대용량 파일용 CLI 업로드

8689 단어 filecliuploadlarge
우리는 데이터 사이언스 팀에서 업무의 일환으로 매일 데이터를 처리합니다. 데이터를 수집하고 잠재적으로 중요한 기능과 기준 수치를 분석하는 것으로 시작합니다. 그런 다음 데이터 전처리 및 정리를 수행합니다. 마지막으로 훈련을 위해 데이터를 기계 학습 알고리즘에 공급합니다.

교육이 완료되면 모델을 테스트합니다. 그런 다음 성능이 좋으면 API를 통해 서비스합니다.

previous article 에서 미리 서명된 URL을 통해 멀티파트 업로드를 사용하여 대용량 파일을 업로드하는 방법에 대해 설명했습니다. 이제 한 단계 더 나아가 미리 서명된 URL을 사용하여 S3에 대용량 파일을 업로드하기 위한 CLI 도구를 생성하는 방법에 대해 설명합니다.

이 기사는 아래에 설명된 대로 3개 부분으로 구성됩니다.
  • 멀티파트 업로드를 위해 미리 서명된 URL 생성
  • 개체의 모든 부분을 업로드합니다
  • .
  • 업로드 완료

  • 멀티파트 업로드 미리 서명된 URL 요청



    먼저 AWS S3 버킷에 미리 서명된 URL을 요청해야 합니다. 객체의 각 부분에 해당하는 미리 서명된 URL 목록과 함께 생성되는 부분이 있는 객체와 연결된 upload_id가 반환됩니다. 미리 서명된 URL을 요청하기 위한 경로를 만들어 봅시다.

    from pathlib import Path
    …
    …
    
    @app.route('/presigned',methods=['POST'])
    def return_presigned():
        data = request.form.to_dict(flat=False)
        file_name = data['file_name'][0]
        file_size = int(data['file_size'][0])
        target_file = Path(file_name)
        max_size = 5 * 1024 * 1024
        upload_by = int(file_size / max_size) + 1
        bucket_name = "YOUR_BUCKET_NAME"
        key = file_name
        upload_id = s3util.start(bucket_name, key)
        urls = []
        for part in range(1, upload_by + 1):
               signed_url = s3util.create_presigned_url(part)
                 urls.append(signed_url)
        return jsonify({
                         'bucket_name':bucket_name,
                         'key':key,
                         'upload_id':upload_id,
                    'file_size:file_size,
                       'file_name':file_name,
                    'max_size':max_size,
                         'upload_by':upload_by,
                    'urls':urls
                })
    


    코드를 살펴보겠습니다. 이 경로(Flask 경로)에서 요청에서 전송된 정보인 file_name 및 file_size를 얻습니다.
    file_name은 객체의 일부에 대한 URL을 생성하는 데 사용되며 file_size는 생성할 부분(생성할 미리 서명된 URL)의 수를 찾는 데 사용됩니다.
    경로에서 max_size는 각 부품의 최대 크기를 결정합니다. 필요에 따라 변경할 수 있습니다.
    upload_by는 개체가 업로드할 부분 수를 알려줍니다.
    bucket_name은 데이터를 업로드할 버킷입니다.
    upload_id는 곧 설명할 S3 유틸리티 함수 create_multipart_upload를 사용하여 생성됩니다.
    그런 다음 s3의 create_presigned_url 유틸리티 기능을 사용하여 for 루프에서 미리 서명된 URL이 생성됩니다. 다시, 우리는 조금 후에 다시 돌아올 것입니다.
    다음으로 필요한 데이터를 JSON 형식으로 반환합니다.

    이제 create_multipart_upload에 대해 이야기해 보겠습니다. 더 읽기 쉽고 관리하기 쉽도록 코드를 캡슐화하는 데 도움이 되는 유틸리티 기능입니다. 다음은 유틸리티 클래스의 코드입니다.

    import boto3
    from botocore.exceptions import ClientError
    from boto3 import Session
    
    
    class S3MultipartUploadUtil:
        """
        AWS S3 Multipart Upload Uril
        """
        def __init__(self, session: Session):
            self.session = session
            self.s3 = session.client('s3')
            self.upload_id = None
            self.bucket_name = None
            self.key = None
    
        def start(self, bucket_name: str, key: str):
            """
            Start Multipart Upload
            :param bucket_name:
            :param key:
            :return:
            """
            self.bucket_name = bucket_name
            self.key = key
            res = self.s3.create_multipart_upload(Bucket=bucket_name, Key=key)
            self.upload_id = res['UploadId']
            logger.debug(f"Start multipart upload '{self.upload_id}'")
            return self.upload_id
    
        def create_presigned_url(self, part_no: int, expire: int=3600) -> str:
            """
            Create pre-signed URL for upload part.
            :param part_no:
            :param expire:
            :return:
            """
            signed_url = self.s3.generate_presigned_url(
                ClientMethod='upload_part',
                Params={'Bucket': self.bucket_name,
                        'Key': self.key,
                        'UploadId': self.upload_id,
                        'PartNumber': part_no},
                ExpiresIn=expire)
            logger.debug(f"Create presigned url for upload part '{signed_url}'")
            return signed_url
    
        def complete(self, parts,id,key,bucket_name):
            """
            Complete Multipart Uploading.
            `parts` is list of dictionary below.
            ```
    
    
            [ {'ETag': etag, 'PartNumber': 1}, {'ETag': etag, 'PartNumber': 2}, ... ]
    
    
            ```
            you can get `ETag` from upload part response header.
            :param parts: Sent part info.
            :return:
            """
            res = self.s3.complete_multipart_upload(
                Bucket=bucket_name,
                Key=key,
                MultipartUpload={
                    'Parts': parts
                },
                UploadId=id
            )
            logger.debug(f"Complete multipart upload '{self.upload_id}'")
            logger.debug(res)
            self.upload_id = None
            self.bucket_name = None
            self.key = None
    


    이 클래스에서는 S3 클라이언트의 기능을 사용하기 쉽고 API 파일에서 덜 어수선하게 만들기 위해 래핑합니다.

    API에서 응답을 받으면 다음과 같이 표시됩니다.


    CLI를 사용하여 데이터를 업로드하려면 이 응답을 JSON 파일로 다운로드합니다.

    객체의 모든 부분 업로드



    이제 이 JSON 파일을 사용하는 CLI 코드로 돌아가서 이 파일을 presigned.json으로 저장한다고 가정합니다.

    import requests
    import progressbar
    from pathlib import Path
    
    def main():
        data = eval(open('presigned.json').read())
        upload_by = data['upload_by']
        max_size = data['max_size']
        urls = data['urls']
        target_file = Path(data['file_name'])
        file_size = data['file_size']
        key = data['key']
        upload_id = data['upload_id']
        bucket_name = data['bucket_name']
        bar = progressbar.ProgressBar(maxval=file_size, \
            widgets=[progressbar.Bar('=', '[', ']'), ' ', progressbar.Percentage()])
        json_object = dict()
        parts = []
        file_size_counter = 0
        with target_file.open('rb') as fin:
            bar.start()
            for num, url in enumerate(urls):
                part = num + 1
                file_data = fin.read(max_size)
                file_size_counter += len(file_data)
                res = requests.put(url, data=file_data)
    
                if res.status_code != 200:
                    print (res.status_code)
                    print ("Error while uploading your data.")
                    return None
                bar.update(file_size_counter)
                etag = res.headers['ETag']
                parts.append((etag, part))
            bar.finish()
            json_object['parts'] = [
                {"ETag": eval(x), 'PartNumber': int(y)} for x, y in parts]
            json_object['upload_id'] = upload_id
            json_object['key'] = key
            json_object['bucket_name'] = bucket_name
        requests.post('https://YOUR_HOSTED_API/combine, json={'parts': json_object})
        print ("Dataset is uploaded successfully")
    
    if __name__ == "__main__":
        main()    
    


    위의 코드는 파일을 로드하고 upload_id, URL 등을 포함하여 필요한 모든 정보를 가져옵니다. 파일을 업로드하는 동안 진행률을 표시하기 위해 Progressbar를 사용합니다. 전체 코드는 다음 코드 줄을 제외하고 거의 자체 설명이 필요합니다.

    requests.post('https://YOUR_HOSTED_API/combine, json={'parts': json_object})
    


    이 코드 조각을 이해하려면 업로드를 완료하는 마지막 단계를 살펴봐야 합니다.

    업로드 완료



    파일의 모든 부분을 업로드했지만 이러한 부분이 아직 결합되지 않았습니다. 이들을 결합하려면 s3에 업로드가 완료되었으며 이제 부품을 결합할 수 있음을 알려야 합니다. 위의 요청은 아래 표의 경로를 호출하고 s3 유틸리티 클래스를 사용하여 멀티파트 업로드를 완료합니다. 이것은 파일과 upload_id를 사용하여 업로드되는 동일한 파일의 일부에 대해 s3에 알려주는 upload_id에 대한 적절한 정보를 제공합니다.

    @app.route("/combine",methods=["POST"])
    def combine():
        body = request.form
        body = body['parts']    
        session = Session()
        s3util = Presigned(session)
        parts = body['parts']
        id, key, bucket_name = body['upload_id'], body['key'], body['bucket_name']
        PARTS = [{"Etag": eval(x), 'PartNumber': int(y)} for x, y in parts]
        s3util.complete(PARTS, id, key, bucket_name)
        return Response(status_code=200)
    


    이 코드는 CLI 도구를 생성하기 위한 최소한의 필수 코드입니다. S3와 상호 작용하기 위해 AWS에서 적절한 역할이 있는 서버에 배포하여 멀티파트 업로드를 완료하기 위해 미리 서명된 URL을 생성하고 반환할 수 있습니다. 이렇게 하면 아무도 S3 버킷에 직접 액세스하지 못하도록 할 수 있습니다. 대신 안전한 데이터 업로드 방법인 미리 서명된 URL을 사용하여 데이터를 업로드합니다.

    좋은 웹페이지 즐겨찾기