AWS Lambda 함수를 사용하여 S3에서 Redshift로 CSV 파일을 보내는 방법

5327 단어

소개



요즘은 모든 것을 자동화해야 하며 클라우드 작업도 예외가 아닙니다. 이 게시물은 당신을 위한 것입니다.

단계별로



데이터를 수집한 후 다음 단계는 Amazon Redshift와 같은 분석 플랫폼으로 데이터를 이동하기 전에 데이터를 추출, 변환 및 로드하기 위해 ETL을 설계하는 것입니다. AWS 프리 티어에 사용되는 Redshift 클러스터.

이를 위해 연구 사례를 다음과 같이 접근해 보았습니다.
  • S3 버킷을 생성합니다.
  • Redshift 클러스터를 생성합니다.
  • DBeaver 또는 원하는 항목에서 Redshift에 연결합니다.
  • 데이터베이스에 테이블을 만듭니다.
  • 종속성이 필요한 Python에서 가상 환경을 만듭니다.
  • Lambda 함수를 생성합니다.
  • 누군가 S3에 데이터를 업로드합니다.
  • 데이터를 쿼리합니다.



  • ¡¡Let’s get started !!



    나중에 1단계와 2단계를 마쳤으면 SQL 클라이언트DBeaver 또는 원하는 것을 사용하여 데이터베이스에 연결하겠습니다. 이를 위해 Redshift 클러스터 구성에서 다음 데이터를 기억해야 합니다.

    HOST = "xyz.redshift.amazonaws.com"
    PORT = "5439"
    DATABASE = "mydatabase"
    USERNAME = "myadmin"
    PASSWORD = "XYZ"
    TABLE = "mytable"
    





    이제 데이터베이스에 연결할 때 새 테이블을 생성하겠습니다.

    CREATE TABLE mytable (
    id      INT4 distkey sortkey,
    col 1     VARCHAR (30) NOT NULL,
    col 2         VARCHAR(100) NOT NULL,
    col 3 VARCHAR(100) NOT NULL,
    col 4        INTEGER NOT NULL,
    col 5  INTEGER NOT NULL,
    col 6           INTEGER NOT NULL);
    


    이 자습서의 경우 Lambda 함수에는 Sqalchemy , Psycopg2 과 같은 일부 Python 라이브러리가 필요하므로 업로드할 .zip 파일을 압축하기 전에 이러한 종속성과 Lambda 스크립트를 사용하여 Python에서 가상 환경을 생성해야 합니다. AWS로.

    이 시점에서 Lambda 함수를 AWS에 구성하는 데 필요한 것은 Python 스크립트뿐이며 누군가 S3 버킷에 새 객체를 업로드할 때마다 Lambda를 트리거합니다. 다음 리소스를 구성해야 합니다.
  • lambda_function.zip(Python 스크립트 및 종속성 또는 aws 사용자 지정 레이어를 추가할 수 있음)을 업로드하고 아래의 코드 예제를 사용하여 데이터를 redshiftlambda_function.py로 보냅니다.
  • AWSLambdaVPCAccesExcecutionRole에 대한 액세스 권한을 부여하는 Lambda 함수에 IAM 역할을 연결합니다.
  • 이 경우 Lambda 함수 또는 가지고 있는 다른 함수에 VPC 기본값을 추가해야 합니다.
  • environment variables "CON"및 "Table"추가

  • CON = "postgresql://USERNAME:[email protected]:5439/DATABASE"
    Table = "mytable"
    


  • 누군가 S3 버킷에 객체를 업로드할 때마다 Lambda 함수를 호출하는 S3 Event Notification을 생성합니다.
  • 시간 초과를 ≥ 3분으로 구성할 수 있습니다.

  • 여기 코드로 가자 👇

    import sqlalchemy 
    import psycopg2
    from sqlalchemy import create_engine 
    from sqlalchemy.orm import scoped_session, sessionmaker
    from datetime import datetime,timedelta
    import os
    
    def handler(event, context):
       for record in event['Records']:
    
          S3_BUCKET = record['s3']['bucket']['name']
          S3_OBJECT = record['s3']['object']['key']
    
    
        # Arguments
        DBC= os.environ["CON"]
        RS_TABLE = os.environ["Table"]
        RS_PORT = "5439"
        DELIMITER = "','"
        REGION = "'us-east-1' "
        # Connection
        engine = create_engine(DBC)
        db = scoped_session(sessionmaker(bind=engine))
        # Send files from S3 into redshift
        copy_query = "COPY "+RS_TABLE+" from 's3://"+   S3_BUCKET+'/'+S3_OBJECT+"' iam_role 'arn:aws:iam::11111111111:role/youroleredshift' delimiter "+DELIMITER+" IGNOREHEADER 1 REGION " + REGION
        # Execute querie
        db.execute(copy_query)
        db.commit()
        db.close()
    


    CSV 파일을 S3 버킷에 업로드할 준비가 되기 전에 먼저 테이블을 생성했으므로 람다 함수를 구현하고 올바르게 구성한 후에 데이터를 S3에 업로드하고 DBeaver로 이동할 수 있습니다. 테이블의 데이터를 쿼리합니다.



    요약



    AWS Lambda는 프로세스를 자동화하는 쉬운 방법이지만 어떤 순간에 사용할 수 없는지 이해해야 합니다. 예를 들어 AWS Lambda에는 6MB 페이로드 제한이 있으므로 이러한 방식으로 매우 큰 테이블을 마이그레이션하는 것은 실용적이지 않습니다.
    한편, 이 서비스를 사용하는 가장 큰 장점은 전체 솔루션 서버리스라는 것입니다!! , 따라서 EC2 인스턴스를 관리할 필요가 없습니다.


    여기까지 읽어주셔서 감사합니다. 이 기사가 유용하다고 생각되면 이 기사를 좋아하고 공유하십시오. 누군가는 그것이 유용하다고 생각할 수도 있고 저를 커피에 초대하지 않겠습니까?




    팔로우 👉
    팔로우 👉
    연락처: [email protected]

    좋은 웹페이지 즐겨찾기