From ba2b281b685ebbfd86f01d05918fba617bcb6d88 Mon Sep 17 00:00:00 2001 From: Ting-Jun Wang Date: Fri, 31 May 2024 03:17:10 +0800 Subject: [PATCH] feat: include web3 for register the cluster --- constant.py | 9 +++++++++ src/Scheduler.abi | 1 + src/node_manager.py | 25 +++++++++++++++++++++++-- src/scheduler.py | 24 ++++++++++++++---------- 4 files changed, 47 insertions(+), 12 deletions(-) create mode 100644 src/Scheduler.abi diff --git a/constant.py b/constant.py index c79737c..356cf14 100644 --- a/constant.py +++ b/constant.py @@ -7,3 +7,12 @@ WEB3_PROVIDER_KEY = "" # Please write down your wallet key to sign a transaction with our contract. WALLET_KEY = "" +# Don't change +SCHEDULER_ADDR = "0xe12dc15f646bC8Ba432254Ffe58595BC31eC2C4d" +SCHEDULER_ABI_FILE = "./src/Scheduler.abi" + +GPU_NAME2ID = { + 'NVIDIA GeForce RTX 4090': 0, + 'NVIDIA GeForce RTX 3060': 1, +} + diff --git a/src/Scheduler.abi b/src/Scheduler.abi new file mode 100644 index 0000000..9ae6e3b --- /dev/null +++ b/src/Scheduler.abi @@ -0,0 +1 @@ +[{"anonymous":false,"inputs":[{"indexed":false,"internalType":"address","name":"provider","type":"address"},{"indexed":false,"internalType":"uint256","name":"clusterIndex","type":"uint256"},{"indexed":false,"internalType":"uint256","name":"gpuId","type":"uint256"},{"indexed":false,"internalType":"uint256","name":"clusterSize","type":"uint256"}],"name":"RegisterCluster","type":"event"},{"anonymous":false,"inputs":[{"indexed":false,"internalType":"address","name":"provider","type":"address"},{"indexed":false,"internalType":"uint256","name":"taskIndex","type":"uint256"},{"indexed":false,"internalType":"string","name":"dataImage","type":"string"},{"indexed":false,"internalType":"string","name":"trainImage","type":"string"}],"name":"StartRun","type":"event"},{"inputs":[{"internalType":"uint256","name":"","type":"uint256"}],"name":"clusters","outputs":[{"internalType":"address","name":"provider","type":"address"},{"internalType":"uint256","name":"gpuId","type":"uint256"},{"internalType":"uint256","name":"clusterSize","type":"uint256"},{"internalType":"bool","name":"available","type":"bool"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"uint256","name":"clusterIndex","type":"uint256"}],"name":"getClusterInfo","outputs":[{"components":[{"internalType":"address","name":"provider","type":"address"},{"internalType":"uint256","name":"gpuId","type":"uint256"},{"internalType":"uint256","name":"clusterSize","type":"uint256"},{"internalType":"bool","name":"available","type":"bool"}],"internalType":"struct Scheduler.Cluster","name":"","type":"tuple"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"getClusters","outputs":[{"components":[{"internalType":"address","name":"provider","type":"address"},{"internalType":"uint256","name":"gpuId","type":"uint256"},{"internalType":"uint256","name":"clusterSize","type":"uint256"},{"internalType":"bool","name":"available","type":"bool"}],"internalType":"struct Scheduler.Cluster[]","name":"","type":"tuple[]"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"getNumberOfClusters","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"getTasks","outputs":[{"components":[{"internalType":"address","name":"provider","type":"address"},{"internalType":"address","name":"client","type":"address"},{"internalType":"uint256","name":"clusterIndex","type":"uint256"},{"internalType":"string","name":"dataImage","type":"string"},{"internalType":"string","name":"trainImage","type":"string"},{"internalType":"uint256","name":"status","type":"uint256"}],"internalType":"struct Scheduler.Task[]","name":"","type":"tuple[]"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"uint256","name":"gpuId","type":"uint256"},{"internalType":"uint256","name":"clusterSize","type":"uint256"}],"name":"registerCluster","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"string","name":"dataImage","type":"string"},{"internalType":"string","name":"trainImage","type":"string"},{"internalType":"uint256","name":"gpuId","type":"uint256"},{"internalType":"uint256","name":"clusterSize","type":"uint256"}],"name":"registerTaskWithConditions","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"uint256","name":"","type":"uint256"}],"name":"tasks","outputs":[{"internalType":"address","name":"provider","type":"address"},{"internalType":"address","name":"client","type":"address"},{"internalType":"uint256","name":"clusterIndex","type":"uint256"},{"internalType":"string","name":"dataImage","type":"string"},{"internalType":"string","name":"trainImage","type":"string"},{"internalType":"uint256","name":"status","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"uint256","name":"taskIndex","type":"uint256"},{"internalType":"uint256","name":"newStatus","type":"uint256"}],"name":"updateStatus","outputs":[],"stateMutability":"nonpayable","type":"function"}] \ No newline at end of file diff --git a/src/node_manager.py b/src/node_manager.py index a99a3f4..4e03f30 100644 --- a/src/node_manager.py +++ b/src/node_manager.py @@ -12,6 +12,7 @@ class NodeManager(): self.status = 'none' self.actions = [ {'explanation': 'Add another node into our cluster', 'function': 'add_node'}, + {'explanation': 'Start waiting for the new task', 'function': 'start_listen_task'}, {'explanation': 'Exit', 'function': 'exit'}, ] self.get_GPU_info() @@ -31,6 +32,8 @@ class NodeManager(): # web3 provider # if this is master, it should init a Web object. self.w3 = None + self.scheduler = None + self.wallet = None time.sleep(2) @@ -76,17 +79,32 @@ class NodeManager(): def start_listen_task(self): self.w3 = Web3(Web3.HTTPProvider(WEB3_PROVIDER_URL + WEB3_PROVIDER_KEY)) + self.scheduler = Scheduler(self.w3, SCHEDULER_ADDR, SCHEDULER_ABI_FILE) + self.wallet = self.w3.eth.account.from_key(WALLET_KEY) print(f"We have use {WEB3_PROVIDER_URL+WEB3_PROVIDER_KEY} as the web3 provider.") print(f"And we have load your wallet private key {WALLET_KEY}") print() if self.w3.is_connected(): print("Connected Successfully.") print() - print("Waiting for the new task from Sepolia testnet...") + + # Register the cluster + gpu_num = self.cluster_info() + gpu_id = GPU_NAME2ID[self.GPU] + print(f"\nWe will register this cluster({self.GPU} * {gpu_num})...") + receipt = self.scheduler.register_cluster(self.wallet, gpu_id, gpu_num) + event = self.scheduler.get_cluster_event(receipt) + print(f"\nOK. this is our cluster event on the blockchain {event[0]['args']}") + + for cluster in self.scheduler.get_clusters(): + print(cluster) + + # start waiting + print("\nWaiting for the new task from Sepolia testnet...") self.cluster_communication_module.start_listen_task() else: - print("Connected Failed.") + print("[ERROR] Connected Failed.") print("Please check for your provider key & wallet key") @@ -95,8 +113,11 @@ class NodeManager(): print(f"\nThere are {len(info)+1} nodes in this cluster.") print("Cluster Info:") print(f" {self.service_exploration_module.host}(local) -> {self.GPU} * {self.GPU_num}") + GPU_num = self.GPU_num for host in info: + GPU_num += host['GPU_num'] print(f" {host['host']} -> {host['GPU']} * {host['GPU_num']}") + return GPU_num def exit(self): self.cluster_communication_module.exit() diff --git a/src/scheduler.py b/src/scheduler.py index 9185869..5e9bcae 100644 --- a/src/scheduler.py +++ b/src/scheduler.py @@ -8,7 +8,7 @@ class Scheduler(): self.contract = self.w3.eth.contract(address=address, abi=abi) print(self.contract.functions) - def signedAndSendTransaction(self, account, function): + def signed_and_send_transaction(self, account, function): tx = function.build_transaction({ 'from': account.address, 'nonce': self.w3.eth.get_transaction_count(account.address) @@ -17,24 +17,24 @@ class Scheduler(): tx_hash = self.w3.eth.send_raw_transaction(signed_tx.rawTransaction) tx_receipt = self.w3.eth.wait_for_transaction_receipt(tx_hash) - _hash = tx_receipt['transactionHash'] - return _hash + return tx_receipt - def registerCluster(self, account, gpu_id: int, gpu_num: int): + def register_cluster(self, account, gpu_id: int, gpu_num: int): function = self.contract.functions.registerCluster(gpu_id, gpu_num) - return self.signedAndSendTransaction(account, function) + return self.signed_and_send_transaction(account, function) - def registerTaskWithConditions(self, account, data_image: str, train_image: str, gpu_id: int, cluster_size: int): + def register_task_with_conditions(self, account, data_image: str, train_image: str, gpu_id: int, cluster_size: int): function = self.contract.functions.registerTaskWithConditions(data_image, train_image, gpu_id, cluster_size) - return self.signedAndSendTransaction(account, function) + return self.signed_and_send_transaction(account, function) - def getClusters(self): + def get_clusters(self): return self.contract.functions.getClusters().call() - def getTasks(self): + def get_tasks(self): return self.contract.functions.getTasks().call() - def getStartRunEvent(self): + ''' + def get_start_run_event(self): event_filter = self.contract.events.StartRun.create_filter(fromBlock=0) events = event_filter.get_new_entries() print(events) @@ -43,4 +43,8 @@ class Scheduler(): event_filter = self.contract.events.RegisterCluster.create_filter(fromBlock=0) events = event_filter.get_new_entries() print(events) + ''' + def get_cluster_event(self, receipt): + event = self.contract.events.RegisterCluster().process_receipt(receipt) + return event