Compare commits
No commits in common. "master" and "socket" have entirely different histories.
5
.gitignore
vendored
5
.gitignore
vendored
@ -1,5 +0,0 @@
|
|||||||
*/__pycache__
|
|
||||||
__pycache__
|
|
||||||
output/
|
|
||||||
dataset_dir/
|
|
||||||
env/
|
|
||||||
18
constant.py
18
constant.py
@ -1,18 +0,0 @@
|
|||||||
# web3 provider information
|
|
||||||
# include provider HTTPS endpoint URL & its API Key.
|
|
||||||
# Alchemy is a good choice for Web3 provider
|
|
||||||
WEB3_PROVIDER_URL = "https://eth-sepolia.g.alchemy.com/v2/"
|
|
||||||
WEB3_PROVIDER_KEY = ""
|
|
||||||
|
|
||||||
# Please write down your wallet key to sign a transaction with our contract.
|
|
||||||
WALLET_KEY = ""
|
|
||||||
|
|
||||||
# Don't change
|
|
||||||
SCHEDULER_ADDR = "0xe12dc15f646bC8Ba432254Ffe58595BC31eC2C4d"
|
|
||||||
SCHEDULER_ABI_FILE = "./src/Scheduler.abi"
|
|
||||||
|
|
||||||
GPU_NAME2ID = {
|
|
||||||
'NVIDIA GeForce RTX 4090': 0,
|
|
||||||
'NVIDIA GeForce RTX 3060': 1,
|
|
||||||
}
|
|
||||||
|
|
||||||
52
main.py
52
main.py
@ -1,47 +1,21 @@
|
|||||||
import select
|
import argparse
|
||||||
import sys
|
import threading
|
||||||
|
from src.communication import ServiceExplorationModule
|
||||||
from src.parser import get_args
|
from src.parser import get_args
|
||||||
from src.node_manager import NodeManager
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
args = get_args()
|
args = get_args()
|
||||||
|
|
||||||
|
socket_type = args.type
|
||||||
host, port = args.host, args.port
|
host, port = args.host, args.port
|
||||||
|
|
||||||
manager = NodeManager(host, port)
|
# start Service Exploration Module
|
||||||
manager.start_service()
|
# let all client can know which IP address has our service so that it can link to.
|
||||||
|
service_explore = ServiceExplorationModule(host, port+1)
|
||||||
|
explore_service_thread = threading.Thread(target=service_explore.listen)
|
||||||
|
explore_service_thread.start()
|
||||||
|
|
||||||
|
if socket_type == 'client':
|
||||||
is_run = True
|
hosts = service_explore.explore()
|
||||||
while is_run:
|
print(hosts)
|
||||||
explanation = '=' * 30 + '\n'
|
|
||||||
for index, action in enumerate(manager.actions):
|
|
||||||
explanation += f'{index+1}) {action["explanation"]}\n'
|
|
||||||
explanation += '=' * 30
|
|
||||||
explanation += '\n> '
|
|
||||||
print(explanation, end='')
|
|
||||||
|
|
||||||
while True:
|
|
||||||
if sys.stdin.closed:
|
|
||||||
sys.stdin = open(0)
|
|
||||||
|
|
||||||
read_list, _, _ = select.select([sys.stdin], [], [], 1)
|
|
||||||
|
|
||||||
if read_list:
|
|
||||||
action = sys.stdin.readline()
|
|
||||||
try:
|
|
||||||
action = manager.actions[int(action)-1]
|
|
||||||
if action['function'] == 'exit':
|
|
||||||
manager.exit()
|
|
||||||
is_run = False
|
|
||||||
else:
|
|
||||||
func = getattr(manager, action['function'])
|
|
||||||
func()
|
|
||||||
break
|
|
||||||
except:
|
|
||||||
break
|
|
||||||
|
|
||||||
print("\n\n\n")
|
|
||||||
print("Stopped")
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -1,3 +1 @@
|
|||||||
web3==6.18.0
|
web3==6.18.0
|
||||||
docker
|
|
||||||
|
|
||||||
|
|||||||
File diff suppressed because one or more lines are too long
@ -1,335 +1,26 @@
|
|||||||
from os.path import isdir
|
|
||||||
import socket
|
import socket
|
||||||
import os
|
|
||||||
import json
|
|
||||||
import threading
|
|
||||||
import docker
|
|
||||||
import time
|
|
||||||
|
|
||||||
from docker.models.containers import Image
|
|
||||||
|
|
||||||
class ClusterCommunicationModule():
|
|
||||||
def __init__(self, host, port, node_manager):
|
|
||||||
# TCP server
|
|
||||||
# In our implementation, the master node link to the worker's TCP server.
|
|
||||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
||||||
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
||||||
|
|
||||||
self.client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
||||||
self.host = host
|
|
||||||
self.port = port
|
|
||||||
|
|
||||||
self.node_manager = node_manager
|
|
||||||
self.worker_conns = []
|
|
||||||
|
|
||||||
def listen(self):
|
|
||||||
self.sock.bind((self.host, self.port))
|
|
||||||
self.sock.listen(1)
|
|
||||||
print(f"Communication Server(TCP) listening on {self.host}:{self.port}...")
|
|
||||||
|
|
||||||
while True:
|
|
||||||
conn, addr = self.sock.accept()
|
|
||||||
|
|
||||||
# first request, we should approve the request
|
|
||||||
data = conn.recv(1024)
|
|
||||||
data = data.decode()
|
|
||||||
|
|
||||||
if data == '[REQUEST]': # worker side
|
|
||||||
cont = self.process_request(conn, addr)
|
|
||||||
if cont:
|
|
||||||
while True:
|
|
||||||
# Start receive command
|
|
||||||
data = self.client_sock.recv(1024)
|
|
||||||
data = data.decode()
|
|
||||||
|
|
||||||
cont = self.handle_command(data)
|
|
||||||
if not cont:
|
|
||||||
self.client_sock.close()
|
|
||||||
self.client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
||||||
break
|
|
||||||
elif data == '[CHECK]': # master side
|
|
||||||
# build docker swarm
|
|
||||||
if self.node_manager.docker_client.swarm.attrs == {}:
|
|
||||||
print("Build new docker swarm...")
|
|
||||||
self.node_manager.docker_client.swarm.init(advertise_addr=self.host, listen_addr=f"{self.host}:2377", force_new_cluster=True)
|
|
||||||
print("Create new overlay network")
|
|
||||||
self.node_manager.docker_client.networks.create(name='train-net', driver='overlay', attachable=True)
|
|
||||||
|
|
||||||
# send docker swarm token to the worker
|
|
||||||
token = self.node_manager.docker_client.swarm.attrs['JoinTokens']['Worker']
|
|
||||||
conn.send(f'[DOCKER_TOKEN] {token}'.encode())
|
|
||||||
print(f"Send token: {token} to the worker.")
|
|
||||||
print("Please Enter to continue...")
|
|
||||||
|
|
||||||
self.worker_conns.append(conn)
|
|
||||||
continue
|
|
||||||
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
def process_request(self, conn, addr): # worker side
|
|
||||||
if self.node_manager.status == 'none':
|
|
||||||
conn.send('[ACCEPT]'.encode())
|
|
||||||
conn.close()
|
|
||||||
self.client_sock.connect((addr[0], self.port))
|
|
||||||
self.client_sock.send('[CHECK]'.encode())
|
|
||||||
|
|
||||||
# join docker swarm cluster
|
|
||||||
token = self.client_sock.recv(1024).decode().split(' ')[-1]
|
|
||||||
print("Receive Docker Swarm Join_Token=", token)
|
|
||||||
status = self.node_manager.docker_client.swarm.join(
|
|
||||||
remote_addrs=[f'{addr[0]}:2377'],
|
|
||||||
join_token=token,
|
|
||||||
advertise_addr=self.host,
|
|
||||||
listen_addr=self.host,
|
|
||||||
data_path_addr=self.host,
|
|
||||||
)
|
|
||||||
|
|
||||||
if not status:
|
|
||||||
print("Some Errors!")
|
|
||||||
|
|
||||||
self.node_manager.actions = [
|
|
||||||
{'explanation': 'Exit', 'function': 'exit'},
|
|
||||||
]
|
|
||||||
|
|
||||||
self.node_manager.status = 'worker'
|
|
||||||
print(f'You are in {addr} cluster now.\nPlease type in Enter to continue')
|
|
||||||
return True
|
|
||||||
else:
|
|
||||||
conn.send('[REJECT]'.encode())
|
|
||||||
print(f'You just reject the cluster invitation from {addr} now.\nPlease type in Enter to continue')
|
|
||||||
return False
|
|
||||||
|
|
||||||
def handle_command(self, data):
|
|
||||||
command, args = data.split(' ')
|
|
||||||
if command == '[INFO]':
|
|
||||||
data = {'host': self.host, 'GPU': self.node_manager.GPU, 'GPU_num': self.node_manager.GPU_num}
|
|
||||||
self.client_sock.send(json.dumps(data).encode())
|
|
||||||
elif command == '[STOP]':
|
|
||||||
print("Receive STOP signal")
|
|
||||||
data = {'host': self.host}
|
|
||||||
self.client_sock.send(f'[STOP_CHECK] {json.dumps(data)}'.encode())
|
|
||||||
self.node_manager.docker_client.swarm.leave()
|
|
||||||
self.node_manager.status = 'none'
|
|
||||||
self.node_manager.actions = [
|
|
||||||
{'explanation': 'Add another node into our cluster', 'function': 'add_node'},
|
|
||||||
{'explanation': 'Exit', 'function': 'exit'},
|
|
||||||
]
|
|
||||||
|
|
||||||
print("You have leaved the cluster.")
|
|
||||||
print("Please Enter to continued")
|
|
||||||
|
|
||||||
return False
|
|
||||||
elif command == '[START_LISTEN_TASK]':
|
|
||||||
self.client_sock.send('[START_LISTEN_TASK_CHECK] {}'.encode())
|
|
||||||
print("The master has started listening for new task from Sepolia testnet...")
|
|
||||||
elif command == '[RUN_CONTAINER]':
|
|
||||||
args = json.loads(args)
|
|
||||||
image = args['image']
|
|
||||||
train = args['train']
|
|
||||||
|
|
||||||
print(f"[RUN_CONTAINER] {image}")
|
|
||||||
self.run_container(image, train, args)
|
|
||||||
|
|
||||||
print(f"[RUN_CONTAINER SUCCESS] {image}")
|
|
||||||
self.client_sock.send('[RUN_CONTAINER_SUCCESS] {}'.encode())
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
def request(self, host): # master side
|
|
||||||
self.client_sock.connect((host, self.port))
|
|
||||||
self.client_sock.send('[REQUEST]'.encode())
|
|
||||||
data = self.client_sock.recv(1024)
|
|
||||||
data = data.decode()
|
|
||||||
|
|
||||||
# close client_sock, and waiting for the worker connect to our self.sock
|
|
||||||
self.client_sock.close()
|
|
||||||
self.client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
||||||
|
|
||||||
if data == '[REJECT]':
|
|
||||||
print(f"{host} reject.")
|
|
||||||
return False
|
|
||||||
|
|
||||||
elif data == '[ACCEPT]':
|
|
||||||
self.node_manager.status = 'master'
|
|
||||||
print(f"{host} accept.")
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
def cluster_info(self):
|
|
||||||
ans = []
|
|
||||||
for conn in self.worker_conns:
|
|
||||||
try:
|
|
||||||
conn.send('[INFO] {}'.encode())
|
|
||||||
data = conn.recv(1024)
|
|
||||||
data = json.loads(data.decode())
|
|
||||||
ans.append(data)
|
|
||||||
except:
|
|
||||||
self.worker_conns.remove(conn)
|
|
||||||
print("1 worker disconnnected.")
|
|
||||||
return ans
|
|
||||||
|
|
||||||
def start_listen_task(self):
|
|
||||||
disconnected_counter = 0
|
|
||||||
for conn in self.worker_conns:
|
|
||||||
try:
|
|
||||||
conn.send('[START_LISTEN_TASK] {}'.encode())
|
|
||||||
data, args = conn.recv(1024).decode().split(' ')
|
|
||||||
except:
|
|
||||||
disconnected_counter += 1
|
|
||||||
self.worker_conns.remove(conn)
|
|
||||||
if disconnected_counter != 0:
|
|
||||||
print(f'{disconnected_counter} worker disconnected.')
|
|
||||||
else:
|
|
||||||
print(f'All other {len(self.worker_conns)} workers are waiting...')
|
|
||||||
|
|
||||||
def exit(self):
|
|
||||||
if self.node_manager.status == 'master':
|
|
||||||
for conn in self.worker_conns:
|
|
||||||
try:
|
|
||||||
conn.send('[STOP] {}'.encode())
|
|
||||||
check, args = conn.recv(1024).decode().split(' ')
|
|
||||||
except:
|
|
||||||
print(f'{args} has stopped.')
|
|
||||||
|
|
||||||
if self.node_manager.status == 'worker':
|
|
||||||
self.node_manager.docker_client.swarm.leave()
|
|
||||||
|
|
||||||
self.sock.close()
|
|
||||||
self.client_sock.close()
|
|
||||||
for conn in self.worker_conns:
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
def run_container(self, image_name, train, train_args={}):
|
|
||||||
'''
|
|
||||||
train_args
|
|
||||||
- index
|
|
||||||
- node_num
|
|
||||||
'''
|
|
||||||
if not os.path.isdir('./dataset_dir'):
|
|
||||||
os.mkdir('./dataset_dir')
|
|
||||||
print("Create ./dataset_dir dir.")
|
|
||||||
if not os.path.isdir('./output'):
|
|
||||||
os.mkdir('./output')
|
|
||||||
print("Create ./output dir.")
|
|
||||||
|
|
||||||
if not train:
|
|
||||||
container = self.node_manager.docker_client.containers.run(
|
|
||||||
image_name,
|
|
||||||
volumes={'dataset_dir': {'bind': '/dataset', 'mode': 'rw'}},
|
|
||||||
detach=True
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
container = self.node_manager.docker_client.containers.run(
|
|
||||||
image_name,
|
|
||||||
volumes={
|
|
||||||
'dataset_dir': {'bind': '/dataset', 'mode': 'rw'},
|
|
||||||
'output': {'bind': '/output', 'mode': 'rw'},
|
|
||||||
},
|
|
||||||
network='train-net',
|
|
||||||
runtime='nvidia',
|
|
||||||
device_requests=[
|
|
||||||
docker.types.DeviceRequest(count=-1, capabilities=[['gpu']])
|
|
||||||
],
|
|
||||||
name=f'train-{train_args["index"]}',
|
|
||||||
environment={
|
|
||||||
'GPU_NUM': self.node_manager.GPU_num,
|
|
||||||
'NODE_NUM': train_args['node_num'],
|
|
||||||
'NODE_RANK': train_args['index'],
|
|
||||||
'MASTER_IP': 'train-0' if self.node_manager.status == 'worker' else '127.0.0.1',
|
|
||||||
'MASTER_PORT': 21046,
|
|
||||||
},
|
|
||||||
detach=True
|
|
||||||
)
|
|
||||||
|
|
||||||
print(container.short_id)
|
|
||||||
for line in container.logs(stream=True):
|
|
||||||
print(line.strip().decode())
|
|
||||||
|
|
||||||
result = container.wait()
|
|
||||||
status_code = result['StatusCode']
|
|
||||||
if status_code != 0:
|
|
||||||
print(f'[ERROR] some error occur in the docker container, error_code={status_code}')
|
|
||||||
container.remove()
|
|
||||||
|
|
||||||
def scatter_container(self, image_name, train):
|
|
||||||
def master_run(image_name):
|
|
||||||
print("[Master] run")
|
|
||||||
train_args = {
|
|
||||||
'index': 0,
|
|
||||||
'node_num': len(self.worker_conns)+1
|
|
||||||
}
|
|
||||||
self.run_container(image_name, train, train_args)
|
|
||||||
print("[Master] finished")
|
|
||||||
|
|
||||||
def send_and_wait(conn_index, conn, image_name):
|
|
||||||
try:
|
|
||||||
# build command
|
|
||||||
data = {
|
|
||||||
'image': image_name,
|
|
||||||
'train': train,
|
|
||||||
'index': conn_index+1,
|
|
||||||
'node_num': len(self.worker_conns)+1
|
|
||||||
}
|
|
||||||
command = '[RUN_CONTAINER] {}'.format(json.dumps(data))
|
|
||||||
|
|
||||||
# send
|
|
||||||
conn.send(command.encode())
|
|
||||||
print(f"[WORKER {conn_index}] Send command {command}")
|
|
||||||
|
|
||||||
# wait
|
|
||||||
data, args = conn.recv(1024).decode().split(' ')
|
|
||||||
print(f"[WORKER {conn_index}] finished")
|
|
||||||
|
|
||||||
except:
|
|
||||||
print("[WARN] connection {conn_index} disconnected.")
|
|
||||||
|
|
||||||
# build threads
|
|
||||||
threads = []
|
|
||||||
master_t = threading.Thread(target=master_run, args=(image_name, ))
|
|
||||||
threads.append(master_t)
|
|
||||||
|
|
||||||
for index, conn in enumerate(self.worker_conns):
|
|
||||||
t = threading.Thread(target=send_and_wait, args=(index, conn, image_name))
|
|
||||||
threads.append(t)
|
|
||||||
|
|
||||||
# start threads & wait
|
|
||||||
for thread in threads:
|
|
||||||
thread.start()
|
|
||||||
for thread in threads:
|
|
||||||
thread.join()
|
|
||||||
|
|
||||||
# all finished
|
|
||||||
print("\n[INFO] All workers finished.")
|
|
||||||
|
|
||||||
|
|
||||||
'''
|
|
||||||
conn.send('[RUN_CONTAINER] '.encode())
|
|
||||||
data, args = conn.recv(1024).decode().split(' ')
|
|
||||||
'''
|
|
||||||
|
|
||||||
class ServiceExplorationModule():
|
class ServiceExplorationModule():
|
||||||
def __init__(self, host, port, node_manager):
|
def __init__(self, host, port):
|
||||||
# UDP server
|
# UDP server
|
||||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
||||||
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
||||||
|
|
||||||
self.host = host
|
self.host = host
|
||||||
self.port = port
|
self.port = port
|
||||||
|
self.IP = socket.gethostbyname(socket.gethostname())
|
||||||
self.node_manager = node_manager
|
self.is_available = True
|
||||||
|
|
||||||
def listen(self):
|
def listen(self):
|
||||||
self.sock.bind(('0.0.0.0', self.port))
|
self.sock.bind((self.host, self.port))
|
||||||
print(f"Exploration Server(UDP) listening on {self.host}:{self.port}...")
|
print(f"Exploration Server(UDP) listening on {self.host}:{self.port}...")
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
data, addr = self.sock.recvfrom(1024)
|
data, addr = self.sock.recvfrom(1024)
|
||||||
|
|
||||||
if self.node_manager.status == 'none':
|
if self.is_available:
|
||||||
if data.decode() == '[EXPLORE]':
|
if data.decode() == '[EXPLORE]':
|
||||||
self.sock.sendto(self.host.encode(), addr)
|
print(f"Receive exploration broadcast from {addr}")
|
||||||
|
self.sock.sendto(self.IP.encode(), addr)
|
||||||
|
|
||||||
def explore(self):
|
def explore(self):
|
||||||
available_host = []
|
available_host = []
|
||||||
@ -337,18 +28,15 @@ class ServiceExplorationModule():
|
|||||||
client_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
client_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
client_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
client_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
||||||
|
|
||||||
client_sock.settimeout(1)
|
client_sock.settimeout(3)
|
||||||
client_sock.sendto('[EXPLORE]'.encode(), ('255.255.255.255', self.port))
|
client_sock.sendto('[EXPLORE]'.encode(), ('255.255.255.255', self.port))
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
data, addr = client_sock.recvfrom(1024)
|
data, addr = client_sock.recvfrom(1024)
|
||||||
if addr[0] != self.host:
|
if addr[0] != self.IP:
|
||||||
available_host.append(addr[0])
|
available_host.append(addr)
|
||||||
except:
|
except:
|
||||||
# if socket timeout
|
# if socket timeout
|
||||||
break
|
break
|
||||||
return available_host
|
return available_host
|
||||||
|
|
||||||
def exit(self):
|
|
||||||
self.sock.close()
|
|
||||||
|
|||||||
@ -1,154 +0,0 @@
|
|||||||
import threading
|
|
||||||
from src.communication import ServiceExplorationModule, ClusterCommunicationModule
|
|
||||||
import torch
|
|
||||||
import time
|
|
||||||
import docker
|
|
||||||
from web3 import Web3
|
|
||||||
from src.scheduler import Scheduler
|
|
||||||
from constant import *
|
|
||||||
|
|
||||||
class NodeManager():
|
|
||||||
def __init__(self, host, port):
|
|
||||||
self.status = 'none'
|
|
||||||
self.actions = [
|
|
||||||
{'explanation': 'Add another node into our cluster', 'function': 'add_node'},
|
|
||||||
{'explanation': 'Start waiting for the new task', 'function': 'start_listen_task'},
|
|
||||||
{'explanation': 'Exit', 'function': 'exit'},
|
|
||||||
]
|
|
||||||
self.get_GPU_info()
|
|
||||||
print(f"You have {self.GPU} * {self.GPU_num}")
|
|
||||||
|
|
||||||
# start Cluster Communication Module
|
|
||||||
# let the nodes in the cluster can communicate
|
|
||||||
self.cluster_communication_module = ClusterCommunicationModule(host, port, self)
|
|
||||||
|
|
||||||
# start Service Exploration Module
|
|
||||||
# let all client can know which IP address has our service so that it can link to.
|
|
||||||
self.service_exploration_module = ServiceExplorationModule(host, port+1, self)
|
|
||||||
|
|
||||||
# docker client
|
|
||||||
self.docker_client = docker.from_env()
|
|
||||||
|
|
||||||
# web3 provider
|
|
||||||
# if this is master, it should init a Web object.
|
|
||||||
self.w3 = None
|
|
||||||
self.scheduler = None
|
|
||||||
self.wallet = None
|
|
||||||
|
|
||||||
time.sleep(2)
|
|
||||||
|
|
||||||
def get_GPU_info(self):
|
|
||||||
self.GPU_num = torch.cuda.device_count()
|
|
||||||
assert self.GPU_num > 0, "Your computer doesn't have GPU resource"
|
|
||||||
|
|
||||||
self.GPU = torch.cuda.get_device_name(0)
|
|
||||||
for i in range(self.GPU_num):
|
|
||||||
assert torch.cuda.get_device_name(i) == self.GPU, "Please provide same type of GPUs."
|
|
||||||
|
|
||||||
def start_service(self):
|
|
||||||
communication_thread = threading.Thread(target=self.cluster_communication_module.listen)
|
|
||||||
communication_thread.daemon = True
|
|
||||||
communication_thread.start()
|
|
||||||
|
|
||||||
explore_service_thread = threading.Thread(target=self.service_exploration_module.listen)
|
|
||||||
explore_service_thread.daemon = True
|
|
||||||
explore_service_thread.start()
|
|
||||||
|
|
||||||
def add_node(self):
|
|
||||||
hosts = self.service_exploration_module.explore()
|
|
||||||
if len(hosts) != 0:
|
|
||||||
msg = "These are the nodes you can request for join into our cluster: \n"
|
|
||||||
msg += '\n'.join([f'{index+1}) {host}' for index, host in enumerate(hosts)])
|
|
||||||
msg += '\n> '
|
|
||||||
|
|
||||||
choose = input(msg)
|
|
||||||
try:
|
|
||||||
choose = int(choose)-1
|
|
||||||
accept = self.cluster_communication_module.request(hosts[choose])
|
|
||||||
if accept:
|
|
||||||
self.actions = [
|
|
||||||
{'explanation': 'Add another node into our cluster', 'function': 'add_node'},
|
|
||||||
{'explanation': 'Cluster info', 'function': 'cluster_info'},
|
|
||||||
{'explanation': 'Start waiting for the new task', 'function': 'start_listen_task'},
|
|
||||||
{'explanation': 'Exit', 'function': 'exit'},
|
|
||||||
]
|
|
||||||
except:
|
|
||||||
print("=== FAIL ===")
|
|
||||||
else:
|
|
||||||
print("No other nodes in your subnet.")
|
|
||||||
|
|
||||||
def start_listen_task(self):
|
|
||||||
self.w3 = Web3(Web3.HTTPProvider(WEB3_PROVIDER_URL + WEB3_PROVIDER_KEY))
|
|
||||||
self.scheduler = Scheduler(self.w3, SCHEDULER_ADDR, SCHEDULER_ABI_FILE)
|
|
||||||
self.wallet = self.w3.eth.account.from_key(WALLET_KEY)
|
|
||||||
print(f"We have use {WEB3_PROVIDER_URL+WEB3_PROVIDER_KEY} as the web3 provider.")
|
|
||||||
print(f"And we have load your wallet private key {WALLET_KEY} (address={self.wallet.address})")
|
|
||||||
print()
|
|
||||||
if self.w3.is_connected():
|
|
||||||
print("[INFO] Connected Successfully.")
|
|
||||||
print()
|
|
||||||
|
|
||||||
# Register the cluster
|
|
||||||
gpu_num = self.cluster_info()
|
|
||||||
gpu_id = GPU_NAME2ID[self.GPU]
|
|
||||||
print(f"\nWe will register this cluster({self.GPU} * {gpu_num})...")
|
|
||||||
receipt = self.scheduler.register_cluster(self.wallet, gpu_id, gpu_num)
|
|
||||||
event = self.scheduler.get_cluster_event(receipt)
|
|
||||||
print("\n[INFO] Register our cluster succefully. \nThis is our cluster event on the blockchain: ")
|
|
||||||
print(f" {event[0]['args']}")
|
|
||||||
|
|
||||||
# start waiting
|
|
||||||
self.cluster_communication_module.start_listen_task()
|
|
||||||
print("\nWaiting for the new task from Sepolia testnet...")
|
|
||||||
print("Ctrl+C to stop the waiting...")
|
|
||||||
try:
|
|
||||||
next_task = self.scheduler.listen_task(self.wallet.address)
|
|
||||||
|
|
||||||
except:
|
|
||||||
print("[INFO] stop the waiting")
|
|
||||||
return
|
|
||||||
|
|
||||||
# get task info
|
|
||||||
task_index = next_task['args']['taskIndex']
|
|
||||||
data_image = next_task['args']['dataImage']
|
|
||||||
train_image = next_task['args']['trainImage']
|
|
||||||
|
|
||||||
print("\n[INFO] You Receive a new task:")
|
|
||||||
print(f" - Task Index: {task_index}")
|
|
||||||
print(f" - Download Image: {data_image}")
|
|
||||||
print(f" - Training Image: {train_image}")
|
|
||||||
|
|
||||||
# data_image = "snsd0805/cifar100-dataset:v1"
|
|
||||||
# train_image = "snsd0805/cifar100-train:v3"
|
|
||||||
|
|
||||||
# Start Downloading
|
|
||||||
self.cluster_communication_module.scatter_container(data_image, train=False)
|
|
||||||
|
|
||||||
# start training
|
|
||||||
self.cluster_communication_module.scatter_container(train_image, train=True)
|
|
||||||
|
|
||||||
print("Update the task's status...")
|
|
||||||
self.scheduler.update_status(self.wallet, task_index, 4)
|
|
||||||
print("Finished.")
|
|
||||||
|
|
||||||
|
|
||||||
else:
|
|
||||||
print("[ERROR] Connected Failed.")
|
|
||||||
print("Please check for your provider key & wallet key")
|
|
||||||
|
|
||||||
|
|
||||||
def cluster_info(self):
|
|
||||||
info = self.cluster_communication_module.cluster_info()
|
|
||||||
print(f"\nThere are {len(info)+1} nodes in this cluster.")
|
|
||||||
print("Cluster Info:")
|
|
||||||
print(f" {self.service_exploration_module.host}(local) -> {self.GPU} * {self.GPU_num}")
|
|
||||||
GPU_num = self.GPU_num
|
|
||||||
for host in info:
|
|
||||||
GPU_num += host['GPU_num']
|
|
||||||
print(f" {host['host']} -> {host['GPU']} * {host['GPU_num']}")
|
|
||||||
return GPU_num
|
|
||||||
|
|
||||||
def exit(self):
|
|
||||||
self.cluster_communication_module.exit()
|
|
||||||
self.service_exploration_module.exit()
|
|
||||||
|
|
||||||
@ -2,7 +2,8 @@ import argparse
|
|||||||
|
|
||||||
def get_args():
|
def get_args():
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
parser.add_argument('host', type=str, help="server's IPv4 address")
|
parser.add_argument('type', type=str, help="'client' or 'server'", default='server')
|
||||||
|
parser.add_argument('--host', type=str, help="server's IPv4 address", default='0.0.0.0')
|
||||||
parser.add_argument('--port', type=int, help="server's listening port", default=8888)
|
parser.add_argument('--port', type=int, help="server's listening port", default=8888)
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|||||||
@ -6,8 +6,9 @@ class Scheduler():
|
|||||||
self.w3 = w3
|
self.w3 = w3
|
||||||
abi = load_abi(abi_file)
|
abi = load_abi(abi_file)
|
||||||
self.contract = self.w3.eth.contract(address=address, abi=abi)
|
self.contract = self.w3.eth.contract(address=address, abi=abi)
|
||||||
|
print(self.contract.functions)
|
||||||
|
|
||||||
def signed_and_send_transaction(self, account, function):
|
def signedAndSendTransaction(self, account, function):
|
||||||
tx = function.build_transaction({
|
tx = function.build_transaction({
|
||||||
'from': account.address,
|
'from': account.address,
|
||||||
'nonce': self.w3.eth.get_transaction_count(account.address)
|
'nonce': self.w3.eth.get_transaction_count(account.address)
|
||||||
@ -16,24 +17,24 @@ class Scheduler():
|
|||||||
|
|
||||||
tx_hash = self.w3.eth.send_raw_transaction(signed_tx.rawTransaction)
|
tx_hash = self.w3.eth.send_raw_transaction(signed_tx.rawTransaction)
|
||||||
tx_receipt = self.w3.eth.wait_for_transaction_receipt(tx_hash)
|
tx_receipt = self.w3.eth.wait_for_transaction_receipt(tx_hash)
|
||||||
return tx_receipt
|
_hash = tx_receipt['transactionHash']
|
||||||
|
return _hash
|
||||||
|
|
||||||
def register_cluster(self, account, gpu_id: int, gpu_num: int):
|
def registerCluster(self, account, gpu_id: int, gpu_num: int):
|
||||||
function = self.contract.functions.registerCluster(gpu_id, gpu_num)
|
function = self.contract.functions.registerCluster(gpu_id, gpu_num)
|
||||||
return self.signed_and_send_transaction(account, function)
|
return self.signedAndSendTransaction(account, function)
|
||||||
|
|
||||||
def register_task_with_conditions(self, account, data_image: str, train_image: str, gpu_id: int, cluster_size: int):
|
def registerTaskWithConditions(self, account, data_image: str, train_image: str, gpu_id: int, cluster_size: int):
|
||||||
function = self.contract.functions.registerTaskWithConditions(data_image, train_image, gpu_id, cluster_size)
|
function = self.contract.functions.registerTaskWithConditions(data_image, train_image, gpu_id, cluster_size)
|
||||||
return self.signed_and_send_transaction(account, function)
|
return self.signedAndSendTransaction(account, function)
|
||||||
|
|
||||||
def get_clusters(self):
|
def getClusters(self):
|
||||||
return self.contract.functions.getClusters().call()
|
return self.contract.functions.getClusters().call()
|
||||||
|
|
||||||
def get_tasks(self):
|
def getTasks(self):
|
||||||
return self.contract.functions.getTasks().call()
|
return self.contract.functions.getTasks().call()
|
||||||
|
|
||||||
'''
|
def getStartRunEvent(self):
|
||||||
def get_start_run_event(self):
|
|
||||||
event_filter = self.contract.events.StartRun.create_filter(fromBlock=0)
|
event_filter = self.contract.events.StartRun.create_filter(fromBlock=0)
|
||||||
events = event_filter.get_new_entries()
|
events = event_filter.get_new_entries()
|
||||||
print(events)
|
print(events)
|
||||||
@ -42,22 +43,4 @@ class Scheduler():
|
|||||||
event_filter = self.contract.events.RegisterCluster.create_filter(fromBlock=0)
|
event_filter = self.contract.events.RegisterCluster.create_filter(fromBlock=0)
|
||||||
events = event_filter.get_new_entries()
|
events = event_filter.get_new_entries()
|
||||||
print(events)
|
print(events)
|
||||||
'''
|
|
||||||
|
|
||||||
def get_cluster_event(self, receipt):
|
|
||||||
event = self.contract.events.RegisterCluster().process_receipt(receipt)
|
|
||||||
return event
|
|
||||||
|
|
||||||
def listen_task(self, address):
|
|
||||||
event_filter = self.contract.events.StartRun.create_filter(fromBlock='latest')
|
|
||||||
while True:
|
|
||||||
for event in event_filter.get_new_entries():
|
|
||||||
if event['args']['provider'] == address:
|
|
||||||
return event
|
|
||||||
|
|
||||||
def update_status(self, account, task_index, new_status):
|
|
||||||
function = self.contract.functions.updateStatus(task_index, new_status)
|
|
||||||
return self.signed_and_send_transaction(account, function)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
58
test.py
58
test.py
@ -1,40 +1,34 @@
|
|||||||
import docker
|
from web3 import Web3
|
||||||
import os
|
from src.scheduler import Scheduler
|
||||||
|
|
||||||
|
SCHEDULER_ADDR = "0x544eAe853EA3774A8857573C6423E6Db95b79258"
|
||||||
|
SCHEDULER_ABI_FILE = "../gpu-contract/abi/Scheduler.abi"
|
||||||
|
|
||||||
if not os.path.isdir('./dataset_dir'):
|
PROVIDER1_KEY = "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80"
|
||||||
os.mkdir('./dataset_dir')
|
PROVIDER2_KEY = "0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d"
|
||||||
|
PROVIDER3_KEY = "0x5de4111afa1a4b94908f83103eb1f1706367c2e68ca870fc3fb9a804cdab365a"
|
||||||
|
|
||||||
docker_client = docker.from_env()
|
CLIENT_KEY = "0x7c852118294e51e653712a81e05800f419141751be58f605c371e15141b007a6"
|
||||||
container = docker_client.containers.run(
|
|
||||||
'snsd0805/cifar100-train:v3',
|
|
||||||
volumes={
|
|
||||||
'dataset_dir': {'bind': '/dataset', 'mode': 'rw'},
|
|
||||||
'output': {'bind': '/output', 'mode': 'rw'},
|
|
||||||
},
|
|
||||||
network='train-net',
|
|
||||||
runtime='nvidia',
|
|
||||||
device_requests=[
|
|
||||||
docker.types.DeviceRequest(count=-1, capabilities=[['gpu']])
|
|
||||||
],
|
|
||||||
name='train-0',
|
|
||||||
environment={
|
|
||||||
'GPU_NUM': 1,
|
|
||||||
'NODE_NUM': 1,
|
|
||||||
'NODE_RANK': 0,
|
|
||||||
'MASTER_IP': 'train-0',
|
|
||||||
'MASTER_PORT': 21046,
|
|
||||||
},
|
|
||||||
detach=True
|
|
||||||
)
|
|
||||||
|
|
||||||
|
w3 = Web3(Web3.HTTPProvider('http://127.0.0.1:8545'))
|
||||||
|
|
||||||
print(container.short_id)
|
if __name__ == '__main__':
|
||||||
for line in container.logs(stream=True):
|
if w3.is_connected():
|
||||||
print(line.strip().decode())
|
scheduler = Scheduler(w3, SCHEDULER_ADDR, SCHEDULER_ABI_FILE)
|
||||||
|
|
||||||
result = container.wait()
|
provider1 = w3.eth.account.from_key(PROVIDER1_KEY)
|
||||||
status_code = result['StatusCode']
|
provider2 = w3.eth.account.from_key(PROVIDER2_KEY)
|
||||||
print(status_code, type(status_code))
|
provider3 = w3.eth.account.from_key(PROVIDER3_KEY)
|
||||||
|
client = w3.eth.account.from_key(CLIENT_KEY)
|
||||||
|
|
||||||
|
print(scheduler.getClusters())
|
||||||
|
scheduler.registerCluster(provider1, 1, 4)
|
||||||
|
scheduler.registerCluster(provider2, 2, 2)
|
||||||
|
scheduler.registerCluster(provider3, 3, 1)
|
||||||
|
scheduler.registerTaskWithConditions(client, "https://data.com", "http://train.tw", 3, 1)
|
||||||
|
print(scheduler.getClusters())
|
||||||
|
print(scheduler.getTasks())
|
||||||
|
|
||||||
|
else:
|
||||||
|
print("cannot connected to the chain")
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user