Compare commits

..

11 Commits

4 changed files with 66 additions and 15 deletions

View File

@ -1 +1,3 @@
web3==6.18.0 web3==6.18.0
docker

View File

@ -1,5 +1,7 @@
import socket import socket
import json import json
import docker
import time
class ClusterCommunicationModule(): class ClusterCommunicationModule():
def __init__(self, host, port, node_manager): def __init__(self, host, port, node_manager):
@ -37,8 +39,21 @@ class ClusterCommunicationModule():
cont = self.handle_command(data) cont = self.handle_command(data)
if not cont: if not cont:
self.client_sock.close()
self.client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
break break
elif data == '[CHECK]': # master side elif data == '[CHECK]': # master side
# build docker swarm
if self.node_manager.docker_client.swarm.attrs == {}:
print("Build new docker swarm...")
self.node_manager.docker_client.swarm.init(advertise_addr=self.host, listen_addr=f"{self.host}:2377", force_new_cluster=True)
# send docker swarm token to the worker
token = self.node_manager.docker_client.swarm.attrs['JoinTokens']['Worker']
conn.send(f'[DOCKER_TOKEN] {token}'.encode())
print(f"Send token: {token} to the worker.")
print("Please Enter to continue...")
self.worker_conns.append(conn) self.worker_conns.append(conn)
continue continue
@ -51,8 +66,17 @@ class ClusterCommunicationModule():
self.client_sock.connect((addr[0], self.port)) self.client_sock.connect((addr[0], self.port))
self.client_sock.send('[CHECK]'.encode()) self.client_sock.send('[CHECK]'.encode())
# remove 'add node' # join docker swarm cluster
self.node_manager.actions = self.node_manager.actions[1:] token = self.client_sock.recv(1024).decode().split(' ')[-1]
print("Receive Docker Swarm Join_Token=", token)
status = self.node_manager.docker_client.swarm.join(remote_addrs=[f'{addr[0]}:2377'], join_token=token)
if not status:
print("Some Errors!")
self.node_manager.actions = [
{'explanation': 'Exit', 'function': 'exit'},
]
self.node_manager.status = 'worker' self.node_manager.status = 'worker'
print(f'You are in {addr} cluster now.\nPlease type in Enter to continue') print(f'You are in {addr} cluster now.\nPlease type in Enter to continue')
@ -63,12 +87,21 @@ class ClusterCommunicationModule():
return False return False
def handle_command(self, data): def handle_command(self, data):
command, args = data.split() command, args = data.split(' ')
if command == '[INFO]': if command == '[INFO]':
data = {'host': self.host, 'GPU': self.node_manager.GPU, 'GPU_num': self.node_manager.GPU_num} data = {'host': self.host, 'GPU': self.node_manager.GPU, 'GPU_num': self.node_manager.GPU_num}
self.client_sock.send(json.dumps(data).encode()) self.client_sock.send(json.dumps(data).encode())
elif command == '[STOP]':
return True print("Receive STOP signal")
data = {'host': self.host}
self.client_sock.send(f'[STOP_CHECK] {json.dumps(data)}'.encode())
self.node_manager.docker_client.swarm.leave()
self.node_manager.status = 'none'
print("You have leaved the cluster.")
return False
def request(self, host): # master side def request(self, host): # master side
self.client_sock.connect((host, self.port)) self.client_sock.connect((host, self.port))
@ -83,25 +116,37 @@ class ClusterCommunicationModule():
if data == '[REJECT]': if data == '[REJECT]':
print(f"{host} reject.") print(f"{host} reject.")
return False return False
elif data == '[ACCEPT]': elif data == '[ACCEPT]':
self.node_manager.status = 'master' self.node_manager.status = 'master'
print(f"{host} accept.") print(f"{host} accept.")
return True return True
def cluster_info(self): def cluster_info(self):
ans = [] ans = []
for conn in self.worker_conns: for conn in self.worker_conns:
try: try:
conn.send('[INFO] {}'.encode()) conn.send('[INFO] {}'.encode())
data = conn.recv(1024) data = conn.recv(1024)
data = json.loads(data.decode()) data = json.loads(data.decode())
ans.append(data) ans.append(data)
except: except:
self.worker_conns.remove(conn) self.worker_conns.remove(conn)
print("1 worker disconnnected.")
return ans return ans
def exit(self): def exit(self):
if self.node_manager.status == 'master':
for conn in self.worker_conns:
conn.send('[STOP] {}'.encode())
check, args = conn.recv(1024).decode().split(' ')
print(f'{args} has stopped.')
self.node_manager.docker_client.swarm.leave(force=True)
if self.node_manager.status == 'worker':
self.node_manager.docker_client.swarm.leave()
self.sock.close() self.sock.close()
self.client_sock.close() self.client_sock.close()
for conn in self.worker_conns: for conn in self.worker_conns:
@ -127,6 +172,7 @@ class ServiceExplorationModule():
while True: while True:
data, addr = self.sock.recvfrom(1024) data, addr = self.sock.recvfrom(1024)
print(self.node_manager.status)
if self.node_manager.status == 'none': if self.node_manager.status == 'none':
if data.decode() == '[EXPLORE]': if data.decode() == '[EXPLORE]':
self.sock.sendto(self.host.encode(), addr) self.sock.sendto(self.host.encode(), addr)

View File

@ -2,6 +2,7 @@ import threading
from src.communication import ServiceExplorationModule, ClusterCommunicationModule from src.communication import ServiceExplorationModule, ClusterCommunicationModule
import torch import torch
import time import time
import docker
class NodeManager(): class NodeManager():
def __init__(self, host, port): def __init__(self, host, port):
@ -21,6 +22,9 @@ class NodeManager():
# let all client can know which IP address has our service so that it can link to. # let all client can know which IP address has our service so that it can link to.
self.service_exploration_module = ServiceExplorationModule(host, port+1, self) self.service_exploration_module = ServiceExplorationModule(host, port+1, self)
# docker client
self.docker_client = docker.from_env()
time.sleep(2) time.sleep(2)
def get_GPU_info(self): def get_GPU_info(self):
@ -52,12 +56,12 @@ class NodeManager():
choose = int(choose)-1 choose = int(choose)-1
accept = self.cluster_communication_module.request(hosts[choose]) accept = self.cluster_communication_module.request(hosts[choose])
if accept: if accept:
exit_func = self.actions[-1] self.actions = [
self.actions = self.actions[:-1] {'explanation': 'Add another node into our cluster', 'function': 'add_node'},
info_func = {'explanation': 'cluster info', 'function': 'cluster_info'} {'explanation': 'Cluster info', 'function': 'cluster_info'},
if info_func not in self.actions: {'explanation': 'Start working', 'function': 'start_work'},
self.actions.append(info_func) {'explanation': 'Exit', 'function': 'exit'},
self.actions.append(exit_func) ]
except: except:
print("=== FAIL ===") print("=== FAIL ===")
else: else:
@ -74,5 +78,4 @@ class NodeManager():
def exit(self): def exit(self):
self.cluster_communication_module.exit() self.cluster_communication_module.exit()
self.service_exploration_module.exit() self.service_exploration_module.exit()

View File

@ -2,7 +2,7 @@ import argparse
def get_args(): def get_args():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('--host', type=str, help="server's IPv4 address", default='0.0.0.0') parser.add_argument('host', type=str, help="server's IPv4 address")
parser.add_argument('--port', type=int, help="server's listening port", default=8888) parser.add_argument('--port', type=int, help="server's listening port", default=8888)
args = parser.parse_args() args = parser.parse_args()