From 5afd66837950c63af0035e3b3f314fdaf68cab2b Mon Sep 17 00:00:00 2001 From: snsd0805 Date: Fri, 31 May 2024 01:31:57 +0800 Subject: [PATCH] feat: start listening notification --- src/communication.py | 20 +++++++++++++++++++- src/node_manager.py | 12 +++++++++++- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/src/communication.py b/src/communication.py index a7434ba..2aa9d7e 100644 --- a/src/communication.py +++ b/src/communication.py @@ -99,8 +99,12 @@ class ClusterCommunicationModule(): self.node_manager.status = 'none' print("You have leaved the cluster.") - return False + elif command == '[START_LISTEN_TASK]': + self.client_sock.send('[START_LISTEN_TASK_CHECK] {}'.encode()) + print("The master has started listening for new task from Sepolia testnet...") + + return True def request(self, host): # master side @@ -136,6 +140,20 @@ class ClusterCommunicationModule(): print("1 worker disconnnected.") return ans + def start_listen_task(self): + disconnected_counter = 0 + for conn in self.worker_conns: + try: + conn.send('[START_LISTEN_TASK] {}'.encode()) + data, args = conn.recv(1024).decode().split(' ') + except: + disconnected_counter += 1 + self.worker_conns.remove(conn) + if disconnected_counter != 0: + print(f'{disconnected_counter} worker disconnected.') + else: + print(f'All other {len(self.worker_conns)} workers are waiting...') + def exit(self): if self.node_manager.status == 'master': for conn in self.worker_conns: diff --git a/src/node_manager.py b/src/node_manager.py index 6f2bdc9..328e00a 100644 --- a/src/node_manager.py +++ b/src/node_manager.py @@ -3,6 +3,8 @@ from src.communication import ServiceExplorationModule, ClusterCommunicationModu import torch import time import docker +from web3 import Web3 +from src.scheduler import Scheduler class NodeManager(): def __init__(self, host, port): @@ -25,6 +27,9 @@ class NodeManager(): # docker client self.docker_client = docker.from_env() + # web3 provider + self.w3 = Web3(Web3.HTTPProvider('https://eth-sepolia.g.alchemy.com/v2/'+ALCHEMY_KEY)) + time.sleep(2) def get_GPU_info(self): @@ -59,7 +64,7 @@ class NodeManager(): self.actions = [ {'explanation': 'Add another node into our cluster', 'function': 'add_node'}, {'explanation': 'Cluster info', 'function': 'cluster_info'}, - {'explanation': 'Start working', 'function': 'start_work'}, + {'explanation': 'Start waiting for the new task', 'function': 'start_listen_task'}, {'explanation': 'Exit', 'function': 'exit'}, ] except: @@ -67,6 +72,11 @@ class NodeManager(): else: print("No other nodes in your subnet.") + def start_listen_task(self): + print("Waiting for the new task from Sepolia testnet...") + self.cluster_communication_module.start_listen_task() + + def cluster_info(self): info = self.cluster_communication_module.cluster_info() print(f"\nThere are {len(info)+1} nodes in this cluster.")