본문 바로가기

더 나은 엔지니어가 되기 위해/파이썬을 파이썬스럽게

python 멀티 프로세싱은 parmap 으로 하자.

[2020.07.24 에 남기는 글]

이 글은 레거시적인 성격이 있습니다.
파이썬에서 멀티프로세스, 쓰레드 이용하기 가장 좋은 법은 concurrent.futrues 에서의
ThreadPoolExecutor 와 ProcessPoolExecutor 를 사용하거나 asyncio 를 사용하는 것입니다.
따라서 이 글은 이제는 별로 추천드리고 싶지 않습니다.

파이썬3에서 일반적으로 멀티프로세스 사용하는 방법

최근에 파이썬으로 프로젝트를 하다가 단일 프로세스로 루프문 돌리니까 너어어무 느려서, 속도를 좀 올려보고자, 멀티프로세싱을 사용해보게 되었다.

파이썬3 에서 일반적으로 사용하는 멀티 프로세싱 방법은 다음과 같이 크게 2가지 방법이 있다.

  • multiprocessing.Process(...)
  • multiprocessing.Pool(...)

두 개가 뭐.. 비슷은 하지만 아무튼 사용용도 보면, 경우에 따라 다르게 사용하는 걸 볼 수있다.
하지만, 이 글에서는 위 두 개에 대해서는 안 다룰거다.
어려운 내용도 아니고, 구글링하면 어떻게 사용하는지 금방 나오므로, 아래 링크로 스킵한다.

niceman 님 블로그에 잘 설명되어있으므로, 아래 링크 참조.

이렇게하면 짜증나는 점

위 두 방법을 안다고 일단 가정하고 쓰겠다.

먼저, 위와 같이 쓰면 매우 짜증난다. 예를 들어, Pool 을 써서 프로세스와 함수 매핑을 하는 경우를 생각하자.
그리고, 프로세스간 shared dictionary 를 사용한다고 하자.
예를 들어 다음과 같은 코드가 된다.

import multiprocessing
from itertools import repeat

def a(x, d):
    d[x] = True

num_cores = multiprocessing.cpu_count() # cpu core 개수
pool = multiprocessing.Pool(num_cores)

manager = multiprocessing.Manager()
d = manager.dict()                        # 프로세스간 공유할 shared dictionary
input_list = range(0, 10)                # 프로세스들에 나눠들어갈 input list
pool.starmap(a, zip(input_list, repeat(d))
pool.close()
pool.join

위 코드를 잠시 설명하자면, core 수 만큼 프로세스를 생성한 후, 각 프로세스는 a(x, d) 라는 함수를 실행한다.
이 때, 각 프로세스는 d 라는 dictonary를 공유하는데, 이를 위해 manager.dict() 를 사용한 뒤, 이를 프로세스 인자로 넘겨주어야 한다. 한편, 프로세스에 들어가는 인자가 1개가 아니라 2개 이상이므로, .starmap() 을 사용한다.
근데, input_list 는 실제로 분할되어 프로세스에 들어가지만 d 역시 꾸준하게 모든 프로세스의 인자로 들어가야하므로, ziprepeat 을 사용하여 넣어주어야 한다.

자. 얼마나 번거로운가? .starmap 까지는 괜찮았다. 그런데 여러 개의 인자를 프로세스에 보내야할 때, ziprepeat 을 사용해야만 실제로 돌릴 수 있다는게 이해가 안간다. 왜이렇게 복잡하게 만들었을까??

그리고 또 무엇보다, tqdm(진행상황을 progress bar로 시각화해주는 툴)을 쓰기가 무진장 번거롭다.
프로세스들의 진행상황을 알 수가 없으니, 이 얼마나 답답한가...

이를 해결해줄 패키지, parmap

이런 불편함. 나만 느낀게 아니였나보다. 이를 깔끔하게 사용할 수 있도록 도와준 패키지가 있으니, parmap 이라는 패키지다.

parmap github
https://github.com/zeehio/parmap

pip install parmap 로 간단히 깔아주고, 혹시 tqdm 이 안깔려있으면 pip install tqdm 으로 깔아주자.
parmap 사용법은 github 페이지에 잘 나와있다.

1. 무작정 따라하기

parmap 을 이용하여 멀티프로세싱을 구현하는 과정을 하나씩 살펴보자.
멀티 프로세싱을 쓴다는 것은, 같은 일을 병렬적으로 처리하겠다는 뜻이다.
그렇다면 어떤 일을 몇 개의 cpu 를 이용하여 병렬처리할지 정의해주어야 한다.

먼저 몇 개의 cpu 를 사용할 수 있는지 확인해보자.
다음 코드로, 컴퓨터의 cpu core 수를 확인해볼 수 있다.

import multiprocessing
num_cores = multiprocessing.cpu_count() # 12

현재 내 컴퓨터에는 12개의 cpu 코어가 있다. 따라서, num_cores 에는 12의 값이 들어간다.
그 다음으로, 병렬적으로 어떤 일을 할 지, 함수로 정의한다.
나는 여러 개의 입력값들을 받아, 이 입력 값을 제곱한 리스트를 내보내는 함수를 정의했다.

def square(input_list):
    return [x*x for x in input_list]

이제 실제로 parmap 을 사용해볼 차례인데,
먼저 각 입력으로 줄 데이터를 정의하고, 이를 병렬처리 개수에 맞춰 분할해야 한다.

import numpy as np

# 입력 데이터
data = list(range(1, 25)) # [1, 2, ..., 24]

# 입력 데이터를 cpu 수만큼 균등하게 나눠준다. 1차원 배열이 2차원 numpy array 배열이 된다.
splited_data =  np.array_split(data, num_cores)
# splited_data 는 [np.array([1, 2]), np.array([3, 4]), ..., np.array([23, 24])] 이 된다.

np.array_split 은 데이터를 균등하게 분할해주지만, 결과 값은 np.array 로 나온다.
따라서 list 가 필요한 경우 다음과 같이 다시 .tolist() 로 바꿔주어야 한다.

splited_data = [x.tolist() for x in splited_data]
# splited_data 는 [[1, 2], [3, 4], ..., [23, 24]] 가 된다. 

이제 parmap 으로 멀티프로세싱 처리를 하자.

import parmap

result = parmap.map(square, splited_data, pm_pbar=True, pm_processes=num_cores)
# result 는 [[1, 4], [9, 16], ..., [529, 576]] 이 된다.

이렇게 멀티프로세싱을 잘 구현했다!
전체 코드는 다음과 같다.

import multiprocessing
import parmap
import numpy as np

num_cores = multiprocessing.cpu_count() # 12

def square(input_list):
    return [x*x for x in input_list]

data = list(range(1, 25))
splited_data =  np.array_split(data, num_cores)
splited_data = [x.tolist() for x in splited_data]

result = parmap.map(square, splited_data, pm_pbar=True, pm_processes=num_cores)

2. 추가 예제

이번엔 맨 처음에 짜증을 불러일으켰던 코드를 parmap 으로 구현해보자.

import parmap
from multiprocessing import Manager

num_cores = 16 # 사용할 cpu 코어 수. multiprocessing.cpu_count() 로 확인 가능
manager = Manager()
d = manager.dict()

def a(x, d):
    d[x] = 1

input_list = range(0, 10)
parmap.map(a, input_list, d, pm_pbar=True, pm_processes=num_cores)

깔끔하다.
input_list 자리에 프로세스에 분할하여 들어갈 input variables 를 넣어주면 되고, 그 뒤에는 고정적으로 들어갈 인자들을 넣어주면 된다. 개수 역시 제한없다.
keyword_parameter 들을 제공하는데, 예를 들어 위 같이 pm_pbar=True 를 주면, tqdm 으로 보는 progress bar 를 볼 수 있다. 완전 편하다.

그 외 기능은 github readme 파일에 잘 나와있으니, 간단하게 읽어보는 것도 좋을듯 하다.

그 외 Tips

1. 데이터 균등하게 나누기, numpy.array_split()

처리해야할 데이터를 나누어 멀티 프로세스에 뿌리는 경우가 많은데, 이럴 때 하나의 데이터를 여러개로 쪼개야 한다.
예를 들어, 3개의 프로세스를 사용하는 경우, [1,2,3,4,5,6,7,8,9][[1,2,3], [4,5,6], [7,8,9]] 로 만들어야 한다.
이 때 각 데이터 뭉치(chunk)의 데이터의 수를 최대한 균등하게 만들어야, 각 프로세스가 parallel 하게 작동할텐데,
이 때, numpy 패키지의 array_split(data, # of chunk) 를 사용하면 된다.
예를 들어, 다음과 같이 쓴다.

import numpy as np

input_data = np.array_split([1,2,3,4,5,6,7,8,9], 3)
# input_data = [np.array([1,2,3]), np.array([4,5,6]), np.array([7,8,9])]

parmap.map(func, input_data)

2. 프로세스간 Shared Data, multiprocessing.Manager()

프로세스간 별도의 메모리 공간을 차지하므로, 당연히 지역 변수는 해당 프로세스 내에서만 존재한다.
프로세스간 Shared 할 수 있는 공간, 구체적으로는 자료구조를 사용하기 위해서는, multiprocessing.Manager() 를 사용하면 된다.
위의 예에서 사용하듯 쓰면 된다.

반응형
  • 찬호 2020.02.28 10:44

    안녕하세요 정말 큰 도움이 되었습니다
    그런데 혹시 매핑 해줘야 하는 함수가 있고
    함수에 넣어줘야 하는 파라미터 수의 차이가 날 경우는 어떻게 해야 할까요?
    예를 들어 다음과 같은 조합입니다
    파라미터1에는 파일 path가 들어갑니다 약 100개의 파일
    파마리터2에는 각 파일에 처리해주는 변수 타입 약 10개
    파라미터3에는 각 처리한 파일에 따라 저장하는 위치 변수 2개
    이런식으로 매핑하는 함수에 파라미터가 다르지만 결국 100개 * 10개 * 2개의 조합으로 처리가 되어야 하거나
    조건에 따라서 마지막 파라미터3에서 2개 중 하나를 선택하여 저장이 되어야 해서
    이때는 조합이 100개 * 10개 * (2개 중 선택한 한개의 파라미터) 조합 등으로 멀티프로세싱을 처리하려면 어떻게 해야할까요?
    감사합니다
    찬호 드림.

    • BlogIcon 흠시 2020.02.28 12:51 신고

      안녕하세요.

      사실 이렇게만 보아서는 정확히 어떤걸 구현하려고 하시는지 모르겠어서, 정확한 답변은 못드리겠으나, 제가 파악한 것 정도로만 말씀드리면.

      1. 먼저 하나의 함수가 있고,
      2. 이 함수의 파라미터가 총 3개 있습니다.

      여하튼, 하나의 함수를 여러 프로세스에 매핑해야하니, 결국은 해당 함수 내부에서 조건문 처리로 나눠줘야 겠네요.

      말씀하신 조합이라는게 어떤 건지요?
      제가 보기에는 100개의 파일 path 를 담는 하나의 리스트를 파라미터 1의 인풋으로 주고,
      파라미터 2의 경우도 마찬가지로 각 100개의 파일에 대한 변수 타입을 담는 하나의 리스트로 주어야 겠고,
      파라미터 3도 마찬가지입니다.

      그냥 함수에서는 리스트 단위로 파라미터를 받고, 내부적으로는 반복문을 돌면서 조건문으로 확인하는 형태가 되겠네요.

    • 찬호 2020.02.28 16:47

      아하 저는 이미지 등을 압축하는데 활용해보려고 했습니다
      예를 들어서 100개의 이미지를 10개의 압축 수준(압출 화질)로 나눠서 기준에 맞게 나오면 1번 폴더 기준에 맞지 않게 나오면 2번 폴더 이런 식으로 분류하고 싶었습니다
      알려주신대로 함수 내부에서 폴더는 선택은 구현하면 될 것 같은데
      100개의 이미지 각각을 10개의 압축수준으로 압축하는 1000가지 조합을 매핑으로 "쉽게" 구현이 가능한지 궁금했습니다
      혹시 떠오르는 방법이 있어서 알려주신다면 감사하겠습니다
      찬호 드림.

    • BlogIcon 흠시 2020.02.28 16:54 신고

      음.. 이렇게만 들으면 왜 조합이 1000개인지 모르겠으나, 지금 떠오르는 방법은 10개의 압축 처리에 맞는 각각의 조건문 (10개의 if 가 되겠네요.) 만 떠오르네요.
      자세한 거는 정말 코드나 요구사항을 정확히 봐야 알것 같습니다 :)

    • 찬호 2020.02.28 21:38

      감사합니다
      왜 조합이 천개냐면 이미지 한 장 마다 10가지 압축을 수행하도록 하고 싶기 때문입니다
      이미지가 100장이고 각 장 마다 10가지 압추기 되기 때문에 1000가지 조합이라고 말씀드렸습니다

  • ㅠㅠ 2020.04.25 05:25

    in get
    return _ForkingPickler.loads(res)
    AttributeError: Can't get attribute 'dailyprice_primitive' on <module '__main__' (built-in)>

    계속 attribute error 가 나네여...ㅠㅠ

  • ㅠㅠ 2020.04.25 05:27

    크롤링 용도고

    manager = mp.Manager()
    D = manager.dict()

    def dailyprice_primitive(ticker_list, D):
    함수내용

    ticker_list = np.array_split(ticker, 4)_split(list(range(i, 2148)), 4)
    parmap.map(dailyprice_primitive, ticker_list, D, pm_pbar=True, pm_processes=4)

    이렇게 구성돼있습니다. 단순 반복문으로 함수 호출할 때는 잘 됐는데 너무 오래 걸려서 멀티프로세싱 찾아보다가... 도와주시면 감사하겠습니다ㅠㅠ

    • BlogIcon 흠시 2020.04.25 13:48 신고

      ticker_list = np.array_split(ticker, 4)_split(list(range(i, 2148)), 4)

      이 부분이 이상한데, 어떻게 이해하면 될까요..?
      코드 전문을 Gist 나 쥬피터(nbviewer) 로 공유해주시면 좀 더 빠르게 이해할 수 있을 것 같아요 ㅎㅎ

    • ㅠㅠ 2020.04.26 02:05

      답변 감사합니다. 밤샌 후에 멘붕 상태로 복붙하다가 잘못 들어갔네요ㅠㅠ

      ticker_list = np.array_split(ticker, 4)

      ticker_list 는 멀티프로세싱에 넣기 위해 새로 만든 변수고 ticker 는 기존에 갖고 있던 1열 2148 행의 데이터프레임입니다. 코어가 4개라서 4로 나눴고요.

      제가 한참 찾아보니 윈도우10에서는 멀티프로세싱이 안 되고 py 파일로 저장해서 불러와야 한다고 하던데, py 파일로 저장한 후에도 파이참으로 실행하면 아무 것도 안 떠서요... cmd 창에서 py 파일 연 다음에 setup 파일명.py 하면 된다는데 이것도 안 되고...... R로 데이터분석만 하다가 이번에 논문 때문에 데이터수집을 직접 하게 생겨서 처음 파이썬 깔아본 생문과뉴비라 어떻게 해야할지 모르겠네요ㅠㅠ

      혹시 멀티프로세싱 관련해서 '무작정 따라하기' 수준 난이도로 아주아주 간단한 코드하고 실행법 좀 써주실 수 없을까요?ㅠㅠ 함수 자체는 잘 돌아가는데 크롤링 완료 예정 시간이 72시간으로 떠서...;;;

    • ㅠㅠ 2020.04.26 02:12

      아 Gist 나 nbviewer 는 뭔지도 몰라서...... 죄송합니다 ㅠ

      예를 들어 지금 제 크롤링 함수가
      def f(a):
      while True:
      A
      return A

      이런으로 a를 넣어서 A를 받고, b를 넣어서 B를 받고... 식으로 리턴값 A-Z 를 전부 함수 f 밖의 데이터프레임에 쌓는 중입니다.

      일단 코드 자체는 잘 돌아가고 있는 걸 확인했고요. 다만 윗 댓글로 말씀드린 것처럼 시간이 너무 오래 걸려서 a-z 를 a-d, e-i, j-p, q-z 이렇게 네 그룹으로 나눈 후 각각 코어에 할당해 4코어로 해당 함수를 돌리려고 합니다.

      근데 제가 윈도우10에서는 py 파일로 저장하고 돌려야 한다는 것도 몰랐을 정도로 문외한이라(파이썬 입문 1주차) if __self__ 이런 것도 몰랐고 구글링으로 10페이지까지 검색해봐도 멀티프로세싱까지 사용하는 분들은 기존에 프로그래밍을 좀 하던 분들이라 그런지 이정도 수준의 기초적인 내용은 언급이 없더라고요.

      바쁘시겠지만 혹 시간이 되시면 꼭 좀 부탁드리겠습니다. 데이터 모으느라 논문에 필요한 분석을 시작도 못하고 있어서ㅠㅠ

    • BlogIcon 흠시 2020.04.26 23:03 신고

      간단한거라 "무작정 따라하기" 를 본문에 추가 했습니다.
      쥬피터에서 돌리셔도 되고, .py 로 스크립트 파일 만드셔서 실행시키셔도 괜찮습니다. 다만 실행이 오래걸리는 경우(몇 시간 이상...) .py 로 만드셔서 실행시키길 추천드립니다.

  • seyoongit 2020.06.18 17:52

    꿀팁추

  • 오전 달리기 2020.08.19 16:15

    예제로 보여 주신 코드 Mac 에서는 잘 수행되는데
    Window에서는 프로그래스 바가 멈춰 있습니다.
    리소스 모니터를 보면 CPU점유률 100%를 나타 내는데요 이런 현상 경험해 보신적 있으신가요?

  • BlogIcon a1216843 2021.05.24 03:20 신고

    멀티프로세싱 처음 적용해보는건데
    올려주신 글 보고 쉽게 따라할 수 있었네요
    좋은 글 감사합니다!