Compare commits
No commits in common. "4f035b6d5336e103b8f0e853d87657b75d09a95e" and "7bf0f39d3936d2cd87ea7dbaa50bcad4cdf922c6" have entirely different histories.
4f035b6d53
...
7bf0f39d39
@ -1,3 +1 @@
|
|||||||
web3==6.18.0
|
web3==6.18.0
|
||||||
docker
|
|
||||||
|
|
||||||
|
|||||||
@ -1,7 +1,5 @@
|
|||||||
import socket
|
import socket
|
||||||
import json
|
import json
|
||||||
import docker
|
|
||||||
import time
|
|
||||||
|
|
||||||
class ClusterCommunicationModule():
|
class ClusterCommunicationModule():
|
||||||
def __init__(self, host, port, node_manager):
|
def __init__(self, host, port, node_manager):
|
||||||
@ -39,21 +37,8 @@ class ClusterCommunicationModule():
|
|||||||
|
|
||||||
cont = self.handle_command(data)
|
cont = self.handle_command(data)
|
||||||
if not cont:
|
if not cont:
|
||||||
self.client_sock.close()
|
|
||||||
self.client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
||||||
break
|
break
|
||||||
elif data == '[CHECK]': # master side
|
elif data == '[CHECK]': # master side
|
||||||
# build docker swarm
|
|
||||||
if self.node_manager.docker_client.swarm.attrs == {}:
|
|
||||||
print("Build new docker swarm...")
|
|
||||||
self.node_manager.docker_client.swarm.init(advertise_addr=self.host, listen_addr=f"{self.host}:2377", force_new_cluster=True)
|
|
||||||
|
|
||||||
# send docker swarm token to the worker
|
|
||||||
token = self.node_manager.docker_client.swarm.attrs['JoinTokens']['Worker']
|
|
||||||
conn.send(f'[DOCKER_TOKEN] {token}'.encode())
|
|
||||||
print(f"Send token: {token} to the worker.")
|
|
||||||
print("Please Enter to continue...")
|
|
||||||
|
|
||||||
self.worker_conns.append(conn)
|
self.worker_conns.append(conn)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
@ -66,17 +51,8 @@ class ClusterCommunicationModule():
|
|||||||
self.client_sock.connect((addr[0], self.port))
|
self.client_sock.connect((addr[0], self.port))
|
||||||
self.client_sock.send('[CHECK]'.encode())
|
self.client_sock.send('[CHECK]'.encode())
|
||||||
|
|
||||||
# join docker swarm cluster
|
# remove 'add node'
|
||||||
token = self.client_sock.recv(1024).decode().split(' ')[-1]
|
self.node_manager.actions = self.node_manager.actions[1:]
|
||||||
print("Receive Docker Swarm Join_Token=", token)
|
|
||||||
status = self.node_manager.docker_client.swarm.join(remote_addrs=[f'{addr[0]}:2377'], join_token=token)
|
|
||||||
|
|
||||||
if not status:
|
|
||||||
print("Some Errors!")
|
|
||||||
|
|
||||||
self.node_manager.actions = [
|
|
||||||
{'explanation': 'Exit', 'function': 'exit'},
|
|
||||||
]
|
|
||||||
|
|
||||||
self.node_manager.status = 'worker'
|
self.node_manager.status = 'worker'
|
||||||
print(f'You are in {addr} cluster now.\nPlease type in Enter to continue')
|
print(f'You are in {addr} cluster now.\nPlease type in Enter to continue')
|
||||||
@ -87,21 +63,12 @@ class ClusterCommunicationModule():
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
def handle_command(self, data):
|
def handle_command(self, data):
|
||||||
command, args = data.split(' ')
|
command, args = data.split()
|
||||||
if command == '[INFO]':
|
if command == '[INFO]':
|
||||||
data = {'host': self.host, 'GPU': self.node_manager.GPU, 'GPU_num': self.node_manager.GPU_num}
|
data = {'host': self.host, 'GPU': self.node_manager.GPU, 'GPU_num': self.node_manager.GPU_num}
|
||||||
self.client_sock.send(json.dumps(data).encode())
|
self.client_sock.send(json.dumps(data).encode())
|
||||||
elif command == '[STOP]':
|
|
||||||
print("Receive STOP signal")
|
return True
|
||||||
data = {'host': self.host}
|
|
||||||
self.client_sock.send(f'[STOP_CHECK] {json.dumps(data)}'.encode())
|
|
||||||
self.node_manager.docker_client.swarm.leave()
|
|
||||||
self.node_manager.status = 'none'
|
|
||||||
|
|
||||||
print("You have leaved the cluster.")
|
|
||||||
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def request(self, host): # master side
|
def request(self, host): # master side
|
||||||
self.client_sock.connect((host, self.port))
|
self.client_sock.connect((host, self.port))
|
||||||
@ -116,37 +83,25 @@ class ClusterCommunicationModule():
|
|||||||
if data == '[REJECT]':
|
if data == '[REJECT]':
|
||||||
print(f"{host} reject.")
|
print(f"{host} reject.")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
elif data == '[ACCEPT]':
|
elif data == '[ACCEPT]':
|
||||||
self.node_manager.status = 'master'
|
self.node_manager.status = 'master'
|
||||||
|
|
||||||
print(f"{host} accept.")
|
print(f"{host} accept.")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
def cluster_info(self):
|
def cluster_info(self):
|
||||||
ans = []
|
ans = []
|
||||||
for conn in self.worker_conns:
|
for conn in self.worker_conns:
|
||||||
try:
|
try:
|
||||||
conn.send('[INFO] {}'.encode())
|
conn.send('[INFO] {}'.encode())
|
||||||
data = conn.recv(1024)
|
data = conn.recv(1024)
|
||||||
data = json.loads(data.decode())
|
data = json.loads(data.decode())
|
||||||
ans.append(data)
|
ans.append(data)
|
||||||
except:
|
except:
|
||||||
self.worker_conns.remove(conn)
|
self.worker_conns.remove(conn)
|
||||||
print("1 worker disconnnected.")
|
|
||||||
return ans
|
return ans
|
||||||
|
|
||||||
def exit(self):
|
def exit(self):
|
||||||
if self.node_manager.status == 'master':
|
|
||||||
for conn in self.worker_conns:
|
|
||||||
conn.send('[STOP] {}'.encode())
|
|
||||||
check, args = conn.recv(1024).decode().split(' ')
|
|
||||||
print(f'{args} has stopped.')
|
|
||||||
self.node_manager.docker_client.swarm.leave(force=True)
|
|
||||||
|
|
||||||
if self.node_manager.status == 'worker':
|
|
||||||
self.node_manager.docker_client.swarm.leave()
|
|
||||||
|
|
||||||
self.sock.close()
|
self.sock.close()
|
||||||
self.client_sock.close()
|
self.client_sock.close()
|
||||||
for conn in self.worker_conns:
|
for conn in self.worker_conns:
|
||||||
@ -172,7 +127,6 @@ class ServiceExplorationModule():
|
|||||||
while True:
|
while True:
|
||||||
data, addr = self.sock.recvfrom(1024)
|
data, addr = self.sock.recvfrom(1024)
|
||||||
|
|
||||||
print(self.node_manager.status)
|
|
||||||
if self.node_manager.status == 'none':
|
if self.node_manager.status == 'none':
|
||||||
if data.decode() == '[EXPLORE]':
|
if data.decode() == '[EXPLORE]':
|
||||||
self.sock.sendto(self.host.encode(), addr)
|
self.sock.sendto(self.host.encode(), addr)
|
||||||
|
|||||||
@ -2,7 +2,6 @@ import threading
|
|||||||
from src.communication import ServiceExplorationModule, ClusterCommunicationModule
|
from src.communication import ServiceExplorationModule, ClusterCommunicationModule
|
||||||
import torch
|
import torch
|
||||||
import time
|
import time
|
||||||
import docker
|
|
||||||
|
|
||||||
class NodeManager():
|
class NodeManager():
|
||||||
def __init__(self, host, port):
|
def __init__(self, host, port):
|
||||||
@ -22,9 +21,6 @@ class NodeManager():
|
|||||||
# let all client can know which IP address has our service so that it can link to.
|
# let all client can know which IP address has our service so that it can link to.
|
||||||
self.service_exploration_module = ServiceExplorationModule(host, port+1, self)
|
self.service_exploration_module = ServiceExplorationModule(host, port+1, self)
|
||||||
|
|
||||||
# docker client
|
|
||||||
self.docker_client = docker.from_env()
|
|
||||||
|
|
||||||
time.sleep(2)
|
time.sleep(2)
|
||||||
|
|
||||||
def get_GPU_info(self):
|
def get_GPU_info(self):
|
||||||
@ -56,12 +52,12 @@ class NodeManager():
|
|||||||
choose = int(choose)-1
|
choose = int(choose)-1
|
||||||
accept = self.cluster_communication_module.request(hosts[choose])
|
accept = self.cluster_communication_module.request(hosts[choose])
|
||||||
if accept:
|
if accept:
|
||||||
self.actions = [
|
exit_func = self.actions[-1]
|
||||||
{'explanation': 'Add another node into our cluster', 'function': 'add_node'},
|
self.actions = self.actions[:-1]
|
||||||
{'explanation': 'Cluster info', 'function': 'cluster_info'},
|
info_func = {'explanation': 'cluster info', 'function': 'cluster_info'}
|
||||||
{'explanation': 'Start working', 'function': 'start_work'},
|
if info_func not in self.actions:
|
||||||
{'explanation': 'Exit', 'function': 'exit'},
|
self.actions.append(info_func)
|
||||||
]
|
self.actions.append(exit_func)
|
||||||
except:
|
except:
|
||||||
print("=== FAIL ===")
|
print("=== FAIL ===")
|
||||||
else:
|
else:
|
||||||
@ -78,4 +74,5 @@ class NodeManager():
|
|||||||
def exit(self):
|
def exit(self):
|
||||||
self.cluster_communication_module.exit()
|
self.cluster_communication_module.exit()
|
||||||
self.service_exploration_module.exit()
|
self.service_exploration_module.exit()
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -2,7 +2,7 @@ import argparse
|
|||||||
|
|
||||||
def get_args():
|
def get_args():
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
parser.add_argument('host', type=str, help="server's IPv4 address")
|
parser.add_argument('--host', type=str, help="server's IPv4 address", default='0.0.0.0')
|
||||||
parser.add_argument('--port', type=int, help="server's listening port", default=8888)
|
parser.add_argument('--port', type=int, help="server's listening port", default=8888)
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user