DASK #3 | Bag & Cluster
1. Dask Bag
1-1. Basic
- str과 같은 unstructured | semi-structured data는 Dask bags에 저장
- text operation은 GIL을 자동적으로 활성화
- 따라서 Dask는 tex데이터를 처리하기 위해 Default적으로 Parallel Processing 사용 : process간 data copy를 하는데 시간을 많이 할애하기 때문에 lazily하게 load하는게 좋음
db.from_sequence()
: list to bags.take()
,.count()
,.replace()
# import dask.bag as db
str_list = ['I got new laptop', 'you have more things than I do', 'lord have mercy']
str_bag = db.from_sequence(str_list, npartitions = 3) # npartitions 지정안해도 알아서 split됨
print(str_bag) #dask.bag<from_sequence, npartitions=3> -> lazy 값
print(str_bag.take(1)) # ('I got new laptop',)
print(str_bag.compute()) # ['I got new laptop', 'you have more things than I do', 'lord have mercy']
# .compute()는 모든 결과 나옴
print(str_bag.count()) # <dask.bag.core.Item object at 0x7fc004f039a0>
print(str_bag.count().compute()) # 3
print(str_bag.str.upper().take(1))
print(str_bag.str.replace('got', 'took a').take(1))
print(str_bag.str.count('I').take(3)) # take 사용하지 않으면 dask값
- 왜인지 모르지만 상황에 따라서 3 elements requested, only 1 elements available. Try passing larger
npartitions
totake
. 이라는 오류 메세지가 뜨는데 원인을 모르겠음. 3개의 elements가 존재하고, take(3)의 element를 넣었기 때문에 문제가 될 것이 없다고 생각됨- 다른 데이터 이용한 것에서는 잘 되는데 왜 위의 데이터에서는 안되는지 확인 필요
db.read_text(files)
: file 한개씩 bag에 seperately하게 넣음
files = glob.glob('/Volumes/Google Drive/My Drive/Colab_Notebook/kaggle/ecom_behavior/bag_dir/[0-9].*')
print(files) # list
txt_db = db.read_text(files)
print(txt_db)
- txt 파일이 9개이기에 npartition = 9
1-2. Operation
Dask Bags에서는 Json file을 많이 사용함
1) map : .map(func).compute()
function 적용
import dask.bag as db
import glob
import json
files = glob.glob('/Volumes/Google Drive/My Drive/Colab_Notebook/kaggle/ecom_behavior/bag_dir/politicians/*.json')
txt_db = db.read_text(files)
print(txt_db.take(1))
print(type(txt_db.take(1))) # tuple이지만 그 안은 str
dict_bag = txt_db.map(json.loads)
print(dict_bag.take(1))
print(type(dict_bag.take(1))) # tuble이지만 그 안은 dict
2) filter : .filter()
필터링 적용
def gender_filter(txt_db):
return txt_db['gender'] == "male"
male_db = dict_bag.filter(gender_filter)
print(male_db.count().compute()) #843
3) pluke : .pluck()
# pluck
pluck_ = dict_bag.pluck('gender')
pluck_.take(3)
pluck_name_len = dict_bag.pluck('name').map(len)
pluck_min = pluck_name_len.min()
pluck_max = pluck_name_len.max()
pluck_mean = pluck_name_len.mean()
print(dask.compute(pluck_min, pluck_max, pluck_mean))
1-3. Dask Bag to DataFrame
- unstructured data를 정리하여 일부의 structured data를 추출하길 원할 때 Dask bags을 사용함
- Json 파일을 사용할 경우, 원하는 dict key만 선정한 후 dask dataframe으로 변형
1-4. Data Variety
- Dask bag으로 audio, video, image 파일 모두 다 효용 가능함
- audio 파일을 위해서는 from scipy.io import wavfile
- 음성 분석 추가 go
2.Create Local Cluster
-
Process와 Thread를 동시에 사용하기 위해서는 Cluster와 Client가 필요함
-
Client를 한번 설정하면 (그 이후의) dask는 모든 computations에 Client를 사용함(그것이 bags이던, df이던 무엇이던간에)
-
LocalCluster()
: 사용 중인 컴퓨터에 생성되는 cluster. 이외로 생성된 cluster는 다른 컴퓨터와 작업을 분리하여 진행함.
# 두가지 방법 다 동일한 결과값 생성
from dask.distributed import Client, LocalCluster
# Method1. Making Client
client = Client(processes = True,
n_workers = 4,
threads_per_worker = 1)
# Method2. Making Cluster then make client.
cluster = LocalCluster(processes = True,
n_workers = 4,
threads_per_worker = 1
)
client = Client(cluster)
Author And Source
이 문제에 관하여(DASK #3 | Bag & Cluster), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://velog.io/@js8456/DASK-3-Dask-Bag저자 귀속: 원작자 정보가 원작자 URL에 포함되어 있으며 저작권은 원작자 소유입니다.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)