우리는 어떻게 나날이 우리의 데이터 공학 작업 절차를 개선할 것인가
본고에서 우리는 데이터 공학 편성에서 무엇을 배웠는지, 그리고 존재하는 결점, 그리고 우리가 어떻게 하루하루 개선했는지 설명할 것이다.
우리는 소형 데이터 공학팀인 Jagadish(me)로 고객의 분석 수요를 처리하고 확장 가능한 파이프를 구축하여 파생표를 생성한다.
처음에 우리는 데이터 공학 체계 구조와 ETL 절차를 가지고 데이터를 원본 창고에서 목표 창고로 이동하는 데 사용했다.이것은 우리의 작업 절차를 시작할 때 매우 유익하다. 데이터를 창고로 옮기기 전에 완전한 전환을 완성했기 때문에 최종 표에는 더 이상 필요하지 않은 열이 나타나지 않는다.ETL은 대량의 데이터를 추출, 변환, 불러올 때 효과가 좋고 창고에서 필요한 표를 깨끗하게 생성하는 것을 대표한다.우리는 일찍이 파이프 스케줄러를 위해 기류를 사용했다.
ETL 사용의 이점은 다음과 같습니다.
ETL 사용의 단점은 다음과 같습니다.
추출, 로드 및 변환(ELT)은 원본 데이터를 소스 데이터베이스에서 데이터 웨어하우스로 전송한 다음 다운스트림 용도로 정보를 변환하는 데이터 통합 프로세스입니다.
첫 번째 단계는 데이터를 추출하는 것이다.데이터를 추출하는 것은 한 개 이상의 원본 시스템(데이터베이스나 각종 다른 데이터 원본일 수도 있음)에서 데이터를 식별하고 읽는 과정이다.
두 번째 단계는 추출 데이터를 불러오는 것입니다.로드는 추출된 데이터를 대상 데이터베이스(예: 창고)에 추가하는 프로세스입니다.
세 번째 단계는 데이터 변환이다.데이터를 창고에서 분석에 필요한 형식으로 바꾸는 과정.
ELT는 추출에 의존하지 않기 때문에 ETL보다 더 유연하게 변환할 수 있으며 앞으로 추출된 데이터를 더 추가할 수 있습니다.현재 대부분의 파생 테이블 작업은 SQL을 통해 수행됩니다.
우리는 ELT 프로세서로서 작업 흐름을 조정하고 감시했으며, SQL과 비SQL 데이터베이스에서 데이터를 추출해야만 창고에 불러올 수 있었다.우리의 기류 배치는 docker를 통해 이루어졌으니 더 자세한 내용은 puckel/airflow을 보십시오.현재 우리는 우리의 이미지의 정식 부두 이미지를 채택하고 있다.
배치에 있어wise는 모든 것이 순조롭지만 생산, 단계와 개발 등 서로 다른 환경을 사용하기 위해 docker compose와airflow 설정 방법을 수정했습니다.
우리의 배치 환경을 이동시키다.env 파일은 env 파일을 통해 로드됩니다.그것은 우리의 환경과 부두를 분리시켰다
LOAD_EX=None
FERNET_KEY=None
EXECUTOR=None
AIRFLOW__WEBSERVER__AUTHENTICATE=None
AIRFLOW__WEBSERVER__AUTH_BACKEND=None
AIRFLOW__CORE__SQL_ALCHEMY_POOL_ENABLED=None
AIRFLOW__CELERY__FLOWER_BASIC_AUTH=None
공기 흐름 속에서 우리는 작업 중에 KWARG를 사용하고 변수를 사용하여 서로 다른 환경을 처리한다 kwargs = {
"project_id": "{{ var.json.config.project_id }}",
"table_dataset": "{{ var.json.config.table_dataset }}",
"table_test_dataset": "{{ var.json.config.table_test_dataset }}",
}
에어플로우 폴더 구조와 코드 개선을 살펴봅시다.프로젝트 측의 DAG는 DAG 내부에 위치하고 나머지는 모두 이해하기 쉽다.
SQL 데이터베이스를 위한 파이핑 DAG 및 작업을 구축했습니다.그것은 소스 코드표의 구조가 비슷하고 코드가 중복되는 단점이 있다.그 밖에 코드는 열 검증과 열 삭제가 없습니다.
def table_task(**kwargs):
# connect to source database
start_date, end_date
postgres_conn_obj = PostgresHook(postgres_conn_id=connection_id)
sql = "SELECT * FROM table WHERE updatedAt>{} AND updatedAt<{}"
df = postgres_conn_obj.get_pandas_df(
sql.format(self.start_date, self.end_date)
)
df.to_bq("table_name", cred)
#dags
table_task = PythonOperator(
task_id="table_task",
python_callable=table_task,
provide_context=True,
op_kwargs=kwargs,
)
표의 중복, 삭제, 데이터베이스 연결과 모델을 처리하는 표준적인 방법을 극복하기 위해 우리는 클래스 기반의 방법으로 이 문제를 해결하고 실행 중인 표를 쉽게 다시 사용할 수 있다. class Table(PythonTaskBase):
"""
Task for table which is present in source
"""
def __init__(
self, postgres_connection_id, bigquery_connection_id, table_name
):
super().__init__(
postgres_connection_id=postgres_connection_id,
bigquery_connection_id=bigquery_connection_id,
table_name=table_name,
)
self.datetime_column = ["created_at", "updated_at"]
self.table_columns = [
"id",
"user_id",
"updated_at",
]
self.schemas = source_schema
def query(self):
"SQL query for table."
sql = """
SELECT *
FROM table
WHERE updated_at >= '{0}'::TIMESTAMP
AND '{1}'::TIMESTAMP > updated_at
"""
return sql
def execute(self, **kwargs):
"""
The function which is callable from dags.
:param kwargs: Passing all task context
:type kwargs: kwargs
:return: None
:rtype: None
"""
self.kwargs = kwargs
self.source_to_warehouse()
상술한 방법을 통해 우리는 70퍼센트의 코드를 깨끗하고 건장하게 한다.지금까지 ELT에서 EL 섹션을 설명했습니다.전환과 어떻게 절차를 실현할 것인가에 들어갑시다.우리의 모든 변환 작업은 데이터를 만들고 조작하는 데 사용되는 SQL 조회로 직접 실행됩니다.스페셜 Operator가 파생표를 만들어 주셔서 감사합니다.
파생표가 도착하면 우리는 유행하는 시각화 도구를 사용하여 데이터를 시각화하고 수요에 따라 보고서를 공유한다.
Reference
이 문제에 관하여(우리는 어떻게 나날이 우리의 데이터 공학 작업 절차를 개선할 것인가), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/jagamts1/how-we-evolved-our-data-engineering-workflow-day-by-day-2ach텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)