본문 바로가기

더 나은 엔지니어가 되기 위해/파이썬을 파이썬스럽게

python 멀티 프로세싱은 parmap 으로 하자.

[2020.07.24 에 남기는 글]

이 글은 레거시적인 성격이 있습니다.
파이썬에서 멀티프로세스, 쓰레드 이용하기 가장 좋은 법은 concurrent.futrues 에서의
ThreadPoolExecutor 와 ProcessPoolExecutor 를 사용하거나 asyncio 를 사용하는 것입니다.
따라서 이 글은 이제는 별로 추천드리고 싶지 않습니다.

파이썬3에서 일반적으로 멀티프로세스 사용하는 방법

최근에 파이썬으로 프로젝트를 하다가 단일 프로세스로 루프문 돌리니까 너어어무 느려서, 속도를 좀 올려보고자, 멀티프로세싱을 사용해보게 되었다.

파이썬3 에서 일반적으로 사용하는 멀티 프로세싱 방법은 다음과 같이 크게 2가지 방법이 있다.

  • multiprocessing.Process(...)
  • multiprocessing.Pool(...)

두 개가 뭐.. 비슷은 하지만 아무튼 사용용도 보면, 경우에 따라 다르게 사용하는 걸 볼 수있다.
하지만, 이 글에서는 위 두 개에 대해서는 안 다룰거다.
어려운 내용도 아니고, 구글링하면 어떻게 사용하는지 금방 나오므로, 아래 링크로 스킵한다.

niceman 님 블로그에 잘 설명되어있으므로, 아래 링크 참조.

이렇게하면 짜증나는 점

위 두 방법을 안다고 일단 가정하고 쓰겠다.

먼저, 위와 같이 쓰면 매우 짜증난다. 예를 들어, Pool 을 써서 프로세스와 함수 매핑을 하는 경우를 생각하자.
그리고, 프로세스간 shared dictionary 를 사용한다고 하자.
예를 들어 다음과 같은 코드가 된다.

import multiprocessing
from itertools import repeat

def a(x, d):
    d[x] = True

num_cores = multiprocessing.cpu_count() # cpu core 개수
pool = multiprocessing.Pool(num_cores)

manager = multiprocessing.Manager()
d = manager.dict()                        # 프로세스간 공유할 shared dictionary
input_list = range(0, 10)                # 프로세스들에 나눠들어갈 input list
pool.starmap(a, zip(input_list, repeat(d))
pool.close()
pool.join

위 코드를 잠시 설명하자면, core 수 만큼 프로세스를 생성한 후, 각 프로세스는 a(x, d) 라는 함수를 실행한다.
이 때, 각 프로세스는 d 라는 dictonary를 공유하는데, 이를 위해 manager.dict() 를 사용한 뒤, 이를 프로세스 인자로 넘겨주어야 한다. 한편, 프로세스에 들어가는 인자가 1개가 아니라 2개 이상이므로, .starmap() 을 사용한다.
근데, input_list 는 실제로 분할되어 프로세스에 들어가지만 d 역시 꾸준하게 모든 프로세스의 인자로 들어가야하므로, ziprepeat 을 사용하여 넣어주어야 한다.

자. 얼마나 번거로운가? .starmap 까지는 괜찮았다. 그런데 여러 개의 인자를 프로세스에 보내야할 때, ziprepeat 을 사용해야만 실제로 돌릴 수 있다는게 이해가 안간다. 왜이렇게 복잡하게 만들었을까??

그리고 또 무엇보다, tqdm(진행상황을 progress bar로 시각화해주는 툴)을 쓰기가 무진장 번거롭다.
프로세스들의 진행상황을 알 수가 없으니, 이 얼마나 답답한가...

이를 해결해줄 패키지, parmap

이런 불편함. 나만 느낀게 아니였나보다. 이를 깔끔하게 사용할 수 있도록 도와준 패키지가 있으니, parmap 이라는 패키지다.

parmap github
https://github.com/zeehio/parmap

pip install parmap 로 간단히 깔아주고, 혹시 tqdm 이 안깔려있으면 pip install tqdm 으로 깔아주자.
parmap 사용법은 github 페이지에 잘 나와있다.

1. 무작정 따라하기

parmap 을 이용하여 멀티프로세싱을 구현하는 과정을 하나씩 살펴보자.
멀티 프로세싱을 쓴다는 것은, 같은 일을 병렬적으로 처리하겠다는 뜻이다.
그렇다면 어떤 일을 몇 개의 cpu 를 이용하여 병렬처리할지 정의해주어야 한다.

먼저 몇 개의 cpu 를 사용할 수 있는지 확인해보자.
다음 코드로, 컴퓨터의 cpu core 수를 확인해볼 수 있다.

import multiprocessing
num_cores = multiprocessing.cpu_count() # 12

현재 내 컴퓨터에는 12개의 cpu 코어가 있다. 따라서, num_cores 에는 12의 값이 들어간다.
그 다음으로, 병렬적으로 어떤 일을 할 지, 함수로 정의한다.
나는 여러 개의 입력값들을 받아, 이 입력 값을 제곱한 리스트를 내보내는 함수를 정의했다.

def square(input_list):
    return [x*x for x in input_list]

이제 실제로 parmap 을 사용해볼 차례인데,
먼저 각 입력으로 줄 데이터를 정의하고, 이를 병렬처리 개수에 맞춰 분할해야 한다.

import numpy as np

# 입력 데이터
data = list(range(1, 25)) # [1, 2, ..., 24]

# 입력 데이터를 cpu 수만큼 균등하게 나눠준다. 1차원 배열이 2차원 numpy array 배열이 된다.
splited_data =  np.array_split(data, num_cores)
# splited_data 는 [np.array([1, 2]), np.array([3, 4]), ..., np.array([23, 24])] 이 된다.

np.array_split 은 데이터를 균등하게 분할해주지만, 결과 값은 np.array 로 나온다.
따라서 list 가 필요한 경우 다음과 같이 다시 .tolist() 로 바꿔주어야 한다.

splited_data = [x.tolist() for x in splited_data]
# splited_data 는 [[1, 2], [3, 4], ..., [23, 24]] 가 된다. 

이제 parmap 으로 멀티프로세싱 처리를 하자.

import parmap

result = parmap.map(square, splited_data, pm_pbar=True, pm_processes=num_cores)
# result 는 [[1, 4], [9, 16], ..., [529, 576]] 이 된다.

이렇게 멀티프로세싱을 잘 구현했다!
전체 코드는 다음과 같다.

import multiprocessing
import parmap
import numpy as np

num_cores = multiprocessing.cpu_count() # 12

def square(input_list):
    return [x*x for x in input_list]

data = list(range(1, 25))
splited_data =  np.array_split(data, num_cores)
splited_data = [x.tolist() for x in splited_data]

result = parmap.map(square, splited_data, pm_pbar=True, pm_processes=num_cores)

2. 추가 예제

이번엔 맨 처음에 짜증을 불러일으켰던 코드를 parmap 으로 구현해보자.

import parmap
from multiprocessing import Manager

num_cores = 16 # 사용할 cpu 코어 수. multiprocessing.cpu_count() 로 확인 가능
manager = Manager()
d = manager.dict()

def a(x, d):
    d[x] = 1

input_list = range(0, 10)
parmap.map(a, input_list, d, pm_pbar=True, pm_processes=num_cores)

깔끔하다.
input_list 자리에 프로세스에 분할하여 들어갈 input variables 를 넣어주면 되고, 그 뒤에는 고정적으로 들어갈 인자들을 넣어주면 된다. 개수 역시 제한없다.
keyword_parameter 들을 제공하는데, 예를 들어 위 같이 pm_pbar=True 를 주면, tqdm 으로 보는 progress bar 를 볼 수 있다. 완전 편하다.

그 외 기능은 github readme 파일에 잘 나와있으니, 간단하게 읽어보는 것도 좋을듯 하다.

그 외 Tips

1. 데이터 균등하게 나누기, numpy.array_split()

처리해야할 데이터를 나누어 멀티 프로세스에 뿌리는 경우가 많은데, 이럴 때 하나의 데이터를 여러개로 쪼개야 한다.
예를 들어, 3개의 프로세스를 사용하는 경우, [1,2,3,4,5,6,7,8,9][[1,2,3], [4,5,6], [7,8,9]] 로 만들어야 한다.
이 때 각 데이터 뭉치(chunk)의 데이터의 수를 최대한 균등하게 만들어야, 각 프로세스가 parallel 하게 작동할텐데,
이 때, numpy 패키지의 array_split(data, # of chunk) 를 사용하면 된다.
예를 들어, 다음과 같이 쓴다.

import numpy as np

input_data = np.array_split([1,2,3,4,5,6,7,8,9], 3)
# input_data = [np.array([1,2,3]), np.array([4,5,6]), np.array([7,8,9])]

parmap.map(func, input_data)

2. 프로세스간 Shared Data, multiprocessing.Manager()

프로세스간 별도의 메모리 공간을 차지하므로, 당연히 지역 변수는 해당 프로세스 내에서만 존재한다.
프로세스간 Shared 할 수 있는 공간, 구체적으로는 자료구조를 사용하기 위해서는, multiprocessing.Manager() 를 사용하면 된다.
위의 예에서 사용하듯 쓰면 된다.