Amazon이 관리하는 Apache 워크플로우에 사용자 정의 연산자 추가
AWS의 Apache Airflow(MWAA) 소개
Amazon Managed Workflows for Apache Airflow(MWAA)는 AWS에서 데이터와 머신러닝 파이프라인을 조율, 관리, 제작할 수 있는 완전한 관리 서비스입니다.
현재 많은 고객들이 EKS, ECS 또는 EC2에서 아파치 에어플로우를 사용하여 파이프를 실행하고 있다. 이 파이프에서 그들은 아파치 에어플로우가 가지고 있는 모든 구성 요소, 예를 들어 스케줄러, 실행기, 웹 서버, 웹UI, 워커스, DB와 에이전트를 관리하는 데 많은 시간을 들여야 한다.또한 모든 보안, 인증, 권한 수여와 로그 기록 옵션을 설정해야 합니다.
Amazon Managed Workflows for Apache Airflow(MWAA)를 사용할 때 AWS는 실례, 저장, 소프트웨어 설치, IAM SSO 통합, 로그 기록(Cloudwatch), 직원 확장과 관련된 모든 구성 요소를 관리하여 사용자 정의 설정을 유연하게 추가하고 작업자, 연결, 센서와 플러그인을 설치할 수 있으며 어떠한 불편도 주지 않는다.
아파치 기류
MWAA에 Slack 플러그인 추가
데이터와 분석, 머신러닝 프로젝트의 증가에 따라 유연성이 더욱 중요해졌습니다. 이로 인해 MWAA에서 사용자 정의 플러그인을 사용하여 우리의 요구에 부합되는 다양한 유형의 작업을 만들 수 있습니다. 이제 플러그인(조작원, 연결, 센서)을 만드는 절차와 그 창설과 모니터링을 어떻게 검증하는지 볼 수 있습니다.
이 과정을 더 잘 이해하기 위해서, 슬랙에 알림 플러그인을 추가하는 방법을 보여 드리겠습니다.
주의: 이 블로그의 모든 코드는
에 있습니다.
내 GitHub 재구매 계약
1. 플러그인 디렉터리 구조
디렉터리 구조는 다음과 같은 형식을 사용해야 합니다. 이 예에서 센서를 사용하지 않았음을 감안하면.
__init__.py
|-- slack_plugin.py
hooks/
|-- __init__.py
|-- slack_webhook_hook.py
operators/
|-- __init__.py
|-- slack_webhook_operator.py
2. 내용
slack 플러그인.이 경우 다음을 사용하여py 파일을 만들어야 합니다.
from airflow.plugins_manager import AirflowPlugin
from hooks.slack_webhook_hook import SlackWebhookHook
from operators.slack_webhook_operator import SlackWebhookOperator
class slack_webhook_operator(SlackWebhookOperator):
pass
class slack_webhook_hook(SlackWebhookHook):
pass
class slack_plugin(AirflowPlugin):
name = 'my_slack_plugin'
hooks = [slack_webhook_hook]
operators = [slack_webhook_operator]
이 코드는 플러그인 이름, 갈고리, 조작부호 이름을 포함하는 클래스를 만들어야 한다는 것을 보여 줍니다. (이 경우 센서를 사용하지 않습니다.)또한 호출 사업자의 경로가 Apache Airflow 1.10.12에서 기본적으로 제공하는 사업자와 다르다는 것을 명심하십시오
PythonOperator 경로(기본 설치와 함께 제공):
from airflow.operators.python_operator import PythonOperator
SlackWebhookOperator 경로(사용자 지정 연산자):
from operators.slack_webhook_operator import SlackWebhookOperator
3. 플러그인 설치
플러그인을 설치하려면 프로젝트plugins 폴더에 있어야 합니다.
MacBook-Pro-73:mwaa czam$ cd plugins
MacBook-Pro-73:mwaa czam$ chmod -R 755 .
MacBook-Pro-73:mwaa czam$ zip -r plugins.zip .
플러그인을 압축하면zip 파일은 MWAA 구성에 지정된 S3 스토리지 통에 업로드해야 합니다.
플러그인을 열면zip 파일이 지정된 경로로 업로드되면 MWAA 환경으로 이동하여 새로 업로드된 파일을 선택해야 합니다.우리는 다음 단계를 클릭한 후에 저장합니다.이렇게 하면 환경이 업데이트 상태로 진입하고 사용 가능한 상태로 돌아가는 데 몇 분의 시간이 걸릴 것이다.
4. 설치 요구 사항
요구txt 파일은 설치해야 하는 패키지와 함께 로드되어야 합니다. 이 경우 파일은 다음과 같습니다.
apache-airflow[slack]
플러그인도 같은 절차를 수행해야 한다는 것을 주의하십시오.MWAA 요구 사항을 쉽게 파악할 수 있는 지퍼txt
5. 사용자 정의 플러그인 테스트
플러그인이 설치되어 있는지 확인하려면 DAG를 만들고 SlackWebhook Operator 작업을 수행해야 합니다.다음 코드는 SlackWebhookOperator가 있는 작업에 적용됩니다.
task1 = SlackWebhookOperator(
task_id='task_1',
http_conn_id='slack_connection',
message='I am the task 1',
channel='#airflowchannel',
dag=dag
)
이 설정을 완성하려면connections에 연결을 추가해야 합니다. 이 연결은 slack에서 만든 웹훅에 포함되어야 합니다.
연결 옵션으로 이동합니다.
다음과 같이 HTTP 연결을 추가해야 합니다.
Slack에서 웹훅을 만들려면 다음 절차를 따르십시오.
설정이 끝난 후,dag를 활성화하고, 빈 채널에서 알림이 도착했는지 확인합니다.
이것들
6. 당신의 플러그인을 감시합니다
MWAA 환경을 모니터링하기 위해 CloudWatch에서 로그 그룹을 볼 수 있습니다. 이 로그 그룹에서 스케줄러, 웹 서버, 작업, 각각의 DAG 상태를 볼 수 있습니다.
또한 Apache Airflow 콘솔에서 트리 보기에서 각 작업의 수행 횟수와 각각의 상태를 볼 수 있습니다.
경험과 모범 사례
데이터와 분석, 머신러닝 프로젝트의 증가에 따라 유연성이 더욱 중요해졌습니다. 이로 인해 MWAA에서 사용자 정의 플러그인을 사용하여 우리의 요구에 부합되는 다양한 유형의 작업을 만들 수 있습니다. 이제 플러그인(조작원, 연결, 센서)을 만드는 절차와 그 창설과 모니터링을 어떻게 검증하는지 볼 수 있습니다.
이 과정을 더 잘 이해하기 위해서, 슬랙에 알림 플러그인을 추가하는 방법을 보여 드리겠습니다.
주의: 이 블로그의 모든 코드는 에 있습니다.
내 GitHub 재구매 계약 1. 플러그인 디렉터리 구조
디렉터리 구조는 다음과 같은 형식을 사용해야 합니다. 이 예에서 센서를 사용하지 않았음을 감안하면.
__init__.py
|-- slack_plugin.py
hooks/
|-- __init__.py
|-- slack_webhook_hook.py
operators/
|-- __init__.py
|-- slack_webhook_operator.py
2. 내용
slack 플러그인.이 경우 다음을 사용하여py 파일을 만들어야 합니다.
from airflow.plugins_manager import AirflowPlugin
from hooks.slack_webhook_hook import SlackWebhookHook
from operators.slack_webhook_operator import SlackWebhookOperator
class slack_webhook_operator(SlackWebhookOperator):
pass
class slack_webhook_hook(SlackWebhookHook):
pass
class slack_plugin(AirflowPlugin):
name = 'my_slack_plugin'
hooks = [slack_webhook_hook]
operators = [slack_webhook_operator]
이 코드는 플러그인 이름, 갈고리, 조작부호 이름을 포함하는 클래스를 만들어야 한다는 것을 보여 줍니다. (이 경우 센서를 사용하지 않습니다.)또한 호출 사업자의 경로가 Apache Airflow 1.10.12에서 기본적으로 제공하는 사업자와 다르다는 것을 명심하십시오PythonOperator 경로(기본 설치와 함께 제공):
from airflow.operators.python_operator import PythonOperator
SlackWebhookOperator 경로(사용자 지정 연산자):from operators.slack_webhook_operator import SlackWebhookOperator
3. 플러그인 설치
플러그인을 설치하려면 프로젝트plugins 폴더에 있어야 합니다.
MacBook-Pro-73:mwaa czam$ cd plugins
MacBook-Pro-73:mwaa czam$ chmod -R 755 .
MacBook-Pro-73:mwaa czam$ zip -r plugins.zip .
플러그인을 압축하면zip 파일은 MWAA 구성에 지정된 S3 스토리지 통에 업로드해야 합니다.플러그인을 열면zip 파일이 지정된 경로로 업로드되면 MWAA 환경으로 이동하여 새로 업로드된 파일을 선택해야 합니다.우리는 다음 단계를 클릭한 후에 저장합니다.이렇게 하면 환경이 업데이트 상태로 진입하고 사용 가능한 상태로 돌아가는 데 몇 분의 시간이 걸릴 것이다.
4. 설치 요구 사항
요구txt 파일은 설치해야 하는 패키지와 함께 로드되어야 합니다. 이 경우 파일은 다음과 같습니다.
apache-airflow[slack]
플러그인도 같은 절차를 수행해야 한다는 것을 주의하십시오.MWAA 요구 사항을 쉽게 파악할 수 있는 지퍼txt5. 사용자 정의 플러그인 테스트
플러그인이 설치되어 있는지 확인하려면 DAG를 만들고 SlackWebhook Operator 작업을 수행해야 합니다.다음 코드는 SlackWebhookOperator가 있는 작업에 적용됩니다.
task1 = SlackWebhookOperator(
task_id='task_1',
http_conn_id='slack_connection',
message='I am the task 1',
channel='#airflowchannel',
dag=dag
)
이 설정을 완성하려면connections에 연결을 추가해야 합니다. 이 연결은 slack에서 만든 웹훅에 포함되어야 합니다.연결 옵션으로 이동합니다.
다음과 같이 HTTP 연결을 추가해야 합니다.
Slack에서 웹훅을 만들려면 다음 절차를 따르십시오.
설정이 끝난 후,dag를 활성화하고, 빈 채널에서 알림이 도착했는지 확인합니다.
이것들 6. 당신의 플러그인을 감시합니다
MWAA 환경을 모니터링하기 위해 CloudWatch에서 로그 그룹을 볼 수 있습니다. 이 로그 그룹에서 스케줄러, 웹 서버, 작업, 각각의 DAG 상태를 볼 수 있습니다.
또한 Apache Airflow 콘솔에서 트리 보기에서 각 작업의 수행 횟수와 각각의 상태를 볼 수 있습니다.
경험과 모범 사례
--
Reference
이 문제에 관하여(Amazon이 관리하는 Apache 워크플로우에 사용자 정의 연산자 추가), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/aws-builders/adding-custom-operators-on-amazon-managed-workflows-for-apache-airflow-1cg텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)