218 lines
7.9 KiB
Python
218 lines
7.9 KiB
Python
import socket
|
|
import json
|
|
import docker
|
|
import time
|
|
|
|
class ClusterCommunicationModule():
|
|
def __init__(self, host, port, node_manager):
|
|
# TCP server
|
|
# In our implementation, the master node link to the worker's TCP server.
|
|
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
|
|
self.client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.host = host
|
|
self.port = port
|
|
|
|
self.node_manager = node_manager
|
|
self.worker_conns = []
|
|
|
|
def listen(self):
|
|
self.sock.bind((self.host, self.port))
|
|
self.sock.listen(1)
|
|
print(f"Communication Server(TCP) listening on {self.host}:{self.port}...")
|
|
|
|
while True:
|
|
conn, addr = self.sock.accept()
|
|
|
|
# first request, we should approve the request
|
|
data = conn.recv(1024)
|
|
data = data.decode()
|
|
|
|
if data == '[REQUEST]': # worker side
|
|
cont = self.process_request(conn, addr)
|
|
if cont:
|
|
while True:
|
|
# Start receive command
|
|
data = self.client_sock.recv(1024)
|
|
data = data.decode()
|
|
|
|
cont = self.handle_command(data)
|
|
if not cont:
|
|
self.client_sock.close()
|
|
self.client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
break
|
|
elif data == '[CHECK]': # master side
|
|
# build docker swarm
|
|
if self.node_manager.docker_client.swarm.attrs == {}:
|
|
print("Build new docker swarm...")
|
|
self.node_manager.docker_client.swarm.init(advertise_addr=self.host, listen_addr=f"{self.host}:2377", force_new_cluster=True)
|
|
|
|
# send docker swarm token to the worker
|
|
token = self.node_manager.docker_client.swarm.attrs['JoinTokens']['Worker']
|
|
conn.send(f'[DOCKER_TOKEN] {token}'.encode())
|
|
print(f"Send token: {token} to the worker.")
|
|
print("Please Enter to continue...")
|
|
|
|
self.worker_conns.append(conn)
|
|
continue
|
|
|
|
conn.close()
|
|
|
|
def process_request(self, conn, addr): # worker side
|
|
if self.node_manager.status == 'none':
|
|
conn.send('[ACCEPT]'.encode())
|
|
conn.close()
|
|
self.client_sock.connect((addr[0], self.port))
|
|
self.client_sock.send('[CHECK]'.encode())
|
|
|
|
# join docker swarm cluster
|
|
token = self.client_sock.recv(1024).decode().split(' ')[-1]
|
|
print("Receive Docker Swarm Join_Token=", token)
|
|
status = self.node_manager.docker_client.swarm.join(remote_addrs=[f'{addr[0]}:2377'], join_token=token)
|
|
|
|
if not status:
|
|
print("Some Errors!")
|
|
|
|
self.node_manager.actions = [
|
|
{'explanation': 'Exit', 'function': 'exit'},
|
|
]
|
|
|
|
self.node_manager.status = 'worker'
|
|
print(f'You are in {addr} cluster now.\nPlease type in Enter to continue')
|
|
return True
|
|
else:
|
|
conn.send('[REJECT]'.encode())
|
|
print(f'You just reject the cluster invitation from {addr} now.\nPlease type in Enter to continue')
|
|
return False
|
|
|
|
def handle_command(self, data):
|
|
command, args = data.split(' ')
|
|
if command == '[INFO]':
|
|
data = {'host': self.host, 'GPU': self.node_manager.GPU, 'GPU_num': self.node_manager.GPU_num}
|
|
self.client_sock.send(json.dumps(data).encode())
|
|
elif command == '[STOP]':
|
|
print("Receive STOP signal")
|
|
data = {'host': self.host}
|
|
self.client_sock.send(f'[STOP_CHECK] {json.dumps(data)}'.encode())
|
|
self.node_manager.docker_client.swarm.leave()
|
|
self.node_manager.status = 'none'
|
|
|
|
print("You have leaved the cluster.")
|
|
return False
|
|
elif command == '[START_LISTEN_TASK]':
|
|
self.client_sock.send('[START_LISTEN_TASK_CHECK] {}'.encode())
|
|
print("The master has started listening for new task from Sepolia testnet...")
|
|
|
|
return True
|
|
|
|
|
|
def request(self, host): # master side
|
|
self.client_sock.connect((host, self.port))
|
|
self.client_sock.send('[REQUEST]'.encode())
|
|
data = self.client_sock.recv(1024)
|
|
data = data.decode()
|
|
|
|
# close client_sock, and waiting for the worker connect to our self.sock
|
|
self.client_sock.close()
|
|
self.client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
|
if data == '[REJECT]':
|
|
print(f"{host} reject.")
|
|
return False
|
|
|
|
elif data == '[ACCEPT]':
|
|
self.node_manager.status = 'master'
|
|
print(f"{host} accept.")
|
|
return True
|
|
|
|
|
|
def cluster_info(self):
|
|
ans = []
|
|
for conn in self.worker_conns:
|
|
try:
|
|
conn.send('[INFO] {}'.encode())
|
|
data = conn.recv(1024)
|
|
data = json.loads(data.decode())
|
|
ans.append(data)
|
|
except:
|
|
self.worker_conns.remove(conn)
|
|
print("1 worker disconnnected.")
|
|
return ans
|
|
|
|
def start_listen_task(self):
|
|
disconnected_counter = 0
|
|
for conn in self.worker_conns:
|
|
try:
|
|
conn.send('[START_LISTEN_TASK] {}'.encode())
|
|
data, args = conn.recv(1024).decode().split(' ')
|
|
except:
|
|
disconnected_counter += 1
|
|
self.worker_conns.remove(conn)
|
|
if disconnected_counter != 0:
|
|
print(f'{disconnected_counter} worker disconnected.')
|
|
else:
|
|
print(f'All other {len(self.worker_conns)} workers are waiting...')
|
|
|
|
def exit(self):
|
|
if self.node_manager.status == 'master':
|
|
for conn in self.worker_conns:
|
|
conn.send('[STOP] {}'.encode())
|
|
check, args = conn.recv(1024).decode().split(' ')
|
|
print(f'{args} has stopped.')
|
|
self.node_manager.docker_client.swarm.leave(force=True)
|
|
|
|
if self.node_manager.status == 'worker':
|
|
self.node_manager.docker_client.swarm.leave()
|
|
|
|
self.sock.close()
|
|
self.client_sock.close()
|
|
for conn in self.worker_conns:
|
|
conn.close()
|
|
|
|
|
|
class ServiceExplorationModule():
|
|
def __init__(self, host, port, node_manager):
|
|
# UDP server
|
|
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
|
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
|
|
self.host = host
|
|
self.port = port
|
|
|
|
self.node_manager = node_manager
|
|
|
|
def listen(self):
|
|
self.sock.bind(('0.0.0.0', self.port))
|
|
print(f"Exploration Server(UDP) listening on {self.host}:{self.port}...")
|
|
|
|
while True:
|
|
data, addr = self.sock.recvfrom(1024)
|
|
|
|
if self.node_manager.status == 'none':
|
|
if data.decode() == '[EXPLORE]':
|
|
self.sock.sendto(self.host.encode(), addr)
|
|
|
|
def explore(self):
|
|
available_host = []
|
|
|
|
client_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
client_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
|
|
|
client_sock.settimeout(1)
|
|
client_sock.sendto('[EXPLORE]'.encode(), ('255.255.255.255', self.port))
|
|
|
|
while True:
|
|
try:
|
|
data, addr = client_sock.recvfrom(1024)
|
|
if addr[0] != self.host:
|
|
available_host.append(addr[0])
|
|
except:
|
|
# if socket timeout
|
|
break
|
|
return available_host
|
|
|
|
def exit(self):
|
|
self.sock.close()
|