우리는 어떻게 나날이 우리의 데이터 공학 작업 절차를 개선할 것인가

5459 단어
데이터 엔지니어는 창고나 OTAP 데이터베이스에서 나온 데이터를 유지, 정리, 조작하는 것을 책임진다.우리는 주로 소프트웨어 엔지니어와 데이터 분석팀과 밀접하게 협력하여 필요한 출력을 만들어 데이터 기반의 고객 업무를 강화한다.
본고에서 우리는 데이터 공학 편성에서 무엇을 배웠는지, 그리고 존재하는 결점, 그리고 우리가 어떻게 하루하루 개선했는지 설명할 것이다.
우리는 소형 데이터 공학팀인 Jagadish(me)로 고객의 분석 수요를 처리하고 확장 가능한 파이프를 구축하여 파생표를 생성한다.
처음에 우리는 데이터 공학 체계 구조와 ETL 절차를 가지고 데이터를 원본 창고에서 목표 창고로 이동하는 데 사용했다.이것은 우리의 작업 절차를 시작할 때 매우 유익하다. 데이터를 창고로 옮기기 전에 완전한 전환을 완성했기 때문에 최종 표에는 더 이상 필요하지 않은 열이 나타나지 않는다.ETL은 대량의 데이터를 추출, 변환, 불러올 때 효과가 좋고 창고에서 필요한 표를 깨끗하게 생성하는 것을 대표한다.우리는 일찍이 파이프 스케줄러를 위해 기류를 사용했다.

ETL 사용의 이점은 다음과 같습니다.
  • 대량 데이터 전송을 위한 서비스 용이성
  • 초기 단계에서 전환을 완성했기 때문에 시종일관 정리된 데이터는 결국 창고에 나타난다.
  • 한 달 전에 우리의 원본 데이터베이스에 몇 가지 새로운 특성을 도입했는데 이것은 데이터 공학 파이프라인에 번거로움을 가져왔고 우리는 처음부터 전체 파이프 흐름을 다시 구축하기 시작했다.우리는 이렇게 많은 의존표가 있기 때문에 ETL 흐름을 유지하기 어려워서 ELT 흐름에 들어갔다.
    ETL 사용의 단점은 다음과 같습니다.
  • 원본 테이블의 미세한 변경으로 인해 파이프의 변경을 실현하는 데 한 달이 걸릴 것이다.
  • 끊임없이 변화하는 수요를 따라잡기 어렵다.
  • 의존 관계표의 변경 사항을 추적하기 어렵다.
  • 이제 플랫폼을 ELT로 이동합니다.
    추출, 로드 및 변환(ELT)은 원본 데이터를 소스 데이터베이스에서 데이터 웨어하우스로 전송한 다음 다운스트림 용도로 정보를 변환하는 데이터 통합 프로세스입니다.

    첫 번째 단계는 데이터를 추출하는 것이다.데이터를 추출하는 것은 한 개 이상의 원본 시스템(데이터베이스나 각종 다른 데이터 원본일 수도 있음)에서 데이터를 식별하고 읽는 과정이다.
    두 번째 단계는 추출 데이터를 불러오는 것입니다.로드는 추출된 데이터를 대상 데이터베이스(예: 창고)에 추가하는 프로세스입니다.
    세 번째 단계는 데이터 변환이다.데이터를 창고에서 분석에 필요한 형식으로 바꾸는 과정.
    ELT는 추출에 의존하지 않기 때문에 ETL보다 더 유연하게 변환할 수 있으며 앞으로 추출된 데이터를 더 추가할 수 있습니다.현재 대부분의 파생 테이블 작업은 SQL을 통해 수행됩니다.
    우리는 ELT 프로세서로서 작업 흐름을 조정하고 감시했으며, SQL과 비SQL 데이터베이스에서 데이터를 추출해야만 창고에 불러올 수 있었다.우리의 기류 배치는 docker를 통해 이루어졌으니 더 자세한 내용은 puckel/airflow을 보십시오.현재 우리는 우리의 이미지의 정식 부두 이미지를 채택하고 있다.
    배치에 있어wise는 모든 것이 순조롭지만 생산, 단계와 개발 등 서로 다른 환경을 사용하기 위해 docker compose와airflow 설정 방법을 수정했습니다.
    우리의 배치 환경을 이동시키다.env 파일은 env 파일을 통해 로드됩니다.그것은 우리의 환경과 부두를 분리시켰다
    LOAD_EX=None
    FERNET_KEY=None
    EXECUTOR=None
    AIRFLOW__WEBSERVER__AUTHENTICATE=None
    AIRFLOW__WEBSERVER__AUTH_BACKEND=None
    AIRFLOW__CORE__SQL_ALCHEMY_POOL_ENABLED=None
    AIRFLOW__CELERY__FLOWER_BASIC_AUTH=None
    
    공기 흐름 속에서 우리는 작업 중에 KWARG를 사용하고 변수를 사용하여 서로 다른 환경을 처리한다
        kwargs = {
            "project_id": "{{ var.json.config.project_id }}",
            "table_dataset": "{{ var.json.config.table_dataset }}",
            "table_test_dataset": "{{ var.json.config.table_test_dataset }}",
        }
    
    에어플로우 폴더 구조와 코드 개선을 살펴봅시다.

    프로젝트 측의 DAG는 DAG 내부에 위치하고 나머지는 모두 이해하기 쉽다.
    SQL 데이터베이스를 위한 파이핑 DAG 및 작업을 구축했습니다.그것은 소스 코드표의 구조가 비슷하고 코드가 중복되는 단점이 있다.그 밖에 코드는 열 검증과 열 삭제가 없습니다.
    def table_task(**kwargs):
        # connect to source database
        start_date, end_date  
        postgres_conn_obj = PostgresHook(postgres_conn_id=connection_id)
        sql = "SELECT * FROM table WHERE updatedAt>{} AND updatedAt<{}"
        df = postgres_conn_obj.get_pandas_df(
            sql.format(self.start_date, self.end_date)
        )
        df.to_bq("table_name", cred)
    
    #dags
        table_task = PythonOperator(
            task_id="table_task",
            python_callable=table_task,
            provide_context=True,
            op_kwargs=kwargs,
        )
    
    표의 중복, 삭제, 데이터베이스 연결과 모델을 처리하는 표준적인 방법을 극복하기 위해 우리는 클래스 기반의 방법으로 이 문제를 해결하고 실행 중인 표를 쉽게 다시 사용할 수 있다.
        class Table(PythonTaskBase):
        """
        Task for table which is present in source
        """
    
        def __init__(
            self, postgres_connection_id, bigquery_connection_id, table_name
        ):
            super().__init__(
                postgres_connection_id=postgres_connection_id,
                bigquery_connection_id=bigquery_connection_id,
                table_name=table_name,
            )
            self.datetime_column = ["created_at", "updated_at"]
            self.table_columns = [
                "id",
                "user_id",
                "updated_at",
            ]
            self.schemas = source_schema
    
        def query(self):
            "SQL query for table."
            sql = """
            SELECT *
            FROM table
            WHERE updated_at >= '{0}'::TIMESTAMP
              AND '{1}'::TIMESTAMP > updated_at
            """
            return sql
    
        def execute(self, **kwargs):
            """
            The function which is callable from dags.
            :param kwargs: Passing all task context
            :type kwargs: kwargs
            :return: None
            :rtype: None
            """
            self.kwargs = kwargs
            self.source_to_warehouse()
    
    상술한 방법을 통해 우리는 70퍼센트의 코드를 깨끗하고 건장하게 한다.지금까지 ELT에서 EL 섹션을 설명했습니다.전환과 어떻게 절차를 실현할 것인가에 들어갑시다.
    우리의 모든 변환 작업은 데이터를 만들고 조작하는 데 사용되는 SQL 조회로 직접 실행됩니다.스페셜 Operator가 파생표를 만들어 주셔서 감사합니다.
    파생표가 도착하면 우리는 유행하는 시각화 도구를 사용하여 데이터를 시각화하고 수요에 따라 보고서를 공유한다.

    좋은 웹페이지 즐겨찾기