Airflow의 Kubernetes Pod Operator를 사용해 보세요.
Kubbernetes PodOperator란 무엇입니까?
Kubernetes Executor와는 다르다.
어떻게 움직여요?
airflow.cfg
내config_file
매개 변수.~/.kube/config
airflow.cfg
내의 매개 변수pod_template_file
로도 정의할 수 있다.사전 준비
alias k=kubectl
간단한 Pod 만들기
만들고 싶은Pod의yaml 이미지
apiVersion: v1
kind: Pod
metadata:
labels:
foo: bar
name: hello-pod-work
spec:
containers:
- image: debian
name: hello-pod-work
DAG 샘플
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
)
default_args = {
'owner': '467',
'depends_on_past': False,
'start_date': days_ago(2),
}
with DAG(
'test_kubernetes_pod_operator_work',
default_args=default_args,
description='KubernetesPodOperatorを試す',
tags=["work"],
) as dag:
dag.doc_md = """
KubernetesPodOperatorを試す
"""
k = KubernetesPodOperator(
namespace='default',
name="hello-pod-work",
image="debian",
labels={"foo": "bar"},
task_id="dry_run_demo",
do_xcom_push=False,
in_cluster=False,
)
$ k get pod
No resources found in default namespace.
시크릿을 해보도록 하겠습니다.
$ k create secret generic airflow-secrets --from-literal=sql_alchemy_conn=hoge
secret/airflow-secrets created
$ k create secret generic airflow-secrets-2 --from-literal=sql_alchemy_conn2=hoge2
secret/airflow-secrets-2 created
만들고 싶은Pod의yaml 이미지
apiVersion: v1
kind: Pod
metadata:
labels:
foo: bar
name: hello-pod-work
spec:
containers:
- image: debian
name: hello-pod-work
volumeMounts:
- mountPath: /etc/sql_conn
name: hoge
readOnly: true
env:
- name: SQL_CONN
valueFrom:
secretKeyRef:
name: airflow-secrets
key: sql_alchemy_conn
envFrom:
- secretRef:
name: airflow-secrets-2
volumes:
- name: hoge
secret:
secretName: airflow-secrets
DAG 샘플
from kubernetes_pod_operator_work.dags import config_storage_apis
・・・
k = KubernetesPodOperator(
namespace='default',
name="hello-pod-work",
image="debian",
labels={"foo": "bar"},
task_id="dry_run_demo",
secrets=[
config_storage_apis.secret_file(),
config_storage_apis.secret_env(),
config_storage_apis.secret_all_keys()],
do_xcom_push=False,
in_cluster=False,
)
from airflow.kubernetes.secret import Secret
def secret_file():
return Secret(
deploy_type='volume', deploy_target='/etc/sql_conn',
secret='airflow-secrets'
)
def secret_env():
return Secret(
deploy_type='env', deploy_target='SQL_CONN',
secret='airflow-secrets', key='sql_alchemy_conn'
)
def secret_all_keys():
return Secret(
deploy_type='env', deploy_target=None,
secret='airflow-secrets-2'
)
mount PersistentVolumeClaim(PVC)
STATUS
Bound
된pvc$ k get pvc
NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE
my-pvc Bound my-pv-hostpath 1Gi RWO manual 3s
$ k get pv
NAME CAPACITY ACCESS MODES RECLAIM POLICY STATUS CLAIM STORAGECLASS REASON AGE
my-pv-hostpath 1Gi RWO Retain Bound default/my-pvc manual 2m24s
만들고 싶은Pod의yaml 이미지
apiVersion: v1
kind: Pod
metadata:
labels:
foo: bar
name: hello-pod-work
spec:
containers:
- image: debian
name: hello-pod-work
volumeMounts:
- name: test-volume
mountPath: /root/mount_file
volumes:
- name: test-volume
persistentVolumeClaim:
claimName: my-pvc
DAG 샘플
k = KubernetesPodOperator(
namespace='default',
name="hello-pod-work",
image="debian",
labels={"foo": "bar"},
task_id="dry_run_demo",
volumes=[config_storage_apis.volume()],
volume_mounts=[config_storage_apis.volume_mount()],
do_xcom_push=False,
in_cluster=False,
)
def volume_mount():
return k8s.V1VolumeMount(
mount_path='/root/mount_file', name='test-volume',
read_only=True, sub_path=None, sub_path_expr=None,
mount_propagation=None
)
def volume():
return k8s.V1Volume(
name='test-volume',
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(
claim_name='my-pvc'),
)
mount Configmap
$ k create configmap test-configmap-1 --from-literal=key1=value1
configmap/test-configmap-1 created
$ k create configmap test-configmap-2 --from-literal=key1=value1
configmap/test-configmap-2 created
만들고 싶은Pod의yaml 이미지
apiVersion: v1
kind: Pod
metadata:
labels:
foo: bar
name: hello-pod-work
spec:
containers:
- image: debian
name: hello-pod-work
envFrom:
- configMapRef:
name: test-configmap-1
- configMapRef:
name: test-configmap-2
DAG 샘플
k = KubernetesPodOperator(
namespace='default',
name="hello-pod-work",
image="debian",
labels={"foo": "bar"},
task_id="dry_run_demo",
env_from=config_storage_apis.configmaps(),
do_xcom_push=False,
in_cluster=False,
)
def configmaps():
return [
k8s.V1EnvFromSource(
config_map_ref=k8s.V1ConfigMapEnvSource(
name='test-configmap-1')
),
k8s.V1EnvFromSource(
config_map_ref=k8s.V1ConfigMapEnvSource(
name='test-configmap-2')
),
]
Pod 내에서 여러 컨테이너 만들기
만들고 싶은Pod의yaml 이미지
apiVersion: v1
kind: Pod
metadata:
labels:
run: share-pod
name: share-pod
spec:
containers:
- image: busybox
name: container1
- image: busybox
name: container2
DAG 샘플
full_pod_spec
파라미터를 사용하는 방법과 pod_template_file
파라미터를 사용하는 방법을 포함한다.c1 = k8s.V1Container(
name="container1",
image="busybox",
)
c2 = k8s.V1Container(
name="container2",
image="busybox",
)
p = k8s.V1Pod(
api_version="v1",
kind="Pod",
metadata=k8s.V1ObjectMeta(
namespace="default",
name="share-pod"
),
spec=k8s.V1PodSpec(
restart_policy='Never',
containers=[c1, c2],
)
)
pod_template_file = """
apiVersion: v1
kind: Pod
metadata:
labels:
run: share-pod
name: share-pod
spec:
containers:
- image: busybox
name: container1
- image: busybox
name: container2
restartPolicy: Always
"""
・・・
# full_pod_specパラメータを使用する方法
k2 = KubernetesPodOperator(
full_pod_spec=p,
task_id="hello-pod-work2",
do_xcom_push=False,
in_cluster=False,
)
# pod_template_fileパラメータを使用する方法
k3 = KubernetesPodOperator(
namespace='default',
pod_template_file=pod_template_file,
task_id="hello-pod-work3",
do_xcom_push=False,
in_cluster=False,
)
다른 Kubbernetes Operator도 할 수 있는 일
20222.4.27 개인 사용 확인
XCom에 결과를 넣으려는 경우결과를 json으로 다시 지정하기
k = KubernetesPodOperator(
namespace='default',
name="hello-pod-work",
image="debian",
cmds=[
"sh",
"-c",
"mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
labels={"foo": "bar"},
task_id="dry_run_demo",
do_xcom_push=True,
in_cluster=False,
)
엑스콤으로 가는 푸시는 자동 제작된 조정 컨테이너를 통해 푸시된다.
확인 가능
k get event
$ cat /etc/kubernetes/audit/logs/audit.log | grep hello-pod-work | grep ResponseComplete | grep create | jq '.requestObject.involvedObject'
{
"kind": "Pod",
"namespace": "default",
"name": "hello-pod-work.8d708c7d53d84a3c9c11c64871ae2c5d",
"uid": "fab4e8ef-66bd-4cd8-99c0-5d472aac1657",
"apiVersion": "v1",
"resourceVersion": "11120850",
"fieldPath": "spec.containers{base}"
}
・・・
{
"kind": "Pod",
"namespace": "default",
"name": "hello-pod-work.8d708c7d53d84a3c9c11c64871ae2c5d",
"uid": "fab4e8ef-66bd-4cd8-99c0-5d472aac1657",
"apiVersion": "v1",
"resourceVersion": "11120850",
"fieldPath": "spec.containers{airflow-xcom-sidecar}"
}
do_xcom_push=True
가 /airflow/xcom/return.json
로 바뀌지 않았음에도 불구하고 자동으로 제작된 나란히 컨테이너/airflow/xcom/return.json Not Found
에서 오류가 발생했다.실행 후 Xcom에서 Push 실행 결과를 확인할 수 있습니다.
debug 보고 싶을 때 dryrun 사용()
에어플로우도 실행
k run hoge --dry-run=client -o yaml
과 같은 결과를 확인할 수 있다.DAG 샘플
・・・
from airflow.operators.python_operator import PythonOperator
・・・
k = KubernetesPodOperator(
namespace='default',
name="hello-pod-work",
image="debian",
# Xcomにpushする場合、pushした値を/airflow/xcom/return.jsonへリダイレクトする
cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
labels={"foo": "bar"},
task_id="dry_run_demo",
do_xcom_push=True,
in_cluster=False,
)
def print_dry_run(*kwargs):
print(k.dry_run())
o = PythonOperator(
task_id='print_dry_run',
python_callable=print_dry_run,
dag=dag,
)
[k, o]
실행 확인 가능한yaml
apiVersion
가api_version
또는restartPolicy
가restart_policy
라면yaml 파일로 직접 복사할 수 없습니다.[2022-04-26 03:36:31,817] {logging_mixin.py:104} INFO - api_version: v1
kind: Pod
metadata:
labels:
airflow_version: 2.1.0
foo: bar
kubernetes_pod_operator: 'True'
name: hello-dry-run-work.afd4862b480a46b1b2a37f24d7627375
namespace: default
spec:
containers:
- command:
- sh
- -c
- mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json
image: debian
name: base
volume_mounts:
- mount_path: /airflow/xcom
name: xcom
# KubernetesPodOperator内で`do_xcom_push=True`をすると、Sidercarコンテナが作られる。
- command:
- sh
- -c
- trap "exit 0" INT; while true; do sleep 1; done;
image: alpine
name: airflow-xcom-sidecar
resources:
requests:
cpu: 1m
volume_mounts:
- mount_path: /airflow/xcom
name: xcom
host_network: false
restart_policy: Never
volumes:
- name: xcom
총결산
나는 문서와 코드를 보면서 할 수 있는 다양한 일을 찾았다.환경을 컨테이너화할 수만 있다면 에어플로우에서 움직일 수 있을 것 같아서 정말 대단합니다.작업 환경에 k8s 환경이 있을 때 저는 다양한 것을 탐색하고 싶습니다.
참고 자료
Reference
이 문제에 관하여(Airflow의 Kubernetes Pod Operator를 사용해 보세요.), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://zenn.dev/467/articles/ca76be579ccf97텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)