Compare commits
No commits in common. "8e9f6b47cf519c6ea6f6e53963ac52ef94bf4602" and "a96c6d968e17a1d871326cc4dec28c9685fa61aa" have entirely different histories.
8e9f6b47cf
...
a96c6d968e
5
.gitignore
vendored
5
.gitignore
vendored
@ -1,5 +0,0 @@
|
|||||||
*/__pycache__
|
|
||||||
__pycache__
|
|
||||||
output/
|
|
||||||
dataset_dir/
|
|
||||||
env/
|
|
||||||
18
constant.py
18
constant.py
@ -1,18 +0,0 @@
|
|||||||
# web3 provider information
|
|
||||||
# include provider HTTPS endpoint URL & its API Key.
|
|
||||||
# Alchemy is a good choice for Web3 provider
|
|
||||||
WEB3_PROVIDER_URL = "https://eth-sepolia.g.alchemy.com/v2/"
|
|
||||||
WEB3_PROVIDER_KEY = ""
|
|
||||||
|
|
||||||
# Please write down your wallet key to sign a transaction with our contract.
|
|
||||||
WALLET_KEY = ""
|
|
||||||
|
|
||||||
# Don't change
|
|
||||||
SCHEDULER_ADDR = "0xe12dc15f646bC8Ba432254Ffe58595BC31eC2C4d"
|
|
||||||
SCHEDULER_ABI_FILE = "./src/Scheduler.abi"
|
|
||||||
|
|
||||||
GPU_NAME2ID = {
|
|
||||||
'NVIDIA GeForce RTX 4090': 0,
|
|
||||||
'NVIDIA GeForce RTX 3060': 1,
|
|
||||||
}
|
|
||||||
|
|
||||||
@ -1 +0,0 @@
|
|||||||
[{"anonymous":false,"inputs":[{"indexed":false,"internalType":"address","name":"provider","type":"address"},{"indexed":false,"internalType":"uint256","name":"clusterIndex","type":"uint256"},{"indexed":false,"internalType":"uint256","name":"gpuId","type":"uint256"},{"indexed":false,"internalType":"uint256","name":"clusterSize","type":"uint256"}],"name":"RegisterCluster","type":"event"},{"anonymous":false,"inputs":[{"indexed":false,"internalType":"address","name":"provider","type":"address"},{"indexed":false,"internalType":"uint256","name":"taskIndex","type":"uint256"},{"indexed":false,"internalType":"string","name":"dataImage","type":"string"},{"indexed":false,"internalType":"string","name":"trainImage","type":"string"}],"name":"StartRun","type":"event"},{"inputs":[{"internalType":"uint256","name":"","type":"uint256"}],"name":"clusters","outputs":[{"internalType":"address","name":"provider","type":"address"},{"internalType":"uint256","name":"gpuId","type":"uint256"},{"internalType":"uint256","name":"clusterSize","type":"uint256"},{"internalType":"bool","name":"available","type":"bool"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"uint256","name":"clusterIndex","type":"uint256"}],"name":"getClusterInfo","outputs":[{"components":[{"internalType":"address","name":"provider","type":"address"},{"internalType":"uint256","name":"gpuId","type":"uint256"},{"internalType":"uint256","name":"clusterSize","type":"uint256"},{"internalType":"bool","name":"available","type":"bool"}],"internalType":"struct Scheduler.Cluster","name":"","type":"tuple"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"getClusters","outputs":[{"components":[{"internalType":"address","name":"provider","type":"address"},{"internalType":"uint256","name":"gpuId","type":"uint256"},{"internalType":"uint256","name":"clusterSize","type":"uint256"},{"internalType":"bool","name":"available","type":"bool"}],"internalType":"struct Scheduler.Cluster[]","name":"","type":"tuple[]"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"getNumberOfClusters","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"getTasks","outputs":[{"components":[{"internalType":"address","name":"provider","type":"address"},{"internalType":"address","name":"client","type":"address"},{"internalType":"uint256","name":"clusterIndex","type":"uint256"},{"internalType":"string","name":"dataImage","type":"string"},{"internalType":"string","name":"trainImage","type":"string"},{"internalType":"uint256","name":"status","type":"uint256"}],"internalType":"struct Scheduler.Task[]","name":"","type":"tuple[]"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"uint256","name":"gpuId","type":"uint256"},{"internalType":"uint256","name":"clusterSize","type":"uint256"}],"name":"registerCluster","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"string","name":"dataImage","type":"string"},{"internalType":"string","name":"trainImage","type":"string"},{"internalType":"uint256","name":"gpuId","type":"uint256"},{"internalType":"uint256","name":"clusterSize","type":"uint256"}],"name":"registerTaskWithConditions","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"uint256","name":"","type":"uint256"}],"name":"tasks","outputs":[{"internalType":"address","name":"provider","type":"address"},{"internalType":"address","name":"client","type":"address"},{"internalType":"uint256","name":"clusterIndex","type":"uint256"},{"internalType":"string","name":"dataImage","type":"string"},{"internalType":"string","name":"trainImage","type":"string"},{"internalType":"uint256","name":"status","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"uint256","name":"taskIndex","type":"uint256"},{"internalType":"uint256","name":"newStatus","type":"uint256"}],"name":"updateStatus","outputs":[],"stateMutability":"nonpayable","type":"function"}]
|
|
||||||
@ -106,11 +106,6 @@ class ClusterCommunicationModule():
|
|||||||
print("Please Enter to continued")
|
print("Please Enter to continued")
|
||||||
|
|
||||||
return False
|
return False
|
||||||
elif command == '[START_LISTEN_TASK]':
|
|
||||||
self.client_sock.send('[START_LISTEN_TASK_CHECK] {}'.encode())
|
|
||||||
print("The master has started listening for new task from Sepolia testnet...")
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
def request(self, host): # master side
|
def request(self, host): # master side
|
||||||
@ -146,20 +141,6 @@ class ClusterCommunicationModule():
|
|||||||
print("1 worker disconnnected.")
|
print("1 worker disconnnected.")
|
||||||
return ans
|
return ans
|
||||||
|
|
||||||
def start_listen_task(self):
|
|
||||||
disconnected_counter = 0
|
|
||||||
for conn in self.worker_conns:
|
|
||||||
try:
|
|
||||||
conn.send('[START_LISTEN_TASK] {}'.encode())
|
|
||||||
data, args = conn.recv(1024).decode().split(' ')
|
|
||||||
except:
|
|
||||||
disconnected_counter += 1
|
|
||||||
self.worker_conns.remove(conn)
|
|
||||||
if disconnected_counter != 0:
|
|
||||||
print(f'{disconnected_counter} worker disconnected.')
|
|
||||||
else:
|
|
||||||
print(f'All other {len(self.worker_conns)} workers are waiting...')
|
|
||||||
|
|
||||||
def exit(self):
|
def exit(self):
|
||||||
if self.node_manager.status == 'master':
|
if self.node_manager.status == 'master':
|
||||||
for conn in self.worker_conns:
|
for conn in self.worker_conns:
|
||||||
|
|||||||
@ -3,16 +3,12 @@ from src.communication import ServiceExplorationModule, ClusterCommunicationModu
|
|||||||
import torch
|
import torch
|
||||||
import time
|
import time
|
||||||
import docker
|
import docker
|
||||||
from web3 import Web3
|
|
||||||
from src.scheduler import Scheduler
|
|
||||||
from constant import *
|
|
||||||
|
|
||||||
class NodeManager():
|
class NodeManager():
|
||||||
def __init__(self, host, port):
|
def __init__(self, host, port):
|
||||||
self.status = 'none'
|
self.status = 'none'
|
||||||
self.actions = [
|
self.actions = [
|
||||||
{'explanation': 'Add another node into our cluster', 'function': 'add_node'},
|
{'explanation': 'Add another node into our cluster', 'function': 'add_node'},
|
||||||
{'explanation': 'Start waiting for the new task', 'function': 'start_listen_task'},
|
|
||||||
{'explanation': 'Exit', 'function': 'exit'},
|
{'explanation': 'Exit', 'function': 'exit'},
|
||||||
]
|
]
|
||||||
self.get_GPU_info()
|
self.get_GPU_info()
|
||||||
@ -29,12 +25,6 @@ class NodeManager():
|
|||||||
# docker client
|
# docker client
|
||||||
self.docker_client = docker.from_env()
|
self.docker_client = docker.from_env()
|
||||||
|
|
||||||
# web3 provider
|
|
||||||
# if this is master, it should init a Web object.
|
|
||||||
self.w3 = None
|
|
||||||
self.scheduler = None
|
|
||||||
self.wallet = None
|
|
||||||
|
|
||||||
time.sleep(2)
|
time.sleep(2)
|
||||||
|
|
||||||
def get_GPU_info(self):
|
def get_GPU_info(self):
|
||||||
@ -69,7 +59,7 @@ class NodeManager():
|
|||||||
self.actions = [
|
self.actions = [
|
||||||
{'explanation': 'Add another node into our cluster', 'function': 'add_node'},
|
{'explanation': 'Add another node into our cluster', 'function': 'add_node'},
|
||||||
{'explanation': 'Cluster info', 'function': 'cluster_info'},
|
{'explanation': 'Cluster info', 'function': 'cluster_info'},
|
||||||
{'explanation': 'Start waiting for the new task', 'function': 'start_listen_task'},
|
{'explanation': 'Start working', 'function': 'start_work'},
|
||||||
{'explanation': 'Exit', 'function': 'exit'},
|
{'explanation': 'Exit', 'function': 'exit'},
|
||||||
]
|
]
|
||||||
except:
|
except:
|
||||||
@ -77,49 +67,13 @@ class NodeManager():
|
|||||||
else:
|
else:
|
||||||
print("No other nodes in your subnet.")
|
print("No other nodes in your subnet.")
|
||||||
|
|
||||||
def start_listen_task(self):
|
|
||||||
self.w3 = Web3(Web3.HTTPProvider(WEB3_PROVIDER_URL + WEB3_PROVIDER_KEY))
|
|
||||||
self.scheduler = Scheduler(self.w3, SCHEDULER_ADDR, SCHEDULER_ABI_FILE)
|
|
||||||
self.wallet = self.w3.eth.account.from_key(WALLET_KEY)
|
|
||||||
print(f"We have use {WEB3_PROVIDER_URL+WEB3_PROVIDER_KEY} as the web3 provider.")
|
|
||||||
print(f"And we have load your wallet private key {WALLET_KEY} (address={self.wallet.address})")
|
|
||||||
print()
|
|
||||||
if self.w3.is_connected():
|
|
||||||
print("Connected Successfully.")
|
|
||||||
print()
|
|
||||||
|
|
||||||
# Register the cluster
|
|
||||||
gpu_num = self.cluster_info()
|
|
||||||
gpu_id = GPU_NAME2ID[self.GPU]
|
|
||||||
print(f"\nWe will register this cluster({self.GPU} * {gpu_num})...")
|
|
||||||
receipt = self.scheduler.register_cluster(self.wallet, gpu_id, gpu_num)
|
|
||||||
event = self.scheduler.get_cluster_event(receipt)
|
|
||||||
print(f"\nOK. this is our cluster event on the blockchain {event[0]['args']}")
|
|
||||||
|
|
||||||
for cluster in self.scheduler.get_clusters():
|
|
||||||
print(cluster)
|
|
||||||
|
|
||||||
# start waiting
|
|
||||||
print("\nWaiting for the new task from Sepolia testnet...")
|
|
||||||
self.cluster_communication_module.start_listen_task()
|
|
||||||
next_task = self.scheduler.listen_task(self.wallet.address)
|
|
||||||
print(next_task)
|
|
||||||
|
|
||||||
else:
|
|
||||||
print("[ERROR] Connected Failed.")
|
|
||||||
print("Please check for your provider key & wallet key")
|
|
||||||
|
|
||||||
|
|
||||||
def cluster_info(self):
|
def cluster_info(self):
|
||||||
info = self.cluster_communication_module.cluster_info()
|
info = self.cluster_communication_module.cluster_info()
|
||||||
print(f"\nThere are {len(info)+1} nodes in this cluster.")
|
print(f"\nThere are {len(info)+1} nodes in this cluster.")
|
||||||
print("Cluster Info:")
|
print("Cluster Info:")
|
||||||
print(f" {self.service_exploration_module.host}(local) -> {self.GPU} * {self.GPU_num}")
|
print(f" {self.service_exploration_module.host}(local) -> {self.GPU} * {self.GPU_num}")
|
||||||
GPU_num = self.GPU_num
|
|
||||||
for host in info:
|
for host in info:
|
||||||
GPU_num += host['GPU_num']
|
|
||||||
print(f" {host['host']} -> {host['GPU']} * {host['GPU_num']}")
|
print(f" {host['host']} -> {host['GPU']} * {host['GPU_num']}")
|
||||||
return GPU_num
|
|
||||||
|
|
||||||
def exit(self):
|
def exit(self):
|
||||||
self.cluster_communication_module.exit()
|
self.cluster_communication_module.exit()
|
||||||
|
|||||||
@ -8,7 +8,7 @@ class Scheduler():
|
|||||||
self.contract = self.w3.eth.contract(address=address, abi=abi)
|
self.contract = self.w3.eth.contract(address=address, abi=abi)
|
||||||
print(self.contract.functions)
|
print(self.contract.functions)
|
||||||
|
|
||||||
def signed_and_send_transaction(self, account, function):
|
def signedAndSendTransaction(self, account, function):
|
||||||
tx = function.build_transaction({
|
tx = function.build_transaction({
|
||||||
'from': account.address,
|
'from': account.address,
|
||||||
'nonce': self.w3.eth.get_transaction_count(account.address)
|
'nonce': self.w3.eth.get_transaction_count(account.address)
|
||||||
@ -17,24 +17,24 @@ class Scheduler():
|
|||||||
|
|
||||||
tx_hash = self.w3.eth.send_raw_transaction(signed_tx.rawTransaction)
|
tx_hash = self.w3.eth.send_raw_transaction(signed_tx.rawTransaction)
|
||||||
tx_receipt = self.w3.eth.wait_for_transaction_receipt(tx_hash)
|
tx_receipt = self.w3.eth.wait_for_transaction_receipt(tx_hash)
|
||||||
return tx_receipt
|
_hash = tx_receipt['transactionHash']
|
||||||
|
return _hash
|
||||||
|
|
||||||
def register_cluster(self, account, gpu_id: int, gpu_num: int):
|
def registerCluster(self, account, gpu_id: int, gpu_num: int):
|
||||||
function = self.contract.functions.registerCluster(gpu_id, gpu_num)
|
function = self.contract.functions.registerCluster(gpu_id, gpu_num)
|
||||||
return self.signed_and_send_transaction(account, function)
|
return self.signedAndSendTransaction(account, function)
|
||||||
|
|
||||||
def register_task_with_conditions(self, account, data_image: str, train_image: str, gpu_id: int, cluster_size: int):
|
def registerTaskWithConditions(self, account, data_image: str, train_image: str, gpu_id: int, cluster_size: int):
|
||||||
function = self.contract.functions.registerTaskWithConditions(data_image, train_image, gpu_id, cluster_size)
|
function = self.contract.functions.registerTaskWithConditions(data_image, train_image, gpu_id, cluster_size)
|
||||||
return self.signed_and_send_transaction(account, function)
|
return self.signedAndSendTransaction(account, function)
|
||||||
|
|
||||||
def get_clusters(self):
|
def getClusters(self):
|
||||||
return self.contract.functions.getClusters().call()
|
return self.contract.functions.getClusters().call()
|
||||||
|
|
||||||
def get_tasks(self):
|
def getTasks(self):
|
||||||
return self.contract.functions.getTasks().call()
|
return self.contract.functions.getTasks().call()
|
||||||
|
|
||||||
'''
|
def getStartRunEvent(self):
|
||||||
def get_start_run_event(self):
|
|
||||||
event_filter = self.contract.events.StartRun.create_filter(fromBlock=0)
|
event_filter = self.contract.events.StartRun.create_filter(fromBlock=0)
|
||||||
events = event_filter.get_new_entries()
|
events = event_filter.get_new_entries()
|
||||||
print(events)
|
print(events)
|
||||||
@ -43,17 +43,4 @@ class Scheduler():
|
|||||||
event_filter = self.contract.events.RegisterCluster.create_filter(fromBlock=0)
|
event_filter = self.contract.events.RegisterCluster.create_filter(fromBlock=0)
|
||||||
events = event_filter.get_new_entries()
|
events = event_filter.get_new_entries()
|
||||||
print(events)
|
print(events)
|
||||||
'''
|
|
||||||
|
|
||||||
def get_cluster_event(self, receipt):
|
|
||||||
event = self.contract.events.RegisterCluster().process_receipt(receipt)
|
|
||||||
return event
|
|
||||||
|
|
||||||
def listen_task(self, address):
|
|
||||||
event_filter = self.contract.events.StartRun.create_filter(fromBlock='latest')
|
|
||||||
while True:
|
|
||||||
for event in event_filter.get_new_entries():
|
|
||||||
if event['args']['provider'] == address:
|
|
||||||
return event
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user