Compare commits
No commits in common. "4d28432bf595fdd22d8d10c58afe3fcfa4113ef4" and "b24d96ba7c791379b88c0ee4e19e5de38119f83f" have entirely different histories.
4d28432bf5
...
b24d96ba7c
@ -1,13 +1,8 @@
|
|||||||
from os.path import isdir
|
|
||||||
import socket
|
import socket
|
||||||
import os
|
|
||||||
import json
|
import json
|
||||||
import threading
|
|
||||||
import docker
|
import docker
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from docker.models.containers import Image
|
|
||||||
|
|
||||||
class ClusterCommunicationModule():
|
class ClusterCommunicationModule():
|
||||||
def __init__(self, host, port, node_manager):
|
def __init__(self, host, port, node_manager):
|
||||||
# TCP server
|
# TCP server
|
||||||
@ -52,8 +47,6 @@ class ClusterCommunicationModule():
|
|||||||
if self.node_manager.docker_client.swarm.attrs == {}:
|
if self.node_manager.docker_client.swarm.attrs == {}:
|
||||||
print("Build new docker swarm...")
|
print("Build new docker swarm...")
|
||||||
self.node_manager.docker_client.swarm.init(advertise_addr=self.host, listen_addr=f"{self.host}:2377", force_new_cluster=True)
|
self.node_manager.docker_client.swarm.init(advertise_addr=self.host, listen_addr=f"{self.host}:2377", force_new_cluster=True)
|
||||||
print("Create new overlay network")
|
|
||||||
self.node_manager.docker_client.networks.create(name='train-net', driver='overlay', attachable=True)
|
|
||||||
|
|
||||||
# send docker swarm token to the worker
|
# send docker swarm token to the worker
|
||||||
token = self.node_manager.docker_client.swarm.attrs['JoinTokens']['Worker']
|
token = self.node_manager.docker_client.swarm.attrs['JoinTokens']['Worker']
|
||||||
@ -76,7 +69,7 @@ class ClusterCommunicationModule():
|
|||||||
# join docker swarm cluster
|
# join docker swarm cluster
|
||||||
token = self.client_sock.recv(1024).decode().split(' ')[-1]
|
token = self.client_sock.recv(1024).decode().split(' ')[-1]
|
||||||
print("Receive Docker Swarm Join_Token=", token)
|
print("Receive Docker Swarm Join_Token=", token)
|
||||||
status = self.node_manager.docker_client.swarm.join(remote_addrs=[f'{addr[0]}:2377'], join_token=token, advertise_addr=f'{addr[0]}:2377')
|
status = self.node_manager.docker_client.swarm.join(remote_addrs=[f'{addr[0]}:2377'], join_token=token)
|
||||||
|
|
||||||
if not status:
|
if not status:
|
||||||
print("Some Errors!")
|
print("Some Errors!")
|
||||||
@ -116,16 +109,6 @@ class ClusterCommunicationModule():
|
|||||||
elif command == '[START_LISTEN_TASK]':
|
elif command == '[START_LISTEN_TASK]':
|
||||||
self.client_sock.send('[START_LISTEN_TASK_CHECK] {}'.encode())
|
self.client_sock.send('[START_LISTEN_TASK_CHECK] {}'.encode())
|
||||||
print("The master has started listening for new task from Sepolia testnet...")
|
print("The master has started listening for new task from Sepolia testnet...")
|
||||||
elif command == '[RUN_CONTAINER]':
|
|
||||||
args = json.loads(args)
|
|
||||||
image = args['image']
|
|
||||||
train = args['train']
|
|
||||||
|
|
||||||
print(f"[RUN_CONTAINER] {image}")
|
|
||||||
self.run_container(image, train, args)
|
|
||||||
|
|
||||||
print(f"[RUN_CONTAINER SUCCESS] {image}")
|
|
||||||
self.client_sock.send('[RUN_CONTAINER_SUCCESS] {}'.encode())
|
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@ -180,11 +163,10 @@ class ClusterCommunicationModule():
|
|||||||
def exit(self):
|
def exit(self):
|
||||||
if self.node_manager.status == 'master':
|
if self.node_manager.status == 'master':
|
||||||
for conn in self.worker_conns:
|
for conn in self.worker_conns:
|
||||||
try:
|
conn.send('[STOP] {}'.encode())
|
||||||
conn.send('[STOP] {}'.encode())
|
check, args = conn.recv(1024).decode().split(' ')
|
||||||
check, args = conn.recv(1024).decode().split(' ')
|
print(f'{args} has stopped.')
|
||||||
except:
|
self.node_manager.docker_client.swarm.leave(force=True)
|
||||||
print(f'{args} has stopped.')
|
|
||||||
|
|
||||||
if self.node_manager.status == 'worker':
|
if self.node_manager.status == 'worker':
|
||||||
self.node_manager.docker_client.swarm.leave()
|
self.node_manager.docker_client.swarm.leave()
|
||||||
@ -193,114 +175,7 @@ class ClusterCommunicationModule():
|
|||||||
self.client_sock.close()
|
self.client_sock.close()
|
||||||
for conn in self.worker_conns:
|
for conn in self.worker_conns:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
def run_container(self, image_name, train, train_args={}):
|
|
||||||
'''
|
|
||||||
train_args
|
|
||||||
- index
|
|
||||||
- node_num
|
|
||||||
'''
|
|
||||||
if not os.path.isdir('./dataset_dir'):
|
|
||||||
os.mkdir('./dataset_dir')
|
|
||||||
print("Create ./dataset_dir dir.")
|
|
||||||
if not os.path.isdir('./output'):
|
|
||||||
os.mkdir('./output')
|
|
||||||
print("Create ./output dir.")
|
|
||||||
|
|
||||||
if not train:
|
|
||||||
container = self.node_manager.docker_client.containers.run(
|
|
||||||
image_name,
|
|
||||||
volumes={'dataset_dir': {'bind': '/dataset', 'mode': 'rw'}},
|
|
||||||
detach=True
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
container = self.node_manager.docker_client.containers.run(
|
|
||||||
image_name,
|
|
||||||
volumes={
|
|
||||||
'dataset_dir': {'bind': '/dataset', 'mode': 'rw'},
|
|
||||||
'output': {'bind': '/output', 'mode': 'rw'},
|
|
||||||
},
|
|
||||||
network='train-net',
|
|
||||||
runtime='nvidia',
|
|
||||||
device_requests=[
|
|
||||||
docker.types.DeviceRequest(count=-1, capabilities=[['gpu']])
|
|
||||||
],
|
|
||||||
name=f'train-{train_args["index"]}',
|
|
||||||
environment={
|
|
||||||
'GPU_NUM': self.node_manager.GPU_num,
|
|
||||||
'NODE_NUM': train_args['node_num'],
|
|
||||||
'NODE_RANK': train_args['index'],
|
|
||||||
'MASTER_IP': 'train-0' if self.node_manager.status == 'worker' else '127.0.0.1',
|
|
||||||
'MASTER_PORT': 21046,
|
|
||||||
},
|
|
||||||
detach=True
|
|
||||||
)
|
|
||||||
|
|
||||||
print(container.short_id)
|
|
||||||
for line in container.logs(stream=True):
|
|
||||||
print(line.strip().decode())
|
|
||||||
|
|
||||||
result = container.wait()
|
|
||||||
status_code = result['StatusCode']
|
|
||||||
if status_code != 0:
|
|
||||||
print(f'[ERROR] some error occur in the docker container, error_code={status_code}')
|
|
||||||
container.remove()
|
|
||||||
|
|
||||||
def scatter_container(self, image_name, train):
|
|
||||||
def master_run(image_name):
|
|
||||||
print("[Master] run")
|
|
||||||
train_args = {
|
|
||||||
'index': 0,
|
|
||||||
'node_num': len(self.worker_conns)+1
|
|
||||||
}
|
|
||||||
self.run_container(image_name, train, train_args)
|
|
||||||
print("[Master] finished")
|
|
||||||
|
|
||||||
def send_and_wait(conn_index, conn, image_name):
|
|
||||||
try:
|
|
||||||
# build command
|
|
||||||
data = {
|
|
||||||
'image': image_name,
|
|
||||||
'train': train,
|
|
||||||
'index': conn_index+1,
|
|
||||||
'node_num': len(self.worker_conns)+1
|
|
||||||
}
|
|
||||||
command = '[RUN_CONTAINER] {}'.format(json.dumps(data))
|
|
||||||
|
|
||||||
# send
|
|
||||||
conn.send(command.encode())
|
|
||||||
print(f"[WORKER {conn_index}] Send command {command}")
|
|
||||||
|
|
||||||
# wait
|
|
||||||
data, args = conn.recv(1024).decode().split(' ')
|
|
||||||
print(f"[WORKER {conn_index}] finished")
|
|
||||||
|
|
||||||
except:
|
|
||||||
print("[WARN] connection {conn_index} disconnected.")
|
|
||||||
|
|
||||||
# build threads
|
|
||||||
threads = []
|
|
||||||
master_t = threading.Thread(target=master_run, args=(image_name, ))
|
|
||||||
threads.append(master_t)
|
|
||||||
|
|
||||||
for index, conn in enumerate(self.worker_conns):
|
|
||||||
t = threading.Thread(target=send_and_wait, args=(index, conn, image_name))
|
|
||||||
threads.append(t)
|
|
||||||
|
|
||||||
# start threads & wait
|
|
||||||
for thread in threads:
|
|
||||||
thread.start()
|
|
||||||
for thread in threads:
|
|
||||||
thread.join()
|
|
||||||
|
|
||||||
# all finished
|
|
||||||
print("\n[INFO] All workers finished.")
|
|
||||||
|
|
||||||
|
|
||||||
'''
|
|
||||||
conn.send('[RUN_CONTAINER] '.encode())
|
|
||||||
data, args = conn.recv(1024).decode().split(' ')
|
|
||||||
'''
|
|
||||||
|
|
||||||
class ServiceExplorationModule():
|
class ServiceExplorationModule():
|
||||||
def __init__(self, host, port, node_manager):
|
def __init__(self, host, port, node_manager):
|
||||||
|
|||||||
@ -85,7 +85,6 @@ class NodeManager():
|
|||||||
print(f"And we have load your wallet private key {WALLET_KEY} (address={self.wallet.address})")
|
print(f"And we have load your wallet private key {WALLET_KEY} (address={self.wallet.address})")
|
||||||
print()
|
print()
|
||||||
if self.w3.is_connected():
|
if self.w3.is_connected():
|
||||||
'''
|
|
||||||
print("[INFO] Connected Successfully.")
|
print("[INFO] Connected Successfully.")
|
||||||
print()
|
print()
|
||||||
|
|
||||||
@ -117,16 +116,8 @@ class NodeManager():
|
|||||||
print("\n[INFO] You Receive a new task:")
|
print("\n[INFO] You Receive a new task:")
|
||||||
print(f" - Download Image: {data_image}")
|
print(f" - Download Image: {data_image}")
|
||||||
print(f" - Training Image: {train_image}")
|
print(f" - Training Image: {train_image}")
|
||||||
'''
|
|
||||||
|
|
||||||
data_image = "snsd0805/cifar100-dataset:v1"
|
|
||||||
train_image = "snsd0805/cifar100-train:v3"
|
|
||||||
|
|
||||||
# Start Downloading
|
# Start Downloading
|
||||||
# self.cluster_communication_module.scatter_container(data_image, train=False)
|
|
||||||
|
|
||||||
# start training
|
|
||||||
self.cluster_communication_module.scatter_container(train_image, train=True)
|
|
||||||
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
|||||||
58
test.py
58
test.py
@ -1,40 +1,34 @@
|
|||||||
import docker
|
from web3 import Web3
|
||||||
import os
|
from src.scheduler import Scheduler
|
||||||
|
|
||||||
|
SCHEDULER_ADDR = "0x544eAe853EA3774A8857573C6423E6Db95b79258"
|
||||||
|
SCHEDULER_ABI_FILE = "../gpu-contract/abi/Scheduler.abi"
|
||||||
|
|
||||||
if not os.path.isdir('./dataset_dir'):
|
PROVIDER1_KEY = "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80"
|
||||||
os.mkdir('./dataset_dir')
|
PROVIDER2_KEY = "0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d"
|
||||||
|
PROVIDER3_KEY = "0x5de4111afa1a4b94908f83103eb1f1706367c2e68ca870fc3fb9a804cdab365a"
|
||||||
|
|
||||||
docker_client = docker.from_env()
|
CLIENT_KEY = "0x7c852118294e51e653712a81e05800f419141751be58f605c371e15141b007a6"
|
||||||
container = docker_client.containers.run(
|
|
||||||
'snsd0805/cifar100-train:v3',
|
|
||||||
volumes={
|
|
||||||
'dataset_dir': {'bind': '/dataset', 'mode': 'rw'},
|
|
||||||
'output': {'bind': '/output', 'mode': 'rw'},
|
|
||||||
},
|
|
||||||
network='train-net',
|
|
||||||
runtime='nvidia',
|
|
||||||
device_requests=[
|
|
||||||
docker.types.DeviceRequest(count=-1, capabilities=[['gpu']])
|
|
||||||
],
|
|
||||||
name='train-0',
|
|
||||||
environment={
|
|
||||||
'GPU_NUM': 1,
|
|
||||||
'NODE_NUM': 1,
|
|
||||||
'NODE_RANK': 0,
|
|
||||||
'MASTER_IP': 'train-0',
|
|
||||||
'MASTER_PORT': 21046,
|
|
||||||
},
|
|
||||||
detach=True
|
|
||||||
)
|
|
||||||
|
|
||||||
|
w3 = Web3(Web3.HTTPProvider('http://127.0.0.1:8545'))
|
||||||
|
|
||||||
print(container.short_id)
|
if __name__ == '__main__':
|
||||||
for line in container.logs(stream=True):
|
if w3.is_connected():
|
||||||
print(line.strip().decode())
|
scheduler = Scheduler(w3, SCHEDULER_ADDR, SCHEDULER_ABI_FILE)
|
||||||
|
|
||||||
result = container.wait()
|
provider1 = w3.eth.account.from_key(PROVIDER1_KEY)
|
||||||
status_code = result['StatusCode']
|
provider2 = w3.eth.account.from_key(PROVIDER2_KEY)
|
||||||
print(status_code, type(status_code))
|
provider3 = w3.eth.account.from_key(PROVIDER3_KEY)
|
||||||
|
client = w3.eth.account.from_key(CLIENT_KEY)
|
||||||
|
|
||||||
|
print(scheduler.getClusters())
|
||||||
|
scheduler.registerCluster(provider1, 1, 4)
|
||||||
|
scheduler.registerCluster(provider2, 2, 2)
|
||||||
|
scheduler.registerCluster(provider3, 3, 1)
|
||||||
|
scheduler.registerTaskWithConditions(client, "https://data.com", "http://train.tw", 3, 1)
|
||||||
|
print(scheduler.getClusters())
|
||||||
|
print(scheduler.getTasks())
|
||||||
|
|
||||||
|
else:
|
||||||
|
print("cannot connected to the chain")
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user