AWS의 이벤트 구동 Python

약 15일 전, 한 클라우드 전문가가 AWS에 #CloudGuruChallenge - 이벤트 구동 Python을 발표했다.이것은 듣기에는 매우 간단할 것 같지만, 나는 약간의 도전에 직면하여 많은 것을 배웠다.이 블로그는 이 도전에 대처하도록 전면적으로 안내할 것입니다.

도전 개요


이 도전의 창조자다.이 도전의 주요 목표는 파이썬과 클라우드 서비스로 2019 관상바이러스 질병 데이터를 자동화하는 ETL 처리 파이프라인을 사용하는 것이다.이 문제에 대한 자세한 내용은 here 를 참조하십시오.

도전 단계


이제 나는 내가 따르는 도전의 모든 절차를 완성하고 싶다.
  • 변환
  • 변환된 데이터를 데이터베이스에 로드
  • 데이터베이스가 업데이트될 때 고객에게 통지
  • 오류 처리
  • 터치 기능 매일 1회
  • 인프라 코드
  • CI/CD 파이핑 및 소스 제어
  • Quicksight 대시보드
  • myGitHub repository에서 전체 작업 코드를 찾을 수 있습니다.

    단계 1 - 변환


    이 도전의 수요 중 하나는 전환 작업을 위한 단독python 모듈을 만드는 것이다.그래서 나는 데이터 변환을 위해 단독python 코드를 만들었다.
    우선, 나는 국가명에 따라 이 두 개의 데이터를 필터했다. 왜냐하면 여기에는 미국의 2019 관상바이러스 질병 데이터만 필요하기 때문이다.다음에python DateTime 데이터 형식에 불필요한 열을 삭제했습니다. (date가 존재하고 변환된 경우)마지막으로 보고 날짜에 따라 이 두 데이터를 연결하고 싶습니다. 이 점을 실현하기 위해 데이터 프레임 인덱스를 변경했습니다.이 함수는 데이터가 어디에 저장되는지도 모르고 데이터베이스도 모르기 때문에 완전히 추상적이다.
    def transform(dfNYT,dfJH):
        dfJH = dfJH[dfJH['Country/Region']=='US'].drop(columns='Country/Region')
        dfJH.columns = ['date','recovered']
        dfNYT['date'] = pd.to_datetime(dfNYT['date'],format='%Y-%m-%d')
        dfJH['date'] = pd.to_datetime(dfJH['date'],format='%Y-%m-%d')
        dfNYT.set_index('date', inplace=True)
        dfJH.set_index('date',inplace=True)
        dfJH['recovered'] = dfJH['recovered'].astype('int64')
        dfFinal = dfNYT.join(dfJH, how='inner')
        dfFinal.reset_index(inplace=True)
        return dfFinal
    

    단계 2 - 변환된 데이터를 데이터베이스에 로드합니다.


    이 작업을 수행하기 위해 Lambda 함수와 RDS PostgreSQL 데이터베이스를 사용했습니다.PostgreSQL에는 python에 사용할 데이터베이스 어댑터가 있습니다. 이름은 psycopg2 입니다.PostgreSQL 데이터베이스에 연결하여 다양한 SQL 조회를 수행할 수 있습니다.
    나는 두 CSV 파일의 URL, 데이터베이스 정보(단점, 포트, 사용자 이름, 비밀번호, 구역) 등 필요한 값을 저장하기 위해 Lambda 환경 변수를 사용합니다. 코드를 더욱 구조화하기 위해 이 작업을 하위 작업으로 나누어 함수를 만들었습니다.
  • 데이터베이스 연결 - 데이터베이스에 연결
  • 첫 번째 데이터 삽입 - 한 번에 모든 데이터 삽입
  • 일일 데이터 삽입 - 사용 가능한 새 데이터만 삽입
  • AWS Management Console과의 수동 상호 작용을 최소화하고 싶습니다.그래서 우선 데이터베이스에서 etl 라는 표가 있는지 검사했습니다.만약 사용할 수 없다면, lambda 함수는 먼저 표를 만들고 첫 번째 데이터를 삽입합니다.etl 테이블을 사용할 수 있다면, 일일 데이터 삽입을 검사합니다.
    데이터 삽입을 최적화하기 위해 나는 몇 가지 방법을 시도했다. 예를 들어 executemany(), mogrify()과 많은 다른 방법이다.마지막으로 나는 다음과 같은 방법을 채택했다.
    for i in dfFinal.index:
        row = (dfFinal.loc[i,'date'], int(dfFinal.loc[i,'cases']),int(dfFinal.loc[i,'deaths']),int(dfFinal.loc[i,'recovered']))
        data.append(row)
    records = ','.join(['%s'] * len(data))
    query = "insert into etl (reportdate,cases,deaths,recovered) values{}".format(records)
    

    3단계 - 데이터베이스가 업데이트될 때 고객에게 통지


    이를 위해 AWS의 SNS 서비스를 사용했습니다.저는 lambda에서 함수를 만들었습니다. 데이터베이스에 업데이트가 있을 때 메시지를 SNS에 발표합니다.
    def notify(text):
        try:
            sns = boto3.client('sns')
            sns.publish(TopicArn = snsARN, Message = text)
        except Exception as e:
            print("Not able to send SMS due to {}".format(e))
            exit(1)
    

    단계 4 - 오류 처리


    원활한 작업 흐름에 있어서 오류 처리가 매우 중요하다.나의 코드는 각종 오류를 관리할 수 있다.좋아하다
  • 데이터에 의외의 또는 고장 입력이 있으면 변환 함수에 이상이 발생하고 통지합니다.
  • 데이터베이스에 연결할 때 문제가 발생하면 database_connection() 함수가 원인을 알리고 설명합니다.
  • 테이블을 만들거나 데이터를 삽입할 때 문제가 발생하면 이상한 알림을 보냅니다.
  • 만약 우리가 매일 삽입을 진행한다면, 전체 데이터를 다시 삽입할 필요가 없다.새 데이터만 삽입하는 것이 좋습니다.이를 위해, 나는 표에서 가장 긴 (마지막) 보고 날짜를 검사하고, 후속 데이터를 삽입했다.그것은 또한 고객에게 업데이트된 줄 수를 통지할 것이다.

    단계 5 - 터치 기능 매일 1회


    이러한 CSV 파일은 매일 한 번씩 업데이트됩니다.따라서 데이터의 업데이트를 연속적으로 검사할 필요가 없다.이것이 바로 내가 매일 람다 함수를 호출하는 이벤트 규칙을 설정한 이유이다.너는 같은 강좌here를 찾을 수 있다.

    단계 6 - 인프라 코드


    나에게 있어서 이것은 가장 도전적인 부분 중의 하나이다. 왜냐하면 나는 구름 속에서 공을 칠 시간이 많지 않기 때문이다.나는 YAML에서 요소를 하나씩 만드는 것부터 시작한다.마지막으로, 나는 그렇게 많은 어려운 임무를 발견하지 못했다. 왜냐하면 AWS는 모든 유형의 AWS 서비스에 매우 목적성이 있고 묘사가 좋은 문서를 제공했기 때문이다.심지어 저는 최종적으로 CloudFormation을 통해 RDS 실례를 만들었습니다. 이것이 바로 제가 먼저 lambda 함수에서 표가 존재하는지 확인하는 이유입니다.전반적으로 말하자면, 이 부분은 나로 하여금 IaC에 대해 더 많은 것을 알게 했다.내 CloudFormation 템플릿here을 볼 수 있습니다.

    단계 7-CI/CD 파이프라인 및 소스 코드 관리


    이 부분은 선택할 수 있지만 저는 이렇게 하고 싶습니다. 왜냐하면python 코드나 기초 구조를 변경할 때마다 AWS에서 수동으로 업데이트하고 싶지 않기 때문입니다.소스 코드 제어는 매우 편리하기 때문에 GitHub를 사용합니다.CI/CD 파이핑을 구성하기 위해 GitHub 작업을 선택했습니다.이제 GitHub 저장소로 변경 사항을 전송할 때마다 GitHub 작업은 파이핑을 실행하고 필요한 업데이트를 수행합니다.

    단계 8-Quicksight 대시보드


    마지막 임무는 미국 환자 수, 회복률, 사망률, 일일 환자 증가 등을 가상화하는 것이다.




    주요 과제 및 학습


    전체 프로젝트에서 나는 많은 도전에 직면했고 심지어 그 속에서 교훈을 얻었다.
  • 전환과 데이터베이스 연결을 위해python의pandas와psycopg2 모듈을 사용했습니다.이 모듈은lambda에서 사용할 수 없습니다.lambda에 층을 추가해야 합니다.처음에 나는 문제에 부딪혔지만 마지막에 정성들여 기획한 AWS Lambda 층 목록here을 발견했다. 이것은 나의 임무를 간단하게 만들었다.
  • 코드를 통해 표를 만들고 싶어서 lambda에서 이 점을 실현할 수 없습니다.하지만 몇 가지 일을 시도한 후에 나는 이것을 얻었다.
  • 나는 구름 형성에 대해 많은 경험이 없다.그래서 저는 우선 다양한 AWS 문서를 읽어서 공부를 하고 최종적으로 완성할 수 있습니다.
  • CI/CD 파이프는 나에게 있어서 새로운 것이지만, 나는 그것이 정말 재미있다는 것을 발견했다.GitHub 작업으로 파이핑을 구성했지만 AWS 코드 파이핑을 탐색하고 싶습니다.
  • 결론


    전반적으로 말하면, 나는 이 도전을 좋아하고, 많은 것을 배웠다.이 여정에서 많은 사람들이 나를 도와주었다.나는 네가 이 놀라운 도전을 창조해 준 것에 감사할 것이다.이 도전이나 블로그에 대한 피드백에 정말 감사드립니다.정말 감사합니다.

    GitHub 저장소

    좋은 웹페이지 즐겨찾기