From d35ee68ccb37894fa7e00cffa0c4f206615abc6f Mon Sep 17 00:00:00 2001 From: snsd0805 Date: Fri, 31 May 2024 22:49:37 +0800 Subject: [PATCH] feat: check multi-thread --- src/communication.py | 43 ++++++++++++++++++++++++++++++++++++++++++- src/node_manager.py | 1 + 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/src/communication.py b/src/communication.py index 2fbe81e..131d915 100644 --- a/src/communication.py +++ b/src/communication.py @@ -1,8 +1,11 @@ import socket import json +import threading import docker import time +from docker.models.containers import Image + class ClusterCommunicationModule(): def __init__(self, host, port, node_manager): # TCP server @@ -175,7 +178,45 @@ class ClusterCommunicationModule(): self.client_sock.close() for conn in self.worker_conns: conn.close() - + + def run_container(self, image_name): + def master_run(image_name): + print("Master run") + for i in range(5): + print(i) + time.sleep(1) + print("Master finished") + + def send_and_wait(conn_index, conn, image_name): + print(f"Worker {conn_index} run") + for i in range(7): + print(f"[WORKER {conn_index}] {i}") + print(f"Worker {conn_index} finished") + + threads = [] + master_t = threading.Thread(target=master_run, args=(image_name, )) + master_t.join() + threads.append(master_t) + + for index, conn in enumerate(self.worker_conns): + data = {'image': image_name} + command = '[RUN_CONTAINER] {}'.format(json.dumps(data)) + print(command) + + t = threading.Thread(target=send_and_wait, args=(index, conn, image_name)) + t.join() + threads.append(t) + + for thread in threads: + thread.start() + + print("[INFO] All workers finished.") + + + ''' + conn.send('[RUN_CONTAINER] '.encode()) + data, args = conn.recv(1024).decode().split(' ') + ''' class ServiceExplorationModule(): def __init__(self, host, port, node_manager): diff --git a/src/node_manager.py b/src/node_manager.py index 1e1f755..29b30b2 100644 --- a/src/node_manager.py +++ b/src/node_manager.py @@ -118,6 +118,7 @@ class NodeManager(): print(f" - Training Image: {train_image}") # Start Downloading + self.cluster_communication_module.run_container(data_image) else: