Compare commits

..

No commits in common. "68123809bbd9e5588130d9674c37d36b2e4e0a8b" and "a947f86c9109c7d2413b1ed4673014572a171ec3" have entirely different histories.

3 changed files with 31 additions and 183 deletions

29
main.py
View File

@ -1,8 +1,11 @@
import select
import sys
from src.parser import get_args from src.parser import get_args
from src.node_manager import NodeManager from src.node_manager import NodeManager
actions = [
{'explanation': 'Add another node into our cluster', 'function': 'add_node'},
{'explanation': 'Exit', 'function': 'exit'},
]
if __name__ == '__main__': if __name__ == '__main__':
args = get_args() args = get_args()
host, port = args.host, args.port host, port = args.host, args.port
@ -11,34 +14,24 @@ if __name__ == '__main__':
manager.start_service() manager.start_service()
is_run = True while True:
while is_run:
explanation = '=' * 30 + '\n' explanation = '=' * 30 + '\n'
for index, action in enumerate(manager.actions): for index, action in enumerate(actions):
explanation += f'{index+1}) {action["explanation"]}\n' explanation += f'{index+1}) {action["explanation"]}\n'
explanation += '=' * 30 explanation += '=' * 30
explanation += '\n> ' explanation += '\n> '
print(explanation, end='')
while True: action = input(explanation)
if sys.stdin.closed:
sys.stdin = open(0)
read_list, _, _ = select.select([sys.stdin], [], [], 1)
if read_list:
action = sys.stdin.readline()
try: try:
action = manager.actions[int(action)-1] action = actions[int(action)-1]
if action['function'] == 'exit': if action['function'] == 'exit':
manager.exit() break
is_run = False
else: else:
func = getattr(manager, action['function']) func = getattr(manager, action['function'])
func() func()
break
except: except:
break None
print("\n\n\n") print("\n\n\n")
print("Stopped") print("Stopped")

View File

@ -1,136 +1,28 @@
import socket import socket
import json
from variables import actions
'''
class ClusterCommunicationModule(): class ClusterCommunicationModule():
def __init__(self, host, port, node_manager): def __init__(self, host, port):
# TCP server '''
# In our implementation, the master node link to the worker's TCP server.
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.host = host
self.port = port
self.IP = socket.gethostbyname(socket.gethostname())
self.node_manager = node_manager
self.worker_conns = []
def listen(self):
self.sock.bind((self.host, self.port))
self.sock.listen(1)
print(f"Communication Server(TCP) listening on {self.host}:{self.port}...")
while True:
conn, addr = self.sock.accept()
# first request, we should approve the request
data = conn.recv(1024)
data = data.decode()
if data == '[REQUEST]': # worker side
cont = self.process_request(conn, addr)
if cont:
while True:
# Start receive command
data = self.client_sock.recv(1024)
data = data.decode()
cont = self.handle_command(data)
if not cont:
break
elif data == '[CHECK]': # master side
self.worker_conns.append(conn)
continue
conn.close()
def process_request(self, conn, addr): # worker side
if self.node_manager.status == 'none':
conn.send('[ACCEPT]'.encode())
conn.close()
self.client_sock.connect((addr[0], self.port))
self.client_sock.send('[CHECK]'.encode())
# remove 'add node'
self.node_manager.actions = self.node_manager.actions[1:]
self.node_manager.status = 'worker'
print(f'You are in {addr} cluster now.\nPlease type in Enter to continue')
return True
else:
conn.send('[REJECT]'.encode())
print(f'You just reject the cluster invitation from {addr} now.\nPlease type in Enter to continue')
return False
def handle_command(self, data):
command, args = data.split()
if command == '[INFO]':
data = {'host': self.IP, 'GPU': self.node_manager.GPU, 'GPU_num': self.node_manager.GPU_num}
self.client_sock.send(json.dumps(data).encode())
return True
def request(self, host): # master side
self.client_sock.connect((host, self.port))
self.client_sock.send('[REQUEST]'.encode())
data = self.client_sock.recv(1024)
data = data.decode()
# close client_sock, and waiting for the worker connect to our self.sock
self.client_sock.close()
self.client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if data == '[REJECT]':
print(f"{host} reject.")
return False
elif data == '[ACCEPT]':
self.node_manager.status = 'master'
print(f"{host} accept.")
return True
def cluster_info(self):
ans = []
for conn in self.worker_conns:
try:
conn.send('[INFO] {}'.encode())
data = conn.recv(1024)
data = json.loads(data.decode())
ans.append(data)
except:
self.worker_conns.remove(conn)
return ans
def exit(self):
self.sock.close()
self.client_sock.close()
for conn in self.worker_conns:
conn.close()
class ServiceExplorationModule(): class ServiceExplorationModule():
def __init__(self, host, port, node_manager): def __init__(self, host, port):
# UDP server # UDP server
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.host = host self.host = host
self.port = port self.port = port
self.IP = socket.gethostbyname(socket.gethostname()) self.IP = socket.gethostbyname(socket.gethostname())
self.is_available = True
self.node_manager = node_manager
def listen(self): def listen(self):
self.sock.bind((self.host, self.port)) self.sock.bind((self.host, self.port))
print(f"Exploration Server(UDP) listening on {self.host}:{self.port}...") # print(f"Exploration Server(UDP) listening on {self.host}:{self.port}...")
while True: while True:
data, addr = self.sock.recvfrom(1024) data, addr = self.sock.recvfrom(1024)
if self.node_manager.status == 'none': if self.is_available:
if data.decode() == '[EXPLORE]': if data.decode() == '[EXPLORE]':
self.sock.sendto(self.IP.encode(), addr) self.sock.sendto(self.IP.encode(), addr)
@ -140,7 +32,7 @@ class ServiceExplorationModule():
client_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) client_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) client_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
client_sock.settimeout(1) client_sock.settimeout(3)
client_sock.sendto('[EXPLORE]'.encode(), ('255.255.255.255', self.port)) client_sock.sendto('[EXPLORE]'.encode(), ('255.255.255.255', self.port))
while True: while True:
@ -153,5 +45,5 @@ class ServiceExplorationModule():
break break
return available_host return available_host
def exit(self): def change_available(self, new_status):
self.sock.close() self.is_available = new_status

View File

@ -1,33 +1,16 @@
import threading import threading
from src.communication import ServiceExplorationModule, ClusterCommunicationModule from src.communication import ServiceExplorationModule
import time import time
class NodeManager(): class NodeManager():
def __init__(self, host, port): def __init__(self, host, port):
self.status = 'none' self.status = 'none'
self.actions = [
{'explanation': 'Add another node into our cluster', 'function': 'add_node'},
{'explanation': 'Exit', 'function': 'exit'},
]
self.GPU = 'RTX 4090'
self.GPU_num = 1
# start Cluster Communication Module
# let the nodes in the cluster can communicate
self.cluster_communication_module = ClusterCommunicationModule(host, port, self)
# start Service Exploration Module # start Service Exploration Module
# let all client can know which IP address has our service so that it can link to. # let all client can know which IP address has our service so that it can link to.
self.service_exploration_module = ServiceExplorationModule(host, port+1, self) self.service_exploration_module = ServiceExplorationModule(host, port+1)
time.sleep(1)
time.sleep(2)
def start_service(self): def start_service(self):
communication_thread = threading.Thread(target=self.cluster_communication_module.listen)
communication_thread.daemon = True
communication_thread.start()
explore_service_thread = threading.Thread(target=self.service_exploration_module.listen) explore_service_thread = threading.Thread(target=self.service_exploration_module.listen)
explore_service_thread.daemon = True explore_service_thread.daemon = True
explore_service_thread.start() explore_service_thread.start()
@ -42,29 +25,9 @@ class NodeManager():
choose = input(msg) choose = input(msg)
try: try:
choose = int(choose)-1 choose = int(choose)-1
accept = self.cluster_communication_module.request(hosts[choose]) print(f"Link to {hosts[choose]}")
if accept:
exit_func = self.actions[-1]
self.actions = self.actions[:-1]
info_func = {'explanation': 'cluster info', 'function': 'cluster_info'}
if info_func not in self.actions:
self.actions.append(info_func)
self.actions.append(exit_func)
except: except:
print("=== FAIL ===") print("=== FAIL ===")
else: else:
print("No other nodes in your subnet.") print("No other nodes in your subnet.")
def cluster_info(self):
info = self.cluster_communication_module.cluster_info()
print(f"\nThere are {len(info)+1} nodes in this cluster.")
print("Cluster Info:")
print(f" {self.service_exploration_module.IP}(local) -> {self.GPU} * {self.GPU_num}")
for host in info:
print(f" {host['host']} -> {host['GPU']} * {host['GPU_num']}")
def exit(self):
self.cluster_communication_module.exit()
self.service_exploration_module.exit()