feat: communication about docker image information

This commit is contained in:
Ting-Jun Wang 2024-06-01 00:47:33 +08:00
parent 3184669630
commit 55211c3856
Signed by: snsd0805
GPG Key ID: 48D331A3D6160354
2 changed files with 74 additions and 11 deletions

View File

@ -1,4 +1,6 @@
from os.path import isdir
import socket
import os
import json
import threading
import docker
@ -113,10 +115,14 @@ class ClusterCommunicationModule():
self.client_sock.send('[START_LISTEN_TASK_CHECK] {}'.encode())
print("The master has started listening for new task from Sepolia testnet...")
elif command == '[RUN_CONTAINER]':
image = json.loads(args)['image']
print(f"[RUN_CONTAINER] {image}")
time.sleep(3)
args = json.loads(args)
image = args['image']
train = args['train']
print(f"[RUN_CONTAINER] {image}")
self.run_container(image, train, args)
print(f"[RUN_CONTAINER] {image}")
self.client_sock.send('[RUN_CONTAINER_SUCCESS] {}'.encode())
return True
@ -185,18 +191,75 @@ class ClusterCommunicationModule():
for conn in self.worker_conns:
conn.close()
def run_container(self, image_name):
def run_container(self, image_name, train, train_args={})
'''
train_args
- index
- node_num
'''
if not os.path.isdir('./dataset_dir'):
os.mkdir('./dataset_dir')
print("Create ./dataset_dir dir.")
if not os.path.isdir('./output'):
os.mkdir('./output')
print("Create ./output dir.")
if not train:
container = self.node_manager.docker_client.containers.run(
image_name,
volumes={'dataset_dir': {'bind': '/dataset', 'mode': 'rw'}},
detach=True
)
else:
container = self.node_manager.docker_client.containers.run(
image_name,
volumes={
'dataset_dir': {'bind': '/dataset', 'mode': 'rw'},
'output': {'bind': '/output', 'mode': 'rw'},
},
network='train-net',
runtime='nvidia',
device_requests=[
docker.types.DeviceRequest(count=-1, capabilities=[['gpu']])
],
name=f'train-{train_args["index"]}',
env={
'GPU_NUM': self.node_manager.GPU_num,
'NODE_NUM': train_args['node_num'],
'NODE_RANK': train_args['index'],
'MASTER_IP': 'train-0',
'MASTER_PORT': 21046,
},
detach=True
)
print(container.short_id)
for line in container.logs(stream=True):
print(line.strip().decode())
result = container.wait()
status_code = result['StatusCode']
print(status_code, type(status_code))
def scatter_container(self, image_name, train=False):
def master_run(image_name):
print("[Master] run")
for i in range(5):
print(i)
time.sleep(1)
train_args = {
'index': 0,
'node_num': len(self.worker_conns)+1
}
self.run_container(image_name, train, train_args)
print("[Master] finished")
def send_and_wait(conn_index, conn, image_name):
try:
# build command
data = {'image': image_name}
data = {
'image': image_name,
'train': train,
'index': conn_index+1,
'node_num': len(self.worker_conns)+1
}
command = '[RUN_CONTAINER] {}'.format(json.dumps(data))
# send
@ -226,7 +289,7 @@ class ClusterCommunicationModule():
thread.join()
# all finished
print("[INFO] All workers finished.")
print("\n[INFO] All workers finished.")
'''

View File

@ -119,10 +119,10 @@ class NodeManager():
print(f" - Training Image: {train_image}")
'''
data_image = "test/test"
data_image = "snsd0805/cifar10-dataset:v1"
# Start Downloading
self.cluster_communication_module.run_container(data_image)
self.cluster_communication_module.scatter_container(data_image)
else: