feat: communication about image scatter
This commit is contained in:
parent
7f650d0673
commit
3184669630
@ -112,6 +112,12 @@ class ClusterCommunicationModule():
|
|||||||
elif command == '[START_LISTEN_TASK]':
|
elif command == '[START_LISTEN_TASK]':
|
||||||
self.client_sock.send('[START_LISTEN_TASK_CHECK] {}'.encode())
|
self.client_sock.send('[START_LISTEN_TASK_CHECK] {}'.encode())
|
||||||
print("The master has started listening for new task from Sepolia testnet...")
|
print("The master has started listening for new task from Sepolia testnet...")
|
||||||
|
elif command == '[RUN_CONTAINER]':
|
||||||
|
image = json.loads(args)['image']
|
||||||
|
print(f"[RUN_CONTAINER] {image}")
|
||||||
|
time.sleep(3)
|
||||||
|
|
||||||
|
self.client_sock.send('[RUN_CONTAINER_SUCCESS] {}'.encode())
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@ -181,36 +187,45 @@ class ClusterCommunicationModule():
|
|||||||
|
|
||||||
def run_container(self, image_name):
|
def run_container(self, image_name):
|
||||||
def master_run(image_name):
|
def master_run(image_name):
|
||||||
print("Master run")
|
print("[Master] run")
|
||||||
for i in range(5):
|
for i in range(5):
|
||||||
print(i)
|
print(i)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
print("Master finished")
|
print("[Master] finished")
|
||||||
|
|
||||||
def send_and_wait(conn_index, conn, image_name):
|
def send_and_wait(conn_index, conn, image_name):
|
||||||
print(f"Worker {conn_index} run")
|
try:
|
||||||
for i in range(7):
|
# build command
|
||||||
print(f"[WORKER {conn_index}] {i}")
|
data = {'image': image_name}
|
||||||
time.sleep(1)
|
command = '[RUN_CONTAINER] {}'.format(json.dumps(data))
|
||||||
print(f"Worker {conn_index} finished")
|
|
||||||
|
|
||||||
|
# send
|
||||||
|
conn.send(command.encode())
|
||||||
|
print(f"[WORKER {conn_index}] Send command {command}")
|
||||||
|
|
||||||
|
# wait
|
||||||
|
data, args = conn.recv(1024).decode().split(' ')
|
||||||
|
print(f"[WORKER {conn_index}] finished")
|
||||||
|
|
||||||
|
except:
|
||||||
|
print("[WARN] connection {conn_index} disconnected.")
|
||||||
|
|
||||||
|
# build threads
|
||||||
threads = []
|
threads = []
|
||||||
master_t = threading.Thread(target=master_run, args=(image_name, ))
|
master_t = threading.Thread(target=master_run, args=(image_name, ))
|
||||||
threads.append(master_t)
|
threads.append(master_t)
|
||||||
|
|
||||||
for index, conn in enumerate(self.worker_conns):
|
for index, conn in enumerate(self.worker_conns):
|
||||||
data = {'image': image_name}
|
|
||||||
command = '[RUN_CONTAINER] {}'.format(json.dumps(data))
|
|
||||||
print(command)
|
|
||||||
|
|
||||||
t = threading.Thread(target=send_and_wait, args=(index, conn, image_name))
|
t = threading.Thread(target=send_and_wait, args=(index, conn, image_name))
|
||||||
threads.append(t)
|
threads.append(t)
|
||||||
|
|
||||||
|
# start threads & wait
|
||||||
for thread in threads:
|
for thread in threads:
|
||||||
thread.start()
|
thread.start()
|
||||||
for thread in threads:
|
for thread in threads:
|
||||||
thread.join()
|
thread.join()
|
||||||
|
|
||||||
|
# all finished
|
||||||
print("[INFO] All workers finished.")
|
print("[INFO] All workers finished.")
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user