python chunk implementation
15918 단어 multiprocessingh5pypythonh5py
긴 시간 동안 실시간으로 어떤 데이터를 로깅하려면, 적절하게 끊어서 저장해줘야 한다. 실시간으로 파일을 쓰는 것은 많은 리소스를 필요로 하고, 한번에 모아서 저장하는 것은 데이터 전체를 잃을 가능성이 크다. 그래서 Chunk
라는 클래스를 만들어서, 일정 갯수 이상 데이터가 쌓이면 자동으로 저장되게 구현했다. 저장하는 포맷은 대용량 데이터 처리에 용이한 hdf5
이다. 데이터를 추가하는 코드가 다른 프로세스와 실시간으로 상호작용할 때는, 저장이 오래 걸리면, 이후 프로세스를 지연시킬 수 있다. 그래서 저장은 multiprocessing을 이용해서 구현했다(save_mp()
).
import numpy as np
import h5py, time, datetime
import multiprocessing as mp
class Chunk():
def __init__(self, size:tuple, path:str, dset_name:str, mode='mp'):
self.size = size
self.path = path
self.dset_name = dset_name
self.data = np.full(self.size, np.nan)
self.row_idx = 0
self.mode = mode
self.processes = []
def push(self, row:np.array):
self.data[self.row_idx] = row
self.row_idx += 1
if self.row_idx >= self.size[0]:
if self.mode == 'mp':
self.save_mp()
else:
self.save()
self.data = np.full(self.size, np.nan)
self.row_idx = 0
def save(self):
self._save(self.data, self.path, self.dset_name)
with h5py.File(self.path, 'a') as f:
data = self.data[:self.row_idx]
if self.dset_name in f:
dset = f[self.dset_name]
shape = [dset.shape[i] for i in range(len(dset.shape))]
appending_idx = shape[0]
shape[0] += data.shape[0]
dset.resize(shape)
dset[appending_idx:] = data
else:
shape = list(self.data.shape)
shape[0] = None
shape = tuple(shape)
dset = f.create_dataset(self.dset_name, data=self.data, compression='gzip', maxshape=shape)
def save_mp(self):
# print(f"save_mp \t {datetime.datetime.now()}")
p = mp.Process(target=Chunk._save, args=(self.data, self.path, self.dset_name,))
p.start()
self.processes.append(p)
def wait_mp(self):
for p in self.processes:
p.join()
@staticmethod
def _save(data, path, dset_name):
with h5py.File(path, 'a') as f:
_data_shape = list(data.shape)
_data_shape[0] = -1
data = data[~np.isnan(data)].reshape(_data_shape)
if dset_name in f:
dset = f[dset_name]
shape = [dset.shape[i] for i in range(len(dset.shape))]
appending_idx = shape[0]
shape[0] += data.shape[0]
dset.resize(shape)
dset[appending_idx:] = data
else:
shape = list(data.shape)
shape[0] = None
shape = tuple(shape)
dset = f.create_dataset(dset_name, data=data, compression='gzip', maxshape=shape)
Author And Source
이 문제에 관하여(python chunk implementation), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://velog.io/@kimhyunchul/python-chunk-implementation저자 귀속: 원작자 정보가 원작자 URL에 포함되어 있으며 저작권은 원작자 소유입니다.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)