판다스 apply 병행 처리의 몇 가지 방법을 상세히 설명하다

6835 단어 pandasapply병행

1. pandarallel (pip install )


Pandas DataFrame df가 있는 간단한 용례와 func를 응용하는 함수에 대해서는parallel_apply는 고전적인 apply를 대체합니다.

from pandarallel import pandarallel
 
# Initialization
pandarallel.initialize()
 
# Standard pandas apply
df.apply(func)
 
# Parallel apply
df.parallel_apply(func)
병렬화 계산을 원하지 않으면, 고전적인 apply 방법을 사용할 수 있습니다.
또한 initialize 함수에서progress_를 전달할 수 있습니다bar=True는 각 작업 CPU의 진행률 표시줄을 표시합니다.

2. joblib (pip install )


 https://pypi.python.org/pypi/joblib

# Embarrassingly parallel helper: to make it easy to write readable parallel code and debug it quickly
 
from math import sqrt
from joblib import Parallel, delayed
 
def test():
  start = time.time()
  result1 = Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10000))
  end = time.time()
  print(end-start)
  result2 = Parallel(n_jobs=8)(delayed(sqrt)(i**2) for i in range(10000))
  end2 = time.time()
  print(end2-end)
출력 결과 ---------
0.4434356689453125
0.6346755027770996

3. multiprocessing


import multiprocessing as mp
 
with mp.Pool(mp.cpu_count()) as pool:
  df['newcol'] = pool.map(f, df['col'])
multiprocessing.cpu_count()

시스템의 CPU 수를 반환합니다.
이 수는 현재 프로세스에서 사용할 수 있는 CPU 수와 다릅니다.사용 가능한 CPU 수는 len(os.sched_getaffinity(0) 방법으로 얻을 수 있습니다.
이 가능하다, ~할 수 있다,..NotImplementedError .
참조os.cpu_count()

4. 몇 가지 방법의 성능 비교


(1) 코드

import sys
import time
import pandas as pd
import multiprocessing as mp
from joblib import Parallel, delayed
from pandarallel import pandarallel
from tqdm import tqdm, tqdm_notebook
 
 
def get_url_len(url):
  url_list = url.split(".")
  time.sleep(0.01) #  0.01 
  return len(url_list)
 
def test1(data):
  """
   
  """
  start = time.time()
  data['len'] = data['url'].apply(get_url_len)
  end = time.time()
  cost_time = end - start
  res = sum(data['len'])
  print("res:{}, cost time:{}".format(res, cost_time))
 
def test_mp(data):
  """
   mp 
  """
  start = time.time()
  with mp.Pool(mp.cpu_count()) as pool:
    data['len'] = pool.map(get_url_len, data['url'])
  end = time.time()
  cost_time = end - start
  res = sum(data['len'])
  print("test_mp \t res:{}, cost time:{}".format(res, cost_time))
 
def test_pandarallel(data):
  """
   pandarallel 
  """
  start = time.time()
  pandarallel.initialize()
  data['len'] = data['url'].parallel_apply(get_url_len)
  end = time.time()
  cost_time = end - start
  res = sum(data['len'])
  print("test_pandarallel \t res:{}, cost time:{}".format(res, cost_time))
 
 
def test_delayed(data):
  """
   delayed 
  """
  def key_func(subset):
    subset["len"] = subset["url"].apply(get_url_len)
    return subset
 
  start = time.time()
  data_grouped = data.groupby(data.index)
  # data_grouped  ,  tqdm  
  results = Parallel(n_jobs=8)(delayed(key_func)(group) for name, group in tqdm(data_grouped))
  data = pd.concat(results)
  end = time.time()
  cost_time = end - start
  res = sum(data['len'])
  print("test_delayed \t res:{}, cost time:{}".format(res, cost_time))
 
 
if __name__ == '__main__':
  
  columns = ['title', 'url', 'pub_old', 'pub_new']
  temp = pd.read_csv("./input.csv", names=columns, nrows=10000)
  data = temp
  """
  for i in range(99):
    data = data.append(temp)
  """
  print(len(data))
  """
  test1(data)
  test_mp(data)
  test_pandarallel(data)
  """
  test_delayed(data)
(2) 결과 출력
1k
res:4338, cost time:0.0018074512481689453
test_mp   res:4338, cost time:0.2626469135284424
test_pandarallel   res:4338, cost time:0.3467681407928467
 
1w
res:42936, cost time:0.008773326873779297
test_mp   res:42936, cost time:0.26111721992492676
test_pandarallel   res:42936, cost time:0.33237743377685547
 
10w
res:426742, cost time:0.07944369316101074
test_mp   res:426742, cost time:0.294996976852417
test_pandarallel   res:426742, cost time:0.39208269119262695
 
100w
res:4267420, cost time:0.8074917793273926
test_mp   res:4267420, cost time:0.9741342067718506
test_pandarallel   res:4267420, cost time:0.6779992580413818
 
1000w
res:42674200, cost time:8.027287006378174
test_mp   res:42674200, cost time:7.751036882400513
test_pandarallel   res:42674200, cost time:4.404983282089233
get_url_len 함수에 sleep 문장(아날로그 복잡 논리)을 추가하고 데이터량은 1k이며 운행 결과는 다음과 같다.
1k
res:4338, cost time:10.054503679275513
test_mp   res:4338, cost time:0.35697126388549805
test_pandarallel   res:4338, cost time:0.43415403366088867
test_delayed   res:4338, cost time:2.294757843017578

5. 매듭


(1) 데이터량이 비교적 적으면 병행 처리가 1회 실행 효율보다 느리다.
(2) apply의 함수 논리가 간단하면 병행 처리가 한 번의 실행 효율보다 느리다.

6. 문제 및 해결 방법


(1)ImportError: This platform lacks a functioning sem_open implementation, therefore, the required synchronization primitives needed will not function, see issue 3770.
https://www.jianshu.com/p/0be1b4b27bde
(2) Linux 물리적 CPU 수, 코어, 논리적 CPU 수 보기
https://lover.blog.csdn.net/article/details/113951192
(3) 진도표 사용
https://www.jb51.net/article/206219.htm
pandas apply 병행 처리에 대한 몇 가지 방법을 상세히 설명하는 이 글은 여기까지 소개되었습니다. 더 많은 pandas apply 병행 처리 내용은 저희 이전의 글을 검색하거나 아래의 관련 글을 계속 훑어보십시오. 앞으로 많은 응원 부탁드립니다!

좋은 웹페이지 즐겨찾기