diff --git a/src/communication.py b/src/communication.py index 07ff4b8..046d14c 100644 --- a/src/communication.py +++ b/src/communication.py @@ -112,6 +112,12 @@ class ClusterCommunicationModule(): elif command == '[START_LISTEN_TASK]': self.client_sock.send('[START_LISTEN_TASK_CHECK] {}'.encode()) print("The master has started listening for new task from Sepolia testnet...") + elif command == '[RUN_CONTAINER]': + image = json.loads(args)['image'] + print(f"[RUN_CONTAINER] {image}") + time.sleep(3) + + self.client_sock.send('[RUN_CONTAINER_SUCCESS] {}'.encode()) return True @@ -181,36 +187,45 @@ class ClusterCommunicationModule(): def run_container(self, image_name): def master_run(image_name): - print("Master run") + print("[Master] run") for i in range(5): print(i) time.sleep(1) - print("Master finished") + print("[Master] finished") def send_and_wait(conn_index, conn, image_name): - print(f"Worker {conn_index} run") - for i in range(7): - print(f"[WORKER {conn_index}] {i}") - time.sleep(1) - print(f"Worker {conn_index} finished") + try: + # build command + data = {'image': image_name} + command = '[RUN_CONTAINER] {}'.format(json.dumps(data)) + + # send + conn.send(command.encode()) + print(f"[WORKER {conn_index}] Send command {command}") + + # wait + data, args = conn.recv(1024).decode().split(' ') + print(f"[WORKER {conn_index}] finished") + + except: + print("[WARN] connection {conn_index} disconnected.") + # build threads threads = [] master_t = threading.Thread(target=master_run, args=(image_name, )) threads.append(master_t) for index, conn in enumerate(self.worker_conns): - data = {'image': image_name} - command = '[RUN_CONTAINER] {}'.format(json.dumps(data)) - print(command) - t = threading.Thread(target=send_and_wait, args=(index, conn, image_name)) threads.append(t) + # start threads & wait for thread in threads: thread.start() for thread in threads: thread.join() + # all finished print("[INFO] All workers finished.")