DASK #3 | Bag & Cluster

1. Dask Bag

1-1. Basic

  • str과 같은 unstructured | semi-structured data는 Dask bags에 저장
  • text operation은 GIL을 자동적으로 활성화
  • 따라서 Dask는 tex데이터를 처리하기 위해 Default적으로 Parallel Processing 사용 : process간 data copy를 하는데 시간을 많이 할애하기 때문에 lazily하게 load하는게 좋음
  • db.from_sequence() : list to bags
  • .take() , .count(), .replace()
# import dask.bag as db

str_list = ['I got new laptop', 'you have more things than I do', 'lord have mercy']
str_bag = db.from_sequence(str_list, npartitions = 3) # npartitions 지정안해도 알아서 split됨 
print(str_bag) #dask.bag<from_sequence, npartitions=3> -> lazy 값

print(str_bag.take(1)) # ('I got new laptop',)
print(str_bag.compute()) # ['I got new laptop', 'you have more things than I do', 'lord have mercy'] 
# .compute()는 모든 결과 나옴 

print(str_bag.count()) # <dask.bag.core.Item object at 0x7fc004f039a0>
print(str_bag.count().compute()) # 3

print(str_bag.str.upper().take(1))
print(str_bag.str.replace('got', 'took a').take(1))
print(str_bag.str.count('I').take(3)) # take 사용하지 않으면 dask값
  • 왜인지 모르지만 상황에 따라서 3 elements requested, only 1 elements available. Try passing larger npartitions to take. 이라는 오류 메세지가 뜨는데 원인을 모르겠음. 3개의 elements가 존재하고, take(3)의 element를 넣었기 때문에 문제가 될 것이 없다고 생각됨
  • 다른 데이터 이용한 것에서는 잘 되는데 왜 위의 데이터에서는 안되는지 확인 필요

  • db.read_text(files) : file 한개씩 bag에 seperately하게 넣음
files = glob.glob('/Volumes/Google Drive/My Drive/Colab_Notebook/kaggle/ecom_behavior/bag_dir/[0-9].*')
print(files) # list 

txt_db = db.read_text(files)
print(txt_db) 

  • txt 파일이 9개이기에 npartition = 9

1-2. Operation

Dask Bags에서는 Json file을 많이 사용함

1) map : .map(func).compute() function 적용

import dask.bag as db 
import glob
import json 

files = glob.glob('/Volumes/Google Drive/My Drive/Colab_Notebook/kaggle/ecom_behavior/bag_dir/politicians/*.json')

txt_db = db.read_text(files)

print(txt_db.take(1))
print(type(txt_db.take(1))) # tuple이지만 그 안은 str

dict_bag = txt_db.map(json.loads)
print(dict_bag.take(1))
print(type(dict_bag.take(1))) # tuble이지만 그 안은 dict

2) filter : .filter() 필터링 적용

def gender_filter(txt_db):
    return txt_db['gender'] == "male"

male_db = dict_bag.filter(gender_filter)

print(male_db.count().compute()) #843

3) pluke : .pluck()

# pluck

pluck_ = dict_bag.pluck('gender')
pluck_.take(3)

pluck_name_len = dict_bag.pluck('name').map(len)
pluck_min = pluck_name_len.min()
pluck_max = pluck_name_len.max()
pluck_mean = pluck_name_len.mean()

print(dask.compute(pluck_min, pluck_max, pluck_mean))

1-3. Dask Bag to DataFrame

  • unstructured data를 정리하여 일부의 structured data를 추출하길 원할 때 Dask bags을 사용함
  • Json 파일을 사용할 경우, 원하는 dict key만 선정한 후 dask dataframe으로 변형

1-4. Data Variety

  • Dask bag으로 audio, video, image 파일 모두 다 효용 가능함
  • audio 파일을 위해서는 from scipy.io import wavfile
  • 음성 분석 추가 go

2.Create Local Cluster

  • Process와 Thread를 동시에 사용하기 위해서는 Cluster와 Client가 필요함

  • Client를 한번 설정하면 (그 이후의) dask는 모든 computations에 Client를 사용함(그것이 bags이던, df이던 무엇이던간에)

  • LocalCluster() : 사용 중인 컴퓨터에 생성되는 cluster. 이외로 생성된 cluster는 다른 컴퓨터와 작업을 분리하여 진행함.

# 두가지 방법 다 동일한 결과값 생성

from dask.distributed import Client, LocalCluster

# Method1. Making Client
client = Client(processes = True, 
                n_workers = 4, 
                threads_per_worker = 1)

# Method2. Making Cluster then make client. 

cluster = LocalCluster(processes = True,
                        n_workers = 4, 
                        threads_per_worker = 1
)

client = Client(cluster)

좋은 웹페이지 즐겨찾기