Airflow의 Kubernetes Pod Operator를 사용해 보세요.

48816 단어 Pythontech

Kubbernetes PodOperator란 무엇입니까?

  • Kubernetes Cluster에서 Pod를 제작해 실행하는 에어플로우의 Operator.

  • Kubernetes Executor와는 다르다.
  • 어떻게 움직여요?

  • Operator 내에서 kubernetes-client를 사용하여 동작한다.
  • Operator 내부https://github.com/apache/airflow/blob/main/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py)
  • Cluster의 설정 사용airflow.cfgconfig_file 매개 변수.
  • 설정하지 않은 경우~/.kube/config
  • YAML의 정의는 airflow.cfg 내의 매개 변수pod_template_file로도 정의할 수 있다.
  • 사전 준비

  • https://pypi.org/project/apache-airflow-providers-cncf-kubernetes/
  • alias k=kubectl
  • 간단한 Pod 만들기


    만들고 싶은Pod의yaml 이미지


    apiVersion: v1
    kind: Pod
    metadata:
      labels:
        foo: bar
      name: hello-pod-work
    spec:
      containers:
      - image: debian
        name: hello-pod-work
    

    DAG 샘플

  • main.py
  • from airflow import DAG
    from airflow.utils.dates import days_ago
    from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
        KubernetesPodOperator,
    )
    
    default_args = {
        'owner': '467',
        'depends_on_past': False,
        'start_date': days_ago(2),
    }
    
    with DAG(
        'test_kubernetes_pod_operator_work',
        default_args=default_args,
        description='KubernetesPodOperatorを試す',
        tags=["work"],
    ) as dag:
        dag.doc_md = """
        KubernetesPodOperatorを試す
        """
        k = KubernetesPodOperator(
            namespace='default',
            name="hello-pod-work",
            image="debian",
            labels={"foo": "bar"},
            task_id="dry_run_demo",
            do_xcom_push=False,
            in_cluster=False,
        )
    
  • DAG 실행 후 Pod 제거
  • $ k get pod
    No resources found in default namespace.
    

    시크릿을 해보도록 하겠습니다.

  • 시크릿 2개 미리 준비
  • $ k create secret generic airflow-secrets --from-literal=sql_alchemy_conn=hoge
    secret/airflow-secrets created
    $ k create secret generic airflow-secrets-2 --from-literal=sql_alchemy_conn2=hoge2
    secret/airflow-secrets-2 created
    

    만들고 싶은Pod의yaml 이미지


    apiVersion: v1
    kind: Pod
    metadata:
      labels:
        foo: bar
      name: hello-pod-work
    spec:
      containers:
      - image: debian
        name: hello-pod-work
        volumeMounts:
        - mountPath: /etc/sql_conn
          name: hoge
          readOnly: true
        env:
        - name: SQL_CONN
          valueFrom:
            secretKeyRef:
              name: airflow-secrets
              key: sql_alchemy_conn
        envFrom:
        - secretRef:
            name: airflow-secrets-2
      volumes:
      - name: hoge
        secret:
          secretName: airflow-secrets
    

    DAG 샘플

  • main.py
  • 추가 또는 변경된 부분만 기재
  • from kubernetes_pod_operator_work.dags import config_storage_apis
    ・・・
        k = KubernetesPodOperator(
            namespace='default',
            name="hello-pod-work",
            image="debian",
            labels={"foo": "bar"},
            task_id="dry_run_demo",
            secrets=[
                config_storage_apis.secret_file(),
                config_storage_apis.secret_env(),
                config_storage_apis.secret_all_keys()],
    	do_xcom_push=False,
            in_cluster=False,
        )
    
  • config_storage_apis.py
  • from airflow.kubernetes.secret import Secret
    
    def secret_file():
        return Secret(
            deploy_type='volume', deploy_target='/etc/sql_conn',
            secret='airflow-secrets'
            )
    
    def secret_env():
        return Secret(
            deploy_type='env', deploy_target='SQL_CONN',
            secret='airflow-secrets', key='sql_alchemy_conn'
    	)
    
    def secret_all_keys():
        return Secret(
            deploy_type='env', deploy_target=None,
            secret='airflow-secrets-2'
    	)
    

    mount PersistentVolumeClaim(PVC)

  • 사전준비STATUSBound된pvc
  • $ k get pvc
    NAME                  STATUS    VOLUME           CAPACITY   ACCESS MODES   STORAGECLASS    AGE
    my-pvc                Bound     my-pv-hostpath   1Gi        RWO            manual          3s
    $ k get pv
    NAME             CAPACITY   ACCESS MODES   RECLAIM POLICY   STATUS      CLAIM            STORAGECLASS    REASON   AGE
    my-pv-hostpath   1Gi        RWO            Retain           Bound       default/my-pvc   manual                   2m24s
    

    만들고 싶은Pod의yaml 이미지


    apiVersion: v1
    kind: Pod
    metadata:
      labels:
        foo: bar
      name: hello-pod-work
    spec:
      containers:
      - image: debian
        name: hello-pod-work
        volumeMounts:
        - name: test-volume
          mountPath: /root/mount_file
      volumes:
      - name: test-volume
        persistentVolumeClaim:
            claimName: my-pvc
    

    DAG 샘플

  • main.py
  • 추가 또는 변경된 부분만 기재
  •     k = KubernetesPodOperator(
            namespace='default',
            name="hello-pod-work",
            image="debian",
            labels={"foo": "bar"},
            task_id="dry_run_demo",
            volumes=[config_storage_apis.volume()],
            volume_mounts=[config_storage_apis.volume_mount()],
            do_xcom_push=False,
            in_cluster=False,
        )
    
  • config_storage_apis.py
  • 추가 또는 변경된 부분만 기재
  • def volume_mount():
        return k8s.V1VolumeMount(
            mount_path='/root/mount_file', name='test-volume',
            read_only=True, sub_path=None, sub_path_expr=None,
            mount_propagation=None
            )
    
    def volume():
        return k8s.V1Volume(
            name='test-volume',
            persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(
                claim_name='my-pvc'),
            )
    

    mount Configmap

  • 사전에 configmap
  • 준비
    $ k create configmap test-configmap-1 --from-literal=key1=value1
    configmap/test-configmap-1 created
    $ k create configmap test-configmap-2 --from-literal=key1=value1
    configmap/test-configmap-2 created
    

    만들고 싶은Pod의yaml 이미지


    apiVersion: v1
    kind: Pod
    metadata:
      labels:
        foo: bar
      name: hello-pod-work
    spec:
      containers:
      - image: debian
        name: hello-pod-work
        envFrom:
        - configMapRef:
            name: test-configmap-1
        - configMapRef:
            name: test-configmap-2
    

    DAG 샘플

  • main.py
  • 추가 또는 변경된 부분만 기재
  •     k = KubernetesPodOperator(
            namespace='default',
            name="hello-pod-work",
            image="debian",
            labels={"foo": "bar"},
            task_id="dry_run_demo",
            env_from=config_storage_apis.configmaps(),
            do_xcom_push=False,
            in_cluster=False,
        )
    
  • config_storage_apis.py
  • 추가 또는 변경된 부분만 기재
  • def configmaps():
        return [
            k8s.V1EnvFromSource(
    		config_map_ref=k8s.V1ConfigMapEnvSource(
    			name='test-configmap-1')
    		),
            k8s.V1EnvFromSource(
    		config_map_ref=k8s.V1ConfigMapEnvSource(
    			name='test-configmap-2')
    		),
            ]
    

    Pod 내에서 여러 컨테이너 만들기


    만들고 싶은Pod의yaml 이미지


    apiVersion: v1
    kind: Pod
    metadata:
      labels:
        run: share-pod
      name: share-pod
    spec:
      containers:
      - image: busybox
        name: container1
      - image: busybox
        name: container2
    

    DAG 샘플

  • 방법은 full_pod_spec 파라미터를 사용하는 방법과 pod_template_file 파라미터를 사용하는 방법을 포함한다.
  • main.py
  • 추가 또는 변경된 부분만 기재
  • c1 = k8s.V1Container(
        name="container1",
        image="busybox",
    )
    c2 = k8s.V1Container(
        name="container2",
        image="busybox",
    )
    
    p = k8s.V1Pod(
        api_version="v1",
        kind="Pod",
        metadata=k8s.V1ObjectMeta(
            namespace="default",
            name="share-pod"
        ),
        spec=k8s.V1PodSpec(
            restart_policy='Never',
            containers=[c1, c2],
        )
    )
    
    pod_template_file = """
    apiVersion: v1
    kind: Pod
    metadata:
      labels:
        run: share-pod
      name: share-pod
    spec:
      containers:
      - image: busybox
        name: container1
      - image: busybox
        name: container2
      restartPolicy: Always
    """
    
    ・・・
        # full_pod_specパラメータを使用する方法
        k2 = KubernetesPodOperator(
            full_pod_spec=p,
            task_id="hello-pod-work2",
            do_xcom_push=False,
            in_cluster=False,
        )
        
        # pod_template_fileパラメータを使用する方法
        k3 = KubernetesPodOperator(
            namespace='default',
            pod_template_file=pod_template_file,
            task_id="hello-pod-work3",
            do_xcom_push=False,
            in_cluster=False,
        )
    

    다른 Kubbernetes Operator도 할 수 있는 일


    20222.4.27 개인 사용 확인
  • Pod의 [ports,NodeSelector,Affinity,ServiceAcoutName,SecurityContext,initContaainer]에 대한 설정
  • XCom에 결과를 넣으려는 경우결과를 json으로 다시 지정하기


        k = KubernetesPodOperator(
            namespace='default',
            name="hello-pod-work",
            image="debian",
            cmds=[
    		"sh",
    		"-c",
    		"mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
            labels={"foo": "bar"},
            task_id="dry_run_demo",
            do_xcom_push=True,
            in_cluster=False,
        )
    

    엑스콤으로 가는 푸시는 자동 제작된 조정 컨테이너를 통해 푸시된다.

  • auditlog를 보면 사이드카 컨테이너 제작 여부를 확인할 수 있다

  • 확인 가능k get event
  • $ cat /etc/kubernetes/audit/logs/audit.log  | grep hello-pod-work | grep ResponseComplete | grep create | jq '.requestObject.involvedObject'
    {
      "kind": "Pod",
      "namespace": "default",
      "name": "hello-pod-work.8d708c7d53d84a3c9c11c64871ae2c5d",
      "uid": "fab4e8ef-66bd-4cd8-99c0-5d472aac1657",
      "apiVersion": "v1",
      "resourceVersion": "11120850",
      "fieldPath": "spec.containers{base}"
    }
    ・・・
    {
      "kind": "Pod",
      "namespace": "default",
      "name": "hello-pod-work.8d708c7d53d84a3c9c11c64871ae2c5d",
      "uid": "fab4e8ef-66bd-4cd8-99c0-5d472aac1657",
      "apiVersion": "v1",
      "resourceVersion": "11120850",
      "fieldPath": "spec.containers{airflow-xcom-sidecar}"
    }
    
  • 그나저나 결과do_xcom_push=True/airflow/xcom/return.json로 바뀌지 않았음에도 불구하고 자동으로 제작된 나란히 컨테이너/airflow/xcom/return.json Not Found에서 오류가 발생했다.
  • 실행 후 Xcom에서 Push 실행 결과를 확인할 수 있습니다.



    debug 보고 싶을 때 dryrun 사용()


  • 에어플로우도 실행k run hoge --dry-run=client -o yaml과 같은 결과를 확인할 수 있다.
  • 위 DAG에서 dry런 출력을 추가해 보세요.
  • DAG 샘플

  • main.py
  • 추가 또는 변경된 부분만 기재
  • ・・・
    from airflow.operators.python_operator import PythonOperator
    ・・・
        k = KubernetesPodOperator(
            namespace='default',
            name="hello-pod-work",
            image="debian",
    	# Xcomにpushする場合、pushした値を/airflow/xcom/return.jsonへリダイレクトする
            cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
            labels={"foo": "bar"},
            task_id="dry_run_demo",
            do_xcom_push=True,
            in_cluster=False,
        )
     
            def print_dry_run(*kwargs):
            print(k.dry_run())
        
        o = PythonOperator(
            task_id='print_dry_run',
            python_callable=print_dry_run,
            dag=dag,
        )
        
        [k, o]
    

    실행 확인 가능한yaml

  • task_id,print_dry_런로그를 보시면 확인할 수 있습니다.
  • apiVersionapi_version 또는restartPolicyrestart_policy라면yaml 파일로 직접 복사할 수 없습니다.
  • [2022-04-26 03:36:31,817] {logging_mixin.py:104} INFO - api_version: v1
    kind: Pod
    metadata:
      labels:
        airflow_version: 2.1.0
        foo: bar
        kubernetes_pod_operator: 'True'
      name: hello-dry-run-work.afd4862b480a46b1b2a37f24d7627375
      namespace: default
    spec:
      containers:
      - command:
        - sh
        - -c
        - mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json
        image: debian
        name: base
        volume_mounts:
        - mount_path: /airflow/xcom
          name: xcom
      # KubernetesPodOperator内で`do_xcom_push=True`をすると、Sidercarコンテナが作られる。
      - command:
        - sh
        - -c
        - trap "exit 0" INT; while true; do sleep 1; done;
        image: alpine
        name: airflow-xcom-sidecar
        resources:
          requests:
            cpu: 1m
        volume_mounts:
        - mount_path: /airflow/xcom
          name: xcom
      host_network: false
      restart_policy: Never
      volumes:
      - name: xcom
    

    총결산


    나는 문서와 코드를 보면서 할 수 있는 다양한 일을 찾았다.환경을 컨테이너화할 수만 있다면 에어플로우에서 움직일 수 있을 것 같아서 정말 대단합니다.작업 환경에 k8s 환경이 있을 때 저는 다양한 것을 탐색하고 싶습니다.

    참고 자료

  • https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/operators.html
  • 좋은 웹페이지 즐겨찾기