From 5afd66837950c63af0035e3b3f314fdaf68cab2b Mon Sep 17 00:00:00 2001 From: snsd0805 Date: Fri, 31 May 2024 01:31:57 +0800 Subject: [PATCH 1/6] feat: start listening notification --- src/communication.py | 20 +++++++++++++++++++- src/node_manager.py | 12 +++++++++++- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/src/communication.py b/src/communication.py index a7434ba..2aa9d7e 100644 --- a/src/communication.py +++ b/src/communication.py @@ -99,8 +99,12 @@ class ClusterCommunicationModule(): self.node_manager.status = 'none' print("You have leaved the cluster.") - return False + elif command == '[START_LISTEN_TASK]': + self.client_sock.send('[START_LISTEN_TASK_CHECK] {}'.encode()) + print("The master has started listening for new task from Sepolia testnet...") + + return True def request(self, host): # master side @@ -136,6 +140,20 @@ class ClusterCommunicationModule(): print("1 worker disconnnected.") return ans + def start_listen_task(self): + disconnected_counter = 0 + for conn in self.worker_conns: + try: + conn.send('[START_LISTEN_TASK] {}'.encode()) + data, args = conn.recv(1024).decode().split(' ') + except: + disconnected_counter += 1 + self.worker_conns.remove(conn) + if disconnected_counter != 0: + print(f'{disconnected_counter} worker disconnected.') + else: + print(f'All other {len(self.worker_conns)} workers are waiting...') + def exit(self): if self.node_manager.status == 'master': for conn in self.worker_conns: diff --git a/src/node_manager.py b/src/node_manager.py index 6f2bdc9..328e00a 100644 --- a/src/node_manager.py +++ b/src/node_manager.py @@ -3,6 +3,8 @@ from src.communication import ServiceExplorationModule, ClusterCommunicationModu import torch import time import docker +from web3 import Web3 +from src.scheduler import Scheduler class NodeManager(): def __init__(self, host, port): @@ -25,6 +27,9 @@ class NodeManager(): # docker client self.docker_client = docker.from_env() + # web3 provider + self.w3 = Web3(Web3.HTTPProvider('https://eth-sepolia.g.alchemy.com/v2/'+ALCHEMY_KEY)) + time.sleep(2) def get_GPU_info(self): @@ -59,7 +64,7 @@ class NodeManager(): self.actions = [ {'explanation': 'Add another node into our cluster', 'function': 'add_node'}, {'explanation': 'Cluster info', 'function': 'cluster_info'}, - {'explanation': 'Start working', 'function': 'start_work'}, + {'explanation': 'Start waiting for the new task', 'function': 'start_listen_task'}, {'explanation': 'Exit', 'function': 'exit'}, ] except: @@ -67,6 +72,11 @@ class NodeManager(): else: print("No other nodes in your subnet.") + def start_listen_task(self): + print("Waiting for the new task from Sepolia testnet...") + self.cluster_communication_module.start_listen_task() + + def cluster_info(self): info = self.cluster_communication_module.cluster_info() print(f"\nThere are {len(info)+1} nodes in this cluster.") From 87646d966189947bcd1e8b819551aad4b77e6110 Mon Sep 17 00:00:00 2001 From: snsd0805 Date: Fri, 31 May 2024 01:51:29 +0800 Subject: [PATCH 2/6] feat: add web3 config --- constant.py | 9 +++++++++ src/node_manager.py | 3 ++- 2 files changed, 11 insertions(+), 1 deletion(-) create mode 100644 constant.py diff --git a/constant.py b/constant.py new file mode 100644 index 0000000..c79737c --- /dev/null +++ b/constant.py @@ -0,0 +1,9 @@ +# web3 provider information +# include provider HTTPS endpoint URL & its API Key. +# Alchemy is a good choice for Web3 provider +WEB3_PROVIDER_URL = "https://eth-sepolia.g.alchemy.com/v2/" +WEB3_PROVIDER_KEY = "" + +# Please write down your wallet key to sign a transaction with our contract. +WALLET_KEY = "" + diff --git a/src/node_manager.py b/src/node_manager.py index 328e00a..ce9e8ac 100644 --- a/src/node_manager.py +++ b/src/node_manager.py @@ -5,6 +5,7 @@ import time import docker from web3 import Web3 from src.scheduler import Scheduler +from constant import * class NodeManager(): def __init__(self, host, port): @@ -28,7 +29,7 @@ class NodeManager(): self.docker_client = docker.from_env() # web3 provider - self.w3 = Web3(Web3.HTTPProvider('https://eth-sepolia.g.alchemy.com/v2/'+ALCHEMY_KEY)) + self.w3 = Web3(Web3.HTTPProvider(WEB3_PROVIDER_URL + WEB3_PROVIDER_KEY)) time.sleep(2) From 12caa7358ff95047256250f26e7bfb62670eab6b Mon Sep 17 00:00:00 2001 From: snsd0805 Date: Fri, 31 May 2024 01:58:04 +0800 Subject: [PATCH 3/6] feat: load web3 provider in the master node --- src/node_manager.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/node_manager.py b/src/node_manager.py index ce9e8ac..a99a3f4 100644 --- a/src/node_manager.py +++ b/src/node_manager.py @@ -29,7 +29,8 @@ class NodeManager(): self.docker_client = docker.from_env() # web3 provider - self.w3 = Web3(Web3.HTTPProvider(WEB3_PROVIDER_URL + WEB3_PROVIDER_KEY)) + # if this is master, it should init a Web object. + self.w3 = None time.sleep(2) @@ -74,9 +75,20 @@ class NodeManager(): print("No other nodes in your subnet.") def start_listen_task(self): - print("Waiting for the new task from Sepolia testnet...") - self.cluster_communication_module.start_listen_task() + self.w3 = Web3(Web3.HTTPProvider(WEB3_PROVIDER_URL + WEB3_PROVIDER_KEY)) + print(f"We have use {WEB3_PROVIDER_URL+WEB3_PROVIDER_KEY} as the web3 provider.") + print(f"And we have load your wallet private key {WALLET_KEY}") + print() + if self.w3.is_connected(): + print("Connected Successfully.") + print() + print("Waiting for the new task from Sepolia testnet...") + self.cluster_communication_module.start_listen_task() + else: + print("Connected Failed.") + print("Please check for your provider key & wallet key") + def cluster_info(self): info = self.cluster_communication_module.cluster_info() From ba2b281b685ebbfd86f01d05918fba617bcb6d88 Mon Sep 17 00:00:00 2001 From: Ting-Jun Wang Date: Fri, 31 May 2024 03:17:10 +0800 Subject: [PATCH 4/6] feat: include web3 for register the cluster --- constant.py | 9 +++++++++ src/Scheduler.abi | 1 + src/node_manager.py | 25 +++++++++++++++++++++++-- src/scheduler.py | 24 ++++++++++++++---------- 4 files changed, 47 insertions(+), 12 deletions(-) create mode 100644 src/Scheduler.abi diff --git a/constant.py b/constant.py index c79737c..356cf14 100644 --- a/constant.py +++ b/constant.py @@ -7,3 +7,12 @@ WEB3_PROVIDER_KEY = "" # Please write down your wallet key to sign a transaction with our contract. WALLET_KEY = "" +# Don't change +SCHEDULER_ADDR = "0xe12dc15f646bC8Ba432254Ffe58595BC31eC2C4d" +SCHEDULER_ABI_FILE = "./src/Scheduler.abi" + +GPU_NAME2ID = { + 'NVIDIA GeForce RTX 4090': 0, + 'NVIDIA GeForce RTX 3060': 1, +} + diff --git a/src/Scheduler.abi b/src/Scheduler.abi new file mode 100644 index 0000000..9ae6e3b --- /dev/null +++ b/src/Scheduler.abi @@ -0,0 +1 @@ +[{"anonymous":false,"inputs":[{"indexed":false,"internalType":"address","name":"provider","type":"address"},{"indexed":false,"internalType":"uint256","name":"clusterIndex","type":"uint256"},{"indexed":false,"internalType":"uint256","name":"gpuId","type":"uint256"},{"indexed":false,"internalType":"uint256","name":"clusterSize","type":"uint256"}],"name":"RegisterCluster","type":"event"},{"anonymous":false,"inputs":[{"indexed":false,"internalType":"address","name":"provider","type":"address"},{"indexed":false,"internalType":"uint256","name":"taskIndex","type":"uint256"},{"indexed":false,"internalType":"string","name":"dataImage","type":"string"},{"indexed":false,"internalType":"string","name":"trainImage","type":"string"}],"name":"StartRun","type":"event"},{"inputs":[{"internalType":"uint256","name":"","type":"uint256"}],"name":"clusters","outputs":[{"internalType":"address","name":"provider","type":"address"},{"internalType":"uint256","name":"gpuId","type":"uint256"},{"internalType":"uint256","name":"clusterSize","type":"uint256"},{"internalType":"bool","name":"available","type":"bool"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"uint256","name":"clusterIndex","type":"uint256"}],"name":"getClusterInfo","outputs":[{"components":[{"internalType":"address","name":"provider","type":"address"},{"internalType":"uint256","name":"gpuId","type":"uint256"},{"internalType":"uint256","name":"clusterSize","type":"uint256"},{"internalType":"bool","name":"available","type":"bool"}],"internalType":"struct Scheduler.Cluster","name":"","type":"tuple"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"getClusters","outputs":[{"components":[{"internalType":"address","name":"provider","type":"address"},{"internalType":"uint256","name":"gpuId","type":"uint256"},{"internalType":"uint256","name":"clusterSize","type":"uint256"},{"internalType":"bool","name":"available","type":"bool"}],"internalType":"struct Scheduler.Cluster[]","name":"","type":"tuple[]"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"getNumberOfClusters","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"getTasks","outputs":[{"components":[{"internalType":"address","name":"provider","type":"address"},{"internalType":"address","name":"client","type":"address"},{"internalType":"uint256","name":"clusterIndex","type":"uint256"},{"internalType":"string","name":"dataImage","type":"string"},{"internalType":"string","name":"trainImage","type":"string"},{"internalType":"uint256","name":"status","type":"uint256"}],"internalType":"struct Scheduler.Task[]","name":"","type":"tuple[]"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"uint256","name":"gpuId","type":"uint256"},{"internalType":"uint256","name":"clusterSize","type":"uint256"}],"name":"registerCluster","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"string","name":"dataImage","type":"string"},{"internalType":"string","name":"trainImage","type":"string"},{"internalType":"uint256","name":"gpuId","type":"uint256"},{"internalType":"uint256","name":"clusterSize","type":"uint256"}],"name":"registerTaskWithConditions","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"uint256","name":"","type":"uint256"}],"name":"tasks","outputs":[{"internalType":"address","name":"provider","type":"address"},{"internalType":"address","name":"client","type":"address"},{"internalType":"uint256","name":"clusterIndex","type":"uint256"},{"internalType":"string","name":"dataImage","type":"string"},{"internalType":"string","name":"trainImage","type":"string"},{"internalType":"uint256","name":"status","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"uint256","name":"taskIndex","type":"uint256"},{"internalType":"uint256","name":"newStatus","type":"uint256"}],"name":"updateStatus","outputs":[],"stateMutability":"nonpayable","type":"function"}] \ No newline at end of file diff --git a/src/node_manager.py b/src/node_manager.py index a99a3f4..4e03f30 100644 --- a/src/node_manager.py +++ b/src/node_manager.py @@ -12,6 +12,7 @@ class NodeManager(): self.status = 'none' self.actions = [ {'explanation': 'Add another node into our cluster', 'function': 'add_node'}, + {'explanation': 'Start waiting for the new task', 'function': 'start_listen_task'}, {'explanation': 'Exit', 'function': 'exit'}, ] self.get_GPU_info() @@ -31,6 +32,8 @@ class NodeManager(): # web3 provider # if this is master, it should init a Web object. self.w3 = None + self.scheduler = None + self.wallet = None time.sleep(2) @@ -76,17 +79,32 @@ class NodeManager(): def start_listen_task(self): self.w3 = Web3(Web3.HTTPProvider(WEB3_PROVIDER_URL + WEB3_PROVIDER_KEY)) + self.scheduler = Scheduler(self.w3, SCHEDULER_ADDR, SCHEDULER_ABI_FILE) + self.wallet = self.w3.eth.account.from_key(WALLET_KEY) print(f"We have use {WEB3_PROVIDER_URL+WEB3_PROVIDER_KEY} as the web3 provider.") print(f"And we have load your wallet private key {WALLET_KEY}") print() if self.w3.is_connected(): print("Connected Successfully.") print() - print("Waiting for the new task from Sepolia testnet...") + + # Register the cluster + gpu_num = self.cluster_info() + gpu_id = GPU_NAME2ID[self.GPU] + print(f"\nWe will register this cluster({self.GPU} * {gpu_num})...") + receipt = self.scheduler.register_cluster(self.wallet, gpu_id, gpu_num) + event = self.scheduler.get_cluster_event(receipt) + print(f"\nOK. this is our cluster event on the blockchain {event[0]['args']}") + + for cluster in self.scheduler.get_clusters(): + print(cluster) + + # start waiting + print("\nWaiting for the new task from Sepolia testnet...") self.cluster_communication_module.start_listen_task() else: - print("Connected Failed.") + print("[ERROR] Connected Failed.") print("Please check for your provider key & wallet key") @@ -95,8 +113,11 @@ class NodeManager(): print(f"\nThere are {len(info)+1} nodes in this cluster.") print("Cluster Info:") print(f" {self.service_exploration_module.host}(local) -> {self.GPU} * {self.GPU_num}") + GPU_num = self.GPU_num for host in info: + GPU_num += host['GPU_num'] print(f" {host['host']} -> {host['GPU']} * {host['GPU_num']}") + return GPU_num def exit(self): self.cluster_communication_module.exit() diff --git a/src/scheduler.py b/src/scheduler.py index 9185869..5e9bcae 100644 --- a/src/scheduler.py +++ b/src/scheduler.py @@ -8,7 +8,7 @@ class Scheduler(): self.contract = self.w3.eth.contract(address=address, abi=abi) print(self.contract.functions) - def signedAndSendTransaction(self, account, function): + def signed_and_send_transaction(self, account, function): tx = function.build_transaction({ 'from': account.address, 'nonce': self.w3.eth.get_transaction_count(account.address) @@ -17,24 +17,24 @@ class Scheduler(): tx_hash = self.w3.eth.send_raw_transaction(signed_tx.rawTransaction) tx_receipt = self.w3.eth.wait_for_transaction_receipt(tx_hash) - _hash = tx_receipt['transactionHash'] - return _hash + return tx_receipt - def registerCluster(self, account, gpu_id: int, gpu_num: int): + def register_cluster(self, account, gpu_id: int, gpu_num: int): function = self.contract.functions.registerCluster(gpu_id, gpu_num) - return self.signedAndSendTransaction(account, function) + return self.signed_and_send_transaction(account, function) - def registerTaskWithConditions(self, account, data_image: str, train_image: str, gpu_id: int, cluster_size: int): + def register_task_with_conditions(self, account, data_image: str, train_image: str, gpu_id: int, cluster_size: int): function = self.contract.functions.registerTaskWithConditions(data_image, train_image, gpu_id, cluster_size) - return self.signedAndSendTransaction(account, function) + return self.signed_and_send_transaction(account, function) - def getClusters(self): + def get_clusters(self): return self.contract.functions.getClusters().call() - def getTasks(self): + def get_tasks(self): return self.contract.functions.getTasks().call() - def getStartRunEvent(self): + ''' + def get_start_run_event(self): event_filter = self.contract.events.StartRun.create_filter(fromBlock=0) events = event_filter.get_new_entries() print(events) @@ -43,4 +43,8 @@ class Scheduler(): event_filter = self.contract.events.RegisterCluster.create_filter(fromBlock=0) events = event_filter.get_new_entries() print(events) + ''' + def get_cluster_event(self, receipt): + event = self.contract.events.RegisterCluster().process_receipt(receipt) + return event From 7fa71768e1f134a65c8f4541b0d465af4ce1e42a Mon Sep 17 00:00:00 2001 From: Ting-Jun Wang Date: Fri, 31 May 2024 03:20:10 +0800 Subject: [PATCH 5/6] docs: .gitignore --- .gitignore | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6f0ec0a --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +*/__pycache__ +__pycache__ +output/ +dataset_dir/ +env/ From feb25fc8946b3c0381991585e740ba128394b8d4 Mon Sep 17 00:00:00 2001 From: Ting-Jun Wang Date: Fri, 31 May 2024 03:46:18 +0800 Subject: [PATCH 6/6] feat: listening the new task from blockchain --- src/node_manager.py | 4 +++- src/scheduler.py | 9 +++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/src/node_manager.py b/src/node_manager.py index 4e03f30..3178135 100644 --- a/src/node_manager.py +++ b/src/node_manager.py @@ -82,7 +82,7 @@ class NodeManager(): self.scheduler = Scheduler(self.w3, SCHEDULER_ADDR, SCHEDULER_ABI_FILE) self.wallet = self.w3.eth.account.from_key(WALLET_KEY) print(f"We have use {WEB3_PROVIDER_URL+WEB3_PROVIDER_KEY} as the web3 provider.") - print(f"And we have load your wallet private key {WALLET_KEY}") + print(f"And we have load your wallet private key {WALLET_KEY} (address={self.wallet.address})") print() if self.w3.is_connected(): print("Connected Successfully.") @@ -102,6 +102,8 @@ class NodeManager(): # start waiting print("\nWaiting for the new task from Sepolia testnet...") self.cluster_communication_module.start_listen_task() + next_task = self.scheduler.listen_task(self.wallet.address) + print(next_task) else: print("[ERROR] Connected Failed.") diff --git a/src/scheduler.py b/src/scheduler.py index 5e9bcae..551b57d 100644 --- a/src/scheduler.py +++ b/src/scheduler.py @@ -48,3 +48,12 @@ class Scheduler(): def get_cluster_event(self, receipt): event = self.contract.events.RegisterCluster().process_receipt(receipt) return event + + def listen_task(self, address): + event_filter = self.contract.events.StartRun.create_filter(fromBlock='latest') + while True: + for event in event_filter.get_new_entries(): + if event['args']['provider'] == address: + return event + +