대형 AWS S3 파일 병렬 처리

내 글에서 우리는 S3 select를 통해 대형 AWS S3 파일을 처리하는 효율에 대해 토론했다.처리 과정은 순서대로 진행되기 때문에 큰 파일은 시간이 오래 걸릴 수 있습니다.그러면 우리는 어떻게 여러 단원을 뛰어넘어 병행 처리합니까?🤔 좋아, 이 문장에서, 우리는 그것을 실시하고, 그것의 일을 볼 것이다.
📝 나는 내가 지난 문장을 살펴보고 이 문장에 상하문을 설정할 것을 강력히 건의한다.
나는 항상 문제를 해결하는 데 필요한 더 작은 부분으로 분해하는 것을 좋아한다.세 가지 간단한 절차를 통해 이 문제를 해결해 봅시다.

1. S3 파일의 총 바이트 찾기


이전 글의 첫걸음과 매우 비슷합니다. 여기서도 파일 크기를 먼저 찾아보려고 합니다.
다음 코드 세그먼트는 S3 파일에 대해 HEAD 요청을 수행하고 파일 크기를 바이트 단위로 결정하는 함수를 보여줍니다.
# core/utils.py

def get_s3_file_size(bucket: str, key: str) -> int:
    """Gets the file size of S3 object by a HEAD request

    Args:
        bucket (str): S3 bucket
        key (str): S3 object path

    Returns:
        int: File size in bytes. Defaults to 0 if any error.
    """
    aws_profile = current_app.config.get('AWS_PROFILE_NAME')
    s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
    file_size = 0
    try:
        response = s3_client.head_object(Bucket=bucket, Key=key)
        if response:
            file_size = int(response.get('ResponseMetadata').get('HTTPHeaders').get('content-length'))
    except ClientError:
        logger.exception(f'Client error reading S3 file {bucket} : {key}')
    return file_size

2. 미나리 만들기 작업으로 블록 처리


파일 블록을 처리하기 위해 미나리 작업을 정의할 것입니다. (잠시 후에 병행 실행될 것입니다.)여기의 전체적인 처리는 다음과 같다.
  • 은 이 블록의 startend bytes을 매개 변수로
  • 으로 수신한다
  • 은 S3을 통해 S3 파일의 이 부분을 가져와 임시 파일(이 예는 CSV)
  • 에 로컬로 저장합니다.
  • 이 임시 파일을 읽고
  • 에 필요한 모든 프로세스 수행
  • 이 임시 파일 삭제
  • 📝 나는 이 임무를 파일 블록 프로세서라고 부른다.그것은 파일의 블록을 처리한다.이 중 여러 개의 작업을 실행하면 전체 파일의 처리를 완성할 것입니다.
    # core/tasks.py
    
    @celery.task(name='core.tasks.chunk_file_processor', bind=True)
    def chunk_file_processor(self, **kwargs):
        """ Creates and process a single file chunk based on S3 Select ScanRange start and end bytes
        """
        bucket = kwargs.get('bucket')
        key = kwargs.get('key')
        filename = kwargs.get('filename')
        start_byte_range = kwargs.get('start_byte_range')
        end_byte_range = kwargs.get('end_byte_range')
        header_row_str = kwargs.get('header_row_str')
        local_file = filename.replace('.csv', f'.{start_byte_range}.csv')
        file_path = path.join(current_app.config.get('BASE_DIR'), 'temp', local_file)
    
        logger.info(f'Processing {filename} chunk range {start_byte_range} -> {end_byte_range}')
        try:
            # 1. fetch data from S3 and store it in a file
            store_scrm_file_s3_content_in_local_file(
                bucket=bucket, key=key, file_path=file_path, start_range=start_byte_range,
                end_range=end_byte_range, delimiter=S3_FILE_DELIMITER, header_row=header_row_str)
    
            # 2. Process the chunk file in temp folder
            id_set = set()
            with open(file_path) as csv_file:
                csv_reader = csv.DictReader(csv_file, delimiter=S3_FILE_DELIMITER)
                for row in csv_reader:
                    # perform any other processing here
                    id_set.add(int(row.get('id')))
            logger.info(f'{min(id_set)} --> {max(id_set)}')
    
            # 3. delete local file
            if path.exists(file_path):
                unlink(file_path)
        except Exception:
            logger.exception(f'Error in file processor: {filename}')
    

    3. 여러 미나리 퀘스트 병행 수행


    이것은 절차 중 가장 재미있는 한 걸음이다.우리는 Celery Group을 통해 여러 개의 미나리 작업을 병행 운행할 것이다.
    S3의 한 파일의 총 바이트 수를 알면 블록의 startend bytes을 계산하고 미나리 그룹을 통해 2단계에서 만든 작업을 호출합니다.startend bytes 범위는 파일 크기의 연속적인 범위입니다.또는, 우리도 모든 처리 임무가 끝난 후에 리셋 (결과) 임무를 호출할 수 있다.
    # core/tasks.py
    
    @celery.task(name='core.tasks.s3_parallel_file_processing', bind=True)
    def s3_parallel_file_processing_task(self, **kwargs):
        """ Creates celery tasks to process chunks of file in parallel
        """
        bucket = kwargs.get('bucket')
        key = kwargs.get('key')
        try:
            filename = key
            # 1. Check file headers for validity -> if failed, stop processing
            desired_row_headers = (
                'id',
                'name',
                'age',
                'latitude',
                'longitude',
                'monthly_income',
                'experienced'
            )
            is_headers_valid, header_row_str = validate_scrm_file_headers_via_s3_select(
                bucket=bucket,
                key=key,
                delimiter=S3_FILE_DELIMITER,
                desired_headers=desired_row_headers)
            if not is_headers_valid:
                logger.error(f'{filename} file headers validation failed')
                return False
            logger.info(f'{filename} file headers validation successful')
    
            # 2. fetch file size via S3 HEAD
            file_size = get_s3_file_size(bucket=bucket, key=key)
            if not file_size:
                logger.error(f'{filename} file size invalid {file_size}')
                return False
            logger.info(f'We are processing {filename} file about {file_size} bytes :-o')
    
            # 2. Create celery group tasks for chunk of this file size for parallel processing
            start_range = 0
            end_range = min(S3_FILE_PROCESSING_CHUNK_SIZE, file_size)
            tasks = []
            while start_range < file_size:
                tasks.append(
                    chunk_file_processor.signature(
                        kwargs={
                            'bucket': bucket,
                            'key': key,
                            'filename': filename,
                            'start_byte_range': start_range,
                            'end_byte_range': end_range,
                            'header_row_str': header_row_str
                        }
                    )
                )
                start_range = end_range
                end_range = end_range + min(S3_FILE_PROCESSING_CHUNK_SIZE, file_size - end_range)
            job = (group(tasks) | chunk_file_processor_callback.s(data={'filename': filename}))
            _ = job.apply_async()
        except Exception:
            logger.exception(f'Error processing file: {filename}')
    
    
    @celery.task(name='core.tasks.chunk_file_processor_callback', bind=True, ignore_result=False)
    def chunk_file_processor_callback(self, *args, **kwargs):
        """ Callback task called post chunk_file_processor()
        """
        logger.info('Callback called')
    
    # core/utils.py
    
    def store_scrm_file_s3_content_in_local_file(bucket: str, key: str, file_path: str, start_range: int, end_range: int,
                                                 delimiter: str, header_row: str):
        """Retrieves S3 file content via S3 Select ScanRange and store it in a local file.
           Make sure the header validation is done before calling this.
    
        Args:
            bucket (str): S3 bucket
            key (str): S3 key
            file_path (str): Local file path to store the contents
            start_range (int): Start range of ScanRange parameter of S3 Select
            end_range (int): End range of ScanRange parameter of S3 Select
            delimiter (str): S3 file delimiter
            header_row (str): Header row of the local file. This will be inserted as first line in local file.
        """
        aws_profile = current_app.config.get('AWS_PROFILE_NAME')
        s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
        expression = 'SELECT * FROM S3Object'
        try:
            response = s3_client.select_object_content(
                Bucket=bucket,
                Key=key,
                ExpressionType='SQL',
                Expression=expression,
                InputSerialization={
                    'CSV': {
                        'FileHeaderInfo': 'USE',
                        'FieldDelimiter': delimiter,
                        'RecordDelimiter': '\n'
                    }
                },
                OutputSerialization={
                    'CSV': {
                        'FieldDelimiter': delimiter,
                        'RecordDelimiter': '\n',
                    },
                },
                ScanRange={
                    'Start': start_range,
                    'End': end_range
                },
            )
    
            """
            select_object_content() response is an event stream that can be looped to concatenate the overall result set
            """
            f = open(file_path, 'wb')  # we receive data in bytes and hence opening file in bytes
            f.write(header_row.encode())
            f.write('\n'.encode())
            for event in response['Payload']:
                if records := event.get('Records'):
                    f.write(records['Payload'])
            f.close()
        except ClientError:
            logger.exception(f'Client error reading S3 file {bucket} : {key}')
        except Exception:
            logger.exception(f'Error reading S3 file {bucket} : {key}')
    
    이렇게!😎 이제 S3 파일을 한 글자 한 글자 절전식으로 처리하지 않고, 병렬 처리 블록을 통해 병렬 처리합니다.그렇게 어렵지 않죠?😅

    🔍 비교 처리 시간


    만약 우리가 이전 글에서 처리한 같은 파일의 처리 시간을 이런 방법과 비교한다면, 처리 속도는 약 68% 빠르다. (같은 하드웨어와 설정을 사용한다.)😆
    스트리밍 S3 파일
    S3 파일 병렬 처리
    파일 크기
    4.8MB
    4.8MB
    처리 시간
    약 37초
    약 12초

    ✔️ 이런 방법의 이점

  • 은 수백만 개의 기록을 포함하는 초대형 파일을 몇 분 안에 처리할 수 있다.내가 생산 환경에서 이런 방법을 사용한 지 이미 한참이 되었는데, 이것은 매우 행복한
  • 이다
  • 분산 작업자
  • 워크풀
  • 의 가용성으로 처리 속도 조절
  • 메모리 문제 없음
  • 📌 내 GitHub 저장소를 보고 이러한 방법의 전체 작업 예를 확인할 수 있습니다.👇

    이드리스 란프라바라 / s3 선택 프레젠테이션


    이 프로젝트는 대형 데이터 파일을 페이지별로 스트리밍하는 다양한 AWS S3 선택 기능을 보여준다.


    AWS S3 선택 프레젠테이션



    이 프로젝트는 AWS S3 Select의 풍부한 기능을 보여줌으로써 paginated style에서 대형 데이터 파일을 전송할 수 있다.
    현재 S3 SelectOFFSET을 지원하지 않기 때문에 우리는 조회 결과를 페이지로 나눌 수 없습니다.따라서 scanrange 기능을 사용하여 S3 파일의 컨텐트를 스트리밍합니다.

    배경.


    큰 파일을 가져오거나 읽으면 Out of Memory 오류가 발생합니다.그것 또한 시스템 붕괴 사건을 초래할 수도 있다.여기 도서관이 있어요.Pandas, Dask 등은 큰 파일을 처리하는 데 매우 뛰어나지만, 파일도 로컬에 표시해야 한다. 즉, S3에서 로컬 기기를 가져와야 한다.그러나 전체 S3 파일을 로컬에서 동시에 가져오고 저장하지 않으려면 어떻게 해야 합니까?🤔
    우리는 AWS S3 Select을 사용하여 그것의 ScanRange 파라미터를 통해 큰 파일을 전송할 수 있다.이런 방법은...
    View on GitHub

    📑 리소스

  • My GitHub repository demonstrating the above approach
  • AWS S3 Select boto3 reference
  • AWS S3 Select userguide
  • 안녕히 계세요!제 다음 댓글까지.😋

    좋은 웹페이지 즐겨찾기