feat: start listening notification

This commit is contained in:
snsd0805 2024-05-31 01:31:57 +08:00
parent 01024d2f0d
commit 5afd668379
Signed by: snsd0805
GPG Key ID: 569349933C77A854
2 changed files with 30 additions and 2 deletions

View File

@ -99,8 +99,12 @@ class ClusterCommunicationModule():
self.node_manager.status = 'none'
print("You have leaved the cluster.")
return False
elif command == '[START_LISTEN_TASK]':
self.client_sock.send('[START_LISTEN_TASK_CHECK] {}'.encode())
print("The master has started listening for new task from Sepolia testnet...")
return True
def request(self, host): # master side
@ -136,6 +140,20 @@ class ClusterCommunicationModule():
print("1 worker disconnnected.")
return ans
def start_listen_task(self):
disconnected_counter = 0
for conn in self.worker_conns:
try:
conn.send('[START_LISTEN_TASK] {}'.encode())
data, args = conn.recv(1024).decode().split(' ')
except:
disconnected_counter += 1
self.worker_conns.remove(conn)
if disconnected_counter != 0:
print(f'{disconnected_counter} worker disconnected.')
else:
print(f'All other {len(self.worker_conns)} workers are waiting...')
def exit(self):
if self.node_manager.status == 'master':
for conn in self.worker_conns:

View File

@ -3,6 +3,8 @@ from src.communication import ServiceExplorationModule, ClusterCommunicationModu
import torch
import time
import docker
from web3 import Web3
from src.scheduler import Scheduler
class NodeManager():
def __init__(self, host, port):
@ -25,6 +27,9 @@ class NodeManager():
# docker client
self.docker_client = docker.from_env()
# web3 provider
self.w3 = Web3(Web3.HTTPProvider('https://eth-sepolia.g.alchemy.com/v2/'+ALCHEMY_KEY))
time.sleep(2)
def get_GPU_info(self):
@ -59,7 +64,7 @@ class NodeManager():
self.actions = [
{'explanation': 'Add another node into our cluster', 'function': 'add_node'},
{'explanation': 'Cluster info', 'function': 'cluster_info'},
{'explanation': 'Start working', 'function': 'start_work'},
{'explanation': 'Start waiting for the new task', 'function': 'start_listen_task'},
{'explanation': 'Exit', 'function': 'exit'},
]
except:
@ -67,6 +72,11 @@ class NodeManager():
else:
print("No other nodes in your subnet.")
def start_listen_task(self):
print("Waiting for the new task from Sepolia testnet...")
self.cluster_communication_module.start_listen_task()
def cluster_info(self):
info = self.cluster_communication_module.cluster_info()
print(f"\nThere are {len(info)+1} nodes in this cluster.")