Compare commits

...

30 Commits

Author SHA1 Message Date
d05abb4211
Merge branch 'master' of ssh://gitea.snsd0804.com:10805/snsd0805/gpu-provider 2024-06-03 01:47:50 +08:00
8642711cfc
feat: update task status 2024-06-03 01:47:42 +08:00
361988c079
fix: uncommand and let the provider listen events 2024-06-02 23:17:57 +08:00
d7504af42d
feat: new contract abi(with erc20) 2024-06-02 23:17:34 +08:00
272b7b4f4d Merge branch 'scatter_image' of https://gitea.snsd0805.com/snsd0805/gpu-provider into scatter_image 2024-06-02 15:17:20 +08:00
9759f99d29 fix: overlay connection with ipv4 2024-06-02 15:16:44 +08:00
4d28432bf5
feat: do not remove the swarm master in master node 2024-06-01 14:44:52 +08:00
3cae025242
feat: add debug script for testing ipv6 resolve 2024-06-01 13:20:46 +08:00
27ff1c5636
fix: add advertise_addr when join the swarm, try to fix bugs in docker 2024-06-01 13:15:12 +08:00
bc476e398f
feat: remove container automatically 2024-06-01 13:06:05 +08:00
0438f9f7b1
fix: some docker typo 2024-06-01 01:29:54 +08:00
c1c946d01b
fix: dataset typo 2024-06-01 00:53:33 +08:00
3047b898ae
fix: typo 2024-06-01 00:48:30 +08:00
55211c3856
feat: communication about docker image information 2024-06-01 00:47:33 +08:00
3184669630
feat: communication about image scatter 2024-05-31 23:36:15 +08:00
7f650d0673
fix: comment out some blocks to debug & thread join() 2024-05-31 22:59:14 +08:00
d35ee68ccb
feat: check multi-thread 2024-05-31 22:49:37 +08:00
b24d96ba7c
fix: some ui commands 2024-05-31 22:25:29 +08:00
6093742092
feat: ctrc+C to stop listening. & move print 2024-05-31 21:52:52 +08:00
8e9f6b47cf
Merge branch 'listen-event' 2024-05-31 03:48:07 +08:00
a96c6d968e
Merge branch 'master' of ssh://gitea.snsd0805.com:10805/snsd0805/gpu-provider 2024-05-31 03:47:27 +08:00
feb25fc894
feat: listening the new task from blockchain 2024-05-31 03:46:18 +08:00
7fa71768e1
docs: .gitignore 2024-05-31 03:20:10 +08:00
ba2b281b68
feat: include web3 for register the cluster 2024-05-31 03:17:10 +08:00
2b1c8890a9 merge master branch (for the new command when the master leaved) 2024-05-31 02:01:37 +08:00
12caa7358f
feat: load web3 provider in the master node 2024-05-31 01:58:04 +08:00
87646d9661
feat: add web3 config 2024-05-31 01:51:29 +08:00
dd67d88da0 fix: update actions when the worker knows the master has leaved 2024-05-31 01:40:18 +08:00
5afd668379
feat: start listening notification 2024-05-31 01:31:57 +08:00
01024d2f0d
fix: delete debug code 2024-05-31 01:06:24 +08:00
7 changed files with 319 additions and 45 deletions

5
.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
*/__pycache__
__pycache__
output/
dataset_dir/
env/

18
constant.py Normal file
View File

@ -0,0 +1,18 @@
# web3 provider information
# include provider HTTPS endpoint URL & its API Key.
# Alchemy is a good choice for Web3 provider
WEB3_PROVIDER_URL = "https://eth-sepolia.g.alchemy.com/v2/"
WEB3_PROVIDER_KEY = ""
# Please write down your wallet key to sign a transaction with our contract.
WALLET_KEY = ""
# Don't change
SCHEDULER_ADDR = "0xe12dc15f646bC8Ba432254Ffe58595BC31eC2C4d"
SCHEDULER_ABI_FILE = "./src/Scheduler.abi"
GPU_NAME2ID = {
'NVIDIA GeForce RTX 4090': 0,
'NVIDIA GeForce RTX 3060': 1,
}

1
src/Scheduler.abi Normal file

File diff suppressed because one or more lines are too long

View File

@ -1,8 +1,13 @@
from os.path import isdir
import socket
import os
import json
import threading
import docker
import time
from docker.models.containers import Image
class ClusterCommunicationModule():
def __init__(self, host, port, node_manager):
# TCP server
@ -47,6 +52,8 @@ class ClusterCommunicationModule():
if self.node_manager.docker_client.swarm.attrs == {}:
print("Build new docker swarm...")
self.node_manager.docker_client.swarm.init(advertise_addr=self.host, listen_addr=f"{self.host}:2377", force_new_cluster=True)
print("Create new overlay network")
self.node_manager.docker_client.networks.create(name='train-net', driver='overlay', attachable=True)
# send docker swarm token to the worker
token = self.node_manager.docker_client.swarm.attrs['JoinTokens']['Worker']
@ -69,7 +76,13 @@ class ClusterCommunicationModule():
# join docker swarm cluster
token = self.client_sock.recv(1024).decode().split(' ')[-1]
print("Receive Docker Swarm Join_Token=", token)
status = self.node_manager.docker_client.swarm.join(remote_addrs=[f'{addr[0]}:2377'], join_token=token)
status = self.node_manager.docker_client.swarm.join(
remote_addrs=[f'{addr[0]}:2377'],
join_token=token,
advertise_addr=self.host,
listen_addr=self.host,
data_path_addr=self.host,
)
if not status:
print("Some Errors!")
@ -97,10 +110,30 @@ class ClusterCommunicationModule():
self.client_sock.send(f'[STOP_CHECK] {json.dumps(data)}'.encode())
self.node_manager.docker_client.swarm.leave()
self.node_manager.status = 'none'
self.node_manager.actions = [
{'explanation': 'Add another node into our cluster', 'function': 'add_node'},
{'explanation': 'Exit', 'function': 'exit'},
]
print("You have leaved the cluster.")
print("Please Enter to continued")
return False
elif command == '[START_LISTEN_TASK]':
self.client_sock.send('[START_LISTEN_TASK_CHECK] {}'.encode())
print("The master has started listening for new task from Sepolia testnet...")
elif command == '[RUN_CONTAINER]':
args = json.loads(args)
image = args['image']
train = args['train']
print(f"[RUN_CONTAINER] {image}")
self.run_container(image, train, args)
print(f"[RUN_CONTAINER SUCCESS] {image}")
self.client_sock.send('[RUN_CONTAINER_SUCCESS] {}'.encode())
return True
def request(self, host): # master side
@ -136,13 +169,28 @@ class ClusterCommunicationModule():
print("1 worker disconnnected.")
return ans
def start_listen_task(self):
disconnected_counter = 0
for conn in self.worker_conns:
try:
conn.send('[START_LISTEN_TASK] {}'.encode())
data, args = conn.recv(1024).decode().split(' ')
except:
disconnected_counter += 1
self.worker_conns.remove(conn)
if disconnected_counter != 0:
print(f'{disconnected_counter} worker disconnected.')
else:
print(f'All other {len(self.worker_conns)} workers are waiting...')
def exit(self):
if self.node_manager.status == 'master':
for conn in self.worker_conns:
try:
conn.send('[STOP] {}'.encode())
check, args = conn.recv(1024).decode().split(' ')
except:
print(f'{args} has stopped.')
self.node_manager.docker_client.swarm.leave(force=True)
if self.node_manager.status == 'worker':
self.node_manager.docker_client.swarm.leave()
@ -152,6 +200,113 @@ class ClusterCommunicationModule():
for conn in self.worker_conns:
conn.close()
def run_container(self, image_name, train, train_args={}):
'''
train_args
- index
- node_num
'''
if not os.path.isdir('./dataset_dir'):
os.mkdir('./dataset_dir')
print("Create ./dataset_dir dir.")
if not os.path.isdir('./output'):
os.mkdir('./output')
print("Create ./output dir.")
if not train:
container = self.node_manager.docker_client.containers.run(
image_name,
volumes={'dataset_dir': {'bind': '/dataset', 'mode': 'rw'}},
detach=True
)
else:
container = self.node_manager.docker_client.containers.run(
image_name,
volumes={
'dataset_dir': {'bind': '/dataset', 'mode': 'rw'},
'output': {'bind': '/output', 'mode': 'rw'},
},
network='train-net',
runtime='nvidia',
device_requests=[
docker.types.DeviceRequest(count=-1, capabilities=[['gpu']])
],
name=f'train-{train_args["index"]}',
environment={
'GPU_NUM': self.node_manager.GPU_num,
'NODE_NUM': train_args['node_num'],
'NODE_RANK': train_args['index'],
'MASTER_IP': 'train-0' if self.node_manager.status == 'worker' else '127.0.0.1',
'MASTER_PORT': 21046,
},
detach=True
)
print(container.short_id)
for line in container.logs(stream=True):
print(line.strip().decode())
result = container.wait()
status_code = result['StatusCode']
if status_code != 0:
print(f'[ERROR] some error occur in the docker container, error_code={status_code}')
container.remove()
def scatter_container(self, image_name, train):
def master_run(image_name):
print("[Master] run")
train_args = {
'index': 0,
'node_num': len(self.worker_conns)+1
}
self.run_container(image_name, train, train_args)
print("[Master] finished")
def send_and_wait(conn_index, conn, image_name):
try:
# build command
data = {
'image': image_name,
'train': train,
'index': conn_index+1,
'node_num': len(self.worker_conns)+1
}
command = '[RUN_CONTAINER] {}'.format(json.dumps(data))
# send
conn.send(command.encode())
print(f"[WORKER {conn_index}] Send command {command}")
# wait
data, args = conn.recv(1024).decode().split(' ')
print(f"[WORKER {conn_index}] finished")
except:
print("[WARN] connection {conn_index} disconnected.")
# build threads
threads = []
master_t = threading.Thread(target=master_run, args=(image_name, ))
threads.append(master_t)
for index, conn in enumerate(self.worker_conns):
t = threading.Thread(target=send_and_wait, args=(index, conn, image_name))
threads.append(t)
# start threads & wait
for thread in threads:
thread.start()
for thread in threads:
thread.join()
# all finished
print("\n[INFO] All workers finished.")
'''
conn.send('[RUN_CONTAINER] '.encode())
data, args = conn.recv(1024).decode().split(' ')
'''
class ServiceExplorationModule():
def __init__(self, host, port, node_manager):
@ -172,7 +327,6 @@ class ServiceExplorationModule():
while True:
data, addr = self.sock.recvfrom(1024)
print(self.node_manager.status)
if self.node_manager.status == 'none':
if data.decode() == '[EXPLORE]':
self.sock.sendto(self.host.encode(), addr)

View File

@ -3,12 +3,16 @@ from src.communication import ServiceExplorationModule, ClusterCommunicationModu
import torch
import time
import docker
from web3 import Web3
from src.scheduler import Scheduler
from constant import *
class NodeManager():
def __init__(self, host, port):
self.status = 'none'
self.actions = [
{'explanation': 'Add another node into our cluster', 'function': 'add_node'},
{'explanation': 'Start waiting for the new task', 'function': 'start_listen_task'},
{'explanation': 'Exit', 'function': 'exit'},
]
self.get_GPU_info()
@ -25,6 +29,12 @@ class NodeManager():
# docker client
self.docker_client = docker.from_env()
# web3 provider
# if this is master, it should init a Web object.
self.w3 = None
self.scheduler = None
self.wallet = None
time.sleep(2)
def get_GPU_info(self):
@ -59,7 +69,7 @@ class NodeManager():
self.actions = [
{'explanation': 'Add another node into our cluster', 'function': 'add_node'},
{'explanation': 'Cluster info', 'function': 'cluster_info'},
{'explanation': 'Start working', 'function': 'start_work'},
{'explanation': 'Start waiting for the new task', 'function': 'start_listen_task'},
{'explanation': 'Exit', 'function': 'exit'},
]
except:
@ -67,13 +77,76 @@ class NodeManager():
else:
print("No other nodes in your subnet.")
def start_listen_task(self):
self.w3 = Web3(Web3.HTTPProvider(WEB3_PROVIDER_URL + WEB3_PROVIDER_KEY))
self.scheduler = Scheduler(self.w3, SCHEDULER_ADDR, SCHEDULER_ABI_FILE)
self.wallet = self.w3.eth.account.from_key(WALLET_KEY)
print(f"We have use {WEB3_PROVIDER_URL+WEB3_PROVIDER_KEY} as the web3 provider.")
print(f"And we have load your wallet private key {WALLET_KEY} (address={self.wallet.address})")
print()
if self.w3.is_connected():
print("[INFO] Connected Successfully.")
print()
# Register the cluster
gpu_num = self.cluster_info()
gpu_id = GPU_NAME2ID[self.GPU]
print(f"\nWe will register this cluster({self.GPU} * {gpu_num})...")
receipt = self.scheduler.register_cluster(self.wallet, gpu_id, gpu_num)
event = self.scheduler.get_cluster_event(receipt)
print("\n[INFO] Register our cluster succefully. \nThis is our cluster event on the blockchain: ")
print(f" {event[0]['args']}")
# start waiting
self.cluster_communication_module.start_listen_task()
print("\nWaiting for the new task from Sepolia testnet...")
print("Ctrl+C to stop the waiting...")
try:
next_task = self.scheduler.listen_task(self.wallet.address)
except:
print("[INFO] stop the waiting")
return
# get task info
task_index = next_task['args']['taskIndex']
data_image = next_task['args']['dataImage']
train_image = next_task['args']['trainImage']
print("\n[INFO] You Receive a new task:")
print(f" - Task Index: {task_index}")
print(f" - Download Image: {data_image}")
print(f" - Training Image: {train_image}")
# data_image = "snsd0805/cifar100-dataset:v1"
# train_image = "snsd0805/cifar100-train:v3"
# Start Downloading
self.cluster_communication_module.scatter_container(data_image, train=False)
# start training
self.cluster_communication_module.scatter_container(train_image, train=True)
print("Update the task's status...")
self.scheduler.update_status(self.wallet, task_index, 4)
print("Finished.")
else:
print("[ERROR] Connected Failed.")
print("Please check for your provider key & wallet key")
def cluster_info(self):
info = self.cluster_communication_module.cluster_info()
print(f"\nThere are {len(info)+1} nodes in this cluster.")
print("Cluster Info:")
print(f" {self.service_exploration_module.host}(local) -> {self.GPU} * {self.GPU_num}")
GPU_num = self.GPU_num
for host in info:
GPU_num += host['GPU_num']
print(f" {host['host']} -> {host['GPU']} * {host['GPU_num']}")
return GPU_num
def exit(self):
self.cluster_communication_module.exit()

View File

@ -6,9 +6,8 @@ class Scheduler():
self.w3 = w3
abi = load_abi(abi_file)
self.contract = self.w3.eth.contract(address=address, abi=abi)
print(self.contract.functions)
def signedAndSendTransaction(self, account, function):
def signed_and_send_transaction(self, account, function):
tx = function.build_transaction({
'from': account.address,
'nonce': self.w3.eth.get_transaction_count(account.address)
@ -17,24 +16,24 @@ class Scheduler():
tx_hash = self.w3.eth.send_raw_transaction(signed_tx.rawTransaction)
tx_receipt = self.w3.eth.wait_for_transaction_receipt(tx_hash)
_hash = tx_receipt['transactionHash']
return _hash
return tx_receipt
def registerCluster(self, account, gpu_id: int, gpu_num: int):
def register_cluster(self, account, gpu_id: int, gpu_num: int):
function = self.contract.functions.registerCluster(gpu_id, gpu_num)
return self.signedAndSendTransaction(account, function)
return self.signed_and_send_transaction(account, function)
def registerTaskWithConditions(self, account, data_image: str, train_image: str, gpu_id: int, cluster_size: int):
def register_task_with_conditions(self, account, data_image: str, train_image: str, gpu_id: int, cluster_size: int):
function = self.contract.functions.registerTaskWithConditions(data_image, train_image, gpu_id, cluster_size)
return self.signedAndSendTransaction(account, function)
return self.signed_and_send_transaction(account, function)
def getClusters(self):
def get_clusters(self):
return self.contract.functions.getClusters().call()
def getTasks(self):
def get_tasks(self):
return self.contract.functions.getTasks().call()
def getStartRunEvent(self):
'''
def get_start_run_event(self):
event_filter = self.contract.events.StartRun.create_filter(fromBlock=0)
events = event_filter.get_new_entries()
print(events)
@ -43,4 +42,22 @@ class Scheduler():
event_filter = self.contract.events.RegisterCluster.create_filter(fromBlock=0)
events = event_filter.get_new_entries()
print(events)
'''
def get_cluster_event(self, receipt):
event = self.contract.events.RegisterCluster().process_receipt(receipt)
return event
def listen_task(self, address):
event_filter = self.contract.events.StartRun.create_filter(fromBlock='latest')
while True:
for event in event_filter.get_new_entries():
if event['args']['provider'] == address:
return event
def update_status(self, account, task_index, new_status):
function = self.contract.functions.updateStatus(task_index, new_status)
return self.signed_and_send_transaction(account, function)

58
test.py
View File

@ -1,34 +1,40 @@
from web3 import Web3
from src.scheduler import Scheduler
import docker
import os
SCHEDULER_ADDR = "0x544eAe853EA3774A8857573C6423E6Db95b79258"
SCHEDULER_ABI_FILE = "../gpu-contract/abi/Scheduler.abi"
PROVIDER1_KEY = "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80"
PROVIDER2_KEY = "0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d"
PROVIDER3_KEY = "0x5de4111afa1a4b94908f83103eb1f1706367c2e68ca870fc3fb9a804cdab365a"
if not os.path.isdir('./dataset_dir'):
os.mkdir('./dataset_dir')
CLIENT_KEY = "0x7c852118294e51e653712a81e05800f419141751be58f605c371e15141b007a6"
docker_client = docker.from_env()
container = docker_client.containers.run(
'snsd0805/cifar100-train:v3',
volumes={
'dataset_dir': {'bind': '/dataset', 'mode': 'rw'},
'output': {'bind': '/output', 'mode': 'rw'},
},
network='train-net',
runtime='nvidia',
device_requests=[
docker.types.DeviceRequest(count=-1, capabilities=[['gpu']])
],
name='train-0',
environment={
'GPU_NUM': 1,
'NODE_NUM': 1,
'NODE_RANK': 0,
'MASTER_IP': 'train-0',
'MASTER_PORT': 21046,
},
detach=True
)
w3 = Web3(Web3.HTTPProvider('http://127.0.0.1:8545'))
if __name__ == '__main__':
if w3.is_connected():
scheduler = Scheduler(w3, SCHEDULER_ADDR, SCHEDULER_ABI_FILE)
print(container.short_id)
for line in container.logs(stream=True):
print(line.strip().decode())
provider1 = w3.eth.account.from_key(PROVIDER1_KEY)
provider2 = w3.eth.account.from_key(PROVIDER2_KEY)
provider3 = w3.eth.account.from_key(PROVIDER3_KEY)
client = w3.eth.account.from_key(CLIENT_KEY)
result = container.wait()
status_code = result['StatusCode']
print(status_code, type(status_code))
print(scheduler.getClusters())
scheduler.registerCluster(provider1, 1, 4)
scheduler.registerCluster(provider2, 2, 2)
scheduler.registerCluster(provider3, 3, 1)
scheduler.registerTaskWithConditions(client, "https://data.com", "http://train.tw", 3, 1)
print(scheduler.getClusters())
print(scheduler.getTasks())
else:
print("cannot connected to the chain")