Extending Airflow's BigQueryOperator so to allow double substitution of parameters

9655 단어 AirflowBigQuery
This note describes the solution to a small problem related to Airflow's BigQueryOperator which I encountered a while ago.
The solution was not entirely trivial for me to figure out, so I decided to share it here in case it may be beneficial
to someone else. Therefore, I assume some basic knowledge of Airflow in this note, as it is not meant to be introductory.

What we mean by "double substitution"



I will start with describing the issue.
The
BigQueryOperator
contains the following templated parameters:
  • destination_dataset_table
  • sql
  • labels
  • query_params
  • bql (deprecated, so we will ignore it here)

  • As it is common in Airflow, user can pass a dictionary param to BigQueryOperator, that can be later referenced
    in templates.

    However, what if one wants to pass a template string in param ? That is, what if one wants to pass something like{"x":"{{var.json.global_config.x}}"} in params , so to reference some global_config variable defined in Airflow as {"x":"value"} , and then
    have SQL template
    SELECT "{{params.x}}" AS x
    

    to be substituted to
    SELECT "value" AS x
    

    ?

    This will not work, as you will in fact getsql
    SELECT "{{var.json.global_config.x}}" AS x

    This makes sense, as substitution in Airflow (in its underlying 진자 template engine)
    does not happen recursively. This is justified, as otherwise one would have to face the possibility of endless substitution
    loops. However, this is not always convenient.

    Why we want double substitution



    Now, the scenarios when one wants to perform double substitutions are more common, than one might thought at first glance.
    Imagine that in our Airflow deployment every DAG we deploy has its own corresponding variable, and also every DAG
    may reference certain global_config variable, containing global configuration. Now, to avoid duplication, it would be ideally
    to be able to reference global_config within the configuration variables of individual DAGs, like in the diagram:



    However, if double substitution
    is unavailable, we have to do the substitution manually in every DAG's source code. This clutters the logic.

    How we can achieve double substitution



    One solution is to create our own ExtendedBigQueryOperator , which one would then call as, for example
    ExtendedBigQueryOperator(task_id="query",
                             sql="\"{{params.templated_params.x}}\" as x",
                             templated_params={
                                 "x": "{{var.json.global_config.template_data.environment}}"
                             },
                             destination_dataset_table="xxx.yyy.zzz"
                             )
    

    We want ExtendedBigQueryOperator to satisfy the following properties:
  • mimic the constructor of BigQueryOperator , except
  • that it would accept additional dictionary parameter templated_params , which would contain parameters which we want to
    substitute twice
  • behave identically to BigQueryOperator in every other aspect

  • Requirements 1. and 3. naturally call for ExtendedBigQueryOperator being inherited from BigQueryOperator , and we also
    add the additional requirement
  • amount of additional code in ExtendedBigQueryOperator should be as small as possible, so to prevent the inheritance
    from breaking in case BigQueryOperator will change in future

  • Finally, this is what I came up with, based on the requirements above (16 lines of code):
    from airflow.contrib.operators.bigquery_operator import BigQueryOperator
    
    def _deep_copy_dict(dst,src):
        for k in src:
            dst[k] = src[k]
    class ExtendedBigQueryOperator(BigQueryOperator):
        def __init__(self,templated_params,*args,**kwargs):
            kwargs["params"] = {**kwargs["params"],"templated_params":templated_params}
            self.templated_params = templated_params
            self.template_fields = ("templated_params",*(super(ExtendedBigQueryOperator, self).template_fields))
            super(ExtendedBigQueryOperator, self).__init__(*args, **kwargs)
        def render_template(self, content, *args, **kwargs):
            res = super(ExtendedBigQueryOperator, self).render_template(content,*args,**kwargs)
            if(content=="templated_params"):
                _deep_copy_dict(self.templated_params,res)
            return res
    

    좋은 웹페이지 즐겨찾기