Amazon이 관리하는 Apache 워크플로우에 사용자 정의 연산자 추가

AWS의 Apache Airflow(MWAA) 소개


Amazon Managed Workflows for Apache Airflow(MWAA)는 AWS에서 데이터와 머신러닝 파이프라인을 조율, 관리, 제작할 수 있는 완전한 관리 서비스입니다.
현재 많은 고객들이 EKS, ECS 또는 EC2에서 아파치 에어플로우를 사용하여 파이프를 실행하고 있다. 이 파이프에서 그들은 아파치 에어플로우가 가지고 있는 모든 구성 요소, 예를 들어 스케줄러, 실행기, 웹 서버, 웹UI, 워커스, DB와 에이전트를 관리하는 데 많은 시간을 들여야 한다.또한 모든 보안, 인증, 권한 수여와 로그 기록 옵션을 설정해야 합니다.
Amazon Managed Workflows for Apache Airflow(MWAA)를 사용할 때 AWS는 실례, 저장, 소프트웨어 설치, IAM SSO 통합, 로그 기록(Cloudwatch), 직원 확장과 관련된 모든 구성 요소를 관리하여 사용자 정의 설정을 유연하게 추가하고 작업자, 연결, 센서와 플러그인을 설치할 수 있으며 어떠한 불편도 주지 않는다.

아파치 기류 MWAA에 Slack 플러그인 추가


데이터와 분석, 머신러닝 프로젝트의 증가에 따라 유연성이 더욱 중요해졌습니다. 이로 인해 MWAA에서 사용자 정의 플러그인을 사용하여 우리의 요구에 부합되는 다양한 유형의 작업을 만들 수 있습니다. 이제 플러그인(조작원, 연결, 센서)을 만드는 절차와 그 창설과 모니터링을 어떻게 검증하는지 볼 수 있습니다.
이 과정을 더 잘 이해하기 위해서, 슬랙에 알림 플러그인을 추가하는 방법을 보여 드리겠습니다.
주의: 이 블로그의 모든 코드는 에 있습니다.

내 GitHub 재구매 계약 1. 플러그인 디렉터리 구조


디렉터리 구조는 다음과 같은 형식을 사용해야 합니다. 이 예에서 센서를 사용하지 않았음을 감안하면.
__init__.py
    |-- slack_plugin.py 
hooks/
    |-- __init__.py
    |-- slack_webhook_hook.py
operators/
    |-- __init__.py
    |-- slack_webhook_operator.py

2. 내용


slack 플러그인.이 경우 다음을 사용하여py 파일을 만들어야 합니다.
from airflow.plugins_manager import AirflowPlugin
from hooks.slack_webhook_hook import SlackWebhookHook
from operators.slack_webhook_operator import SlackWebhookOperator

class slack_webhook_operator(SlackWebhookOperator):
  pass

class slack_webhook_hook(SlackWebhookHook):
  pass

class slack_plugin(AirflowPlugin):

    name = 'my_slack_plugin'       
    hooks = [slack_webhook_hook]
    operators = [slack_webhook_operator]
이 코드는 플러그인 이름, 갈고리, 조작부호 이름을 포함하는 클래스를 만들어야 한다는 것을 보여 줍니다. (이 경우 센서를 사용하지 않습니다.)또한 호출 사업자의 경로가 Apache Airflow 1.10.12에서 기본적으로 제공하는 사업자와 다르다는 것을 명심하십시오
PythonOperator 경로(기본 설치와 함께 제공):
from airflow.operators.python_operator import PythonOperator
SlackWebhookOperator 경로(사용자 지정 연산자):
from operators.slack_webhook_operator import SlackWebhookOperator

3. 플러그인 설치


플러그인을 설치하려면 프로젝트plugins 폴더에 있어야 합니다.
MacBook-Pro-73:mwaa czam$  cd plugins
MacBook-Pro-73:mwaa czam$  chmod -R 755 .
MacBook-Pro-73:mwaa czam$  zip -r plugins.zip .
플러그인을 압축하면zip 파일은 MWAA 구성에 지정된 S3 스토리지 통에 업로드해야 합니다.

플러그인을 열면zip 파일이 지정된 경로로 업로드되면 MWAA 환경으로 이동하여 새로 업로드된 파일을 선택해야 합니다.우리는 다음 단계를 클릭한 후에 저장합니다.이렇게 하면 환경이 업데이트 상태로 진입하고 사용 가능한 상태로 돌아가는 데 몇 분의 시간이 걸릴 것이다.

4. 설치 요구 사항


요구txt 파일은 설치해야 하는 패키지와 함께 로드되어야 합니다. 이 경우 파일은 다음과 같습니다.
apache-airflow[slack]
플러그인도 같은 절차를 수행해야 한다는 것을 주의하십시오.MWAA 요구 사항을 쉽게 파악할 수 있는 지퍼txt

5. 사용자 정의 플러그인 테스트


플러그인이 설치되어 있는지 확인하려면 DAG를 만들고 SlackWebhook Operator 작업을 수행해야 합니다.다음 코드는 SlackWebhookOperator가 있는 작업에 적용됩니다.
task1 = SlackWebhookOperator(
    task_id='task_1',
    http_conn_id='slack_connection',
    message='I am the task 1',
    channel='#airflowchannel',
    dag=dag
  )
이 설정을 완성하려면connections에 연결을 추가해야 합니다. 이 연결은 slack에서 만든 웹훅에 포함되어야 합니다.
연결 옵션으로 이동합니다.

다음과 같이 HTTP 연결을 추가해야 합니다.

Slack에서 웹훅을 만들려면 다음 절차를 따르십시오.
설정이 끝난 후,dag를 활성화하고, 빈 채널에서 알림이 도착했는지 확인합니다.

이것들 6. 당신의 플러그인을 감시합니다


MWAA 환경을 모니터링하기 위해 CloudWatch에서 로그 그룹을 볼 수 있습니다. 이 로그 그룹에서 스케줄러, 웹 서버, 작업, 각각의 DAG 상태를 볼 수 있습니다.

또한 Apache Airflow 콘솔에서 트리 보기에서 각 작업의 수행 횟수와 각각의 상태를 볼 수 있습니다.

경험과 모범 사례

  • 환경을 만들 때 모든 구성 요소에 로그 기록을 사용하면 전체 환경의 상태를 더 잘 알 수 있습니다.
  • 사용자 정의 작업을 수행하려면 사용자 정의 연산자를 만들고 플러그인에 추가하는 것이 좋습니다.
  • 공식 Apache Airflow 문서에서 사용할 수 있는 조작원, 연결기, 센서를 대량으로 찾을 수 있습니다.
  • AWS 서비스와 상호작용을 하려면 서로 다른 방식으로 Python Operator와boto3 라이브러리를 사용할 수 있고 AWS와의 연결을 만들고 사용자 정의 조작부호를 통해 서비스와 상호작용을 할 수 있다.
  • 는 데이터와 분석 프로젝트를 위한 파이프라인을 만들 수도 있고 운영자와 sage maker 등 서비스를 이용하여 연결할 수도 있다.
  • 이 글의 모든 내용을 찾을 수 있다Machine Learning.
    --

    좋은 웹페이지 즐겨찾기