Compare commits

..

2 Commits

2 changed files with 32 additions and 12 deletions

View File

@ -112,6 +112,12 @@ class ClusterCommunicationModule():
elif command == '[START_LISTEN_TASK]': elif command == '[START_LISTEN_TASK]':
self.client_sock.send('[START_LISTEN_TASK_CHECK] {}'.encode()) self.client_sock.send('[START_LISTEN_TASK_CHECK] {}'.encode())
print("The master has started listening for new task from Sepolia testnet...") print("The master has started listening for new task from Sepolia testnet...")
elif command == '[RUN_CONTAINER]':
image = json.loads(args)['image']
print(f"[RUN_CONTAINER] {image}")
time.sleep(3)
self.client_sock.send('[RUN_CONTAINER_SUCCESS] {}'.encode())
return True return True
@ -181,35 +187,45 @@ class ClusterCommunicationModule():
def run_container(self, image_name): def run_container(self, image_name):
def master_run(image_name): def master_run(image_name):
print("Master run") print("[Master] run")
for i in range(5): for i in range(5):
print(i) print(i)
time.sleep(1) time.sleep(1)
print("Master finished") print("[Master] finished")
def send_and_wait(conn_index, conn, image_name): def send_and_wait(conn_index, conn, image_name):
print(f"Worker {conn_index} run") try:
for i in range(7): # build command
print(f"[WORKER {conn_index}] {i}") data = {'image': image_name}
print(f"Worker {conn_index} finished") command = '[RUN_CONTAINER] {}'.format(json.dumps(data))
# send
conn.send(command.encode())
print(f"[WORKER {conn_index}] Send command {command}")
# wait
data, args = conn.recv(1024).decode().split(' ')
print(f"[WORKER {conn_index}] finished")
except:
print("[WARN] connection {conn_index} disconnected.")
# build threads
threads = [] threads = []
master_t = threading.Thread(target=master_run, args=(image_name, )) master_t = threading.Thread(target=master_run, args=(image_name, ))
master_t.join()
threads.append(master_t) threads.append(master_t)
for index, conn in enumerate(self.worker_conns): for index, conn in enumerate(self.worker_conns):
data = {'image': image_name}
command = '[RUN_CONTAINER] {}'.format(json.dumps(data))
print(command)
t = threading.Thread(target=send_and_wait, args=(index, conn, image_name)) t = threading.Thread(target=send_and_wait, args=(index, conn, image_name))
t.join()
threads.append(t) threads.append(t)
# start threads & wait
for thread in threads: for thread in threads:
thread.start() thread.start()
for thread in threads:
thread.join()
# all finished
print("[INFO] All workers finished.") print("[INFO] All workers finished.")

View File

@ -85,6 +85,7 @@ class NodeManager():
print(f"And we have load your wallet private key {WALLET_KEY} (address={self.wallet.address})") print(f"And we have load your wallet private key {WALLET_KEY} (address={self.wallet.address})")
print() print()
if self.w3.is_connected(): if self.w3.is_connected():
'''
print("[INFO] Connected Successfully.") print("[INFO] Connected Successfully.")
print() print()
@ -116,6 +117,9 @@ class NodeManager():
print("\n[INFO] You Receive a new task:") print("\n[INFO] You Receive a new task:")
print(f" - Download Image: {data_image}") print(f" - Download Image: {data_image}")
print(f" - Training Image: {train_image}") print(f" - Training Image: {train_image}")
'''
data_image = "test/test"
# Start Downloading # Start Downloading
self.cluster_communication_module.run_container(data_image) self.cluster_communication_module.run_container(data_image)