Python: 다중 처리를 사용하여 작업을 더 빨리 완성하는 방법

5775 단어 python
데이터 과학 업무에서 일부 함수나 한 조의 함수가 순환 중에 운행하여 데이터를 처리하거나 분석하는 것은 매우 흔히 볼 수 있는 것이다.비싼 조작이 반복되는 것을 보았을 때, 속도를 높이기 위해 두 가지 방법을 즉각 생각해야 한다.첫 번째는 벡터화입니다. 저는 여기서 소개하지 않겠습니다. 두 번째는 다중 루틴(또는 프로세스)으로 병행 작업을 허용하고 하드웨어를 충분히 이용할 수 있습니다.
Python의 multiprocessing 모듈을 어떻게 사용해서 이 문제를 해결하는지 간단한 예를 보여 드리겠습니다.multiprocessing에서 여러 개의 Python 프로세스가 생성되어 여러 개의 스레드가 아닌 함수를 실행하는 데 사용되었고 전역 해석기 자물쇠(GIL)를 돌았다. 후자는 스레드화된 Python 프로그램의 속도를 현저히 낮출 수 있다.목표는 세분화 가능한 작업을 몇 가지 부분으로 나누어 컴퓨터의 모든 자원을 이용하여 서로 다른 과정에서 이 작업을 집행한 다음에 이 계산 결과를 주 과정으로 되돌려 더 많은 합병 데이터를 얻는 것이다.
이 예에 대해 나는 가상의 시간 시퀀스 데이터를 만들 것이다.일일 출근율, 온도, 주가, 상점 매출액 등 모든 종류의 시간 서열이 될 수 있다.탭에 순서 문자열을 만드는 데 itertools 모듈을 사용하고 있습니다.나는 판다로 10여 년 전의 상업 데이트 범위를 만들었다.이곳의 생각은 충분한 데이터를 생성하고 처리하는 데 시간이 좀 걸린다는 것이다. 우리는 측정할 수 있다.
아래의 모든 예는 Python 3.6.10에서 ipython 셸을 사용하여 작성하고 실행하는 것입니다.
import os
import multiprocessing
import functools 
import itertools 
import string 
import pandas as pd 
import numpy as np 

dates = pd.bdate_range("20100101", pd.Timestamp.today()) 
labels = ["".join(l) for l in itertools.combinations(string.ascii_letters, 2)] 
os.makedirs("data", exist_ok=True)
for label in labels: 
    df = pd.Series(np.random.random(len(dates)), index=dates).to_frame("data") 
    df.to_csv(os.path.join("data", f"{label}.csv"))
네, 지금 큰 데이터 파일이 있습니다.함수를 만들어서 그 중의 파일을 읽고 데이터로 간단한 계산을 한 다음 결과를 되돌려줍니다.(만약 당신이 집에서 따라다닌다면 완성된 후에 이 파일들을 삭제하는 것을 잊지 마세요).
def process_file(label): 
    path = os.path.join("data", f"{label}.csv") 
    df = pd.read_csv(path) 
    return df.describe()
우리는 주피터 노트북이나 ipython 세션의 %timeit 마법을 사용하여 단일 파일을 처리하는 비용을 확인할 것입니다.-o 옵션은 우리가 사용할 수 있는 결과를 되돌려줍니다.현재 우리는 순환에서 이 파일들을 처리하는 것이 선형이라는 것을 알고 있기 때문에 예상된 운행 시간 (초 단위) 을 얻는 것은 매우 간단하다.
In [6]: r = %timeit -o process_file('az')
5.85 ms ± 255 µs per loop (mean ± std. dev. of 7 runs, 100 loops each) 
In [7]: r.average \* len(labels) 
Out[7]: 7.755164415974036
컴퓨터의 속도가 매우 빠르기 때문에, 우리는 계속 실제 운행을 해서 실제 시간이 얼마나 되는지 볼 것이다.%timeit 코드를 여러 번 실행하기 때문에 아직 시간이 필요합니다.너는 아마도 이 프로그램을 스스로 실행해야 할 것이다. 그러면 너는 일이 끝나기를 기다리는 좌절감을 체험할 수 있을 것이다.
In [8]: %timeit for l in labels: process_file(l)
8.27 s ± 354 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
이것은 그런대로 괜찮다. 내 컴퓨터에서 나는 약 8초 안에 1000여 개의 데이터 파일을 처리할 수 있다.하지만 더 큰 파일이 있고 더 복잡한 계산을 해야 한다면 더 많은 처리 능력을 사용하고 싶습니다.내 경우 쿼드 코어 Intel i7과 8스레드가 있어 기계가 다를 수 있습니다.multiprocessing 모듈을 사용하여 이 문제를 해결하기 전에 간단한 예를 보고 기본적인 지도 원칙을 이해하자.우선, 모든 실행 프로그램 multiprocessing 은 이 프로세스가 주 프로세스인지 하위 프로세스인지 확인하는 보호 프로그램이 필요합니다.이 보호는 모든 하위 프로세스가 주 코드를 가져올 수 있고 부작용이 생기지 않도록 합니다. 예를 들어 끊임없는 순환에서 더 많은 프로세스를 시작하려고 시도하는 것입니다.두 번째는 프로세스 간에 상태를 공유하는 것을 피하고 실행 중인 함수에서 작업을 분리해 보는 것입니다.마지막으로 방법의 매개 변수는pickle가 필요합니다. 왜냐하면 이것은 모듈이 프로세스 사이에서 데이터를 이동하는 방식이기 때문입니다.이 방법에서 디스크에서 파일을 불러오고 소량의 데이터를 되돌려주기 때문에 이 문제는 좋은 후보 문제입니다.
In [9]: # our function that we will execute in another process ...: def say_hi(): 
...:     print("Child process:", multiprocessing.current_process()) 
...:     print('Hi') 
...: 
...: 
...: if __name__ == '__main__': 
...:     p = multiprocessing.Process(target=say_hi) 
...:     print("Main:", multiprocessing.current_process()) 
...:     p.start() 
Main: <_MainProcess(MainProcess, started)> 
Child process: <Process(Process-1, started)>
Hi
그러나 우리는 하위 프로세스만 실행하고 싶지 않고 가능한 한 컴퓨터에서 효과적으로 사용하고 싶다.좋은 방법은 사용Pool이다.APool에는 함수를 실행하는 데 사용할 수 있는 여러 가지 방법이 있다.간단하고 자주 사용하는 방법 map 은 내장된 방법의 병렬 버전일 뿐이다.이것은iterable 두 번째 매개 변수 중의 모든 항목을 사용하여 첫 번째 매개 변수 방법을 호출합니다.Pool 얼마나 많은 프로세서를 사용해야 하는지 알려주거나 기본적으로 모든 프로세서를 사용할 수 있습니다.
In [10]: %%timeit 
...: if __name__ == '__main__': 
...: with multiprocessing.Pool(processes=multiprocessing.cpu_count() - 2) as pool: 
...: results = pool.map(process_file, labels) 
...: 2.28 s ± 131 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
내 기계에서는 속도가 약 8초에서 2초 남짓으로 약 3.5배 높아졌다.실행 시간이 길어지면서 프로세스 간에 데이터를 전달하는 시간이 줄어들면서 이러한 개선은 사용 가능한 프로세스의 수량에 더욱 가까워질 것이 분명하다.
마지막으로 볼 만한 것은 좀 더 복잡한 방법 호출 예시들이다.만약 함수에 간단한 단일 매개 변수 목록이 없고 여러 개의 매개 변수가 있다면?만약 추가 파라미터가 매우 흔하다면, 사용 functools.partial 은 좋은 해결 방안이다.일부에게 추가 매개 변수만 주면 됩니다.
def process_file2(label, threshold): 
    path = os.path.join("data", f"{label}.csv") 
    df = pd.read_csv(path) 
    return df['data'].mean() > threshold 
if __name__ == '__main__':
    with multiprocessing.Pool(processes=multiprocessing.cpu_count() - 2) as pool: 
        results = pool.map(functools.partial(process_file2, threshold=.2), labels)
어떤 경우, 매개 변수는 약간 복잡할 수도 있고, 모든 호출이 고정된 것은 아니다.이 예에서는 starmapdict 매개 변수 목록을 함께 사용할 수 있습니다.
# here's just a simple example of data with different arguments for some of the labels 
def make_thresh(label): 
    if 'a' in label or 'z' in label: 
        return .3 
    else: 
        return .4 

args = [(l, make_thresh(l)) for l in labels] 

if __name__ == '__main__': 
    with multiprocessing.Pool(processes=multiprocessing.cpu_count() - 2) as pool: 
        results = pool.starmap(process_file2, args)
나는 multiprocessing 모듈의 이 간단한 소개가 Python 코드를 가속화하고 당신의 환경을 충분히 이용하여 일을 빨리 완성하는 간단한 방법을 보여 주기를 바랍니다.

좋은 웹페이지 즐겨찾기