2024. 1. 21. 16:53ㆍDevelopers 공간 [Basic]/Software Basic
프로젝트를 구성할 때 다양한 프레임워크를 활용해서 구현하시는 경우, 하나의 프로세스를 통해 실행하는 경우가 많습니다.
하지만 이번 글에서는 Python으로 여러개의 프로세스(Process)와 쓰레드(Thread)를 통해 프로젝트를 효율적으로 구성하고 싶은 경우에 대해 정리해 보고자 합니다.
<구성>
1. Concept
a. Process & Thread
b. Multi-Process
c. Multi-Thread
2. Inter-Process Communication
a. Message Queue
b. Shared Memory
c. Memory Map
d. Socket
3. Synchronization
a. Mutex
b. Semaphore
c. Conditional Variable
4. Example
a. Server
b. Client
글효과 분류1 : 코드
글효과 분류2 : 폴더/파일
글효과 분류3 : 용어설명
글효과 분류4 : 글 내 참조
글효과 분류5 : 글 내 참조2
글효과 분류6 : 글 내 참조3
1. Concept
먼저 Process와 Thread에 대한 개념을 살펴보고, Multi-Process와 Multi-Thread의 개념 및 구현하는 법을 살펴보겠습니다.
a. Process & Thread
Disk에 있는 코드 및 실행할 수 있는 파일을 Program이라고 합니다. 이를 실행하면 Process라는 단위로 실행되고 각각 Process ID(PID)를 가지고 실행됩니다.
CPU에서 1개의 Process 뿐 아니라, 다른 요청에 의한 Process도 있을 수 있는데, 자원이 부족한 경우 Concurrency를 위해 Context Switch이라는 작업을 통해 이 작업들을 돌아가면서 처리합니다.
** Concurrency : 한번에 많은 Task를 다루는 것
하나의 Process는 여러개의 Thread를 가지고 수행될 수 있는데 이런 Thread들을 통해 Prallelism을 구현하고, CPU가 N개의 Core를 가지고 있을 때 N개의 Thread를 동시에 실행할 수 있습니다.
** Hyper-Threading : Intel에서 개발한 기술로, 하나의 코어에서 두개 이상의 Thread를 실행할 수 있는 방법입니다.
** Parallelism : 한번에 많은 일들을 처리하는 것
Process와 Thread의 차이는 아래와 같습니다.
- Process는 하나의 Task 단위이므로, 어떤 Process에 문제가 발생한다고 해도 다른 Process에 영향을 주지 않습니다.
하지만, Process는 각각 독립된 메모리 영역을 할당받았기 때문에 동일한 메모리 공간을 공유하려면 IPC(Inter-Process Communication)이라는 기법을 활용해야 합니다. - Thread는 하나의 Task가 아닌 작업 단위이므로, 어떤 Thread에 문제가 생기면 Process 전체가 문제가 발생합니다.
하지만, Thread는 같은 메모리 공간을 하나의 Process 내에서 실행하므로 메모리 공간을 공유할 수 있어 설계에 유용합니다.
(물론 자원을 공유할 때 동기화(Synchronization)와 관련된 조치가 필요합니다.)
Python에서의 Thread는 조금 다를 수 있습니다.
Python에는 GIL(Global Interpreter Lock) 이라는 문제가 있기 때문입니다. 간단히 설명하면 python process내의 다양한 thread들은 자원에 대한 통제를 위해 mutex lock과 비슷한 GIL을 활용하기 때문에 multi-thread로 구현하더라도 single thread와 비슷하게 sequential하게 동작한다는 것입니다. 자세한 내용은 관련 글(https://tkayyoo.tistory.com/27)을 참조하세요.
** mutex lock : 뒤에 synchronization관련된 내용에서 설명하겠습니다.
따라서 python의 Thread를 활용하는 이유가, CPU-bound를 해결하기 위해서라면 Thread가 동시에 동작하기 어렵기 때문에 개선되기 어려울 수 있습니다. 하지만 I/O-bound를 해결하기 위해서라면 I/O를 병렬적으로 던지고 받는 동작을 Concurrent하게 할 수 있기 때문에 이득을 볼 수 있겠죠.
** CPU-bound 프로그램 : CPU 연산량이 많은 프로그램
** I/O-bound 프로그램 : In/Out interface를 많이 활용해야 하는 프로그램
b. Multi-Process
Process를 여러개 만들어 어떤 프로그램을 동작하는 것을 Multi-Process라고 합니다. 이부분은 개념적으로 크게 어렵지 않기 때문에 구현을 살펴보겠습니다. Python 구현에 더해 C++로 구현한 방법들도 보이겠습니다.
1. Python 구현
<Method 1 : 기본>
기본적으로 Python으로 자식 process를 생성하는 방법은 아래와 같습니다. multiprocessing 패키지를 활용해 프로세스를 만들어주고, 함수를 연결해주면 됩니다.
import multiprocessing
import threading
import time
import os
# Case1. Single
def myfunction(arg1):
time.sleep(1)
print("[1] function process {} in! (pid:{}, tid:{})".format(arg1, os.getpid(), threading.get_ident()))
p_list = [multiprocessing.Process(target=myfunction, args=(i,)) for i in range(5)]
for process in p_list:
process.start()
print("[1] function out")
[1] function out
[1] function process 0 in! (pid:932, tid:140166589622080)
[1] function process 1 in! (pid:933, tid:140166589622080)
[1] function process 2 in! (pid:934, tid:140166589622080)
[1] function process 3 in! (pid:935, tid:140166589622080)
[1] function process 4 in! (pid:936, tid:140166589622080)
** 참고로 python에는 subprocess라는 패키지가 있습니다. 위처럼 python으로 child process가 동작할 내용을 명시하는 것이 아니라, subprocess는 다른 외부의 프로그램을 python 내에서 실행하기 위해 사용합니다.
<Method 2 : 상속을 활용하는 방법>
Process에 함수를 준 것 과 다르게, 클래스를 만들어 전달해주는 방법입니다. 중요한 것은 run()함수를 구현해주는 것입니다.
# Case2. SubClass
class myclass(multiprocessing.Process):
def __init__(self, arg1):
super().__init__()
self.arg1 = arg1
def run(self):
time.sleep(1)
print("[2]class process {} in! (pid:{}, tid:{})".format(self.arg1, os.getpid(), threading.get_ident()))
p_list = [myclass(i) for i in range(5)]
for process in p_list:
process.start()
print("[2] class out")
[2] class out
[2]class process 0 in! (pid:944, tid:140087289825088)
[2]class process 1 in! (pid:945, tid:140087289825088)
[2]class process 2 in! (pid:946, tid:140087289825088)
[2]class process 3 in! (pid:947, tid:140087289825088)
[2]class process 4 in! (pid:948, tid:140087289825088)
<Method 3 : Pool을 활용하는 방법>
마지막으로, process pool을 만들어주는 방법입니다. process pool이란 먼저 프로세스들을 준비하고, 해당 프로세스에 task들을 던져주는 형태로 구현할 때 사용하는 개념입니다.
구현은 아래와 같이 multiprocessing.pool을 활용해 process를 만들어준 후에 map 함수를 활용해 만들어준 함수를 전달해줍니다.
** p.map : iterator를 받아 처리하며, 단일 인자 input, return은 list
** p.starmap : iterator를 받아 처리하며, 두개 이상 인자 input, return은 list
** p.imap : iterator를 받아 처리하며, 단일 인자 input, return은 iterator
# Case3-3. Pool
import multiprocessing.pool
def myfunction4(arg1):
time.sleep(1)
print("[3-3] function4 process {} in! (pid:{}, tid:{})".format(arg1, os.getpid(), threading.get_ident()))
return arg1+1
pool = multiprocessing.pool.Pool(5)
array = [10,11,12,13,14]
result = pool.map(myfunction4, array)
print("[3-3] pool out3 : {}".format(result))
pool.close()
pool.join()
[3-3] function4 process 11 in! (pid:975, tid:139825954170688)
[3-3] function4 process 12 in! (pid:976, tid:139825954170688)
[3-3] function4 process 10 in! (pid:974, tid:139825954170688)
[3-3] function4 process 13 in! (pid:977, tid:139825954170688)
[3-3] function4 process 14 in! (pid:978, tid:139825954170688)
[3-3] pool out3 : [11, 12, 13, 14, 15]
-----------------------------------------------------------------------------
<ProcessPoolExecutor를 활용하기>
위 방법 이외에 concurrent라는 패키지를 활용할 수도 있습니다. 아래와 같이 submit을 통해 함수를 던져줄 수 있습니다.
# Case3-1. Pool
from concurrent.futures import ProcessPoolExecutor
def myfunction2(arg1):
time.sleep(1)
print("[3-1] function2 process {} in! (pid:{}, tid:{})".format(arg1, os.getpid(), threading.get_ident()))
return arg1+1
executor = ProcessPoolExecutor(5)
for i in range(10):
future = executor.submit(myfunction2, i)
print("[3-1] pool out")
[3-1] pool out
[3-1] function2 process 0 in! (pid:958, tid:139805162002240)
[3-1] function2 process 1 in! (pid:959, tid:139805162002240)
[3-1] function2 process 2 in! (pid:960, tid:139805162002240)
[3-1] function2 process 3 in! (pid:961, tid:139805162002240)
[3-1] function2 process 4 in! (pid:962, tid:139805162002240)
[3-1] function2 process 5 in! (pid:959, tid:139805162002240)
[3-1] function2 process 6 in! (pid:958, tid:139805162002240)
[3-1] function2 process 7 in! (pid:962, tid:139805162002240)
[3-1] function2 process 8 in! (pid:961, tid:139805162002240)
[3-1] function2 process 9 in! (pid:960, tid:139805162002240)
아래와 같이 map 함수를 활용할 수도 있습니다.
# Case3-2. Pool
from concurrent.futures import ProcessPoolExecutor
def myfunction3(arg1):
time.sleep(1)
print("[3-2] function3 process {} in! (pid:{}, tid:{})".format(arg1, os.getpid(), threading.get_ident()))
return arg1+1
executor = ProcessPoolExecutor(5)
array = [10,11,12,13,14]
for result in executor.map(myfunction3, array):
print("[3-2] pool out2 : {}".format(result))
[3-2] function3 process 10 in! (pid:966, tid:140310008780608)
[3-2] function3 process 11 in! (pid:967, tid:140310008780608)
[3-2] function3 process 12 in! (pid:968, tid:140310008780608)
[3-2] function3 process 13 in! (pid:969, tid:140310008780608)
[3-2] function3 process 14 in! (pid:970, tid:140310008780608)
[3-2] pool out2 : 11
[3-2] pool out2 : 12
[3-2] pool out2 : 13
[3-2] pool out2 : 14
[3-2] pool out2 : 15
-----------------------------------------------------------------------------
2. C++ 구현
이번엔 C++로 구현 된 내용을 살펴보겠습니다.
사실 Linux에서 process를 만들어주는 Process 생성과정은 아래 두개의 함수를 섞은 함수입니다.
- fork() : 새로운 자식 process를 생성합니다. 부모 process는 자식 process의 PID를 받으며, 자식 process는 0을 받습니다.
- exec() : 현재 process를 원하는 프로그램 실행에 사용합니다.
아래 코드는 fork()를 활용해 분기해준 후에 특정함수를 실행하도록 보였습니다.
#include <stdio.h>
#include <pthread.h>
#include <unistd.h> //sleep, getpid, fork
#include <stdlib.h> //EXIT_FAILURE
int shared_variable=0;
int main(){
pid_t my_pid = getpid();
printf("This is my process pid %d\n", my_pid);
pid_t pid;
char name1[] = "process_1";
char name2[] = "process_2";
pid = fork();
if(pid<0)
exit(EXIT_FAILURE);
// Child Process1
else if(pid==0)
function(name1);
// Parent Process
else if(pid>0){
printf("Process pid %d created\n", pid);
pid = fork();
if(pid<0)
exit(EXIT_FAILURE);
// Child Process2
else if(pid==0)
function(name2);
// Parent Process
else if (pid>0){
printf("Process pid %d created\n", pid);
printf("Result : %d\n", shared_variable);
}
}
return 0;
}
위에서 사용한 함수는 아래와 같습니다.
void* function(void* data){
sleep(1);
char* process_name = (char *)data;
pid_t pid; //process id
pthread_t tid; // thread id
pid = getpid(); //4
tid = pthread_self();
shared_variable++;
printf("check : %d\n", shared_variable);
printf("process name : %s, tid : %x, pid : %u\n", process_name, (unsigned int)tid, (unsigned int)pid);
}
이제 아래와 같은 명령어를 통해 실행해보겠습니다.
g++ test.cpp -o out -lpthread
./out
rm out
This is my process pid 1002
Process pid 1003 created
Process pid 1004 created
Result : 0
check : 1
process name : process_1, tid : fa61e740, pid : 1003
check : 1
process name : process_2, tid : fa61e740, pid : 1004
c. Multi-Thread
이번엔 여러개의 Thread를 만드는 Multi-Thread에 대해 살펴보겠습니다. 역시나 Python으로 구현하는 것에 더해 C++도 보이겠습니다.
1. Python 구현
<Method 1 : 기본>
기본적으로 thread를 만들어주는 방법은 아래와 같습니다. threading 패키지를 활용해 프로세스를 만들어주고, 함수를 연결해주면 됩니다.
import threading
import time
import os
# Case1. Single
def myfunction(arg1):
time.sleep(1)
print("[1] function thread {} in! (pid:{}, tid:{})".format(arg1, os.getpid(), threading.get_ident()))
t_list = [threading.Thread(target=myfunction, args=(i,)) for i in range(5)]
for thread in t_list:
thread.start()
print("[1] function out")
[1] function out
[1] function thread 0 in! (pid:1011, tid:139930617952000)
[1] function thread 1 in! (pid:1011, tid:139930609559296)
[1] function thread 3 in! (pid:1011, tid:139930592773888)
[1] function thread 4 in! (pid:1011, tid:139930584381184)
[1] function thread 2 in! (pid:1011, tid:139930601166592)
<Method 2 : 상속을 활용하는 방법>
Thread에 함수를 준 것 과 다르게, 클래스를 만들어 전달해주는 방법입니다. 중요한 것은 run()함수를 구현해주는 것입니다.
# Case2. SubClass
class myclass(threading.Thread):
def __init__(self, arg1):
super().__init__()
self.arg1 = arg1
def run(self):
time.sleep(1)
print("[2] class thread {} in! (pid:{}, tid:{})".format(self.arg1, os.getpid(), threading.get_ident()))
t_list = [myclass(i) for i in range(5)]
for thread in t_list:
thread.start()
print("[2] class out")
[2] class out
[2] class thread 1 in! (pid:1017, tid:140217161565952)
[2] class thread 0 in! (pid:1017, tid:140217169958656)
[2] class thread 3 in! (pid:1017, tid:140217144780544)
[2] class thread 4 in! (pid:1017, tid:140217136387840)
[2] class thread 2 in! (pid:1017, tid:140217153173248)
<Method 3 : Pool을 활용하는 방법>
마지막으로, thread pool을 만들어주는 방법입니다. thread pool이란 먼저 쓰레드들을 준비하고, 해당 쓰레드에 task들을 던져주는 형태로 구현할 때 사용하는 개념입니다.
구현은 아래와 같이 multiprocessing.pool.ThreadPool을 활용해 thread pool을 만들어준 후에 map 함수를 활용해 만들어준 함수를 전달해줍니다.
** p.map : iterator를 받아 처리하며, 단일 인자 input, return은 list
** p.starmap : iterator를 받아 처리하며, 두개 이상 인자 input, return은 list
** p.imap : iterator를 받아 처리하며, 단일 인자 input, return은 iterator
# Case3-3. Pool
import multiprocessing.pool
def myfunction4(arg1):
time.sleep(1)
print("[3-3] function4 thread {} in! (pid:{}, tid:{})".format(arg1, os.getpid(), threading.get_ident()))
return arg1+1
pool = multiprocessing.pool.ThreadPool(5)
array = [10,11,12,13,14]
result = pool.map(myfunction4, array)
print("[3-3] pool out3 : {}".format(result))
pool.close()
pool.join()
[3-3] function4 thread 10 in! (pid:1035, tid:140005012096768)
[3-3] function4 thread 12 in! (pid:1035, tid:140005028882176)
[3-3] function4 thread 14 in! (pid:1035, tid:140005020489472)
[3-3] function4 thread 11 in! (pid:1035, tid:140005045667584)
[3-3] function4 thread 13 in! (pid:1035, tid:140005037274880)
[3-3] pool out3 : [11, 12, 13, 14, 15]
------------------------------------------------------------
<ThreadPoolExecutor를 활용하기>
위 방법 이외에 concurrent라는 패키지를 활용할 수도 있습니다. 아래와 같이 submit을 통해 함수를 던져줄 수 있습니다.
# Case3-1. Pool
from concurrent.futures import ThreadPoolExecutor
def myfunction2(arg1):
time.sleep(1)
print("[3-1] function2 thread {} in! (pid:{}, tid:{})".format(arg1, os.getpid(), threading.get_ident()))
return arg1+1
executor = ThreadPoolExecutor(5)
for i in range(10):
future = executor.submit(myfunction2, i)
print("[3-1] pool out")
[3-1] pool out
[3-1] function2 thread 0 in! (pid:1023, tid:140033373177600)
[3-1] function2 thread 2 in! (pid:1023, tid:140033356392192)
[3-1] function2 thread 1 in! (pid:1023, tid:140033364784896)
[3-1] function2 thread 3 in! (pid:1023, tid:140033347999488)
[3-1] function2 thread 4 in! (pid:1023, tid:140033339606784)
[3-1] function2 thread 5 in! (pid:1023, tid:140033373177600)
[3-1] function2 thread 8 in! (pid:1023, tid:140033347999488)
[3-1] function2 thread 6 in! (pid:1023, tid:140033356392192)
[3-1] function2 thread 7 in! (pid:1023, tid:140033364784896)
[3-1] function2 thread 9 in! (pid:1023, tid:140033339606784)
아래와 같이 map 함수를 활용할 수도 있습니다.
# Case3-2. Pool
from concurrent.futures import ThreadPoolExecutor
def myfunction3(arg1):
time.sleep(1)
print("[3-2] function3 thread {} in! (pid:{}, tid:{})".format(arg1, os.getpid(), threading.get_ident()))
return arg1+1
executor = ThreadPoolExecutor(5)
array = [10,11,12,13,14]
for result in executor.map(myfunction3, array):
print("[3-2] pool out2 : {}".format(result))
[3-2] function3 thread 10 in! (pid:1029, tid:139739635279616)
[3-2] pool out2 : 11
[3-2] function3 thread 11 in! (pid:1029, tid:139739626886912)
[3-2] function3 thread 14 in! (pid:1029, tid:139739601708800)
[3-2] function3 thread 13 in! (pid:1029, tid:139739610101504)
[3-2] function3 thread 12 in! (pid:1029, tid:139739618494208)
[3-2] pool out2 : 12
[3-2] pool out2 : 13
[3-2] pool out2 : 14
[3-2] pool out2 : 15
------------------------------------------------------------
2. C++ 구현
이번엔 C++로 구현 된 내용을 살펴보겠습니다. Linux에서는 pthread_create라는 함수를 제공합니다.
#include <stdio.h>
#include <pthread.h>
#include <unistd.h> //sleep, getpid
#include <stdlib.h> //EXIT_FAILURE
int shared_variable=0;
int main(){
pthread_t pthread[2];
char name1[] = "thread_1";
char name2[] = "thread_2";
if(pthread_create(&pthread[0], NULL, &function, (void *)name1)==0)
printf("Success thread1\n");
else
exit(EXIT_FAILURE);
if(pthread_create(&pthread[1], NULL, &function, (void *)name2)==0)
printf("Success thread2\n");
else
exit(EXIT_FAILURE);
int status;
pthread_join(pthread[0], (void **)&status);
pthread_join(pthread[1], (void **)&status);
printf("Result : %d\n", shared_variable);
return 0;
}
위에서 사용한 함수는 아래와 같습니다.
void* function(void* data){
sleep(1);
char* thread_name = (char *)data;
pid_t pid; //process id
pthread_t tid; // thread id
pid = getpid(); //4
tid = pthread_self();
shared_variable++;
printf("check : %d\n", shared_variable);
printf("thread name : %s, tid : %x, pid : %u\n", thread_name, (unsigned int)tid, (unsigned int)pid);
}
이제 아래와 같은 명령어를 통해 실행해주겠습니다. pthread 라이브러리를 추가해주어야합니다.
g++ test.cpp -o out -lpthread
./out
rm out
Success thread1
Success thread2
check : 1
thread name : thread_1, tid : 451e700, pid : 1051
check : 2
thread name : thread_2, tid : 3d1d700, pid : 1051
Result : 2
2. Inter-Process Communication
IPC(Inter-Process Communication)란, 이름 그대로 Process간에 통신을 위한 방법을 의미합니다. Process는 독립된 메모리를 가지고 실행되는 단위이므로, 서로의 통신을 위해서 적당한 방법이 필요합니다. IPC의 모델에는 크게 Shared Memory와 Message Passing이 있습니다.
- Shared Memory : 하나의 메모리를 두개의 프로세스가 공유하는 방법입니다. Message Passing보다 빠르지만, Critical Section을 다루기 위한 구현이 어렵습니다.
- Message Passing : Kernel이 개입해서 Message Queue를 활용해 프로세스 간 데이터를 공유하는 방법입니다. 커널이 개입하기 떄문에 Shared Memory보다 느리지만, 구현이 쉽고, 작은양의 데이터를 교환할 때 유용합니다.
ex) 파이프, 소켓(TCP/IP)
** Thread와 같이 Process간에도, 같은 메모리를 공유해 통신해야하기 때문에 아래에서 설명할 Semaphore와 Mutex같은 Synchronization을 필요로 합니다.
IPC에는 System V IPC와 POSIX IPC라는 2가지 표준이 있으며, 두 표준 모두 Shared Memory와 Message Passing 방법을 가지고있습니다. System V IPC를 개선한 버전이 POSIX IPC지만, POSIC IPC가 더 성능이 좋다고만은 할 수 없습니다.
IPC의 종류에는 아래와 같은 다양한 방법들이 있습니다.
PIPE | Named PIPE | Message Queue |
Shared Memory |
Memory Map |
Socket | |
통신 관계 | 부모 자식간 | 프로세스간 | 프로세스간 | 프로세스간 | 프로세스간 | 시스템간 |
통신 방향 | 단방향 | 단방향 | 단방향 | 양방향 | 양방향 | 양방향 |
통신 단위 | Stream | Stream | Object | Object | Page | Stream |
공유 매개체 | 파일 | 파일 | 메모리 | 메모리 | 파일, 메모리 | 소켓 |
범위 | 시스템 내 | 시스템 내 | 시스탬 내 | 시스템 내 | 시스템 내 | 시스템 내외 |
이 중에 Message Queue, Shared Memory, Memory Map, Socket 4가지를 Python으로 구현해보겠습니다.
다양한 방법이 있겠지만 multiprocessing 패키지를 활용해 구현하는 것을 우선적으로 보이겠습니다.
a. Message Queue
먼저 Message Queue에 대해 살펴보겠습니다.
보통 Message Passing을 활용하기 위해서는 위에서 설명한 Pipe나 Message Queue를 활용합니다. 그중에서도 Message Queue는 위에서 살펴본 바와 같이 프로세스간의 단방향 통신을 위한 "자료구조"입니다.
Python의 multiprocessing 패키지에는 Queue라는 공유 Queue를 제공합니다. 이는 python에서 기본적으로 제공하는 queue.Queue의 clone에 가까우며, 추가적으로 multiprocessing 환경에서도 공유가 가능하도록 만든 queue입니다.
** (중요) 사실 queue를 구현하기 위해 다음 챕터에서 설명할 동기화가 필요합니다. 하지만 상속한 python의 queue.Queue에는 기본적으로 get(block=True)를 하는 경우 lock/semaphore를 활용해 thead-safe하며, busy wait 하지 않고 suspended되도록 구현이 되어있다고 합니다. 즉, 직접 해당 리소스를 공유할 방법에 대해서 직접구현하지 않아도 되도록 구현되어있다는 좋은 특징이있습니다.
** 단, queue.Queue는 하나의 프로세스 내에서만 작동하며, 이런 것을 가능하게 하는것이 multiprocessing.Queue입니다.
이를 활용해 여러개의 process간 통신을 하는 방법은 아래와 같습니다.
import multiprocessing
import sys
from multiprocessing import Queue
num_processes=5
result_queue=Queue()
class my_class(multiprocessing.Process):
def __init__(self, number):
super(my_class, self).__init__()
self.number = number
def run(self, number):
while True:
mydata = result_queue.get() # block=True, timeout=0
print("thread {} : data {}".format(my.number, mydata))
if __name__=='__main__':
process_list = [my_class(i) for i in range(num_processes)]
try :
for process in process_list:
process.start()
except(KeyboardInterrupt, SystemExit):
sys.exit()
for thread in thread_list:
thread.join()
while True:
value = input("Please enter a string to queue:\n")
if not result_queue.full():
result_queue.put(value)
사실 눈치 채셨겠지만, 위 방법만으로는 자식 프로세스와의 통신은 가능하겠지만, 다른 프로세스와의 통신은 불가합니다. 따라서 아래 d.Socket과 같이 multiprocessing.Manager().Queue()를 활용하는 등 아래서 설명할 socket과 함께 사용하면 다른 프로세스와의 통신도 가능합니다.
---------------------------------------------------------------
<sysv_ipc의 MessageQueue를 활용>
위와 같이 Socket 형태와 합쳐 만들 필요 없이 MessageQueue를 아예 제공하는 sysv_ipc 패키지도 존재합니다. 이는 위와 달리 서버-클라이언트 구조를 바로 활용할 수 있도록 하지만, 메모리 상의 제약사항이 존재합니다.
구현한 내용은 아래와 같습니다.
server.py
from sysv_ipc import MessageQueue, IPC_CREAT
import numpy as np
wavRecvMsQueue = MessageQueue(42,IPC_CREAT, max_message_size=10000)
while True:
print("Wait start")
data, type=wavRecvMsQueue.receive()
data = np.frombuffer(data, dtype=np.float64)
print("data : {}".format(data))
client.py
# pip3 install sysv_ipc
from sysv_ipc import MessageQueue, IPC_CREAT
import numpy as np
wavSendMsQueue = MessageQueue(42,IPC_CREAT, max_message_size=10000)
a = np.random.rand(400,2)
wavSendMsQueue.send(a)
---------------------------------------------------------------
b. Shared Memory
Shared Memory는 위에서 말했던 바와 같이 특정 메모리 공간을 공유해, 프로세스간 통신이 가능하도록 하는 방법입니다. multiprocessing패키지에는 shared_memory를 제공하며, 구현한 내용은 아래와 같습니다.
** 주의할 것은 multiprocessing.shared_memory는 python 버전 3.8부터 존재하므로 python 버전을 주의해야합니다.
server.py
from multiprocessing import shared_memory
import numpy as np
a = np.array([1, 1, 2, 3, 5, 8])
shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
b[:] = a[:]
print(shm.name + '\n')
while True :
input("Press any key : ")
b[-1]+=1
del b
shm.close()
shm.unlink()
client.py
import numpy as np
from multiprocessing import shared_memory
existing_shm = shared_memory.SharedMemory(name='psm_59549a6a')
c = np.ndarray((6,), dtype=np.int64, buffer=existing_shm.buf)
print("Shared Array is : {}".format(c))
while True:
input("Press any button : ")
print("Shared Array is : {}".format(c))
del c
existing_shm.close()
c. Memory Map
Memory Map(MMap)은 Shared Memory와 마찬가지로, 특정 메모리 영역을 프로세스간 공유함으로써 IPC가 가능하게 하도록 하는 방법입니다. 하지만 Memory Map은 Shared Memory와 다르게, 특정 파일을 메모리에 Mapping해 공유한다는 것이 다른 특징을 가지고 있습니다.
아래는 Python으로 구현한 mmap의 예시입니다.
server.py
import mmap
import numpy as np
import os
import cv2
width=327
height=154
size=(width*height*3)
fd = os.open('/tmp/mmaptest', os.O_CREAT|os.O_TRUNC|os.O_RDWR)
os.truncate(fd, (width*height*3))
while True:
input("Ready to go : ")
img = cv2.imread('images.png', cv2.IMREAD_COLOR)
print(img.shape) #(154,327,3), (H,W,C)
buf = img.tobytes()
mm = mmap.mmap(fd, size, flags=mmap.MAP_SHARED, prot=mmap.PROT_WRITE)
mm.seek(0)
mm.write(buf)
mm.flush()
mm.close()
client.py
import os
import mmap
import numpy as np
import cv2
width=327
height=154
size = (width*height*3)
fd = os.open('/tmp/mmaptest', os.O_RDONLY)
mm = mmap.mmap(fd, size, mmap.MAP_SHARED, mmap.PROT_READ)
while(True):
mm.seek(0)
buf=mm.read(size)
img = np.frombuffer(buf, dtype=np.uint8).reshape((height,width,3))
cv2.imshow("img", img)
# 0:infinite waiting
# 1~ ms
key=cv2.waitKey(1) & 0xFF # 8bit ASCII
key = chr(key) # esc == 27, enter==13
if key.lower() =="q":
break
mm.close()
d. Socket
Socket은 프로세스 뿐아니라 시스템간에도 통신이 가능하도록 하는 방법으로, 넓은 범위에서 활용되는 방법입니다. 시스템 간에도 사용되기 때문에, 도메인과 프로토콜이라는 개념이 존재하며 서버와 클라이언트 개념이 확실히 존재합니다. multiprocessing 패키지에는 Socket과 관련된 부분도 제공하므로, 구현한 내용은 아래와 같습니다.
server.py
from multiprocessing.connection import Listener
import numpy as np
listener=Listener(('localhost', 6000), authkey=b'password')
running = True
while running :
conn = listener.accept()
print("connection accepted from {}".format(listener.last_accepted))
while True:
msg = conn.recv()
print(msg)
if isinstance(msg,str) and msg =='close connection':
conn.close()
break
if isinstance(msg,str) and msg =='close server':
conn.close()
running=False
break
listener.close()
client.py
from multiprocessing.connection import Client
import time
import numpy as np
conn = Client(('localhost', 6000), authkey=b'password')
conn.send('foo1')
time.sleep(1)
conn.send('foo2')
time.sleep(1)
#conn.send('close connection')
#time.sleep(1)
conn.send('close server')
conn.close()
---------------------------------------------------------------------------------
<Manager를 활용해 구현하는 방법>
위 방법외에도 multiprocessing,Manager라는 객체를 통해 프로세스를 생성하는 방법도 존재합니다.
server.py
from multiprocessing.managers import SyncManager
import multiprocessing
import numpy as np
patch_dict = np.random.rand(5,1000)
def get_patch_dict():
return patch_dict
class MyManager(SyncManager):
pass
port_num = 4900
MyManager.register("patch_dict", get_patch_dict)
manager=MyManager(("127.0.0.1", port_num), authkey=b'password')
multiprocessing.current_process().authkey = b"password"
manager.start()
input("Press any key to **** server".center(50, "-"))
manager.shutdown()
client.py
from multiprocessing.managers import SyncManager
import multiprocessing
class MyManager(SyncManager):
pass
port_num=4900
MyManager.register("patch_dict")
manager = MyManager(("127.0.0.1",port_num), authkey=b'password')
multiprocessing.current_process().authkey=b'password'
manager.connect()
patch_dict = manager.patch_dict()
print("Keys : {}".format(patch_dict))
Manager를 활용하면 Queue나 Value같은 multiprocessing패키지에서 제공하는 공유 자원에 대해서도 직접적인 서버프로세스를 만들어 제공할 수 있는 장점도 있기 때문에 많이 활용합니다.
---------------------------------------------------------------------------------
3. Synchronization
두개 이상의 Thread가 공유 리소스에 접근하면 서로 리소스를 활용하려고 하기 때문에 Non-deterministic한 결과가 발생하게 됩니다. 이를 Race Condition이라고 합니다.
이를 방지하고 Mutual Exclusion(상호배제) 정책을 구현하기 위해, 우리는 먼저 공유 리소스에 접근하는 코드의 영역 혹은 접근 가능한 Logical한 영역을 Critical Section이라고 부릅니다.
이런 Critical Section을 안전하게 하나의 Thread에게만 제공해 원자성(Atomicity)를 보장하는 과정을 동기화(Synchronization)라고 하는데 이 부분에 대해 살펴보려고 합니다.
시작하기에 앞서 Race Condition의 상황에 대해 살펴보려고 합니다. Python은 앞서 설명한 바와 같이 GIL이 있으므로, 재현이 되지 않을 우려가 있어 C++을 활용한 Multi-thread환경을 가정하고 실제 코드의 결과를 살펴보겠습니다. 코드는 생략하겠습니다.
----------------------------------------------------------------------------
<C++에서 구현한 Multi-Thread>
실제 사용한 코드입니다.
#include <stdio.h>
#include <pthread.h>
#include <unistd.h> //sleep, getpid
#include <stdlib.h> //EXIT_FAILURE
#include <cmath>
int shared_variable=0;
void* function(void* id){
int thread_name = *(int *)id;
pid_t pid; //process id
pthread_t tid; // thread id
pid = getpid();
tid = pthread_self();
printf("thread name : %d, tid : %x, pid : %u\n", thread_name, (unsigned int)tid, (unsigned int)pid);
shared_variable++;
printf("What is the global_var now : %d\n", shared_variable);
return NULL;
}
int main(){
int num_thread=32000;
pthread_t pthread[num_thread];
int thread_id[num_thread];
for(register int i=0; i< num_thread;i++){
thread_id[i]=i;
}
for(register int i=0; i<num_thread; i++) {
if(pthread_create(&pthread[i], NULL, &function, &thread_id[i])==0)
;
else
exit(EXIT_FAILURE);
}
for(register int i=0; i<num_thread; i++) {
pthread_join(pthread[i], NULL);
}
printf("Result : %d\n", shared_variable);
return 0;
}
----------------------------------------------------------------------------
...
thread name : 31998, tid : 5b0a700, pid : 17430
What is the global_var now : 31996
thread name : 31999, tid : 5309700, pid : 17430
What is the global_var now : 31997
Result : 31997
32000개의 쓰레드가 공유 리소스에 접근한 결과를 보면 동기화가 진행되지 않았기 때문에 결과가 예상과 다르게 31997로 나온 것을 볼 수 있습니다.
이때 동기화를 가장 단순하게 해결할 수 있는 방법은 Spin Lock입니다. Spin Lock이란 어떤 Lock 객체를 두고 , 해당 Lock을 먼저 차지한 쓰레드가 여유로울 때까지 Busy Wait하는 방법입니다.
** Busy Wait : Loop Cycle을 돌며 해당 Lock을 잡을 때까지 기다리는 방법입니다.
하지만 이런 Busy Wait의 가장 큰 단점은 아무것도 할 수 없음에도 불구하고, 모든 쓰레드들이 CPU 리소스를 소모하면서 기다려야한다는 단점이 있습니다. 그럼 Spin Lock으로 구현했을 때의 결과를 살펴보겠습니다.
----------------------------------------------------------------------------
<Spin Lock을 활용해 Busy Wait하는 C++>
Spin Lock을 활용한 구현입니다.
#include <stdio.h>
#include <pthread.h>
#include <unistd.h> //sleep, getpid
#include <stdlib.h> //EXIT_FAILURE
#include <cmath>
#include <ctime>
int shared_variable=0;
pthread_spinlock_t s;
void* function(void* id){
int thread_name = *(int *)id;
pid_t pid; //process id
pthread_t tid; // thread id
pid = getpid();
tid = pthread_self();
printf("thread name : %d, tid : %x, pid : %u\n", thread_name, (unsigned int)tid, (unsigned int)pid);
pthread_spin_lock(&s);
shared_variable++;
pthread_spin_unlock(&s);
printf("What is the global_var now : %d\n", shared_variable);
return NULL;
}
int main(){
int num_thread=32000;
pthread_t pthread[num_thread];
int thread_id[num_thread];
for(register int i=0; i< num_thread;i++){
thread_id[i]=i;
}
if(pthread_spin_init(&s, 0)!=0){
printf("Error initialization\n");
return 0;
}
clock_t start, finish;
double duration;
start = clock();
for(register int i=0; i<num_thread; i++) {
if(pthread_create(&pthread[i], NULL, &function, &thread_id[i])==0)
;
else
exit(EXIT_FAILURE);
}
for(register int i=0; i<num_thread; i++) {
pthread_join(pthread[i], NULL);
}
finish = clock();
pthread_spin_destroy(&s);
printf("Result : %d\n", shared_variable);
duration = (double)(finish - start) / CLOCKS_PER_SEC;
printf("Elapsed time : %lf\n",duration);
return 0;
}
----------------------------------------------------------------------------
....
What is the global_var now : 31999
thread name : 31999, tid : 95309700, pid : 13762
What is the global_var now : 32000
Result : 32000
Elapsed time : 1.269900
결과를 보면 동기화가 가능해졌기 때문에, 전역 변수에 모두 원자성을 보장하며 접근이 가능했고, 그로 인해 전역변수에 대한 결과가 32000으로 잘 나온 것을 볼 수 있습니다.
하지만, 앞서 설명한 바와 같이 Spin Lock은 CPU자원을 소모하기 때문에, 아래서 설명할 다른 방법 중 Mutex를 활용해 동기화를 진행하면 아래와 같습니다.
----------------------------------------------------------------------------
<Mutex Lock을 활용해 Busy Wait하는 C++>
Mutex에 대해서는 아래에서 추가로 설명하겠습니다.
#include <stdio.h>
#include <pthread.h>
#include <unistd.h> //sleep, getpid
#include <stdlib.h> //EXIT_FAILURE
#include <cmath>
#include <ctime>
int shared_variable=0;
pthread_mutex_t m;
void* function(void* id){
int thread_name = *(int *)id;
pid_t pid; //process id
pthread_t tid; // thread id
pid = getpid();
tid = pthread_self();
printf("thread name : %d, tid : %x, pid : %u\n", thread_name, (unsigned int)tid, (unsigned int)pid);
pthread_mutex_lock(&m) ;
shared_variable++;
pthread_mutex_unlock(&m);
printf("What is the global_var now : %d\n", shared_variable);
return NULL;
}
int main(){
int num_thread=32000;
pthread_t pthread[num_thread];
int thread_id[num_thread];
for(register int i=0; i< num_thread;i++){
thread_id[i]=i;
}
if(pthread_mutex_init(&m, NULL)!=0){
printf("Error initialization\n");
return 0;
}
clock_t start, finish;
double duration;
start = clock();
for(register int i=0; i<num_thread; i++) {
if(pthread_create(&pthread[i], NULL, &function, &thread_id[i])==0)
;
else
exit(EXIT_FAILURE);
}
for(register int i=0; i<num_thread; i++) {
pthread_join(pthread[i], NULL);
}
finish = clock();
pthread_mutex_destroy(&m);
printf("Result : %d\n", shared_variable);
duration = (double)(finish - start) / CLOCKS_PER_SEC;
printf("Elapsed time : %lf\n",duration);
return 0;
}
----------------------------------------------------------------------------
....
What is the global_var now : 31999
thread name : 31999, tid : 7bb06700, pid : 15596
What is the global_var now : 32000
Result : 32000
Elapsed time : 1.267371
역시나 원자성을 보장하기 때문에 전역 변수의 값이 안정적으로 더해진 것을 볼 수 있습니다.
아래서는 동기화 개념에 대해서 하나씩 살펴보며, 아래의 Python으로 구현된 Multi-Thread 상황에 동기화를 구현한 모습을 살펴보겠습니다.
import threading
import time
import os
global_var=0
# Case2. SubClass
class myclass(threading.Thread):
def __init__(self, arg1):
super().__init__()
self.arg1 = arg1
def run(self):
global global_var
print("[2] class thread {} in! (pid:{}, tid:{})".format(self.arg1, os.getpid(), threading.get_ident()))
global_var+=1
print("[2] What is the global_var now : {}".format(global_var))
num_thread = 30000
t_list = [myclass(i) for i in range(num_thread)]
start_time=time.time()
for thread in t_list:
thread.start()
for thread in t_list:
thread.join()
end_time = time.time()
print("[2] class out : {}".format(global_var))
elapsed_time = end_time -start_time
print("[2] elapsed time : {}sec".format(elapsed_time))
추가적으로 Python의 GIL이 있음에도 불구하고 왜 동기화가 필요한지에 대해 궁금하신 분들은 아래 더보기를 참조하시길 바랍니다.
----------------------------------------------------------------------------
<Python에 동기화가 필요한 이유?>
앞서 설명한 바와 같이 Python에는 GIL이 존재하면 하나의 프로세스 내에서는 Thread가 동시에 실행될 일이 없을 것 같은데, 왜 Thread간의 동기화가 필요할까요?
먼저 위 기본 코드에 대한 결과를 살펴 보겠습니다.
...
[2] What is the global_var now : 100
[2] class out : 100
[2] elapsed time : 1.6295876502990723sec
역시나 동기화가 진행되지 않았음에도 불구하고, 결과가 동기화가 진행된 것처럼 보입니다. 그럼 GIL 때문에 동기화가 필요 없는 것이 아닐까요?
아닙니다. Python의 GIL 특성 때문에 Python은 Byte Code를 한번에 실행할 수 없을 뿐이지, 내가 원하는 Critical Section에 대해 안전하게 보장할 수 있는 방법이 아닙니다.
예를 들어 Thread 두개가 A라는 객체를 읽는 것을 순서대로 한뒤에 각각 A라는 객체를 변경 시켰다면, A라는 객체를 Thread 한 개가 바꾸고 나서 읽은 것이 아니기 때문에 각각 자기가 원하는 값으로 바꿔버릴 수 있는 위험이 있습니다. 따라서 Python에서도 Lock을 활용하는 것이 필요합니다.
그럼에도 불구하고, 역시나 당연하게 Multi-Thread는 GIL라는 특성이 있기 때문에, Multi-Process를 활용한다면 훨씬 더 빠르게 병렬적으로 실행할 수 있습니다. 아래 코드를 실행해보겠습니다.
import threading
import multiprocessing
import time
import os
global_var=0
# Case2. SubClass
class myclass(multiprocessing.Process):
def __init__(self, arg1):
super().__init__()
self.arg1 = arg1
def run(self):
global global_var
print("[2] class process {} in! (pid:{}, tid:{})".format(self.arg1, os.getpid(), threading.get_ident()))
#for i in range(10**5):
# temp = i+1
global_var+=1
print("[2] What is the global_var now : {}".format(global_var))
num_process = 30000
p_list = [myclass(i) for i in range(num_process)]
start_time= time.time()
for process in p_list:
process.start()
for process in p_list:
process.join()
end_time = time.time()
print("[2] class out : {}".format(global_var))
elapsed_time = end_time - start_time
print("[2] elapsed time : {}sec".format(elapsed_time))
...
[2] What is the global_var now : 1
[2] What is the global_var now : 1
[2] class out : 0
[2] elapsed time : 0.09156060218811035sec
위와 다르게 속도가 굉장히 빨라진 것을 볼 수 있습니다.
근데 위 결과에 대해서는 한가지 살펴볼 것이 있습니다. 공유 객체가 이상한 값으로 할당된 것을 볼 수 있습니다. 이는 병렬적으로 실행되었기 때문에 Race Condition이 발생했기 때문일까요?
아닙니다. Global Variable은 fork된 프로세스에서 활용할 수 있는 값이 아닙니다. 따라서 이를 위해 python에서는 Multiprocessing에 할용할 객체는 multiprocessing 패키지를 이용해서 선언해주어야 합니다.
----------------------------------------------------------------------------
a. Mutex
원자성을 보장하기 위해 단 하나의 쓰레드 혹은 프로세스에 의해 차지될 수 있는 객체를 Lock이라고 합니다.
이때 앞서 설명한 바와 같이 Busy Waiting을 하는 Spin Lock과는 다르게 Mutex Lock의 경우, 해당 Lock을 얻기 전까지 Sleep 상태로 들어갔다가 Lock이 다시 여유가 생기면 모든 Thread들이 Wakeup되고, 다시 Lock을 얻으려고 시도 할 수 있는 상태가 됩니다. 이를 Locking Mechanism이라고 합니다.
** 정확히는 뒤에서 나올 Signaling이 아닌, OS가 모든 Thread들을 깨웁니다.
위 Python 코드에서 mutex를 활용한 구현은 아래와 같습니다.
import threading
import time
import os
global_var=0
lock=threading.Lock()
# Case2. SubClass
class myclass(threading.Thread):
def __init__(self, arg1):
super().__init__()
self.arg1 = arg1
def run(self):
global global_var
print("[2] class thread {} in! (pid:{}, tid:{})".format(self.arg1, os.getpid(), threading.get_ident()))
lock.acquire()
global_var+=1
lock.release()
print("[2] What is the global_var now : {}".format(global_var))
num_thread = 30000
t_list = [myclass(i) for i in range(num_thread)]
start_time= time.time()
for thread in t_list:
thread.start()
for thread in t_list:
thread.join()
end_time = time.time()
print("[2] class out : {}".format(global_var))
elapsed_time = end_time - start_time
print("[2] elapsed time : {}sec".format(elapsed_time))
b. Semaphore
Semaphore는 Mutex와 다르게 여러개의 Thread가 접근 가능하도록 만든 방법입니다.
또한, 이는 Mutex와는 다르게 다른 Thread에 의해 신호를 받는 signal mechanism을 활용합니다. 즉, semaphore의 count가 0보다 큰 경우 Thread들이 접근 가능하므로 signaled상태이고, 0인 경우는 접근이 불가능한 상황입니다. 아래는 Semaphore에 대한 그림과 비교 표입니다.
Mutex (Binary Semaphore) | Semaphore | |
용도 | 공유 자원 하나에 접근을 제한 | 공유 자원 여러개에 접근을 제한 |
해제 방법 | Mutex 소유 중인 주체가 해제 | 소유하지 않는 주체가 해제 |
존재 범위 | Critical Section 내 | 파일 형태로 지속 존재 |
Semaphore에는 두가지가 있습니다.
- Counting Semaphore : 2개 이상의 Thread가 접근이 가능합니다.
- Binary Semaphore : 1개의 Thread만 접근이 가능합니다.
위 Python 코드에서 semaphore를 활용한 구현은 아래와 같습니다.
import threading
import time
import os
num_sema=3
sema=threading.Semaphore(num_sema)
global_var=0
# Case2. SubClass
class myclass(threading.Thread):
def __init__(self, arg1):
super().__init__()
self.arg1 = arg1
def run(self):
global global_var
print("[2] class thread {} in! (pid:{}, tid:{})".format(self.arg1, os.getpid(), threading.get_ident()))
sema.acquire()
global_var+=1
sema.release()
print("[2] What is the global_var now : {}".format(global_var))
num_thread = 10
t_list = [myclass(i) for i in range(num_thread)]
for thread in t_list:
thread.start()
for thread in t_list:
thread.join()
print("[2] class out : {}".format(global_var))
c. Condition Variable
CV(Condition Variable)은 Semaphore와 다르게 "특정 조건"이 성립할 때 wake up하기 위해 사용하는 방법입니다. CV는 Mutex와 associated되어 활용되며, Semaphore와는 다르게 어떤 쓰레드가 Signal을 이미 보냈는데, 다른 쓰레드가 wait상태가 아닐 수도 있어 이때는 signal을 받지 못하는 위험이 있습니다.
생산자 소비자 (Producer-Consumer) 모델을 구현할 때 주로 사용됩니다. 아래는 Python 코드에서 CV를 활용한 구현은 아래와 같습니다.
import threading
import time
import os
global_var=[]
condition=threading.Condition()
# Case2. SubClass
class producer(threading.Thread):
def __init__(self, arg1):
super().__init__()
self.arg1 = arg1
def run(self):
condition.acquire()
global global_var
print("[2] class thread {} in! (pid:{}, tid:{})".format(self.arg1, os.getpid(), threading.get_ident()))
global_var.append(self.arg1)
print("[2] What is the global_var now : {}".format(global_var))
condition.notify()
condition.release()
class consumer(threading.Thread):
def __init__(self, arg1):
super().__init__()
self.arg1 = arg1
def run(self):
while True:
condition.acquire()
print("!!!!!!!!!!!!!!!!!!IN!!!!")
condition.wait()
global global_var
print("[2] class thread {} in! (pid:{}, tid:{})".format(self.arg1, os.getpid(), threading.get_ident()))
item = global_var.pop(0)
print("[2] What is the global_var now : {}".format(item))
condition.release()
num_thread = 10
t_prod = [producer(i) for i in range(num_thread)]
t_cons = [consumer(i+num_thread) for i in range(num_thread)]
start_time=time.time()
for thread in t_cons:
thread.start()
for thread in t_prod:
thread.start()
for thread in t_prod:
thread.join()
for thread in t_cons:
thread.join()
end_time = time.time()
print("[2] class out : {}".format(global_var))
elapsed_time = end_time -start_time
print("[2] elapsed time : {}sec".format(elapsed_time))
4. Example
위와 같은 내용을 바탕으로 구현한 어떤 예시를 하나 보이고자 합니다.
아래 그림과 같이 Interface Module과 연결된 My Module을 구현하고자 하는 상황이라고 가정합니다. 요구사항은 아래와 같습니다.
- Interface Module로부터 알 수 없는 개수의 Process들이 My Module에 접근할 것입니다.
- My Module 내에서 Input Queue와 Result Queue가 Interface Module과의 연결을 관리합니다.
- My Module내에서는 thread를 여러개 실행해두고 Input Queue의 상태에 따라서 wake up되어 task가 실행됩니다.
- My Module에서 완료된 task들은 Result Queue에 결과를 전달합니다.
- Result Queue는 요청했던 프로세스에게 맞춰 결과를 전달합니다.
a. Server
서버 측의 구현 코드에 대해 설명하겠습니다.
import multiprocessing
from multiprocessing.managers import SyncManager
from multiprocessing import Manager
from multiprocessing import Queue
import threading
import sys
import json
먼저 Input Queue에 대해 설명하겠습니다. 앞서 설명한 Message Queue를 구현하기 위해 Queue를 선언했으며, Manager와 함께 IPC가 가능하도록 구현할 예정입니다.
#=================Multiprocessing Managers - Receiver Process========================
input_queue = Queue()
다음으로 Result Queue에 대해 설명하겠습니다. 여러개의 Thread에서 하나의 result queue에 결과를 담을텐데, 이를 외부에 각각의 프로세스에 보내기 위해 result_queue_dict를 만들었습니다.
외부의 프로세스는 외부 프로세스에 해당하는 ID를 통해 Queue를 미리 할당해야 할 것 입니다.
#=================Multiprocessing Managers - Sender Process========================
result_queue_dict = {}
result_queue = Queue()
def GetResultQueue(channel_id):
if not channel_id in result_queue_dict.keys():
return {"result":False, "message":"[Error] Please call SetResultQueue function first"}
else:
return result_queue_dict[channel_id]
class SetResultQueue(object):
def __init__(self,):
self.result_queue_dict = result_queue_dict
self.result_queue = result_queue
def mediator():
while True :
result, channel_id = self.result_queue.get()
self.result_queue_dict[channel_id].put((result))
mediator_obj = threading.Thread(target=mediator)
mediator_obj.start()
def set_channel(self, channel_id):
attr = getattr(self, 'result_queue_dict')
if channel_id in attr.keys():
return {"result":False, "message":"[Error] Please give me another channel id, not {}".format(channel_id)}
else:
attr[channel_id] = Queue()
return {"result":True, "message":"[Notice] Successfully channel {} added".format(channel_id)}
#=============================================================================END
------------------------------------------------------------------------------------
<Result Queue를 구현하는데의 위 구조를 선택한 이유>
- Result Queue를 여러개 유지하기 위해서 Dictionary를 활용하고 싶은데 아래와 같은 제한이 있습니다.
- a. multiprocessing manager는 queue 하나는 return할 수 있지만, queue를 포함한 dictionary는 return 할 수 없습니다.
→ Queue dictionary가 아닌 객체 하나를 return해야 합니다.
** queue를 return 하면 proxy형태로 return 되기 때문인데, 이미 구현된 Proxy를 통해 get()함수를 호출할 수 있습니다. - b. Manager().dict()는 process 간 공유되지만, 일반적인 global dictionary는 queue object는 공유가 되지 않습니다.
하지만 Manager().dict()에는 queue를 할당할 수 없지만, global dictionary는 queue object는 할당할 수 있습니다.
→ Queue를 dictionary에 할당하는 것은 필요하므로 global dictionary를 활용해야할 것 같습니다.
** Manager().dict에 queue를 할당하는 경우 : RuntimeError: Queue objects should only be shared between processes through inheritance - c. multiprocessing.Queue는 프로세스간 공유가 되지만 queue.Queue는 공유가 되지 않습니다.
→ 새로운 mediator를 추가해 multiprocessing.Queue 하나를 공유하도록 해야 합니다.
- a. multiprocessing manager는 queue 하나는 return할 수 있지만, queue를 포함한 dictionary는 return 할 수 없습니다.
- (1-a를 위해) queue를 return 하기 위해서는? 함수/Class 중 함수를 활용
- 함수는 multiprocessing.Queue object를 return하는 것이 가능하지만 class는 multiprocessing.Queue object를 return하는 것은 안됩니다.
→ 함수를 활용해야 합니다.
** class에서 queue object를 return 하는 경우 : RuntimeError: Queue objects should only be shared between processes through inheritance
- 함수는 multiprocessing.Queue object를 return하는 것이 가능하지만 class는 multiprocessing.Queue object를 return하는 것은 안됩니다.
- (1-b를 위해) global dictionary에 할당하는 방법을 만들기 위해 class를 활용했는데, 이때 client에게 새로운 함수를 만들어주고 expose 하기위해서는 개인이 쓰고 싶은 proxy를 직접 만들어주어야합니다. 아래 Client부분에 구현체를 보이겠습니다.
------------------------------------------------------------------------------------
자 이제 위 두 프로세스를 띄우고, input_queue에서 결과를 받아 result_queue에 담는 MY_Thread들을 실행해보겠습니다.
아래서 input queue와 result queue를 담당할 manager를 두번 실행해 주었는데, manager 객체 별로 process하나를 의미합니다.
class QueueManager(SyncManager):
pass
class MY_Wrapper :
def __init__(self, setting):
# =====================================Configuration========================
self.num_threads = 5
port_sender = 1001
port_receiver = 1002
authkey = b'password'
gpu_target=3
# ==========================Queue manager processes========================
QueueManager.register('GetAudioQueue', callable=lambda:audio_queue)
QueueManager.register('SetResultQueue', callable=SetResultQueue)
QueueManager.register('GetResultQueue', callable=GetResultQueue)
self.manager_sender = QueueManager(("127.0.0.1", port_sender), authkey=authkey)
self.manager_receiver = QueueManager(("127.0.0.1", port_receiver), authkey=authkey)
multiprocessing.current_process().authkey=authkey
# ===============================Main Threads=============================
self.on_flag = threading.Event()
self.on_flag.clear()
self.thread_list = [MY_Thread(i, setting, audio_queue, result_queue, self.on_flag) for i in range(0, self.num_threads)]
# =========================================================================
def start(self):
# =====================Main Threads=====================
for thread in self.thread_list:
thread.start()
print("[Notice] Processing Module Ready")
# ====================Queue manager processes===========
self.manager_sender.start()
self.manager_receiver.start()
print("[Notice] IPC Queue Manager Ready")
# ======================================================
try :
for thread in self.thread_list:
thread.join()
except :
self.manager_sender.shutdown()
self.manager_receiver.shutdown()
self.on_flag.set()
for i in range(0, self.num_threads):
audio_queue.put(None)
for thread in self.thread_list:
thread.join()
sys.exit()
My_Thread에 대한 내용은 보이지 않겠습니다. 단순히 알아야할 내용에 대해서만 이야기하자면 각각의 Thread에서는 아래와 같이 동작합니다.
- input_queue.get(block=True)를 통해 대기합니다. 따라서, queue의 특성상 input_queue에 아무것도 존재하지 않을 때는 sleep형태를 유지할 것입니다.
- 결과는 result_queue에 넣습니다.
-----------------------------------------------------------------------
<MY_Thread의 구현 내용>
실제 구현 된 MY_Thread의 내용입니다. input queue에 정보가 있을때 wakeup 되도록 만든 것은 multiprocessing.Queue를 활용해 block되도록 함으로써 구현했습니다.
class MY_Thread(threading.Thread):
def __init__(self, thread_id, setting, input_queue, result_queue, on_flag):
super(MY_Thread, self).__init__()
self.id = thread_id
self.input_queue = input_queue
self.result_queue = result_queue
self.on_flag = on_flag
from .my_class import MY_Class
self.LOGIC = MY_Class(setting)
def run(self,):
while True :
data = self.input_queue.get()
if not data and self.on_flag.is_set():
self.stop_module()
print("[Notice] Thread number {} is closed".format(self.id))
break;
data_a, data_b, channel_id = data
result = self.LOGIC(data_a)
self.result_queue.put((result, channel_id))
def stop_module(self,):
self.LOGIC.__del__()
-----------------------------------------------------------------------
b. Client
이번엔 Client쪽의 코드를 살펴보겠습니다.
앞서 언급한 바와 같이 Client에서는 보낼 audio_queue와 result_queue를 얻어내야합니다.
audio_queue는 Manager를 통해 직접 띄워져 있으므로, 바로 audio_queue 프록시를 받을 수 있습니다. 하지만 result_queue의 경우 나의 ID를 등록하고 나에 해당하는 queue를 받아야하므로, 얻기 전에 SetResultQueue()를 통해 Wrapper 프록시를 받고 해당 프로세스의 Queue를 만들어 준뒤, GetResultQueue() 함수를 통해 Queue를 얻어냅니다.
두개의 함수로 구분된 이유에 대해서는 위 더보기를 참조하세요. 아래는 Client로 구현된 코드입니다.
from multiprocessing.managers import SyncManager
from multiprocessing.managers import NamespaceProxy
from multiprocessing import Queue
import multiprocessing
from queue import Empty
#=================== Connect Queue Managers
class QueueManager(SyncManager):
pass;
class MY_Client:
def __init__(self, channel_id):
self.channel_id = channel_id
QueueManager.register('GetAudioQueue')
QueueManager.register('GetResultQueue')
QueueManager.register('SetResultQueue', proxytype=ResultQueueProxy)
self.port_sender = 1001
self.port_receiver= 1002
self.authkey = b"password"
self.manager_sender = QueueManager(("127.0.0.1", self.port_sender), authkey=self.authkey)
self.manager_receiver = QueueManager(("127.0.0.1", self.port_receiver), authkey=self.authkey)
multiprocessing.current_process().authkey=b"password"
self.manager_sender.connect()
self.manager_receiver.connect()
# ***********************************************
self.audio_queue = self.manager_sender.GetAudioQueue()
# ***********************************************
receiver_obj = self.manager_receiver.SetResultQueue()
data = receiver_obj.set_channel(self.channel_id)
result = data.get('result')
message = data.get("message")
if result:
print(message)
else :
raise Exception(message)
data= self.manager_receiver.GetResultQueue(self.channel_id)
try :
a = data.get(block=False)
except Empty:
self.result_queue=data
except :
message = data.get("message")
raise Exception(message)
# ***********************************************
def SendAudio(self, data_a, data_b):
self.audio_queue.put((data_a, data_b, self.channel_id))
def ReceiveResult(self, block=True):
if block:
result = self.result_queue.get(True)
return {'result':True, 'data':result}
else:
try:
result = self.result_queue.get(False)
return {'result':True, 'data':result}
except Empty:
return {'result':False}
-----------------------------------------------------------------------
<프록시(Proxy)에 원하는 인터페이스 추가하기>
앞서 더보기에서 설명한 바와 같이 프록시에 Interface를 추가하고 싶은 경우 아래와 같이 프록시를 재정의해주어야 합니다. 따라서 Client에서는 Manager를 만들 때 새로 정의한 아래와 같은 Proxy를 넘겨주어야 합니다.
class ResultQueueProxy(NamespaceProxy):
_exposed_ = ('__getattribute__', '__setattr__', '__delattr__', 'set_channel')
def set_channel(self, channel_id):
callmethod = object.__getattribute__(self, '_callmethod')
return callmethod('set_channel', (channel_id,))
-----------------------------------------------------------------------
https://yaelimeee.tistory.com/56
https://stackoverflow.com/questions/63553692/how-to-use-memory-mapped-file-in-python-linux
https://docs.python.org/3/library/multiprocessing.html
https://stackoverflow.com/questions/22098216/are-python-condition-variables-busy-waiting
'Developers 공간 [Basic] > Software Basic' 카테고리의 다른 글
[PyTorch] Optimizer & LR Scheduler 정리 (0) | 2024.08.17 |
---|---|
[Pytorch] Attention Layer 분석 및 구축하기 (0) | 2023.06.20 |
[OOP] Design Pattern 정리 (0) | 2023.05.15 |
[PyTorch] DDP(Distributed Data Parallel) 셋팅하기 (0) | 2022.12.27 |
[AWS] SMDDP(Sagemaker's DDP) 기본 환경 셋팅 (0) | 2022.12.27 |