From a947f86c9109c7d2413b1ed4673014572a171ec3 Mon Sep 17 00:00:00 2001 From: Ting-Jun Wang Date: Sat, 18 May 2024 03:02:31 +0800 Subject: [PATCH] feat: extract action from node manager --- main.py | 45 +++++++++++++++++++++++++++++++------------- src/communication.py | 13 ++++++++++--- src/node_manager.py | 33 ++++++++++++++++++++++++++++++++ src/parser.py | 1 - 4 files changed, 75 insertions(+), 17 deletions(-) create mode 100644 src/node_manager.py diff --git a/main.py b/main.py index 9058904..49a0afb 100644 --- a/main.py +++ b/main.py @@ -1,21 +1,40 @@ -import argparse -import threading -from src.communication import ServiceExplorationModule from src.parser import get_args +from src.node_manager import NodeManager + +actions = [ + {'explanation': 'Add another node into our cluster', 'function': 'add_node'}, + {'explanation': 'Exit', 'function': 'exit'}, + ] if __name__ == '__main__': args = get_args() - - socket_type = args.type host, port = args.host, args.port - # start Service Exploration Module - # let all client can know which IP address has our service so that it can link to. - service_explore = ServiceExplorationModule(host, port+1) - explore_service_thread = threading.Thread(target=service_explore.listen) - explore_service_thread.start() + manager = NodeManager(host, port) + manager.start_service() - if socket_type == 'client': - hosts = service_explore.explore() - print(hosts) + + while True: + explanation = '=' * 30 + '\n' + for index, action in enumerate(actions): + explanation += f'{index+1}) {action["explanation"]}\n' + explanation += '=' * 30 + explanation += '\n> ' + + action = input(explanation) + + try: + action = actions[int(action)-1] + if action['function'] == 'exit': + break + else: + func = getattr(manager, action['function']) + func() + except: + None + + print("\n\n\n") + print("Stopped") + + diff --git a/src/communication.py b/src/communication.py index ac8d00b..9420b67 100644 --- a/src/communication.py +++ b/src/communication.py @@ -1,5 +1,10 @@ import socket +''' +class ClusterCommunicationModule(): + def __init__(self, host, port): +''' + class ServiceExplorationModule(): def __init__(self, host, port): # UDP server @@ -12,14 +17,13 @@ class ServiceExplorationModule(): def listen(self): self.sock.bind((self.host, self.port)) - print(f"Exploration Server(UDP) listening on {self.host}:{self.port}...") + # print(f"Exploration Server(UDP) listening on {self.host}:{self.port}...") while True: data, addr = self.sock.recvfrom(1024) if self.is_available: if data.decode() == '[EXPLORE]': - print(f"Receive exploration broadcast from {addr}") self.sock.sendto(self.IP.encode(), addr) def explore(self): @@ -35,8 +39,11 @@ class ServiceExplorationModule(): try: data, addr = client_sock.recvfrom(1024) if addr[0] != self.IP: - available_host.append(addr) + available_host.append(addr[0]) except: # if socket timeout break return available_host + + def change_available(self, new_status): + self.is_available = new_status diff --git a/src/node_manager.py b/src/node_manager.py new file mode 100644 index 0000000..3928677 --- /dev/null +++ b/src/node_manager.py @@ -0,0 +1,33 @@ +import threading +from src.communication import ServiceExplorationModule +import time + +class NodeManager(): + def __init__(self, host, port): + self.status = 'none' + # start Service Exploration Module + # let all client can know which IP address has our service so that it can link to. + self.service_exploration_module = ServiceExplorationModule(host, port+1) + time.sleep(1) + + def start_service(self): + explore_service_thread = threading.Thread(target=self.service_exploration_module.listen) + explore_service_thread.daemon = True + explore_service_thread.start() + + def add_node(self): + hosts = self.service_exploration_module.explore() + if len(hosts) != 0: + msg = "These are the nodes you can request for join into our cluster: \n" + msg += '\n'.join([f'{index+1}) {host}' for index, host in enumerate(hosts)]) + msg += '\n> ' + + choose = input(msg) + try: + choose = int(choose)-1 + print(f"Link to {hosts[choose]}") + except: + print("=== FAIL ===") + else: + print("No other nodes in your subnet.") + diff --git a/src/parser.py b/src/parser.py index c158307..2487f2e 100644 --- a/src/parser.py +++ b/src/parser.py @@ -2,7 +2,6 @@ import argparse def get_args(): parser = argparse.ArgumentParser() - parser.add_argument('type', type=str, help="'client' or 'server'", default='server') parser.add_argument('--host', type=str, help="server's IPv4 address", default='0.0.0.0') parser.add_argument('--port', type=int, help="server's listening port", default=8888)