판다스 apply 병행 처리의 몇 가지 방법을 상세히 설명하다
1. pandarallel (pip install )
Pandas DataFrame df가 있는 간단한 용례와 func를 응용하는 함수에 대해서는parallel_apply는 고전적인 apply를 대체합니다.
from pandarallel import pandarallel
# Initialization
pandarallel.initialize()
# Standard pandas apply
df.apply(func)
# Parallel apply
df.parallel_apply(func)
병렬화 계산을 원하지 않으면, 고전적인 apply 방법을 사용할 수 있습니다.또한 initialize 함수에서progress_를 전달할 수 있습니다bar=True는 각 작업 CPU의 진행률 표시줄을 표시합니다.
2. joblib (pip install )
https://pypi.python.org/pypi/joblib
# Embarrassingly parallel helper: to make it easy to write readable parallel code and debug it quickly
from math import sqrt
from joblib import Parallel, delayed
def test():
start = time.time()
result1 = Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10000))
end = time.time()
print(end-start)
result2 = Parallel(n_jobs=8)(delayed(sqrt)(i**2) for i in range(10000))
end2 = time.time()
print(end2-end)
출력 결과 ---------0.4434356689453125
0.6346755027770996
3. multiprocessing
import multiprocessing as mp
with mp.Pool(mp.cpu_count()) as pool:
df['newcol'] = pool.map(f, df['col'])
multiprocessing.cpu_count()
시스템의 CPU 수를 반환합니다.이 수는 현재 프로세스에서 사용할 수 있는 CPU 수와 다릅니다.사용 가능한 CPU 수는 len(os.sched_getaffinity(0) 방법으로 얻을 수 있습니다.
이 가능하다, ~할 수 있다,..NotImplementedError .
참조os.cpu_count()
4. 몇 가지 방법의 성능 비교
(1) 코드
import sys
import time
import pandas as pd
import multiprocessing as mp
from joblib import Parallel, delayed
from pandarallel import pandarallel
from tqdm import tqdm, tqdm_notebook
def get_url_len(url):
url_list = url.split(".")
time.sleep(0.01) # 0.01
return len(url_list)
def test1(data):
"""
"""
start = time.time()
data['len'] = data['url'].apply(get_url_len)
end = time.time()
cost_time = end - start
res = sum(data['len'])
print("res:{}, cost time:{}".format(res, cost_time))
def test_mp(data):
"""
mp
"""
start = time.time()
with mp.Pool(mp.cpu_count()) as pool:
data['len'] = pool.map(get_url_len, data['url'])
end = time.time()
cost_time = end - start
res = sum(data['len'])
print("test_mp \t res:{}, cost time:{}".format(res, cost_time))
def test_pandarallel(data):
"""
pandarallel
"""
start = time.time()
pandarallel.initialize()
data['len'] = data['url'].parallel_apply(get_url_len)
end = time.time()
cost_time = end - start
res = sum(data['len'])
print("test_pandarallel \t res:{}, cost time:{}".format(res, cost_time))
def test_delayed(data):
"""
delayed
"""
def key_func(subset):
subset["len"] = subset["url"].apply(get_url_len)
return subset
start = time.time()
data_grouped = data.groupby(data.index)
# data_grouped , tqdm
results = Parallel(n_jobs=8)(delayed(key_func)(group) for name, group in tqdm(data_grouped))
data = pd.concat(results)
end = time.time()
cost_time = end - start
res = sum(data['len'])
print("test_delayed \t res:{}, cost time:{}".format(res, cost_time))
if __name__ == '__main__':
columns = ['title', 'url', 'pub_old', 'pub_new']
temp = pd.read_csv("./input.csv", names=columns, nrows=10000)
data = temp
"""
for i in range(99):
data = data.append(temp)
"""
print(len(data))
"""
test1(data)
test_mp(data)
test_pandarallel(data)
"""
test_delayed(data)
(2) 결과 출력1k
res:4338, cost time:0.0018074512481689453
test_mp res:4338, cost time:0.2626469135284424
test_pandarallel res:4338, cost time:0.3467681407928467
1w
res:42936, cost time:0.008773326873779297
test_mp res:42936, cost time:0.26111721992492676
test_pandarallel res:42936, cost time:0.33237743377685547
10w
res:426742, cost time:0.07944369316101074
test_mp res:426742, cost time:0.294996976852417
test_pandarallel res:426742, cost time:0.39208269119262695
100w
res:4267420, cost time:0.8074917793273926
test_mp res:4267420, cost time:0.9741342067718506
test_pandarallel res:4267420, cost time:0.6779992580413818
1000w
res:42674200, cost time:8.027287006378174
test_mp res:42674200, cost time:7.751036882400513
test_pandarallel res:42674200, cost time:4.404983282089233
get_url_len 함수에 sleep 문장(아날로그 복잡 논리)을 추가하고 데이터량은 1k이며 운행 결과는 다음과 같다.
1k
res:4338, cost time:10.054503679275513
test_mp res:4338, cost time:0.35697126388549805
test_pandarallel res:4338, cost time:0.43415403366088867
test_delayed res:4338, cost time:2.294757843017578
5. 매듭
(1) 데이터량이 비교적 적으면 병행 처리가 1회 실행 효율보다 느리다.
(2) apply의 함수 논리가 간단하면 병행 처리가 한 번의 실행 효율보다 느리다.
6. 문제 및 해결 방법
(1)ImportError: This platform lacks a functioning sem_open implementation, therefore, the required synchronization primitives needed will not function, see issue 3770.
https://www.jianshu.com/p/0be1b4b27bde
(2) Linux 물리적 CPU 수, 코어, 논리적 CPU 수 보기
https://lover.blog.csdn.net/article/details/113951192
(3) 진도표 사용
https://www.jb51.net/article/206219.htm
pandas apply 병행 처리에 대한 몇 가지 방법을 상세히 설명하는 이 글은 여기까지 소개되었습니다. 더 많은 pandas apply 병행 처리 내용은 저희 이전의 글을 검색하거나 아래의 관련 글을 계속 훑어보십시오. 앞으로 많은 응원 부탁드립니다!
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
【Pandas】DatetimeIndex란? no.29안녕하세요, 마유미입니다. Pandas에 대한 기사를 시리즈로 작성하고 있습니다. 이번은 제29회의 기사가 됩니다. 에서 Pandas의 시간에 대한 모듈에 대해 씁니다. 이번 기사에서는, 「DatetimeIndex」...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.