대형 AWS S3 파일 병렬 처리
32271 단어 showdevawspythonproductivity
📝 나는 내가 지난 문장을 살펴보고 이 문장에 상하문을 설정할 것을 강력히 건의한다.
나는 항상 문제를 해결하는 데 필요한 더 작은 부분으로 분해하는 것을 좋아한다.세 가지 간단한 절차를 통해 이 문제를 해결해 봅시다.
1. S3 파일의 총 바이트 찾기
이전 글의 첫걸음과 매우 비슷합니다. 여기서도 파일 크기를 먼저 찾아보려고 합니다.
다음 코드 세그먼트는 S3 파일에 대해 HEAD 요청을 수행하고 파일 크기를 바이트 단위로 결정하는 함수를 보여줍니다.
# core/utils.py
def get_s3_file_size(bucket: str, key: str) -> int:
"""Gets the file size of S3 object by a HEAD request
Args:
bucket (str): S3 bucket
key (str): S3 object path
Returns:
int: File size in bytes. Defaults to 0 if any error.
"""
aws_profile = current_app.config.get('AWS_PROFILE_NAME')
s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
file_size = 0
try:
response = s3_client.head_object(Bucket=bucket, Key=key)
if response:
file_size = int(response.get('ResponseMetadata').get('HTTPHeaders').get('content-length'))
except ClientError:
logger.exception(f'Client error reading S3 file {bucket} : {key}')
return file_size
2. 미나리 만들기 작업으로 블록 처리
파일 블록을 처리하기 위해 미나리 작업을 정의할 것입니다. (잠시 후에 병행 실행될 것입니다.)여기의 전체적인 처리는 다음과 같다.
start
과 end bytes
을 매개 변수로 # core/tasks.py
@celery.task(name='core.tasks.chunk_file_processor', bind=True)
def chunk_file_processor(self, **kwargs):
""" Creates and process a single file chunk based on S3 Select ScanRange start and end bytes
"""
bucket = kwargs.get('bucket')
key = kwargs.get('key')
filename = kwargs.get('filename')
start_byte_range = kwargs.get('start_byte_range')
end_byte_range = kwargs.get('end_byte_range')
header_row_str = kwargs.get('header_row_str')
local_file = filename.replace('.csv', f'.{start_byte_range}.csv')
file_path = path.join(current_app.config.get('BASE_DIR'), 'temp', local_file)
logger.info(f'Processing {filename} chunk range {start_byte_range} -> {end_byte_range}')
try:
# 1. fetch data from S3 and store it in a file
store_scrm_file_s3_content_in_local_file(
bucket=bucket, key=key, file_path=file_path, start_range=start_byte_range,
end_range=end_byte_range, delimiter=S3_FILE_DELIMITER, header_row=header_row_str)
# 2. Process the chunk file in temp folder
id_set = set()
with open(file_path) as csv_file:
csv_reader = csv.DictReader(csv_file, delimiter=S3_FILE_DELIMITER)
for row in csv_reader:
# perform any other processing here
id_set.add(int(row.get('id')))
logger.info(f'{min(id_set)} --> {max(id_set)}')
# 3. delete local file
if path.exists(file_path):
unlink(file_path)
except Exception:
logger.exception(f'Error in file processor: {filename}')
3. 여러 미나리 퀘스트 병행 수행
이것은 절차 중 가장 재미있는 한 걸음이다.우리는 Celery Group을 통해 여러 개의 미나리 작업을 병행 운행할 것이다.
S3의 한 파일의 총 바이트 수를 알면 블록의
start
과 end bytes
을 계산하고 미나리 그룹을 통해 2단계에서 만든 작업을 호출합니다.start
및 end bytes
범위는 파일 크기의 연속적인 범위입니다.또는, 우리도 모든 처리 임무가 끝난 후에 리셋 (결과) 임무를 호출할 수 있다.# core/tasks.py
@celery.task(name='core.tasks.s3_parallel_file_processing', bind=True)
def s3_parallel_file_processing_task(self, **kwargs):
""" Creates celery tasks to process chunks of file in parallel
"""
bucket = kwargs.get('bucket')
key = kwargs.get('key')
try:
filename = key
# 1. Check file headers for validity -> if failed, stop processing
desired_row_headers = (
'id',
'name',
'age',
'latitude',
'longitude',
'monthly_income',
'experienced'
)
is_headers_valid, header_row_str = validate_scrm_file_headers_via_s3_select(
bucket=bucket,
key=key,
delimiter=S3_FILE_DELIMITER,
desired_headers=desired_row_headers)
if not is_headers_valid:
logger.error(f'{filename} file headers validation failed')
return False
logger.info(f'{filename} file headers validation successful')
# 2. fetch file size via S3 HEAD
file_size = get_s3_file_size(bucket=bucket, key=key)
if not file_size:
logger.error(f'{filename} file size invalid {file_size}')
return False
logger.info(f'We are processing {filename} file about {file_size} bytes :-o')
# 2. Create celery group tasks for chunk of this file size for parallel processing
start_range = 0
end_range = min(S3_FILE_PROCESSING_CHUNK_SIZE, file_size)
tasks = []
while start_range < file_size:
tasks.append(
chunk_file_processor.signature(
kwargs={
'bucket': bucket,
'key': key,
'filename': filename,
'start_byte_range': start_range,
'end_byte_range': end_range,
'header_row_str': header_row_str
}
)
)
start_range = end_range
end_range = end_range + min(S3_FILE_PROCESSING_CHUNK_SIZE, file_size - end_range)
job = (group(tasks) | chunk_file_processor_callback.s(data={'filename': filename}))
_ = job.apply_async()
except Exception:
logger.exception(f'Error processing file: {filename}')
@celery.task(name='core.tasks.chunk_file_processor_callback', bind=True, ignore_result=False)
def chunk_file_processor_callback(self, *args, **kwargs):
""" Callback task called post chunk_file_processor()
"""
logger.info('Callback called')
# core/utils.py
def store_scrm_file_s3_content_in_local_file(bucket: str, key: str, file_path: str, start_range: int, end_range: int,
delimiter: str, header_row: str):
"""Retrieves S3 file content via S3 Select ScanRange and store it in a local file.
Make sure the header validation is done before calling this.
Args:
bucket (str): S3 bucket
key (str): S3 key
file_path (str): Local file path to store the contents
start_range (int): Start range of ScanRange parameter of S3 Select
end_range (int): End range of ScanRange parameter of S3 Select
delimiter (str): S3 file delimiter
header_row (str): Header row of the local file. This will be inserted as first line in local file.
"""
aws_profile = current_app.config.get('AWS_PROFILE_NAME')
s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
expression = 'SELECT * FROM S3Object'
try:
response = s3_client.select_object_content(
Bucket=bucket,
Key=key,
ExpressionType='SQL',
Expression=expression,
InputSerialization={
'CSV': {
'FileHeaderInfo': 'USE',
'FieldDelimiter': delimiter,
'RecordDelimiter': '\n'
}
},
OutputSerialization={
'CSV': {
'FieldDelimiter': delimiter,
'RecordDelimiter': '\n',
},
},
ScanRange={
'Start': start_range,
'End': end_range
},
)
"""
select_object_content() response is an event stream that can be looped to concatenate the overall result set
"""
f = open(file_path, 'wb') # we receive data in bytes and hence opening file in bytes
f.write(header_row.encode())
f.write('\n'.encode())
for event in response['Payload']:
if records := event.get('Records'):
f.write(records['Payload'])
f.close()
except ClientError:
logger.exception(f'Client error reading S3 file {bucket} : {key}')
except Exception:
logger.exception(f'Error reading S3 file {bucket} : {key}')
이렇게!😎 이제 S3 파일을 한 글자 한 글자 절전식으로 처리하지 않고, 병렬 처리 블록을 통해 병렬 처리합니다.그렇게 어렵지 않죠?😅🔍 비교 처리 시간
만약 우리가 이전 글에서 처리한 같은 파일의 처리 시간을 이런 방법과 비교한다면, 처리 속도는 약 68% 빠르다. (같은 하드웨어와 설정을 사용한다.)😆
스트리밍 S3 파일
S3 파일 병렬 처리
파일 크기
4.8MB
4.8MB
처리 시간
약 37초
약 12초
✔️ 이런 방법의 이점
이드리스 란프라바라 / s3 선택 프레젠테이션
이 프로젝트는 대형 데이터 파일을 페이지별로 스트리밍하는 다양한 AWS S3 선택 기능을 보여준다.
AWS S3 선택 프레젠테이션
이 프로젝트는 AWS S3 Select
의 풍부한 기능을 보여줌으로써 paginated style
에서 대형 데이터 파일을 전송할 수 있다.
현재 S3 Select
은 OFFSET
을 지원하지 않기 때문에 우리는 조회 결과를 페이지로 나눌 수 없습니다.따라서 scanrange
기능을 사용하여 S3 파일의 컨텐트를 스트리밍합니다.
배경.
큰 파일을 가져오거나 읽으면 Out of Memory
오류가 발생합니다.그것 또한 시스템 붕괴 사건을 초래할 수도 있다.여기 도서관이 있어요.Pandas, Dask 등은 큰 파일을 처리하는 데 매우 뛰어나지만, 파일도 로컬에 표시해야 한다. 즉, S3에서 로컬 기기를 가져와야 한다.그러나 전체 S3 파일을 로컬에서 동시에 가져오고 저장하지 않으려면 어떻게 해야 합니까?🤔
우리는 AWS S3 Select
을 사용하여 그것의 ScanRange
파라미터를 통해 큰 파일을 전송할 수 있다.이런 방법은...
View on GitHub
📑 리소스
큰 파일을 가져오거나 읽으면
Out of Memory
오류가 발생합니다.그것 또한 시스템 붕괴 사건을 초래할 수도 있다.여기 도서관이 있어요.Pandas, Dask 등은 큰 파일을 처리하는 데 매우 뛰어나지만, 파일도 로컬에 표시해야 한다. 즉, S3에서 로컬 기기를 가져와야 한다.그러나 전체 S3 파일을 로컬에서 동시에 가져오고 저장하지 않으려면 어떻게 해야 합니까?🤔우리는
AWS S3 Select
을 사용하여 그것의 ScanRange
파라미터를 통해 큰 파일을 전송할 수 있다.이런 방법은...View on GitHub
📑 리소스
Reference
이 문제에 관하여(대형 AWS S3 파일 병렬 처리), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/idrisrampurawala/parallelize-processing-a-large-aws-s3-file-8eh텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)