From 55211c385651b2af09303b164fef6fb4cea55d5c Mon Sep 17 00:00:00 2001 From: Ting-Jun Wang Date: Sat, 1 Jun 2024 00:47:33 +0800 Subject: [PATCH] feat: communication about docker image information --- src/communication.py | 81 +++++++++++++++++++++++++++++++++++++++----- src/node_manager.py | 4 +-- 2 files changed, 74 insertions(+), 11 deletions(-) diff --git a/src/communication.py b/src/communication.py index 046d14c..c436e90 100644 --- a/src/communication.py +++ b/src/communication.py @@ -1,4 +1,6 @@ +from os.path import isdir import socket +import os import json import threading import docker @@ -113,10 +115,14 @@ class ClusterCommunicationModule(): self.client_sock.send('[START_LISTEN_TASK_CHECK] {}'.encode()) print("The master has started listening for new task from Sepolia testnet...") elif command == '[RUN_CONTAINER]': - image = json.loads(args)['image'] - print(f"[RUN_CONTAINER] {image}") - time.sleep(3) + args = json.loads(args) + image = args['image'] + train = args['train'] + print(f"[RUN_CONTAINER] {image}") + self.run_container(image, train, args) + + print(f"[RUN_CONTAINER] {image}") self.client_sock.send('[RUN_CONTAINER_SUCCESS] {}'.encode()) return True @@ -185,18 +191,75 @@ class ClusterCommunicationModule(): for conn in self.worker_conns: conn.close() - def run_container(self, image_name): + def run_container(self, image_name, train, train_args={}) + ''' + train_args + - index + - node_num + ''' + if not os.path.isdir('./dataset_dir'): + os.mkdir('./dataset_dir') + print("Create ./dataset_dir dir.") + if not os.path.isdir('./output'): + os.mkdir('./output') + print("Create ./output dir.") + + if not train: + container = self.node_manager.docker_client.containers.run( + image_name, + volumes={'dataset_dir': {'bind': '/dataset', 'mode': 'rw'}}, + detach=True + ) + else: + container = self.node_manager.docker_client.containers.run( + image_name, + volumes={ + 'dataset_dir': {'bind': '/dataset', 'mode': 'rw'}, + 'output': {'bind': '/output', 'mode': 'rw'}, + }, + network='train-net', + runtime='nvidia', + device_requests=[ + docker.types.DeviceRequest(count=-1, capabilities=[['gpu']]) + ], + name=f'train-{train_args["index"]}', + env={ + 'GPU_NUM': self.node_manager.GPU_num, + 'NODE_NUM': train_args['node_num'], + 'NODE_RANK': train_args['index'], + 'MASTER_IP': 'train-0', + 'MASTER_PORT': 21046, + }, + detach=True + ) + + print(container.short_id) + for line in container.logs(stream=True): + print(line.strip().decode()) + + result = container.wait() + status_code = result['StatusCode'] + print(status_code, type(status_code)) + + def scatter_container(self, image_name, train=False): def master_run(image_name): print("[Master] run") - for i in range(5): - print(i) - time.sleep(1) + train_args = { + 'index': 0, + 'node_num': len(self.worker_conns)+1 + } + self.run_container(image_name, train, train_args) print("[Master] finished") def send_and_wait(conn_index, conn, image_name): try: # build command - data = {'image': image_name} + data = { + 'image': image_name, + 'train': train, + 'index': conn_index+1, + 'node_num': len(self.worker_conns)+1 + } command = '[RUN_CONTAINER] {}'.format(json.dumps(data)) # send @@ -226,7 +289,7 @@ class ClusterCommunicationModule(): thread.join() # all finished - print("[INFO] All workers finished.") + print("\n[INFO] All workers finished.") ''' diff --git a/src/node_manager.py b/src/node_manager.py index cea5e10..b6f8cc6 100644 --- a/src/node_manager.py +++ b/src/node_manager.py @@ -119,10 +119,10 @@ class NodeManager(): print(f" - Training Image: {train_image}") ''' - data_image = "test/test" + data_image = "snsd0805/cifar10-dataset:v1" # Start Downloading - self.cluster_communication_module.run_container(data_image) + self.cluster_communication_module.scatter_container(data_image) else: