From 01e673c07c700aeeb9235a365b5ee53cd2da9bd6 Mon Sep 17 00:00:00 2001 From: Ting-Jun Wang Date: Sat, 18 May 2024 17:17:44 +0800 Subject: [PATCH] feat: finish the communication between master and workers --- main.py | 34 ++++++++----- src/communication.py | 115 +++++++++++++++++++++++++++++++++++++------ src/node_manager.py | 33 ++++++++++--- 3 files changed, 148 insertions(+), 34 deletions(-) diff --git a/main.py b/main.py index 49a0afb..a735265 100644 --- a/main.py +++ b/main.py @@ -1,3 +1,5 @@ +import select +import sys from src.parser import get_args from src.node_manager import NodeManager @@ -14,24 +16,34 @@ if __name__ == '__main__': manager.start_service() - while True: + is_run = True + while is_run: explanation = '=' * 30 + '\n' for index, action in enumerate(actions): explanation += f'{index+1}) {action["explanation"]}\n' explanation += '=' * 30 explanation += '\n> ' + print(explanation, end='') - action = input(explanation) + while True: + if sys.stdin.closed: + sys.stdin = open(0) - try: - action = actions[int(action)-1] - if action['function'] == 'exit': - break - else: - func = getattr(manager, action['function']) - func() - except: - None + read_list, _, _ = select.select([sys.stdin], [], [], 1) + + if read_list: + action = sys.stdin.readline() + try: + action = actions[int(action)-1] + if action['function'] == 'exit': + manager.exit() + is_run = False + else: + func = getattr(manager, action['function']) + func() + break + except: + break print("\n\n\n") print("Stopped") diff --git a/src/communication.py b/src/communication.py index 9420b67..a169f2e 100644 --- a/src/communication.py +++ b/src/communication.py @@ -1,28 +1,113 @@ import socket -''' class ClusterCommunicationModule(): - def __init__(self, host, port): -''' + def __init__(self, host, port, node_manager): + # TCP server + # In our implementation, the master node link to the worker's TCP server. + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) -class ServiceExplorationModule(): - def __init__(self, host, port): - # UDP server - self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + self.client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.host = host self.port = port - self.IP = socket.gethostbyname(socket.gethostname()) - self.is_available = True + + + self.node_manager = node_manager + self.worker_conns = [] def listen(self): self.sock.bind((self.host, self.port)) - # print(f"Exploration Server(UDP) listening on {self.host}:{self.port}...") + self.sock.listen(1) + print(f"Communication Server(TCP) listening on {self.host}:{self.port}...") + + while True: + conn, addr = self.sock.accept() + + # first request, we should approve the request + data = conn.recv(1024) + data = data.decode() + + if data == '[REQUEST]': # worker side + cont = self.process_request(conn, addr) + if cont: + while True: + # Start receive command + data = self.client_sock.recv(1024) + data = data.decode() + + cont = self.handle_command(data) + if not cont: + break + elif data == '[CHECK]': # master side + self.worker_conns.append(conn) + conn.send('TEST'.encode()) + continue + + + conn.close() + + def process_request(self, conn, addr): # worker side + if self.node_manager.status == 'none': + conn.send('[ACCEPT]'.encode()) + conn.close() + self.client_sock.connect((addr[0], self.port)) + self.client_sock.send('[CHECK]'.encode()) + + self.node_manager.status = 'worker' + print(f'You are in {addr} cluster now.\nPlease type in Enter to continue') + return True + else: + conn.send('[REJECT]'.encode()) + print(f'You just reject the cluster invitation from {addr} now.\nPlease type in Enter to continue') + return False + + def handle_command(self, data): + return True + + def request(self, host): # master side + self.client_sock.connect((host, self.port)) + self.client_sock.send('[REQUEST]'.encode()) + data = self.client_sock.recv(1024) + data = data.decode() + + # close client_sock, and waiting for the worker connect to our self.sock + self.client_sock.close() + self.client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + if data == '[REJECT]': + print(f"{host} reject.") + elif data == '[ACCEPT]': + self.node_manager.status = 'master' + print(f"{host} accept.") + + def exit(self): + self.sock.close() + self.client_sock.close() + for conn in self.worker_conns: + conn.close() + + +class ServiceExplorationModule(): + def __init__(self, host, port, node_manager): + # UDP server + self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + + self.host = host + self.port = port + self.IP = socket.gethostbyname(socket.gethostname()) + + self.node_manager = node_manager + + def listen(self): + self.sock.bind((self.host, self.port)) + print(f"Exploration Server(UDP) listening on {self.host}:{self.port}...") while True: data, addr = self.sock.recvfrom(1024) - if self.is_available: + if self.node_manager.status == 'none': if data.decode() == '[EXPLORE]': self.sock.sendto(self.IP.encode(), addr) @@ -32,7 +117,7 @@ class ServiceExplorationModule(): client_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) client_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - client_sock.settimeout(3) + client_sock.settimeout(1) client_sock.sendto('[EXPLORE]'.encode(), ('255.255.255.255', self.port)) while True: @@ -45,5 +130,5 @@ class ServiceExplorationModule(): break return available_host - def change_available(self, new_status): - self.is_available = new_status + def exit(self): + self.sock.close() diff --git a/src/node_manager.py b/src/node_manager.py index 3928677..4669658 100644 --- a/src/node_manager.py +++ b/src/node_manager.py @@ -1,16 +1,26 @@ import threading -from src.communication import ServiceExplorationModule +from src.communication import ServiceExplorationModule, ClusterCommunicationModule import time class NodeManager(): def __init__(self, host, port): self.status = 'none' + + # start Cluster Communication Module + # let the nodes in the cluster can communicate + self.cluster_communication_module = ClusterCommunicationModule(host, port, self) + # start Service Exploration Module # let all client can know which IP address has our service so that it can link to. - self.service_exploration_module = ServiceExplorationModule(host, port+1) - time.sleep(1) + self.service_exploration_module = ServiceExplorationModule(host, port+1, self) + + time.sleep(2) def start_service(self): + communication_thread = threading.Thread(target=self.cluster_communication_module.listen) + communication_thread.daemon = True + communication_thread.start() + explore_service_thread = threading.Thread(target=self.service_exploration_module.listen) explore_service_thread.daemon = True explore_service_thread.start() @@ -23,11 +33,18 @@ class NodeManager(): msg += '\n> ' choose = input(msg) - try: - choose = int(choose)-1 - print(f"Link to {hosts[choose]}") - except: - print("=== FAIL ===") + # try: + choose = int(choose)-1 + print("START REQUEST") + self.cluster_communication_module.request(hosts[choose]) + print(f"Link to {hosts[choose]}") + # except: + # print("=== FAIL ===") else: print("No other nodes in your subnet.") + def exit(self): + self.cluster_communication_module.exit() + self.service_exploration_module.exit() + +