feat: add cluster_info function

This commit is contained in:
Ting-Jun Wang 2024-05-18 18:08:07 +08:00
parent 01e673c07c
commit 68123809bb
Signed by: snsd0805
GPG Key ID: 48D331A3D6160354
3 changed files with 55 additions and 17 deletions

View File

@ -3,11 +3,6 @@ import sys
from src.parser import get_args from src.parser import get_args
from src.node_manager import NodeManager from src.node_manager import NodeManager
actions = [
{'explanation': 'Add another node into our cluster', 'function': 'add_node'},
{'explanation': 'Exit', 'function': 'exit'},
]
if __name__ == '__main__': if __name__ == '__main__':
args = get_args() args = get_args()
host, port = args.host, args.port host, port = args.host, args.port
@ -19,7 +14,7 @@ if __name__ == '__main__':
is_run = True is_run = True
while is_run: while is_run:
explanation = '=' * 30 + '\n' explanation = '=' * 30 + '\n'
for index, action in enumerate(actions): for index, action in enumerate(manager.actions):
explanation += f'{index+1}) {action["explanation"]}\n' explanation += f'{index+1}) {action["explanation"]}\n'
explanation += '=' * 30 explanation += '=' * 30
explanation += '\n> ' explanation += '\n> '
@ -34,7 +29,7 @@ if __name__ == '__main__':
if read_list: if read_list:
action = sys.stdin.readline() action = sys.stdin.readline()
try: try:
action = actions[int(action)-1] action = manager.actions[int(action)-1]
if action['function'] == 'exit': if action['function'] == 'exit':
manager.exit() manager.exit()
is_run = False is_run = False

View File

@ -1,4 +1,6 @@
import socket import socket
import json
from variables import actions
class ClusterCommunicationModule(): class ClusterCommunicationModule():
def __init__(self, host, port, node_manager): def __init__(self, host, port, node_manager):
@ -10,7 +12,7 @@ class ClusterCommunicationModule():
self.client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.host = host self.host = host
self.port = port self.port = port
self.IP = socket.gethostbyname(socket.gethostname())
self.node_manager = node_manager self.node_manager = node_manager
self.worker_conns = [] self.worker_conns = []
@ -40,10 +42,8 @@ class ClusterCommunicationModule():
break break
elif data == '[CHECK]': # master side elif data == '[CHECK]': # master side
self.worker_conns.append(conn) self.worker_conns.append(conn)
conn.send('TEST'.encode())
continue continue
conn.close() conn.close()
def process_request(self, conn, addr): # worker side def process_request(self, conn, addr): # worker side
@ -53,6 +53,9 @@ class ClusterCommunicationModule():
self.client_sock.connect((addr[0], self.port)) self.client_sock.connect((addr[0], self.port))
self.client_sock.send('[CHECK]'.encode()) self.client_sock.send('[CHECK]'.encode())
# remove 'add node'
self.node_manager.actions = self.node_manager.actions[1:]
self.node_manager.status = 'worker' self.node_manager.status = 'worker'
print(f'You are in {addr} cluster now.\nPlease type in Enter to continue') print(f'You are in {addr} cluster now.\nPlease type in Enter to continue')
return True return True
@ -62,6 +65,11 @@ class ClusterCommunicationModule():
return False return False
def handle_command(self, data): def handle_command(self, data):
command, args = data.split()
if command == '[INFO]':
data = {'host': self.IP, 'GPU': self.node_manager.GPU, 'GPU_num': self.node_manager.GPU_num}
self.client_sock.send(json.dumps(data).encode())
return True return True
def request(self, host): # master side def request(self, host): # master side
@ -76,9 +84,24 @@ class ClusterCommunicationModule():
if data == '[REJECT]': if data == '[REJECT]':
print(f"{host} reject.") print(f"{host} reject.")
return False
elif data == '[ACCEPT]': elif data == '[ACCEPT]':
self.node_manager.status = 'master' self.node_manager.status = 'master'
print(f"{host} accept.") print(f"{host} accept.")
return True
def cluster_info(self):
ans = []
for conn in self.worker_conns:
try:
conn.send('[INFO] {}'.encode())
data = conn.recv(1024)
data = json.loads(data.decode())
ans.append(data)
except:
self.worker_conns.remove(conn)
return ans
def exit(self): def exit(self):
self.sock.close() self.sock.close()

View File

@ -2,9 +2,16 @@ import threading
from src.communication import ServiceExplorationModule, ClusterCommunicationModule from src.communication import ServiceExplorationModule, ClusterCommunicationModule
import time import time
class NodeManager(): class NodeManager():
def __init__(self, host, port): def __init__(self, host, port):
self.status = 'none' self.status = 'none'
self.actions = [
{'explanation': 'Add another node into our cluster', 'function': 'add_node'},
{'explanation': 'Exit', 'function': 'exit'},
]
self.GPU = 'RTX 4090'
self.GPU_num = 1
# start Cluster Communication Module # start Cluster Communication Module
# let the nodes in the cluster can communicate # let the nodes in the cluster can communicate
@ -33,16 +40,29 @@ class NodeManager():
msg += '\n> ' msg += '\n> '
choose = input(msg) choose = input(msg)
# try: try:
choose = int(choose)-1 choose = int(choose)-1
print("START REQUEST") accept = self.cluster_communication_module.request(hosts[choose])
self.cluster_communication_module.request(hosts[choose]) if accept:
print(f"Link to {hosts[choose]}") exit_func = self.actions[-1]
# except: self.actions = self.actions[:-1]
# print("=== FAIL ===") info_func = {'explanation': 'cluster info', 'function': 'cluster_info'}
if info_func not in self.actions:
self.actions.append(info_func)
self.actions.append(exit_func)
except:
print("=== FAIL ===")
else: else:
print("No other nodes in your subnet.") print("No other nodes in your subnet.")
def cluster_info(self):
info = self.cluster_communication_module.cluster_info()
print(f"\nThere are {len(info)+1} nodes in this cluster.")
print("Cluster Info:")
print(f" {self.service_exploration_module.IP}(local) -> {self.GPU} * {self.GPU_num}")
for host in info:
print(f" {host['host']} -> {host['GPU']} * {host['GPU_num']}")
def exit(self): def exit(self):
self.cluster_communication_module.exit() self.cluster_communication_module.exit()
self.service_exploration_module.exit() self.service_exploration_module.exit()