Python을 사용하여 병렬 처리를 권장합니다.

다음은 Python(CPython) 병렬 처리에서 성능을 적절하게 개선하는 방법을 소개합니다.
일반적으로 병렬 처리에서 떠오르는 것은 스레드 처리이다. 파이톤은 GIL(글로벌 해석기 록)이라고 불리는 잠금 기구가 있기 때문에 이런 방법은 성능 개선을 원하지 않는다.
왜 이렇게 말할까. GIL의 라인을 보존해야만 파이톤 코드를 실행할 수 있기 때문에 여러 라인에서 병렬 처리를 해도 실제 실행된 라인은 하나뿐이다.
본고는 라인 기반의 병행 처리 문제와 대체 방법으로서의 과정 기반의 병행 처리의 장점을 총결하였다.참고로 검증 시 사용한 것은 파이톤 3.9.12/Ubuntu 20.0404LTE입니다.

GIL의 영향을 보세요.


우리는 아래의 간단한 알고리즘을 준비하여 소수의 함수를 계산했다.
병렬 처리에서 실행 결과를 얻기 위해 사용된 것은 return이 아니라 Queue입니다.
from queue import Queue

def count_primes(num: int, queue: Queue) -> None:
    primes = 0
    for i in range(2, num + 1):
        for j in range(2, i):
            if i % j == 0:
                break
        else:
            primes += 1
    queue.put(primes)
먼저 다음과 같은 라인을 생성하고 실행합니다.
from threading import Thread

queue1 = Queue()
thread1 = Thread(target=count_primes, args=(100000, queue1))
thread1.start()
thread1.join()
print(queue1.get())
10만의 소수를 계산하면 30초가 걸린다.
$ time python count_primes_thread.py 
9592

real    0m30.982s
user    0m30.965s
sys     0m0.010s
다음은 두 라인에서 동시에 실행됩니다.
queue1 = Queue()
thread1 = Thread(target=count_primes, args=(100000, queue1))
thread1.start()

queue2 = Queue()
thread2 = Thread(target=count_primes, args=(100000, queue2))
thread2.start()

thread1.join()
print(queue1.get())
thread2.join()
print(queue2.get())
결과는 약 두 배로 약 1분이 걸렸다.
$ time python count_primes_thread.py 
9592
9592

real    1m1.449s
user    1m1.665s
sys     0m0.730s
실행 환경은 여러 핵심 CPU이기 때문에 병행 처리가 가능해야 하지만 GIL의 영향으로 동시 실행이 제한된다.
그러면 파이썬 프로세스를 기반으로 하는 병렬 처리를 통해 성능을 개선하는 방법을 생각해 보겠습니다.

프로세스 기반 병렬 처리


파이톤의 GIL은 프로세스 단위fork로 작동하기 때문에 생성된 하위 프로세스는 부모 프로세스 GIL의 영향을 받지 않고 병행 처리할 수 있다.
여기에 사용된 것은 multiprocessing 모듈로 라인과 비슷한 인터페이스 처리 프로세스를 사용할 수 있습니다.
참고로 파이톤은 프로세스를 기반으로 하는 병행 처리된 몇 가지 하위 프로세스 생성 방법을 준비했습니다. 이 보도는 리눅스 환경의 기본값 fork 을 전제로 합니다.threading 모듈로 아까 라인multiprocessing의 프로그램을 다시 써 보았습니다.
그럼에도 불구하고 거의 공통된 인터페이스가 있기 때문에 이런 상황에서 교체import의 종류만 수정하면 된다.
from multiprocessing import Process, Queue

def count_primes(num: int, queue: Queue) -> None:
    primes = 0
    for i in range(2, num + 1):
        for j in range(2, i):
            if i % j == 0:
                break
        else:
            primes += 1
    queue.put(primes)

queue1 = Queue()
process1 = Process(target=count_primes, args=(100000, queue1))
process1.start()

queue2 = Queue()
process2 = Process(target=count_primes, args=(100000, queue2))
process2.start()

process1.join()
print(queue1.get())
process2.join()
print(queue2.get())
시행 후 방금 1분 이상 걸린 처리가 34초로 끝나 속도가 약 2배 높아졌다.
$ time python count_primes_process.py 
9592
9592

real    0m34.479s
user    1m8.574s
sys     0m0.010s
프로세스 기반의 병행 처리라면 파이톤 코드도 병행 실행할 수 있다.
라인과 비슷한 인터페이스는 처리하기 쉬우나 실제 프로세스와 라인은 본질적으로 다르기 때문에 잘못 사용하면 원하지 않는 결과를 초래할 수 있다.따라서 과정에 기반한 병행 처리에서 빠져들기 쉬운 요점을 설명해 드리겠습니다.

스레드 공유 메모리 공간, 프로세스 독립


루틴은 부모 프로세스와 메모리 공간을 공유합니다. 예를 들어 전역 변수 값을 바꾸면 루틴 호출원도 수정 결과를 얻을 수 있습니다.
다른 한편, 서브프로세스는 fork 시간에 부모 프로세스에서 복사된 메모리 공간을 가지고 있기 때문에 부모 프로세스의 변수를 참조할 수 있지만 서브프로세스 측면에서 변수를 변경하더라도 서브프로세스가 끝난 후에 버려지고 부모 프로세스에 영향을 주지 않는다.
실제로 해볼게요.
from multiprocessing import Process
from threading import Thread

global_value = 0

def worker() -> None:
    global global_value
    global_value += 1
    print(f"in worker               : {global_value=}")

# スレッドを生成・実行開始して終了まで待つ
print(f"before thread execution : {global_value=}")
thread = Thread(target=worker)
thread.start()
thread.join()
print(f"after thread execution  : {global_value=}")

# プロセスを生成・実行開始して終了まで待つ
print(f"before process execution: {global_value=}")
process = Process(target=worker)
process.start()
process.join()
print(f"after process execution : {global_value=}")
집행 결과는 다음과 같다.
$ time python thread_process.py 
before thread execution : global_value=0
in worker               : global_value=1
after thread execution  : global_value=1
before process execution: global_value=1
in worker               : global_value=2
after process execution : global_value=1
라인의 병행 처리를 바탕으로 하는 측은 라인이 시작된 후global_value1의 결과를 더해서 호출자도 수신할 수 있다.
다음은 프로세스를 바탕으로 하는 병행 처리 방법worker 함수 내의 출력은global_value=2이기 때문에 전역 변수의 값을 참조할 수 있다.그러나 worker 함수 종료(하위 프로세스 종료), 부 프로세스로 돌아가는 제어global_value=1.global_value=2 업데이트된 것은 하위 프로세스이기 때문에 부모 프로세스 측면global_value은 영향을 받지 않습니다.
참고로 이 샘플 프로그램은 한 프로그램 내의 루트와 과정이 모두 생성되었지만 생성 과정의 정해진 시간에 여러 개의 루트가 운행하는 상황은 본질적으로 안전하지 않다는 것을 주의하십시오.프로세스 기반 병렬 처리를 시작하기 전에 루트를 닫으십시오.※공식 문서 fork의 설명 참조

데이터의 인수인계는 프로세스 간 통신이다


다음은 데이터 인수인계에 사용되는 Queue류를 고려해 보겠습니다.
스레드 공유 메모리 공간으로 데이터 교환은 같은 메모리 공간에서 직접 할 수 있다.
다른 한편, 프로세스를 바탕으로 하는 병행 처리에서 생성된 하위 프로세스는 부자 관계가 있지만 서로 독립된 과정이기 때문에 데이터의 교환과 이용 과정 간의 통신 송신 수신 바이트열이다.
라인과 프로세스는 모두 비슷한 인터페이스를 가진Queue류에서 대화를 나누었지만 실제 내부 실현은 크게 다르다.
동작 원리의 차이를 이해하기 위해 먼저 라인에서 수신defaultdict(lambda: 1)을 시도해 본다.defaultdict는 존재하지 않는 키에 접근할 때의 기본값을 callable의 실행 결과로 할 수 있기 때문에 lambda와 자주 협업하여 사용한다.
다음 코드는 지정한 수치 앞에 포함된 소수를 키로 설정한 defaultdict 을 되돌려줍니다.
from collections import defaultdict
from threading import Thread
from queue import Queue

def get_primes(num: int, queue: Queue) -> None:
    primes = defaultdict(lambda: 1)
    for i in range(2, num + 1):
        for j in range(2, i):
            if i % j == 0:
                break
        else:
            primes[i]
    queue.put(primes)

queue = Queue()
thread = Thread(target=get_primes, args=(100000, queue))
thread.start()
thread.join()
primes = queue.get()
print(sum(primes.values()))
라인 기반의 병행 처리에 문제가 없음defaultdict.
그럼 다음은 과정에 기반한 병행 처리에서 같은 일을 시도해 봅시다.
from collections import defaultdict
from multiprocessing import Process
from multiprocessing import Queue

def get_primes(num: int, queue: Queue) -> None:
    primes = defaultdict(lambda: 1)
    for i in range(2, num + 1):
        for j in range(2, i):
            if i % j == 0:
                break
        else:
            primes[i] = 1
    queue.put(primes)

queue = Queue()
process = Process(target=get_primes, args=(100000, queue))
process.start()
process.join()
primes = queue.get()
print(sum(primes.values()))
하지만 아쉽게도 집행하면 영원히 끝나지 않을 거예요.multiprocessing.Queue 과정 간 통신을 하는 유형이기 때문에 인수한 데이터를 엄숙하게 한 후 바이트열로 전환하고 수신자가 분할화한다.그러나 defaultdict에 맡긴 lambda: 1는 엄숙화할 수 없다.
하위 프로세스는 소수 계산 처리를 실행하고 데이터를 Queue에 건네주지만 실패합니다.그리고 부모 프로세스 측은 primes = queue.get()에서 Queue으로부터의 데이터를 기다리기 때문에 추적을 진행합니다.
시리아 사용pickle이기 때문에 pickle에 대응하는 대상을 직렬화할 수 있다.예를 들어 아래lambda 대신 def를 사용하면 하위 프로세스를 생성하기 전에 미리 정의하면 문제없이 처리할 수 있다.
def return_one():
    return 1

def get_primes(num: int, queue: Queue) -> None:
    primes = defaultdict(return_one)
    ...

하위 프로세스에 데이터를 전송하는 방법


프로세스를 기반으로 하는 병렬 처리에서 하위 프로세스를 만드는 부하 (즉 fork 가 높지 않습니다.따라서 부모로부터 정보를 얻으면 부모 프로세스에서 아이에게 주고 싶은 데이터를 생성한 후 하위 프로세스를 만드는 방법이 뛰어나다.
그나저나 서브프로세스는 자원을 복제했지만 CoW(복사 켜기 램프) 구조 덕분에 읽기만 하면 실제 저장 영역이 소모되지 않아 나쁘지 않다.Processargs에서 2GB의 거대한 문자열을 전달합니다.
from multiprocessing import Process, Queue

def worker(target: str, queue: Queue):
    queue.put(len(target))

# 2GB相当の文字列を生成
huge_str = "A" * 1 * 1024 * 1024 * 1024 * 2

queue = Queue()

# 2GB相当の文字列を渡して子プロセスを開始
process = Process(target=worker, args=(huge_str, queue))
process.start()
process.join()

# 実行結果の受け取り
print(queue.get())
거대한 데이터이지만 부모 프로세스의 메모리 공간을 보존하는 하위 프로세스는 실제로 복제하지 않아도 데이터를 직접 전달할 수 있기 때문에 1초도 안 되어 처리가 끝났다.
2147483648

real    0m0.797s
user    0m0.359s
sys     0m0.439s
다음 단계는 생성 과정 후Queue에 2GB의 데이터를 하위 프로세스로 보내는 데 30초 이상이 걸렸다.
from multiprocessing import Process, Queue

def worker(queue: Queue):
    queue.put(len(queue.get()))

queue = Queue()

# 子プロセスを開始
process = Process(target=worker, args=(queue,))
process.start()

# 2GB相当の文字列を生成して子プロセスに送信
huge_str = "A" * 1 * 1024 * 1024 * 1024 * 2
queue.put(huge_str)
process.join()

# 実行結果の受け取り
print(queue.get())
2147483648

real    0m30.700s
user    0m5.584s
sys     0m26.285s
부모 프로세스 측은 직렬화를 통해 하위 프로세스에 데이터를 보내고 하위 프로세스는 일괄적으로 데이터를 수신한다. 이런 프로세스에서 같은 변수의 메모리 구역은 여러 차례 확보하고 복제한 결과 성능이 악화된다.
CoW는 상위 프로세스에서 하위 프로세스로 데이터를 전송할 때 상위 프로세스 측면에서 데이터를 만든 후 하위 프로세스를 생성하는 데 유용합니다.
그러나 하위 프로세스에서 부모 프로세스로 데이터를 전달할 때 같은 방법을 사용할 수 없기 때문에 multiprocessing.Queue 등 과정 간 통신에 의존해야 한다.

총결산


파이썬 코드를 병렬 처리하려면 프로그램별로 처리하세요. 하지만 주의사항도 있어요.이런 내용.
스레드를 사용한 병행 처리가 이뤄져'왜 불쾌할까...'하는 사람에게 전달됐으면 좋겠다.
그나저나 단서에서 문제가 된 GIL은 시스템 호출 실행에서 풀렸다.따라서 저장 IO와 DB 작업이 병목이라면 스레드의 병행 처리도 어느 정도 성능 개선을 할 수 있다.
라인이라면 처리하기 쉬우니 양자의 특징을 잘 고려한 토대에서 적당한 병행 처리 방법을 선택하세요.

좋은 웹페이지 즐겨찾기