Airflow 플러그인 - 맞춤형 Airflow 플러그인을 작성한 방법
1.10.12
을 사용하고 있습니다. 향후 최신 버전 2.3.3
으로 업그레이드할 수 있습니다.어쨌든 우리는 즉석에서
create
, terminate
ec2
인스턴스에 대한 요구 사항이 있었습니다. 1.10.12
버전에서는 apache-airflow-backport-providers-amazon
을 설치해야 합니다.따라서
apache-airflow-backport-providers-amazon
은 ec2
을 지원하지만 EC2StartInstanceOperator
가 알려진 경우 EC2StopInstanceOperator
사용을 시작하고 instance_id
사용을 중지하는 것으로만 제한됩니다. 생성 및 종료 기능이 없습니다.So I decided to take some learnings from
ec2
operator and extend it with create and terminate functionality.
그럼 먼저 폴더 구조를 이해하고 코드를 살펴보겠습니다.
airflow-ec2-plugin-extended
├── __init__.py
├── ec2_extended_plugins.py
├── hooks
│ ├── __init__.py
│ └── ec2_instance_hooks.py
├── operators
│ ├── __init__.py
│ ├── ec2_create_instance.py
│ └── ec2_terminate_instance.py
├── requirements.txt
└── venv
TL;DR the plugin code is available here ->
airflow-ec2-plugin-extended
ec2_extended_plugins.py
ec2_extended_plugins.py
에는 EC2ExtendedPlugins
의 후크 EC2ExtendedHooks
및 연산자 EC2ExtendedCreateInstance
, EC2ExtendedTerminateInstance
에 대한 정의가 포함되어 있습니다. 기본적으로 ec2_extended_plugins.py
모두 함께 스티치(후크 및 오퍼레이터)ec2_instance_hooks.py
ec2_instance_hooks.py
에는 2개의 메서드가 포함된 클래스EC2ExtendedHooks
가 있습니다.create_instance
terminate_instance
create_instance
다음 입력 인수를 사용합니다.인수 이름
값 유형
기본
필수의
subnet_id
string
없음예
security_group_ids
List[str]
없음예
image_id
string
없음예
instance_type
string
없음예
region_name
string
없음예
key_name
string
없음예
tags
List[Dict[str, str]]
[{'ResourceType': 'instance','Tags': tags}]
아니iam_instance_profile
string
없음아니
user_data
string
없음아니
min_count
int
1
아니max_count
int
1
아니그리고
Instance Object
를 반환합니다.terminate_instance
다음 입력 인수를 사용합니다.인수 이름
값 유형
기본
필수의
instance_id
string
없음예
region_name
string
없음예
그리고 아무것도 반환하지 않습니다.
create_instance
및 terminate_instance
모두 기본 기능에 대해 EC2ExtendedCreateInstance
을 상속하는 연산자 클래스 EC2ExtendedTerminateInstance
및 BaseOperator
에 의해 구동됩니다.사용하는 방법
airflow-ec2-plugin-extended
플러그인이 설치되고 dag
가 활성화되면 기류 그래프 보기에 다음과 같은 내용이 표시됩니다.인스턴스 dag 코드 스니펫 생성
from operators.ec2_create_instance import EC2ExtendedCreateInstance
from operators.ec2_terminate_instance import EC2ExtendedTerminateInstance
....
....
....
create_ec2 = EC2ExtendedCreateInstance(
subnet_id=bridge_subnet,
security_group_ids=bridge_security_group_ids,
image_id=bridge_image_id,
instance_type='t2.medium',
key_name='searchops-pipeline-dev',
tags=[{"Key": "name", "Value": "AutoDeployed via MWAA Pipeline"}],
aws_conn_id='aws_default',
region_name='us-east-1',
task_id='create_ec2',
)
create_ec2 >> terminate_ec2
연산자의 위 코드 뱅크 조각은 제공된 인수를 사용하여
ec2
인스턴스를 생성하고 결과를 XCom - create_ec2
에 저장합니다.XComs let tasks exchange messages, allowing more nuanced forms of control and shared state. The name is an abbreviation of “cross-communication”. XComs are principally defined by a key, value, and timestamp, but also track attributes like the task/DAG that created the XCom and when it should become visible. Any object that can be pickled can be used as an XCom value, so users should make sure to use objects of appropriate size.
인스턴스 dag 코드 스니펫 종료
from operators.ec2_create_instance import EC2ExtendedCreateInstance
from operators.ec2_terminate_instance import EC2ExtendedTerminateInstance
....
....
....
terminate_ec2 = EC2ExtendedTerminateInstance(
instance_id="{{ task_instance.xcom_pull('create_ec2', dag_id=DAG_ID, key='return_value')[0] }}",
region_name='us-east-1',
task_id='terminate_ec2',
)
)
create_ec2 >> terminate_ec2
terminate
동일한 인스턴스로 XCom - instance_id
에서 값(기본적으로 create_ec2
)을 가져와 instance_id
인수의 값으로 전달합니다.You may ask why ?
[0]
while fetching the values from XCom -create_ec2
. So, whenEC2ExtendedCreateInstance
stores the value in XCom -create_ec2
, it store them asList[str]
in order ofinstance_id
andprivate_ip_address
So, we are fetching first element of theList[str]
as we need theinstance_id
toterminate
the instance.
여기에 동일한 항목example dag이 있습니다.
필요한 경우 사용자 지정Apache Airflow 플러그인을 작성하는 방법을 이해하는 데 도움이 되기를 바랍니다.
행복한 코딩!!
Reference
이 문제에 관하여(Airflow 플러그인 - 맞춤형 Airflow 플러그인을 작성한 방법), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/deadlock/airflow-plugin-how-i-wrote-custom-airflow-plugins-3pga텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)