Extending Airflow's BigQueryOperator so to allow double substitution of parameters
BigQueryOperator
which I encountered a while ago.The solution was not entirely trivial for me to figure out, so I decided to share it here in case it may be beneficial
to someone else. Therefore, I assume some basic knowledge of Airflow in this note, as it is not meant to be introductory.
What we mean by "double substitution"
I will start with describing the issue.
The
BigQueryOperator
contains the following templated parameters:
destination_dataset_table
sql
labels
query_params
bql
(deprecated, so we will ignore it here) As it is common in Airflow, user can pass a dictionary
param
to BigQueryOperator, that can be later referencedin templates.
However, what if one wants to pass a template string in
param
? That is, what if one wants to pass something like{"x":"{{var.json.global_config.x}}"}
in params
, so to reference some global_config
variable defined in Airflow as {"x":"value"}
, and thenhave SQL template
SELECT "{{params.x}}" AS x
to be substituted to
SELECT "value" AS x
?
This will not work, as you will in fact get
sql
SELECT "{{var.json.global_config.x}}" AS x
This makes sense, as substitution in Airflow (in its underlying 진자 template engine)
does not happen recursively. This is justified, as otherwise one would have to face the possibility of endless substitution
loops. However, this is not always convenient.
Why we want double substitution
Now, the scenarios when one wants to perform double substitutions are more common, than one might thought at first glance.
Imagine that in our Airflow deployment every DAG we deploy has its own corresponding variable, and also every DAG
may reference certain
global_config
variable, containing global configuration. Now, to avoid duplication, it would be ideallyto be able to reference
global_config
within the configuration variables of individual DAGs, like in the diagram:However, if double substitution
is unavailable, we have to do the substitution manually in every DAG's source code. This clutters the logic.
How we can achieve double substitution
One solution is to create our own
ExtendedBigQueryOperator
, which one would then call as, for exampleExtendedBigQueryOperator(task_id="query",
sql="\"{{params.templated_params.x}}\" as x",
templated_params={
"x": "{{var.json.global_config.template_data.environment}}"
},
destination_dataset_table="xxx.yyy.zzz"
)
We want
ExtendedBigQueryOperator
to satisfy the following properties:BigQueryOperator
, except templated_params
, which would contain parameters which we want tosubstitute twice
BigQueryOperator
in every other aspect Requirements
1.
and 3.
naturally call for ExtendedBigQueryOperator
being inherited from BigQueryOperator
, and we alsoadd the additional requirement
ExtendedBigQueryOperator
should be as small as possible, so to prevent the inheritancefrom breaking in case
BigQueryOperator
will change in future Finally, this is what I came up with, based on the requirements above (16 lines of code):
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
def _deep_copy_dict(dst,src):
for k in src:
dst[k] = src[k]
class ExtendedBigQueryOperator(BigQueryOperator):
def __init__(self,templated_params,*args,**kwargs):
kwargs["params"] = {**kwargs["params"],"templated_params":templated_params}
self.templated_params = templated_params
self.template_fields = ("templated_params",*(super(ExtendedBigQueryOperator, self).template_fields))
super(ExtendedBigQueryOperator, self).__init__(*args, **kwargs)
def render_template(self, content, *args, **kwargs):
res = super(ExtendedBigQueryOperator, self).render_template(content,*args,**kwargs)
if(content=="templated_params"):
_deep_copy_dict(self.templated_params,res)
return res
Reference
이 문제에 관하여(Extending Airflow's BigQueryOperator so to allow double substitution of parameters), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://qiita.com/nailbiter/items/0def9e3a419811f0dd86텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)