Airflow 플러그인 - 맞춤형 Airflow 플러그인을 작성한 방법

꽤 오랜 시간 동안 Apache Airflow 을 사용해 왔으며 일부 레거시 이유로 Version: 1.10.12 을 사용하고 있습니다. 향후 최신 버전 2.3.3 으로 업그레이드할 수 있습니다.



어쨌든 우리는 즉석에서 create , terminate ec2 인스턴스에 대한 요구 사항이 있었습니다. 1.10.12 버전에서는 apache-airflow-backport-providers-amazon 을 설치해야 합니다.

따라서 apache-airflow-backport-providers-amazon ec2 을 지원하지만 EC2StartInstanceOperator가 알려진 경우 EC2StopInstanceOperator 사용을 시작하고 instance_id 사용을 중지하는 것으로만 제한됩니다. 생성 및 종료 기능이 없습니다.

So I decided to take some learnings from ec2 operator and extend it with create and terminate functionality.



그럼 먼저 폴더 구조를 이해하고 코드를 살펴보겠습니다.




airflow-ec2-plugin-extended
      ├── __init__.py
      ├── ec2_extended_plugins.py
      ├── hooks
      │   ├── __init__.py
      │   └── ec2_instance_hooks.py
      ├── operators
      │   ├── __init__.py
      │   ├── ec2_create_instance.py
      │   └── ec2_terminate_instance.py
      ├── requirements.txt
      └── venv


TL;DR the plugin code is available here -> airflow-ec2-plugin-extended




ec2_extended_plugins.py



ec2_extended_plugins.py 에는 EC2ExtendedPlugins의 후크 EC2ExtendedHooks 및 연산자 EC2ExtendedCreateInstance , EC2ExtendedTerminateInstance 에 대한 정의가 포함되어 있습니다. 기본적으로 ec2_extended_plugins.py 모두 함께 스티치(후크 및 오퍼레이터)


ec2_instance_hooks.py



ec2_instance_hooks.py 에는 2개의 메서드가 포함된 클래스EC2ExtendedHooks가 있습니다.
  • create_instance
  • terminate_instance



  • create_instance 다음 입력 인수를 사용합니다.


    인수 이름
    값 유형
    기본
    필수의

    subnet_idstring없음

    security_group_idsList[str]없음

    image_idstring없음

    instance_typestring없음

    region_namestring없음

    key_namestring없음

    tagsList[Dict[str, str]][{'ResourceType': 'instance','Tags': tags}]아니
    iam_instance_profilestring없음
    아니
    user_datastring없음
    아니
    min_countint1아니
    max_countint1아니


    그리고 Instance Object를 반환합니다.


    terminate_instance 다음 입력 인수를 사용합니다.


    인수 이름
    값 유형
    기본
    필수의

    instance_idstring없음

    region_namestring없음



    그리고 아무것도 반환하지 않습니다. create_instance terminate_instance 모두 기본 기능에 대해 EC2ExtendedCreateInstance 을 상속하는 연산자 클래스 EC2ExtendedTerminateInstance BaseOperator 에 의해 구동됩니다.


    사용하는 방법



    airflow-ec2-plugin-extended 플러그인이 설치되고 dag가 활성화되면 기류 그래프 보기에 다음과 같은 내용이 표시됩니다.



    인스턴스 dag 코드 스니펫 생성




    from operators.ec2_create_instance import EC2ExtendedCreateInstance
    from operators.ec2_terminate_instance import EC2ExtendedTerminateInstance
    ....
    ....
    ....
    create_ec2 = EC2ExtendedCreateInstance(
        subnet_id=bridge_subnet,
        security_group_ids=bridge_security_group_ids,
        image_id=bridge_image_id,
        instance_type='t2.medium',
        key_name='searchops-pipeline-dev',
        tags=[{"Key": "name", "Value": "AutoDeployed via MWAA Pipeline"}],
        aws_conn_id='aws_default',
        region_name='us-east-1',
        task_id='create_ec2',
    )
    
    create_ec2 >> terminate_ec2
    


    연산자의 위 코드 뱅크 조각은 제공된 인수를 사용하여 ec2 인스턴스를 생성하고 결과를 XCom - create_ec2 에 저장합니다.

    XComs let tasks exchange messages, allowing more nuanced forms of control and shared state. The name is an abbreviation of “cross-communication”. XComs are principally defined by a key, value, and timestamp, but also track attributes like the task/DAG that created the XCom and when it should become visible. Any object that can be pickled can be used as an XCom value, so users should make sure to use objects of appropriate size.






    인스턴스 dag 코드 스니펫 종료




    from operators.ec2_create_instance import EC2ExtendedCreateInstance
    from operators.ec2_terminate_instance import EC2ExtendedTerminateInstance
    ....
    ....
    ....
    terminate_ec2 = EC2ExtendedTerminateInstance(
        instance_id="{{ task_instance.xcom_pull('create_ec2', dag_id=DAG_ID, key='return_value')[0] }}",
        region_name='us-east-1',
        task_id='terminate_ec2',
     )
    )
    
    create_ec2 >> terminate_ec2
    

    terminate 동일한 인스턴스로 XCom - instance_id 에서 값(기본적으로 create_ec2)을 가져와 instance_id 인수의 값으로 전달합니다.



    You may ask why ? [0] while fetching the values from XCom - create_ec2. So, when EC2ExtendedCreateInstance stores the value in XCom - create_ec2, it store them as List[str] in order of instance_id and private_ip_address
    So, we are fetching first element of the List[str] as we need the instance_id to terminate the instance.





    여기에 동일한 항목example dag이 있습니다.

    필요한 경우 사용자 지정Apache Airflow 플러그인을 작성하는 방법을 이해하는 데 도움이 되기를 바랍니다.

    행복한 코딩!!

    좋은 웹페이지 즐겨찾기