feat: extract action from node manager
This commit is contained in:
parent
4fa614776d
commit
a947f86c91
45
main.py
45
main.py
@ -1,21 +1,40 @@
|
|||||||
import argparse
|
|
||||||
import threading
|
|
||||||
from src.communication import ServiceExplorationModule
|
|
||||||
from src.parser import get_args
|
from src.parser import get_args
|
||||||
|
from src.node_manager import NodeManager
|
||||||
|
|
||||||
|
actions = [
|
||||||
|
{'explanation': 'Add another node into our cluster', 'function': 'add_node'},
|
||||||
|
{'explanation': 'Exit', 'function': 'exit'},
|
||||||
|
]
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
args = get_args()
|
args = get_args()
|
||||||
|
|
||||||
socket_type = args.type
|
|
||||||
host, port = args.host, args.port
|
host, port = args.host, args.port
|
||||||
|
|
||||||
# start Service Exploration Module
|
manager = NodeManager(host, port)
|
||||||
# let all client can know which IP address has our service so that it can link to.
|
manager.start_service()
|
||||||
service_explore = ServiceExplorationModule(host, port+1)
|
|
||||||
explore_service_thread = threading.Thread(target=service_explore.listen)
|
|
||||||
explore_service_thread.start()
|
while True:
|
||||||
|
explanation = '=' * 30 + '\n'
|
||||||
|
for index, action in enumerate(actions):
|
||||||
|
explanation += f'{index+1}) {action["explanation"]}\n'
|
||||||
|
explanation += '=' * 30
|
||||||
|
explanation += '\n> '
|
||||||
|
|
||||||
|
action = input(explanation)
|
||||||
|
|
||||||
|
try:
|
||||||
|
action = actions[int(action)-1]
|
||||||
|
if action['function'] == 'exit':
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
func = getattr(manager, action['function'])
|
||||||
|
func()
|
||||||
|
except:
|
||||||
|
None
|
||||||
|
|
||||||
|
print("\n\n\n")
|
||||||
|
print("Stopped")
|
||||||
|
|
||||||
|
|
||||||
if socket_type == 'client':
|
|
||||||
hosts = service_explore.explore()
|
|
||||||
print(hosts)
|
|
||||||
|
|
||||||
|
|||||||
@ -1,5 +1,10 @@
|
|||||||
import socket
|
import socket
|
||||||
|
|
||||||
|
'''
|
||||||
|
class ClusterCommunicationModule():
|
||||||
|
def __init__(self, host, port):
|
||||||
|
'''
|
||||||
|
|
||||||
class ServiceExplorationModule():
|
class ServiceExplorationModule():
|
||||||
def __init__(self, host, port):
|
def __init__(self, host, port):
|
||||||
# UDP server
|
# UDP server
|
||||||
@ -12,14 +17,13 @@ class ServiceExplorationModule():
|
|||||||
|
|
||||||
def listen(self):
|
def listen(self):
|
||||||
self.sock.bind((self.host, self.port))
|
self.sock.bind((self.host, self.port))
|
||||||
print(f"Exploration Server(UDP) listening on {self.host}:{self.port}...")
|
# print(f"Exploration Server(UDP) listening on {self.host}:{self.port}...")
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
data, addr = self.sock.recvfrom(1024)
|
data, addr = self.sock.recvfrom(1024)
|
||||||
|
|
||||||
if self.is_available:
|
if self.is_available:
|
||||||
if data.decode() == '[EXPLORE]':
|
if data.decode() == '[EXPLORE]':
|
||||||
print(f"Receive exploration broadcast from {addr}")
|
|
||||||
self.sock.sendto(self.IP.encode(), addr)
|
self.sock.sendto(self.IP.encode(), addr)
|
||||||
|
|
||||||
def explore(self):
|
def explore(self):
|
||||||
@ -35,8 +39,11 @@ class ServiceExplorationModule():
|
|||||||
try:
|
try:
|
||||||
data, addr = client_sock.recvfrom(1024)
|
data, addr = client_sock.recvfrom(1024)
|
||||||
if addr[0] != self.IP:
|
if addr[0] != self.IP:
|
||||||
available_host.append(addr)
|
available_host.append(addr[0])
|
||||||
except:
|
except:
|
||||||
# if socket timeout
|
# if socket timeout
|
||||||
break
|
break
|
||||||
return available_host
|
return available_host
|
||||||
|
|
||||||
|
def change_available(self, new_status):
|
||||||
|
self.is_available = new_status
|
||||||
|
|||||||
33
src/node_manager.py
Normal file
33
src/node_manager.py
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
import threading
|
||||||
|
from src.communication import ServiceExplorationModule
|
||||||
|
import time
|
||||||
|
|
||||||
|
class NodeManager():
|
||||||
|
def __init__(self, host, port):
|
||||||
|
self.status = 'none'
|
||||||
|
# start Service Exploration Module
|
||||||
|
# let all client can know which IP address has our service so that it can link to.
|
||||||
|
self.service_exploration_module = ServiceExplorationModule(host, port+1)
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
def start_service(self):
|
||||||
|
explore_service_thread = threading.Thread(target=self.service_exploration_module.listen)
|
||||||
|
explore_service_thread.daemon = True
|
||||||
|
explore_service_thread.start()
|
||||||
|
|
||||||
|
def add_node(self):
|
||||||
|
hosts = self.service_exploration_module.explore()
|
||||||
|
if len(hosts) != 0:
|
||||||
|
msg = "These are the nodes you can request for join into our cluster: \n"
|
||||||
|
msg += '\n'.join([f'{index+1}) {host}' for index, host in enumerate(hosts)])
|
||||||
|
msg += '\n> '
|
||||||
|
|
||||||
|
choose = input(msg)
|
||||||
|
try:
|
||||||
|
choose = int(choose)-1
|
||||||
|
print(f"Link to {hosts[choose]}")
|
||||||
|
except:
|
||||||
|
print("=== FAIL ===")
|
||||||
|
else:
|
||||||
|
print("No other nodes in your subnet.")
|
||||||
|
|
||||||
@ -2,7 +2,6 @@ import argparse
|
|||||||
|
|
||||||
def get_args():
|
def get_args():
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
parser.add_argument('type', type=str, help="'client' or 'server'", default='server')
|
|
||||||
parser.add_argument('--host', type=str, help="server's IPv4 address", default='0.0.0.0')
|
parser.add_argument('--host', type=str, help="server's IPv4 address", default='0.0.0.0')
|
||||||
parser.add_argument('--port', type=int, help="server's listening port", default=8888)
|
parser.add_argument('--port', type=int, help="server's listening port", default=8888)
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user