Compare commits
No commits in common. "68123809bbd9e5588130d9674c37d36b2e4e0a8b" and "a947f86c9109c7d2413b1ed4673014572a171ec3" have entirely different histories.
68123809bb
...
a947f86c91
29
main.py
29
main.py
@ -1,8 +1,11 @@
|
|||||||
import select
|
|
||||||
import sys
|
|
||||||
from src.parser import get_args
|
from src.parser import get_args
|
||||||
from src.node_manager import NodeManager
|
from src.node_manager import NodeManager
|
||||||
|
|
||||||
|
actions = [
|
||||||
|
{'explanation': 'Add another node into our cluster', 'function': 'add_node'},
|
||||||
|
{'explanation': 'Exit', 'function': 'exit'},
|
||||||
|
]
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
args = get_args()
|
args = get_args()
|
||||||
host, port = args.host, args.port
|
host, port = args.host, args.port
|
||||||
@ -11,34 +14,24 @@ if __name__ == '__main__':
|
|||||||
manager.start_service()
|
manager.start_service()
|
||||||
|
|
||||||
|
|
||||||
is_run = True
|
while True:
|
||||||
while is_run:
|
|
||||||
explanation = '=' * 30 + '\n'
|
explanation = '=' * 30 + '\n'
|
||||||
for index, action in enumerate(manager.actions):
|
for index, action in enumerate(actions):
|
||||||
explanation += f'{index+1}) {action["explanation"]}\n'
|
explanation += f'{index+1}) {action["explanation"]}\n'
|
||||||
explanation += '=' * 30
|
explanation += '=' * 30
|
||||||
explanation += '\n> '
|
explanation += '\n> '
|
||||||
print(explanation, end='')
|
|
||||||
|
|
||||||
while True:
|
action = input(explanation)
|
||||||
if sys.stdin.closed:
|
|
||||||
sys.stdin = open(0)
|
|
||||||
|
|
||||||
read_list, _, _ = select.select([sys.stdin], [], [], 1)
|
|
||||||
|
|
||||||
if read_list:
|
|
||||||
action = sys.stdin.readline()
|
|
||||||
try:
|
try:
|
||||||
action = manager.actions[int(action)-1]
|
action = actions[int(action)-1]
|
||||||
if action['function'] == 'exit':
|
if action['function'] == 'exit':
|
||||||
manager.exit()
|
break
|
||||||
is_run = False
|
|
||||||
else:
|
else:
|
||||||
func = getattr(manager, action['function'])
|
func = getattr(manager, action['function'])
|
||||||
func()
|
func()
|
||||||
break
|
|
||||||
except:
|
except:
|
||||||
break
|
None
|
||||||
|
|
||||||
print("\n\n\n")
|
print("\n\n\n")
|
||||||
print("Stopped")
|
print("Stopped")
|
||||||
|
|||||||
@ -1,136 +1,28 @@
|
|||||||
import socket
|
import socket
|
||||||
import json
|
|
||||||
from variables import actions
|
|
||||||
|
|
||||||
|
'''
|
||||||
class ClusterCommunicationModule():
|
class ClusterCommunicationModule():
|
||||||
def __init__(self, host, port, node_manager):
|
def __init__(self, host, port):
|
||||||
# TCP server
|
'''
|
||||||
# In our implementation, the master node link to the worker's TCP server.
|
|
||||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
||||||
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
||||||
|
|
||||||
self.client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
||||||
self.host = host
|
|
||||||
self.port = port
|
|
||||||
self.IP = socket.gethostbyname(socket.gethostname())
|
|
||||||
|
|
||||||
self.node_manager = node_manager
|
|
||||||
self.worker_conns = []
|
|
||||||
|
|
||||||
def listen(self):
|
|
||||||
self.sock.bind((self.host, self.port))
|
|
||||||
self.sock.listen(1)
|
|
||||||
print(f"Communication Server(TCP) listening on {self.host}:{self.port}...")
|
|
||||||
|
|
||||||
while True:
|
|
||||||
conn, addr = self.sock.accept()
|
|
||||||
|
|
||||||
# first request, we should approve the request
|
|
||||||
data = conn.recv(1024)
|
|
||||||
data = data.decode()
|
|
||||||
|
|
||||||
if data == '[REQUEST]': # worker side
|
|
||||||
cont = self.process_request(conn, addr)
|
|
||||||
if cont:
|
|
||||||
while True:
|
|
||||||
# Start receive command
|
|
||||||
data = self.client_sock.recv(1024)
|
|
||||||
data = data.decode()
|
|
||||||
|
|
||||||
cont = self.handle_command(data)
|
|
||||||
if not cont:
|
|
||||||
break
|
|
||||||
elif data == '[CHECK]': # master side
|
|
||||||
self.worker_conns.append(conn)
|
|
||||||
continue
|
|
||||||
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
def process_request(self, conn, addr): # worker side
|
|
||||||
if self.node_manager.status == 'none':
|
|
||||||
conn.send('[ACCEPT]'.encode())
|
|
||||||
conn.close()
|
|
||||||
self.client_sock.connect((addr[0], self.port))
|
|
||||||
self.client_sock.send('[CHECK]'.encode())
|
|
||||||
|
|
||||||
# remove 'add node'
|
|
||||||
self.node_manager.actions = self.node_manager.actions[1:]
|
|
||||||
|
|
||||||
self.node_manager.status = 'worker'
|
|
||||||
print(f'You are in {addr} cluster now.\nPlease type in Enter to continue')
|
|
||||||
return True
|
|
||||||
else:
|
|
||||||
conn.send('[REJECT]'.encode())
|
|
||||||
print(f'You just reject the cluster invitation from {addr} now.\nPlease type in Enter to continue')
|
|
||||||
return False
|
|
||||||
|
|
||||||
def handle_command(self, data):
|
|
||||||
command, args = data.split()
|
|
||||||
if command == '[INFO]':
|
|
||||||
data = {'host': self.IP, 'GPU': self.node_manager.GPU, 'GPU_num': self.node_manager.GPU_num}
|
|
||||||
self.client_sock.send(json.dumps(data).encode())
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
def request(self, host): # master side
|
|
||||||
self.client_sock.connect((host, self.port))
|
|
||||||
self.client_sock.send('[REQUEST]'.encode())
|
|
||||||
data = self.client_sock.recv(1024)
|
|
||||||
data = data.decode()
|
|
||||||
|
|
||||||
# close client_sock, and waiting for the worker connect to our self.sock
|
|
||||||
self.client_sock.close()
|
|
||||||
self.client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
||||||
|
|
||||||
if data == '[REJECT]':
|
|
||||||
print(f"{host} reject.")
|
|
||||||
return False
|
|
||||||
elif data == '[ACCEPT]':
|
|
||||||
self.node_manager.status = 'master'
|
|
||||||
|
|
||||||
print(f"{host} accept.")
|
|
||||||
return True
|
|
||||||
|
|
||||||
def cluster_info(self):
|
|
||||||
ans = []
|
|
||||||
for conn in self.worker_conns:
|
|
||||||
try:
|
|
||||||
conn.send('[INFO] {}'.encode())
|
|
||||||
data = conn.recv(1024)
|
|
||||||
data = json.loads(data.decode())
|
|
||||||
ans.append(data)
|
|
||||||
except:
|
|
||||||
self.worker_conns.remove(conn)
|
|
||||||
return ans
|
|
||||||
|
|
||||||
def exit(self):
|
|
||||||
self.sock.close()
|
|
||||||
self.client_sock.close()
|
|
||||||
for conn in self.worker_conns:
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
|
|
||||||
class ServiceExplorationModule():
|
class ServiceExplorationModule():
|
||||||
def __init__(self, host, port, node_manager):
|
def __init__(self, host, port):
|
||||||
# UDP server
|
# UDP server
|
||||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
||||||
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
||||||
|
|
||||||
self.host = host
|
self.host = host
|
||||||
self.port = port
|
self.port = port
|
||||||
self.IP = socket.gethostbyname(socket.gethostname())
|
self.IP = socket.gethostbyname(socket.gethostname())
|
||||||
|
self.is_available = True
|
||||||
self.node_manager = node_manager
|
|
||||||
|
|
||||||
def listen(self):
|
def listen(self):
|
||||||
self.sock.bind((self.host, self.port))
|
self.sock.bind((self.host, self.port))
|
||||||
print(f"Exploration Server(UDP) listening on {self.host}:{self.port}...")
|
# print(f"Exploration Server(UDP) listening on {self.host}:{self.port}...")
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
data, addr = self.sock.recvfrom(1024)
|
data, addr = self.sock.recvfrom(1024)
|
||||||
|
|
||||||
if self.node_manager.status == 'none':
|
if self.is_available:
|
||||||
if data.decode() == '[EXPLORE]':
|
if data.decode() == '[EXPLORE]':
|
||||||
self.sock.sendto(self.IP.encode(), addr)
|
self.sock.sendto(self.IP.encode(), addr)
|
||||||
|
|
||||||
@ -140,7 +32,7 @@ class ServiceExplorationModule():
|
|||||||
client_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
client_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
client_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
client_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
||||||
|
|
||||||
client_sock.settimeout(1)
|
client_sock.settimeout(3)
|
||||||
client_sock.sendto('[EXPLORE]'.encode(), ('255.255.255.255', self.port))
|
client_sock.sendto('[EXPLORE]'.encode(), ('255.255.255.255', self.port))
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
@ -153,5 +45,5 @@ class ServiceExplorationModule():
|
|||||||
break
|
break
|
||||||
return available_host
|
return available_host
|
||||||
|
|
||||||
def exit(self):
|
def change_available(self, new_status):
|
||||||
self.sock.close()
|
self.is_available = new_status
|
||||||
|
|||||||
@ -1,33 +1,16 @@
|
|||||||
import threading
|
import threading
|
||||||
from src.communication import ServiceExplorationModule, ClusterCommunicationModule
|
from src.communication import ServiceExplorationModule
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
|
||||||
class NodeManager():
|
class NodeManager():
|
||||||
def __init__(self, host, port):
|
def __init__(self, host, port):
|
||||||
self.status = 'none'
|
self.status = 'none'
|
||||||
self.actions = [
|
|
||||||
{'explanation': 'Add another node into our cluster', 'function': 'add_node'},
|
|
||||||
{'explanation': 'Exit', 'function': 'exit'},
|
|
||||||
]
|
|
||||||
self.GPU = 'RTX 4090'
|
|
||||||
self.GPU_num = 1
|
|
||||||
|
|
||||||
# start Cluster Communication Module
|
|
||||||
# let the nodes in the cluster can communicate
|
|
||||||
self.cluster_communication_module = ClusterCommunicationModule(host, port, self)
|
|
||||||
|
|
||||||
# start Service Exploration Module
|
# start Service Exploration Module
|
||||||
# let all client can know which IP address has our service so that it can link to.
|
# let all client can know which IP address has our service so that it can link to.
|
||||||
self.service_exploration_module = ServiceExplorationModule(host, port+1, self)
|
self.service_exploration_module = ServiceExplorationModule(host, port+1)
|
||||||
|
time.sleep(1)
|
||||||
time.sleep(2)
|
|
||||||
|
|
||||||
def start_service(self):
|
def start_service(self):
|
||||||
communication_thread = threading.Thread(target=self.cluster_communication_module.listen)
|
|
||||||
communication_thread.daemon = True
|
|
||||||
communication_thread.start()
|
|
||||||
|
|
||||||
explore_service_thread = threading.Thread(target=self.service_exploration_module.listen)
|
explore_service_thread = threading.Thread(target=self.service_exploration_module.listen)
|
||||||
explore_service_thread.daemon = True
|
explore_service_thread.daemon = True
|
||||||
explore_service_thread.start()
|
explore_service_thread.start()
|
||||||
@ -42,29 +25,9 @@ class NodeManager():
|
|||||||
choose = input(msg)
|
choose = input(msg)
|
||||||
try:
|
try:
|
||||||
choose = int(choose)-1
|
choose = int(choose)-1
|
||||||
accept = self.cluster_communication_module.request(hosts[choose])
|
print(f"Link to {hosts[choose]}")
|
||||||
if accept:
|
|
||||||
exit_func = self.actions[-1]
|
|
||||||
self.actions = self.actions[:-1]
|
|
||||||
info_func = {'explanation': 'cluster info', 'function': 'cluster_info'}
|
|
||||||
if info_func not in self.actions:
|
|
||||||
self.actions.append(info_func)
|
|
||||||
self.actions.append(exit_func)
|
|
||||||
except:
|
except:
|
||||||
print("=== FAIL ===")
|
print("=== FAIL ===")
|
||||||
else:
|
else:
|
||||||
print("No other nodes in your subnet.")
|
print("No other nodes in your subnet.")
|
||||||
|
|
||||||
def cluster_info(self):
|
|
||||||
info = self.cluster_communication_module.cluster_info()
|
|
||||||
print(f"\nThere are {len(info)+1} nodes in this cluster.")
|
|
||||||
print("Cluster Info:")
|
|
||||||
print(f" {self.service_exploration_module.IP}(local) -> {self.GPU} * {self.GPU_num}")
|
|
||||||
for host in info:
|
|
||||||
print(f" {host['host']} -> {host['GPU']} * {host['GPU_num']}")
|
|
||||||
|
|
||||||
def exit(self):
|
|
||||||
self.cluster_communication_module.exit()
|
|
||||||
self.service_exploration_module.exit()
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user