diff --git a/main.py b/main.py index a735265..1c4b882 100644 --- a/main.py +++ b/main.py @@ -3,11 +3,6 @@ import sys from src.parser import get_args from src.node_manager import NodeManager -actions = [ - {'explanation': 'Add another node into our cluster', 'function': 'add_node'}, - {'explanation': 'Exit', 'function': 'exit'}, - ] - if __name__ == '__main__': args = get_args() host, port = args.host, args.port @@ -19,7 +14,7 @@ if __name__ == '__main__': is_run = True while is_run: explanation = '=' * 30 + '\n' - for index, action in enumerate(actions): + for index, action in enumerate(manager.actions): explanation += f'{index+1}) {action["explanation"]}\n' explanation += '=' * 30 explanation += '\n> ' @@ -34,7 +29,7 @@ if __name__ == '__main__': if read_list: action = sys.stdin.readline() try: - action = actions[int(action)-1] + action = manager.actions[int(action)-1] if action['function'] == 'exit': manager.exit() is_run = False diff --git a/src/communication.py b/src/communication.py index a169f2e..3f136aa 100644 --- a/src/communication.py +++ b/src/communication.py @@ -1,4 +1,6 @@ import socket +import json +from variables import actions class ClusterCommunicationModule(): def __init__(self, host, port, node_manager): @@ -10,7 +12,7 @@ class ClusterCommunicationModule(): self.client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.host = host self.port = port - + self.IP = socket.gethostbyname(socket.gethostname()) self.node_manager = node_manager self.worker_conns = [] @@ -40,10 +42,8 @@ class ClusterCommunicationModule(): break elif data == '[CHECK]': # master side self.worker_conns.append(conn) - conn.send('TEST'.encode()) continue - conn.close() def process_request(self, conn, addr): # worker side @@ -53,6 +53,9 @@ class ClusterCommunicationModule(): self.client_sock.connect((addr[0], self.port)) self.client_sock.send('[CHECK]'.encode()) + # remove 'add node' + self.node_manager.actions = self.node_manager.actions[1:] + self.node_manager.status = 'worker' print(f'You are in {addr} cluster now.\nPlease type in Enter to continue') return True @@ -62,6 +65,11 @@ class ClusterCommunicationModule(): return False def handle_command(self, data): + command, args = data.split() + if command == '[INFO]': + data = {'host': self.IP, 'GPU': self.node_manager.GPU, 'GPU_num': self.node_manager.GPU_num} + self.client_sock.send(json.dumps(data).encode()) + return True def request(self, host): # master side @@ -76,9 +84,24 @@ class ClusterCommunicationModule(): if data == '[REJECT]': print(f"{host} reject.") + return False elif data == '[ACCEPT]': self.node_manager.status = 'master' + print(f"{host} accept.") + return True + + def cluster_info(self): + ans = [] + for conn in self.worker_conns: + try: + conn.send('[INFO] {}'.encode()) + data = conn.recv(1024) + data = json.loads(data.decode()) + ans.append(data) + except: + self.worker_conns.remove(conn) + return ans def exit(self): self.sock.close() diff --git a/src/node_manager.py b/src/node_manager.py index 4669658..1421f9c 100644 --- a/src/node_manager.py +++ b/src/node_manager.py @@ -2,9 +2,16 @@ import threading from src.communication import ServiceExplorationModule, ClusterCommunicationModule import time + class NodeManager(): def __init__(self, host, port): self.status = 'none' + self.actions = [ + {'explanation': 'Add another node into our cluster', 'function': 'add_node'}, + {'explanation': 'Exit', 'function': 'exit'}, + ] + self.GPU = 'RTX 4090' + self.GPU_num = 1 # start Cluster Communication Module # let the nodes in the cluster can communicate @@ -33,15 +40,28 @@ class NodeManager(): msg += '\n> ' choose = input(msg) - # try: - choose = int(choose)-1 - print("START REQUEST") - self.cluster_communication_module.request(hosts[choose]) - print(f"Link to {hosts[choose]}") - # except: - # print("=== FAIL ===") + try: + choose = int(choose)-1 + accept = self.cluster_communication_module.request(hosts[choose]) + if accept: + exit_func = self.actions[-1] + self.actions = self.actions[:-1] + info_func = {'explanation': 'cluster info', 'function': 'cluster_info'} + if info_func not in self.actions: + self.actions.append(info_func) + self.actions.append(exit_func) + except: + print("=== FAIL ===") else: print("No other nodes in your subnet.") + + def cluster_info(self): + info = self.cluster_communication_module.cluster_info() + print(f"\nThere are {len(info)+1} nodes in this cluster.") + print("Cluster Info:") + print(f" {self.service_exploration_module.IP}(local) -> {self.GPU} * {self.GPU_num}") + for host in info: + print(f" {host['host']} -> {host['GPU']} * {host['GPU_num']}") def exit(self): self.cluster_communication_module.exit()