AWS의 이벤트 구동 Python
도전 개요
이 도전의 창조자다.이 도전의 주요 목표는 파이썬과 클라우드 서비스로 2019 관상바이러스 질병 데이터를 자동화하는 ETL 처리 파이프라인을 사용하는 것이다.이 문제에 대한 자세한 내용은 here 를 참조하십시오.
도전 단계
이제 나는 내가 따르는 도전의 모든 절차를 완성하고 싶다.
이제 나는 내가 따르는 도전의 모든 절차를 완성하고 싶다.
단계 1 - 변환
이 도전의 수요 중 하나는 전환 작업을 위한 단독python 모듈을 만드는 것이다.그래서 나는 데이터 변환을 위해 단독python 코드를 만들었다.
우선, 나는 국가명에 따라 이 두 개의 데이터를 필터했다. 왜냐하면 여기에는 미국의 2019 관상바이러스 질병 데이터만 필요하기 때문이다.다음에python DateTime 데이터 형식에 불필요한 열을 삭제했습니다. (date가 존재하고 변환된 경우)마지막으로 보고 날짜에 따라 이 두 데이터를 연결하고 싶습니다. 이 점을 실현하기 위해 데이터 프레임 인덱스를 변경했습니다.이 함수는 데이터가 어디에 저장되는지도 모르고 데이터베이스도 모르기 때문에 완전히 추상적이다.
def transform(dfNYT,dfJH):
dfJH = dfJH[dfJH['Country/Region']=='US'].drop(columns='Country/Region')
dfJH.columns = ['date','recovered']
dfNYT['date'] = pd.to_datetime(dfNYT['date'],format='%Y-%m-%d')
dfJH['date'] = pd.to_datetime(dfJH['date'],format='%Y-%m-%d')
dfNYT.set_index('date', inplace=True)
dfJH.set_index('date',inplace=True)
dfJH['recovered'] = dfJH['recovered'].astype('int64')
dfFinal = dfNYT.join(dfJH, how='inner')
dfFinal.reset_index(inplace=True)
return dfFinal
단계 2 - 변환된 데이터를 데이터베이스에 로드합니다.
이 작업을 수행하기 위해 Lambda 함수와 RDS PostgreSQL 데이터베이스를 사용했습니다.PostgreSQL에는 python에 사용할 데이터베이스 어댑터가 있습니다. 이름은 psycopg2 입니다.PostgreSQL 데이터베이스에 연결하여 다양한 SQL 조회를 수행할 수 있습니다.
나는 두 CSV 파일의 URL, 데이터베이스 정보(단점, 포트, 사용자 이름, 비밀번호, 구역) 등 필요한 값을 저장하기 위해 Lambda 환경 변수를 사용합니다. 코드를 더욱 구조화하기 위해 이 작업을 하위 작업으로 나누어 함수를 만들었습니다.
etl
라는 표가 있는지 검사했습니다.만약 사용할 수 없다면, lambda 함수는 먼저 표를 만들고 첫 번째 데이터를 삽입합니다.etl
테이블을 사용할 수 있다면, 일일 데이터 삽입을 검사합니다.데이터 삽입을 최적화하기 위해 나는 몇 가지 방법을 시도했다. 예를 들어
executemany()
, mogrify()
과 많은 다른 방법이다.마지막으로 나는 다음과 같은 방법을 채택했다.for i in dfFinal.index:
row = (dfFinal.loc[i,'date'], int(dfFinal.loc[i,'cases']),int(dfFinal.loc[i,'deaths']),int(dfFinal.loc[i,'recovered']))
data.append(row)
records = ','.join(['%s'] * len(data))
query = "insert into etl (reportdate,cases,deaths,recovered) values{}".format(records)
3단계 - 데이터베이스가 업데이트될 때 고객에게 통지
이를 위해 AWS의 SNS 서비스를 사용했습니다.저는 lambda에서 함수를 만들었습니다. 데이터베이스에 업데이트가 있을 때 메시지를 SNS에 발표합니다.
def notify(text):
try:
sns = boto3.client('sns')
sns.publish(TopicArn = snsARN, Message = text)
except Exception as e:
print("Not able to send SMS due to {}".format(e))
exit(1)
단계 4 - 오류 처리
원활한 작업 흐름에 있어서 오류 처리가 매우 중요하다.나의 코드는 각종 오류를 관리할 수 있다.좋아하다
database_connection()
함수가 원인을 알리고 설명합니다.단계 5 - 터치 기능 매일 1회
이러한 CSV 파일은 매일 한 번씩 업데이트됩니다.따라서 데이터의 업데이트를 연속적으로 검사할 필요가 없다.이것이 바로 내가 매일 람다 함수를 호출하는 이벤트 규칙을 설정한 이유이다.너는 같은 강좌here를 찾을 수 있다.
단계 6 - 인프라 코드
나에게 있어서 이것은 가장 도전적인 부분 중의 하나이다. 왜냐하면 나는 구름 속에서 공을 칠 시간이 많지 않기 때문이다.나는 YAML에서 요소를 하나씩 만드는 것부터 시작한다.마지막으로, 나는 그렇게 많은 어려운 임무를 발견하지 못했다. 왜냐하면 AWS는 모든 유형의 AWS 서비스에 매우 목적성이 있고 묘사가 좋은 문서를 제공했기 때문이다.심지어 저는 최종적으로 CloudFormation을 통해 RDS 실례를 만들었습니다. 이것이 바로 제가 먼저 lambda 함수에서 표가 존재하는지 확인하는 이유입니다.전반적으로 말하자면, 이 부분은 나로 하여금 IaC에 대해 더 많은 것을 알게 했다.내 CloudFormation 템플릿here을 볼 수 있습니다.
단계 7-CI/CD 파이프라인 및 소스 코드 관리
이 부분은 선택할 수 있지만 저는 이렇게 하고 싶습니다. 왜냐하면python 코드나 기초 구조를 변경할 때마다 AWS에서 수동으로 업데이트하고 싶지 않기 때문입니다.소스 코드 제어는 매우 편리하기 때문에 GitHub를 사용합니다.CI/CD 파이핑을 구성하기 위해 GitHub 작업을 선택했습니다.이제 GitHub 저장소로 변경 사항을 전송할 때마다 GitHub 작업은 파이핑을 실행하고 필요한 업데이트를 수행합니다.
단계 8-Quicksight 대시보드
마지막 임무는 미국 환자 수, 회복률, 사망률, 일일 환자 증가 등을 가상화하는 것이다.
주요 과제 및 학습
전체 프로젝트에서 나는 많은 도전에 직면했고 심지어 그 속에서 교훈을 얻었다.
결론
전반적으로 말하면, 나는 이 도전을 좋아하고, 많은 것을 배웠다.이 여정에서 많은 사람들이 나를 도와주었다.나는 네가 이 놀라운 도전을 창조해 준 것에 감사할 것이다.이 도전이나 블로그에 대한 피드백에 정말 감사드립니다.정말 감사합니다.
GitHub 저장소
Reference
이 문제에 관하여(AWS의 이벤트 구동 Python), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://dev.to/bansimendapara/cloudguruchallenge-september-60l
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
GitHub 저장소
Reference
이 문제에 관하여(AWS의 이벤트 구동 Python), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/bansimendapara/cloudguruchallenge-september-60l텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)