feat: include web3 for register the cluster

This commit is contained in:
Ting-Jun Wang 2024-05-31 03:17:10 +08:00
parent 2b1c8890a9
commit ba2b281b68
Signed by: snsd0805
GPG Key ID: 48D331A3D6160354
4 changed files with 47 additions and 12 deletions

View File

@ -7,3 +7,12 @@ WEB3_PROVIDER_KEY = ""
# Please write down your wallet key to sign a transaction with our contract.
WALLET_KEY = ""
# Don't change
SCHEDULER_ADDR = "0xe12dc15f646bC8Ba432254Ffe58595BC31eC2C4d"
SCHEDULER_ABI_FILE = "./src/Scheduler.abi"
GPU_NAME2ID = {
'NVIDIA GeForce RTX 4090': 0,
'NVIDIA GeForce RTX 3060': 1,
}

1
src/Scheduler.abi Normal file
View File

@ -0,0 +1 @@
[{"anonymous":false,"inputs":[{"indexed":false,"internalType":"address","name":"provider","type":"address"},{"indexed":false,"internalType":"uint256","name":"clusterIndex","type":"uint256"},{"indexed":false,"internalType":"uint256","name":"gpuId","type":"uint256"},{"indexed":false,"internalType":"uint256","name":"clusterSize","type":"uint256"}],"name":"RegisterCluster","type":"event"},{"anonymous":false,"inputs":[{"indexed":false,"internalType":"address","name":"provider","type":"address"},{"indexed":false,"internalType":"uint256","name":"taskIndex","type":"uint256"},{"indexed":false,"internalType":"string","name":"dataImage","type":"string"},{"indexed":false,"internalType":"string","name":"trainImage","type":"string"}],"name":"StartRun","type":"event"},{"inputs":[{"internalType":"uint256","name":"","type":"uint256"}],"name":"clusters","outputs":[{"internalType":"address","name":"provider","type":"address"},{"internalType":"uint256","name":"gpuId","type":"uint256"},{"internalType":"uint256","name":"clusterSize","type":"uint256"},{"internalType":"bool","name":"available","type":"bool"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"uint256","name":"clusterIndex","type":"uint256"}],"name":"getClusterInfo","outputs":[{"components":[{"internalType":"address","name":"provider","type":"address"},{"internalType":"uint256","name":"gpuId","type":"uint256"},{"internalType":"uint256","name":"clusterSize","type":"uint256"},{"internalType":"bool","name":"available","type":"bool"}],"internalType":"struct Scheduler.Cluster","name":"","type":"tuple"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"getClusters","outputs":[{"components":[{"internalType":"address","name":"provider","type":"address"},{"internalType":"uint256","name":"gpuId","type":"uint256"},{"internalType":"uint256","name":"clusterSize","type":"uint256"},{"internalType":"bool","name":"available","type":"bool"}],"internalType":"struct Scheduler.Cluster[]","name":"","type":"tuple[]"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"getNumberOfClusters","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"getTasks","outputs":[{"components":[{"internalType":"address","name":"provider","type":"address"},{"internalType":"address","name":"client","type":"address"},{"internalType":"uint256","name":"clusterIndex","type":"uint256"},{"internalType":"string","name":"dataImage","type":"string"},{"internalType":"string","name":"trainImage","type":"string"},{"internalType":"uint256","name":"status","type":"uint256"}],"internalType":"struct Scheduler.Task[]","name":"","type":"tuple[]"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"uint256","name":"gpuId","type":"uint256"},{"internalType":"uint256","name":"clusterSize","type":"uint256"}],"name":"registerCluster","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"string","name":"dataImage","type":"string"},{"internalType":"string","name":"trainImage","type":"string"},{"internalType":"uint256","name":"gpuId","type":"uint256"},{"internalType":"uint256","name":"clusterSize","type":"uint256"}],"name":"registerTaskWithConditions","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"uint256","name":"","type":"uint256"}],"name":"tasks","outputs":[{"internalType":"address","name":"provider","type":"address"},{"internalType":"address","name":"client","type":"address"},{"internalType":"uint256","name":"clusterIndex","type":"uint256"},{"internalType":"string","name":"dataImage","type":"string"},{"internalType":"string","name":"trainImage","type":"string"},{"internalType":"uint256","name":"status","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"uint256","name":"taskIndex","type":"uint256"},{"internalType":"uint256","name":"newStatus","type":"uint256"}],"name":"updateStatus","outputs":[],"stateMutability":"nonpayable","type":"function"}]

View File

@ -12,6 +12,7 @@ class NodeManager():
self.status = 'none'
self.actions = [
{'explanation': 'Add another node into our cluster', 'function': 'add_node'},
{'explanation': 'Start waiting for the new task', 'function': 'start_listen_task'},
{'explanation': 'Exit', 'function': 'exit'},
]
self.get_GPU_info()
@ -31,6 +32,8 @@ class NodeManager():
# web3 provider
# if this is master, it should init a Web object.
self.w3 = None
self.scheduler = None
self.wallet = None
time.sleep(2)
@ -76,17 +79,32 @@ class NodeManager():
def start_listen_task(self):
self.w3 = Web3(Web3.HTTPProvider(WEB3_PROVIDER_URL + WEB3_PROVIDER_KEY))
self.scheduler = Scheduler(self.w3, SCHEDULER_ADDR, SCHEDULER_ABI_FILE)
self.wallet = self.w3.eth.account.from_key(WALLET_KEY)
print(f"We have use {WEB3_PROVIDER_URL+WEB3_PROVIDER_KEY} as the web3 provider.")
print(f"And we have load your wallet private key {WALLET_KEY}")
print()
if self.w3.is_connected():
print("Connected Successfully.")
print()
print("Waiting for the new task from Sepolia testnet...")
# Register the cluster
gpu_num = self.cluster_info()
gpu_id = GPU_NAME2ID[self.GPU]
print(f"\nWe will register this cluster({self.GPU} * {gpu_num})...")
receipt = self.scheduler.register_cluster(self.wallet, gpu_id, gpu_num)
event = self.scheduler.get_cluster_event(receipt)
print(f"\nOK. this is our cluster event on the blockchain {event[0]['args']}")
for cluster in self.scheduler.get_clusters():
print(cluster)
# start waiting
print("\nWaiting for the new task from Sepolia testnet...")
self.cluster_communication_module.start_listen_task()
else:
print("Connected Failed.")
print("[ERROR] Connected Failed.")
print("Please check for your provider key & wallet key")
@ -95,8 +113,11 @@ class NodeManager():
print(f"\nThere are {len(info)+1} nodes in this cluster.")
print("Cluster Info:")
print(f" {self.service_exploration_module.host}(local) -> {self.GPU} * {self.GPU_num}")
GPU_num = self.GPU_num
for host in info:
GPU_num += host['GPU_num']
print(f" {host['host']} -> {host['GPU']} * {host['GPU_num']}")
return GPU_num
def exit(self):
self.cluster_communication_module.exit()

View File

@ -8,7 +8,7 @@ class Scheduler():
self.contract = self.w3.eth.contract(address=address, abi=abi)
print(self.contract.functions)
def signedAndSendTransaction(self, account, function):
def signed_and_send_transaction(self, account, function):
tx = function.build_transaction({
'from': account.address,
'nonce': self.w3.eth.get_transaction_count(account.address)
@ -17,24 +17,24 @@ class Scheduler():
tx_hash = self.w3.eth.send_raw_transaction(signed_tx.rawTransaction)
tx_receipt = self.w3.eth.wait_for_transaction_receipt(tx_hash)
_hash = tx_receipt['transactionHash']
return _hash
return tx_receipt
def registerCluster(self, account, gpu_id: int, gpu_num: int):
def register_cluster(self, account, gpu_id: int, gpu_num: int):
function = self.contract.functions.registerCluster(gpu_id, gpu_num)
return self.signedAndSendTransaction(account, function)
return self.signed_and_send_transaction(account, function)
def registerTaskWithConditions(self, account, data_image: str, train_image: str, gpu_id: int, cluster_size: int):
def register_task_with_conditions(self, account, data_image: str, train_image: str, gpu_id: int, cluster_size: int):
function = self.contract.functions.registerTaskWithConditions(data_image, train_image, gpu_id, cluster_size)
return self.signedAndSendTransaction(account, function)
return self.signed_and_send_transaction(account, function)
def getClusters(self):
def get_clusters(self):
return self.contract.functions.getClusters().call()
def getTasks(self):
def get_tasks(self):
return self.contract.functions.getTasks().call()
def getStartRunEvent(self):
'''
def get_start_run_event(self):
event_filter = self.contract.events.StartRun.create_filter(fromBlock=0)
events = event_filter.get_new_entries()
print(events)
@ -43,4 +43,8 @@ class Scheduler():
event_filter = self.contract.events.RegisterCluster.create_filter(fromBlock=0)
events = event_filter.get_new_entries()
print(events)
'''
def get_cluster_event(self, receipt):
event = self.contract.events.RegisterCluster().process_receipt(receipt)
return event