[PyTorch] DDP(Distributed Data Parallel) 셋팅하기

2022. 12. 27. 15:37Developers 공간 [Basic]/Software Basic

728x90

이번엔 Distributed 병렬 처리를 pytorch를 활용해 구현하는 방법에 대해 다뤄보고자 합니다.

먼저 용어에 대해서 확실히 정리하고 나가자면 아래와 같습니다.

  • Concurrent Computing (동시) : single 코어 혹은 multi 코어 에서 "동시에 처리되는 것으로 보이는 것"으로, hardware 구조로 인한 것이라기보다 software 구조로 인한 논리적 연산입니다. Parallel보다는 조금더 general한 개념입니다.
  • Parallel Computing (병렬) : multi 코어에서 "동시에 처리되는 것"으로 hardware 구조에 의한 물리적 연산입니다.
  • Distributed Computing (분산) : Link로 연결된 여러 개의 노드 혹은 프로세서에서 message passing을 활용해 처리하는 방식입니다.
  • Partitioning : 1개의 GPU를 나눠서 사용하는 것을 의미합니다.

Distributed Data Parallel은 PyTorch에서 지원하는 병렬처리 패키지입니다. 기존에 single GPU에서만 가능했던 부분을 Distributed 환경에서도 실행 가능하도록 지원하는 것이므로, 학습시 많이 보셨을 것이라 생각합니다.

 

<구성>
1. PyTorch를 위한 병렬처리 라이브러리
   a. DP(DataParallel)
   b. DDP(DistributedDataParallel)
   c. SMDDP(SageMaker's Distributed Data Parallel)
   d. OpenMP
2. DDP 구현하기
   a. 구현 전 기본 지식
   b. DDP 구현하기 (w/o mpi_run)
3. 기타 필요한 것들
   a. Distributed 실행 취소
   b. Distributed 편의 도구
   c. Dataset & Model import 시 편의도구(__init__.py)

글효과 분류1 : 코드

글효과 분류2 : 폴더/파일

글효과 분류3 : 용어설명

글효과 분류4 : 글 내 참조

글효과 분류5 : 글 내 참조2

글효과 분류6 : 글 내 참조3


1. PyTorch를 위한 병렬처리 라이브러리

또한 pyTorch에서 사용하는 병렬처리 개념은 Distributed인 DDP(Distributed Data Parallel)과 아닌 DP(Data Parallel)로 나뉩니다. DDP는 multi-processing을 사용하며 DP는 multi-thread를 사용하는데, 이유는 뒤에서 설명드리겠습니다.

이외 SMDDP(SageMaker's Distributed Data Parallel)는 AWS에서 제공하는 Sagemaker라는 플랫폼을 사욯아기 위한 DDP모듈입니다. 자세한 내용은 SMDDP를 참조 바라며, 이 글에서는 간단한 설명만 담았습니다.

 


a. DP(DataParallel)
  • 특징
    • single-process & multi-thread
    • single machine에서만 동작
  • 동작 : 예를 들어 GPU 4개가 있다고 할 때, 학습 중의 forward 와 backward 의 동작은 아래와 같습니다.
<Forward>
STEP1. GPU 0 에서 batch 수를 4등분 해 GPU0~GPU3에 전송합니다. (Scatter)
STEP2. GPU 0 에서 모델의 parameter를 GPU0~GPU3에 복사합니다. (Replicate)
STEP3. 각각 GPU에서 각각의 Logits들을 계산합니다! (Forward)
STEP4. 모든 Logits 들을 GPU0으로 모읍니다. (Gather)
STEP5. GPU0 에서 loss 를 계산합니다!
<Backward>
STEP1. loss 를 GPU0~GPU3 에 전송합니다. (Scatter)
STEP2. 각각 GPU에서 각각의 gradients를 계산합니다! (Backward)
STEP3. 모든 Gradients 들을 GPU0으로 모읍니다. (Reduce)
STEP4. GPU0 에 있는 모델의 parameter를 업데이트 합니다.
STEP5. GPU0의 모델의 parameter를 GPU0~GPU3에 복사합니다. (Replicate)
  • 주의사항  : GPU0에 필요한 메모리의 할당량이 높습니다.
    • ex) <Forward> STEP4 에서 GPU0로 모든 Logits 들을 Gather하기 때문입니다.
    • 각각 GPU에서 Loss 를 계산하면 좋을 것 같습니다. (즉, 각 GPU에서의 결과가 logits가 아닌 loss가 되도록)
    • 단 4개의 GPU들을 Reduction하는 과정이 필요합니다.
  • 문제점
    • 1. Python의 GIL(Global Interpreter Lock) 문제
      • GIL 문제
        • python은 아래의 두가지를 통해 메모리를 관리합니다.
          • Reference : 모든 객체의 reference를 counting하고 이는 sys.getrefcount(변수)를 활용해 확인 할 수 있습니다.
          • generational Garbage Collection(GC) : 순환 참조를 방지하고 객체 수명을 관리(Generation)하기 위해
        • 하지만 병렬처리를 위해 reference count를 관리하기 위해서는 모든 객체에 대해 lock이 필요하게 되므로, 이것에 대한 대체로 하나의 쓰레드가 모든 자원에 대한 GIL을 얻어 다른 쓰레드를 막아버리는 작업을 합니다.
        • 하지만 이는 overhead를 발생합니다.
      • 이를 방지하기 위해 multi-thread가 아닌 multi-process를 활용하는 것이 유리합니다.
        ** multi-thread : 하나의 process를 실행하기 위해, 같은 자원을 활용해 여러개의 실행 단위를 사용하고, 하나의 프로그램을 공유하므로 context-switching이 빠릅니다.
        ** multi-process : 여러개의 process를 실행하기 때문에, 각각의 자원을 활용하고, 어느 하나가 죽더라도 전체가 죽지는 않습니다.
    • 2. <Backward> STEP5 와 같이 매번 모델의 parameter를 모두에게 복사해 주어야합니다.
      •  각각의 Gradient를 <Backward> STEP3처럼 gather 하지 않고, 각각에서 reduce를 수행한뒤, 모든 device에 cross로 복사하는 "All-Reduce"를 하면 좋습니다.
      • 다만, 이렇게 한다면 N^2개(N∏2, All-to-All)의 GPU간 통신이 발생합니다.
  • 예시 : 밑에 모델 선언 관련한 기본 구조를 먼저 이해하시려면 2-a를 참조바랍니다. 
import torch.nn as nn
MODEL = nn.DataParallel(MODEL, device_ids=[args.gpu])

 


b. DDP(DistributedDataParallel)
  • 특징
    • multi-process (하나의 GPU에 하나의 process, 하나의 process에 하나의 DDP instance)
    • single machine과 multi machine(multi-node) 모두에서 동작 
  • DP의 문제점과 비교
    • 1. Multi-process 모듈이라 GIL을 해결했으며, master가 따로 없습니다.
    • 2. All-Reduce보다 개선된 "Ring-All-Reduce"를 활용하여 All-to-All 통신이 필요하지 않고, 인근 GPU로 전달하며 Reduce하기 때문에 조금더 효율적입니다
      ** Ring-All-Reduce : 실제 2009년 논문 "Bandwidth optimal all-reduce algorithms for clusters of workstations"라는 논문에서 제안된 all-reduce기법을 Baidu Research에서 Baidu-All-Reduce라는 이름으로 2017년에 제안한 기법이며, 아래 그림과 같은 동작을 합니다.
      이는 2018년 Uber의 Horovod에 "Horovod: fast and easy distributed deep learning in TensorFlow"라는 논문으로, 2019년 pytorch에 도입되어 2020년 "PyTorch Distributed: Experiences on Accelerating Data Parallel Training"라는 논문으로 소개되었습니다.


[Ring-All-Reduce방식-출처참조]

  • 예시 : 밑에 모델 선언 관련한 기본 구조를 먼저 이해하시려면 2-a를 참조바랍니다. 
import torch.nn as nn

MODEL = nn.parallel.DistributedDataParallel(MODEL, device_ids=[args.gpu]),

c. SMDDP(SageMaker's Distributed Data Parallel)

 

  • SMDDP : AWS SageMaker에서 제공하는 DP 라이브러리
    • single machine과 multi machine(multi-node) 모두에서 동작
    • 자세한 내용은 위에서 참조한 링크에서 자세하게 다루겠습니다.
  • 예시
from smdistributed.dataparallel.torch.parallel.distributed import DistributedDataParallel

MODEL = DistributedDataParallel(MODEL.to(device))

 

 


d. OpenMP

 

CPU 병렬처리를 위해 사용하는 라이브러리로, 따로 작업하지 않아도 기본적으로 사용하게 됩니다.

하지만 Thread의 개수를 설정하기 위해 GNU OpenMP 셋팅할때와 같이 환경변수를 조정하는 등의 방법을 활용할 수 있습니다.

** GNU 프로젝트 : 모두가 상업화하지 않고 공유된 소프트웨어를 사용할 수 있도록 FSF(자유 소프트웨어 재단)에서 GPL(general public license)를 따르는 프로젝트를 만들게 된 것입니다. OpenMP또한 GNU 프로젝트의 일환입니다.

import torch.multiprocessing as mp

export OMP_NUM_THREADS=4

 


2. DDP 구현하기

이번엔 본격적으로 DDP를 활용하는 방법을 알아보려고 합니다.


a. 구현 전 기본 지식

 구현에 앞서 Pytorch 모델을 활용해 학습하는 코드를 작성하는 기본적인 프레임을 가볍게 설명하겠습니다.

아래는 글쓴이가 선호하는 구조를 설명하며, 이를 기반으로 뒤의 내용을 설명하겠습니다.

설명 중에 없는 함수의 경우 "3. 기타 필요한 것들"의 함수들을 참조바랍니다.

  • Project Folder Tree : 글쓴이는 기본적으로 아래와 같은 폴더트리를 구성하고 시작합니다.
    ├── tk_start.sh
    ├── T1_train.py
    ├── T2_eval.py
    ├── T3_test.py
    └── STEP0_CONFIG
          └── config.py
    └── STEP1_DATASET
          ├── __init__.py
          └── DATA_D001.py
    └── STEP2_MODEL
          ├── __init__.py
          └── MODEL_M001.py
    └── TK_UTILS
          ├── dist.py
          └── utils.py
  • ├── T1_train.py : PyTorch 학습 구조 (개인적인 순서)
    • STEP0 : Dataset/Model/Training Metric 과 관련된 환경변수 셋팅
    • STEP1 : Dataset 선언
    • STEP2 : Model 선언
    • STEP3 : Training Metric 선언
    • STEP4 : 학습 & 평가
#STEP0. Configure
CONFIG = STEP0_Config

#STEP1. Dataset
dataset, dataset_config = STEP1_BuildDataset(CONFIG)

if is_distributed():
    sampler = torch.utils.data.DistributedSampler(datasets['train'], shuffle=CONFIG.shuffle)
elif shuffle:
    sampler = torch.utils.data.RandomSampler(datasets['train'])
else:
    sampler = torch.utils.data.SequentialSampler(datasets['train'])
    
dataloaders = {}
dataloaders['train'] = DataLoader(
            datasets['train'],
            sampler=sampler,
            batch_size=CONFIG.batchsize_per_gpu,
            num_workers=CONFIG.dataset_num_workers,
            worker_init_fn=my_worker_init_fn,
        )
        
#STEP2. Model
MODEL = STEP2_BuildModel()
MODEL = MODEL.cuda(local_rank)
MODEL_no_ddp = MODEL

#STEP3. Metric
OPTIMIZER = STEP3_BuildOptimizer()

#STEP4. Training & Evaluation
for epoch in range(CONFIG.start_epoch, CONFIG.max_epoch):
	for batch_idx, batch_data_label in enumerate(dataset_loader):
    	# 4-1. Clear Gradients
    	OPTIMIZER.zero_grad()
        
    	# 4-2. Data to Device
        DATA = batch_data_label.to(device)
        
        # 4-3. Get Loss 
        loss = MODEL(DATA)
        
        # 4-4. Compute Gradients
        loss.backward()
        
        # 4-5. Optmization Step
        OPTIMIZER.step()
  • └── STEP1_DATASET
          └── DATA_D001.py :
    Dataset 구현시 기본 구조
class ScannetDetectionDataset(torch.utils.data.Dataset):
def __init__(self, 
        dataset_config,
        split_set="train",
        augment=False,
        root_dir=None):
...
def __len__(self):
...
def __getitem__(self,idx):
return ret_dict
  • └── STEP2_MODEL
          └── MODEL_M001.py
     : Model 구현시 기본 구조
class MODEL(nn.Module):
def __init__(self):
	...
def forward(self, inputs, ...):
	...
  • init 함수 : Dataloader에 넣어줄 seed를 각각 초기화할 함수 입니다.
def my_worker_init_fn(worker_id):
    np.random.seed(np.random.get_state()[1][0] + worker_id)

 


b. DDP 구현하기 (w/o mpi_run)
  • 용어 설명
    • Backend : 각 프로세스가 다른 프로세스와 데이터를 교환할 수 있도록 메시지 교환 규약(messaging passing semantics)을 활용하는데, 다양한 커뮤니케이션 백엔드(Communication backend)를 사용할 수 있습니다.
      • MPI(Message Passing Interface) : 고성능 컴퓨팅(HPC) 시 필요하며, 병렬 컴퓨팅을 위한 저수준 API입니다.대규모 연산 클러스에서 폭넓은 가용성 과 높은 수준의 최적화를 보입니다.
        • distributed 환경에서 주로 사용하며, mpirun 등의 프로세스를 통해 관리하므로 main함수 하나만 실행해주면 됩니다. 
      • NCCL : Distributed GPU training 시 필요하며, CUDA Tensor들에 대한 집합 연산의 최적화된 구현체를 제공합니다. 
        • single 노드의 Multi-GPU에서 주로 사용하며, 내부적으로 torch.multiprocessing.spawn() 등의 함수를 통해 분기합니다.
        • Nvidia에서 만들었습니다.
      • Gloo : Distributed CPU training 시 필요하며,  CUDA는 NCCL만큼 최적화되어있지 않습니다.
      • RPC(Remote Procedure Calls) : 서버/클라이언트 모델시 필요하며, MPI에 비해 고수준 API입니다. torch에서 RPC 프레임워크 자체는 지원하지만 Distributed RPC Framework로 활용하려면 직접 구성해주어야합니다.
    • Distributed에서 GPU 인덱스
      • # world size : 전체 GPU 개수
      • # global rank : 전체 GPU 넘버(인덱스)
      • # local rank : node당 GPU넘버(인덱스)

[PyTorch에서 지원하는 Gloo, MPI, NCCL의 Capabilities]

  • ├── T1_train.py : 위 PyTorch 학습 구조상 DDP를 위해 바뀐 내용
    • torch.utils.data.DistributedSampler() : Sampler와 shuffle 유무를 넣어주었는데, "num_replicas=worldsize" 혹은 "rank=global_rank"를 넣어주어야하는 경우도 있으니 참조하세요.
def main(local_rank):	
    #STEP0. Configure
    CONFIG = STEP0_Config

    #STEP1. Dataset
    ...

    #STEP2. Model
    MODEL = STEP2_BuildModel()
    MODEL = MODEL.cuda(local_rank)
    MODEL_no_ddp = MODEL
    
	if is_distributed():
        MODEL = torch.nn.SyncBatchNorm.convert_sync_batchnorm(MODEL)
        MODEL = torch.nn.parallel.DistributedDataParallel(
            MODEL, device_ids = [local_rank]
        )
        MODEL_no_ddp = MODEL.module

    #STEP3. Metric
    ...

    #STEP4. Training & Evaluation
    ...
  • ├── T1_train.py : 위의 main() 함수를 distributed 환경으로 실행하는 함수입니다.
    • A. Check : torch를 활용해 다양한 환경 확인
      • torch.utils.collect_env : PyTorch와 연결된 환경을 모두 알기 위한 방법입니다.
        • 위를 그대로 실행시 "python3 -m torch.utils.collect_env"로 실행
        • from torch.utils.collect_env import get_running_cuda_version, run
          get_running_cuda_version(run) : 현재 설치된 CUDA의 버전을 알려주는 명령어입니다.
      • torch.version.cuda : CUDA의 버전을 확인하기 위한 방법입니다.
      • torch.cuda.is_available() : 현재 CUDA가 설치된 것으로 확인되는지 확인하는 방법입니다.
      • torch.cuda.device_count() : 현재 연결된 GPU의 개수를 알려주는 명령어 입니다.
      • torch.cuda.current_device() : 현재 master device의 device number를 알려주는 명령어 입니다.
      • torch.cuda.get_device_name(0) : 해당 device number의 device의 이름과 정보를 얻기 위한 명령어 입니다.
    • B. Setting Spawn : 새로운 프로세스를 만들기 위한 방법론
      • spawn : 최소한의 코드를 담아 Multi-processing를 위한 프로세스를 생성
      • fork : 완전자기 복제를 통한 Multi-processing를 위한 프로세스를 생성
    • C. Start Distributed : 실행합니다. 앞서 언급한 바와 같이 torch.multiprocessing.spawn() 함수를 활용해야 합니다.
from torch.multiprocessing import set_start_method

def __name__ == "__main__":
	# A. Check
    if torch.cuda.device_count()==0 or not torch.cuda.is_available():
        raise SystemError("Error")

	# B. Setting spawn
    try:
        set_start_method("spawn")
    except RuntimeError:
        pass

	# C. Start Distributed
    world_size = 1 # num of gpu
    if world_size == 1:
        main(0)
    else:
        try:
            torch.multiprocessing.spawn(main, nprocs=world_size)
        except KeyboardInterrupt:
            raise SystemError("Error") # 대체 가능
  • └── STEP0_CONFIG
          └── config.py
    • global_rank, local_rank, world_size를 구합니다.
    • backend는 nccl을 활용합니다.
    • torch.cuda.set_device(): 현재 gpu를 설정해줍니다.
    • torch.distributed_init_process_group() : default distributed 프로세스 그룹을 초기화합니다. distributed 패키지를 필수적으로 초기화 해주어야합니다.
global_rank = os.environ["RANK"]
local_rank = local_rank
world_size = FLAGS.ngpu

if FLAGS.ngpus >1:
    torch.cuda.set_device(local_rank)
    torch.distributed.init_process_group(
		rank=global_rank,
        world_size=world_size,
        init_method="tcp://localhost:12345"
        backend="nccl"
	)
	torch.distributed.barrier()

 


3. 기타 필요한 것들

위에 설명이 없는 함수 및 수행시 필요한 추가적인 함수들에 대해 설명하고자 합니다.


a. Distributed 실행 취소

Distributed로 실행한 경우, Multi-process들을 단순히 keyboard interrupt등으로 한번에 종료하기 쉽지않습니다.

아래와 같은 방법으로 종료하면 됩니다.

  • 명령어로 강제종료 하는 방법
kill $(ps aux | grep "multiprocessing.spawn" | grep -v grep | awk '{print $2}')
for i in $(lsof /dev/nvidia0 | grep python | awk '{print $2}' | sort -u);do kill -9 $i; done
  • Python 함수 내에서 except를 두는 방법 (위의 내용 중 "대체 가능"부분을 아래와 같이 수정합니다)
try:
	torch.multiprocessing.spawn(run, agrs…)
except KeyboardInterrupt:
    print("interrupted")
    try:
		torch.distributed.destroy_process_group()
    except KeyboardInterrupt:
		os.system("kill $(ps aux | grep "multiprocessing.spawn" | grep -v grep | awk '{print $2}')")

b. Distributed 편의 도구

Distributed 환경에서 사용하기 위해 사용하는 다양한 함수들을 정리해놓았습니다.

import torch.distributed as dist

def is_distributed():
    if not dist.is_available() or not dist.is_initialized():
        return False
    return True

def get_rank():
    if not is_distributed():
        return 0
    return dist.get_rank()

def is_primary():
    return get_rank() == 0

def setup_print_for_distributed(is_primary):
    """
    This function disables printing when not in primary process
    """
    import builtins as __builtin__
    builtin_print = __builtin__.print

    def print(*args, **kwargs):
        force = kwargs.pop('force', False)
        if is_primary or force:
            builtin_print(*args, **kwargs)

    __builtin__.print = print
    
def barrier():
    if not is_distributed():
        return
    torch.distributed.barrier()
  • is_distributed() : 현재 distributed 환경이 구축되었는지 확인하기 위한 함수
    • torch.distributed.dist.is_available() :  distributed package가 사용가능한지 확인하는 명령어입니다.
  • get_rank() : 현재 프로세스의 GPU 넘버를 확인하기 위한 함수
    • torch.distributed.get_rank() : 현재 프로세스의 GPU 넘버를 확인하기 위한 명령어입니다.
  • is_primary() : master GPU인지 확인하기 위한 함수
  • setup_print_for_distributed() : master GPU에서만 print를 하기 위한 함수
  • barrier() : 모든 프로세스들을 동기화하기 위한 함수
    • torch.distributed.barrier() : 모든 프로세스들을 동기화하기 위한 명령어

 


 c. Dataset & Model import 시 편의도구(__init__.py)
  •  ├── __init__.py : import를 위해 편의상 작성하는 __init__.py 구조를 보여줍니다.
#__init__.py
from .scannet import ScannetDetectionDataset, ScannetDatasetConfig
from .sunrgbd import SunrgbdDetectionDataset, SunrgbdDatasetConfig

DATASET_FUNCTIONS = {
    "scannet": [ScannetDetectionDataset, ScannetDatasetConfig],
    "sunrgbd": [SunrgbdDetectionDataset, SunrgbdDatasetConfig],
}

def FUNCTION(CONFIG):
    dataset_builder = DATASET_FUNCTIONS[CONFIG.dataset_name][0]
    dataset_config = DATASET_FUNCTIONS[CONFIG.dataset_name][1]()

    dataset_dict = {
            "train": dataset_builder(
                dataset_config, 
                split_set="train", 
                root_dir=CONFIG.dataset_root_dir, 
                augment=True
            ),
            "test": dataset_builder(
                dataset_config, 
                split_set="val", 
                root_dir=CONFIG.dataset_root_dir, 
                augment=False
            ),
        }
    return dataset_dict, dataset_config
  • from ... : 폴더 내 해당하는 코드들을 import 합니다
  • DATASET_FUNCTIONS : 해당 input에 따라 import할 리스트를 작성합니다.
  • dataset_dict : return할 dictionary를 작성합니다.

https://github.com/facebookresearch/3detr

https://pytorch.org/docs/stable/distributed.html#module-torch.distributed

https://andrew.gibiansky.com/blog/machine-learning/baidu-allreduce/#ref-4

https://algopoolja.tistory.com/95

728x90