Pyspark를 통해 ETL을 수행하는 동적 방법

각 테이블에 대해 ETL을 별도로 작성하는 대신 데이터베이스(MySQL,PostgreSQL,SQL-Server) 및 Pyspark를 사용하여 동적으로 수행하는 기술을 가질 수 있습니다. 더 나은 이해를 위해 몇 가지 단계를 따라 코드를 작성합니다.

1 단계

TEST_DWH라는 이름을 가진 데이터베이스(SQL-SERVER를 사용하고 있습니다)에 두 개의 테이블을 만듭니다.
ETL의 마스터 데이터(소스 및 대상 정보)를 보관하기 위한 테이블 etl_metadata

CREATE TABLE [dbo].[etl_metadata](
    [id] [int] IDENTITY(1,1) NOT NULL,
    [source_type] [varchar](max) NULL,
    [source_info] [text] NULL,
    [destination_db] [varchar](max) NULL,
    [destination_schema] [varchar](max) NULL,
    [destination_table] [varchar](max) NULL,
    [etl_type] [varchar](max) NULL
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]


일일 ETL 진행을 위한 테이블 etl_metadaata_schedule

CREATE TABLE [dbo].[etl_metadata_schedule](
    [id] [int] NULL,
    [source_type] [varchar](max) NULL,
    [source_info] [text] NULL,
    [destination_db] [varchar](max) NULL,
    [destination_schema] [varchar](max) NULL,
    [destination_table] [varchar](max) NULL,
    [etl_type] [varchar](max) NULL,
    [status] [varchar](max) NULL,
    [started_at] [datetime] NULL,
    [completed_at] [datetime] NULL,
    [schedule_date] [datetime] NULL
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]


2 단계

이제 Pyspark를 사용하여 파이썬에서 ETL을 작성하십시오.
  • ETL 프로세스를 반복하기 위해 Pandas로 데이터 가져오기
  • etl_type을 사용하여 읽기 소스를 전환합니다(제 경우에는 CSV와 데이터베이스의 두 가지 사례를 사용했습니다)
  • 대상에 데이터 쓰기, 대상 정보는 etl_metadata에서 사용됩니다.

  • 
    """
    Created on Thu Mar 17 11:06:28 2022
    
    @author: Administrator
    """
    
    #SPARK LIBRARIES
    from pyspark.sql import SparkSession
    import pyodbc
    import pandas as pd
    #initiate spark env
    import findspark
    findspark.init()
    findspark.find()
    #print(findspark.find())
    
    
    
    
    spark = SparkSession \
        .builder \
        .appName("Python ETL script for TEST") \
        .master("local[*]")\
        .config("spark.driver.memory", '8g')\
        .config("spark.sql.ansi.enabled ",True)\
        .config("spark.jars", "C:\Drivers\sqljdbc42.jar") \
        .getOrCreate()
    
    
    
    
    
    source_type = ''
    source_info = ''
    destination_db=''
    destination_schema=''
    destination_table = ''
    etl_type = ''
    query_string = '' 
    
    ##Initiatong variable for query establishhing
    
    #- timedelta(43)
    #today = (date.today())
    #print("Today's date:", "select a.*,null status,null status_description ,null started_at,null completed_at,GETDATE() schedule_date from dbo.etl_metadata_schedule_staging where schedule_date = "+"'"+str(today)+"'")
    
    #set variable to be used to connect the database
    
    
    
    
    database = "TEST_DWH"
    user = "user"
    password  = "password"
    
    
    query_string="SELECT a.*,CONCAT(ISNULL(b.status,'Pending'),b.status) status,null status_description ,null started_at,null completed_at FROM (SELECT *,getdate()  schedule_date FROM dbo.etl_metadata ) a LEFT JOIN [dbo].[etl_metadata_schedule] b ON a.id = b.id and  CAST(b.schedule_date AS date)= CAST(getdate() AS date) where ISNULL(b.status,'A') != 'completed'"
    
    #Read ETL Meta Data
    etl_meta_data_staging = spark.read\
        .format("jdbc") \
        .option("url", "jdbc:sqlserver://localhost:1433;databaseName={"+database+"};") \
        .option("query", query_string) \
        .option("user", user) \
        .option("password", password) \
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
        .load()    
    
    
    #-------------------CREATE NEW SCHEDULE----------------------------#
    etl_meta_data_staging.filter("status == 'Pending'").show()
    
    
    #THEN READ BASE META DATA AND  CREATE ONE ELSE DONT
    etl_meta_data_staging.filter("status == 'Pending'").write \
            .format("jdbc") \
            .option("url", "jdbc:sqlserver://localhost:1433;databaseName={"+database+"};") \
            .option("dbtable", "dbo.etl_metadata_schedule") \
            .option("user", user) \
            .option("password", password) \
            .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
            .mode("append")\
            .save()
    
    
    #-------------------END CREATE NEW SCHEDULE----------------------------#
    
    
    #--------------SQL SERVER CONNECTION TO MAINTAIN ERROR STATE-------------#
    conn = pyodbc.connect("Driver={ODBC Driver 17 for SQL Server};"
                          "Server=localhost,1433;"
                          "Database="+database+";"
                          "UID="+user+";"
                          "PWD="+password+";")
    
    cursor = conn.cursor()
    #--------------END SQL SERVER CONNECTION TO MAINTAIN ERROR STATE-------------#
    
    df_etl_meta_data_staging  = etl_meta_data_staging.toPandas()
    
    df_etl_meta_data_staging=df_etl_meta_data_staging.sort_values('id')
    #---LOOP : read FROM SOURCE  (ext☺ract) and write to destination.---#
    for etl_id in df_etl_meta_data_staging['id']:
    
    
    
       status = 'In Progress'
       print("Starting for "+ str(etl_id))
       #---------------UPDATE In Progress Status---------------#
       cursor.\
           execute('''UPDATE  [TEST_DWH].[dbo].[etl_metadata_schedule] 
                   SET [status]=\'''' 
                   +status+ "',[started_at]= CURRENT_TIMESTAMP where id= '"+ str(etl_id)+"';")
       conn.commit()
       #---------------UPDATE In Progress Status---------------#
    
    
    
    
    
       # load meta data into variables
       source_type = df_etl_meta_data_staging['source_type'][df_etl_meta_data_staging['id']==etl_id].values[0]
       source_info = df_etl_meta_data_staging['source_info'][df_etl_meta_data_staging['id']==etl_id].values[0]
       destination_db = df_etl_meta_data_staging['destination_db'][df_etl_meta_data_staging['id']==etl_id].values[0]
       destination_schema = df_etl_meta_data_staging['destination_schema'][df_etl_meta_data_staging['id']==etl_id].values[0]
       destination_table = df_etl_meta_data_staging['destination_table'][df_etl_meta_data_staging['id']==etl_id].values[0]
       etl_type = df_etl_meta_data_staging['etl_type'][df_etl_meta_data_staging['id']==etl_id].values[0]
    
    
    
    
    
       # initialize empty status for each run
       status = ''
    
       # Read  data from spurce try to read otherwise through exception
    
       #print(url_link)
       #print("Reading via ", source_info)
       # Read module data
    
       try: 
            print("Reading via ", source_info)
            # Read module data 
            if source_type == 'CSV':
               jdbcDF = spark.read\
                           .format("csv") \
                           .option("header", "true") \
                           .option("quote", "\"") \
                           .option("escape", "\"") \
                           .load(source_info) 
    
               status= 'read_successful'
               jdbcDF.show()
    
            elif source_type == 'sqlserver':
                jdbcDF = spark.read\
                    .format("jdbc") \
                    .option("url", "jdbc:sqlserver://localhost:1433;databaseName={"+database+"};") \
                    .option("query", source_info) \
                    .option("user", user) \
                    .option("password", password) \
                    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
                    .load() 
    
            #Try to Write Extracted data relevant to destination table
            try:
              jdbcDF.write \
                      .format("jdbc") \
                      .option("url", "jdbc:sqlserver://localhost:1433;databaseName={"+destination_db+"};") \
                      .option("dbtable", destination_schema+"."+destination_table) \
                      .option("user", user) \
                      .option("password", password) \
                      .option("truncate", "true") \
                      .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
                      .mode("overwrite")\
                      .save()
    
              status = 'completed'
              print("Write Successful")
              #---------------UPDATE Success Status---------------#
              cursor.\
                  execute('''UPDATE  [TEST_DWH].[dbo].[etl_metadata_schedule] 
                          SET [status]=\'''' 
                          +status+ "',[completed_at]= CURRENT_TIMESTAMP where id= '"+ str(etl_id)+"';")
              conn.commit()
              #---------------UPDATE Success Status---------------#
    
            #except of Write Extracted data relevant to destination table
            #---------------UPDATE Success Status---------------#
            except Exception as e :
                  print('some error in writing')
                  status = 'error in writing to destination db, '+str(e)
                  #---------------UPDATE Error Status---------------#
                  cursor.\
                      execute('''UPDATE  [TEST_DWH].[dbo].[etl_metadata_schedule] 
                              SET [status]=\'''' 
                              +status+ "',[completed_at]= CURRENT_TIMESTAMP where id= '"+ str(etl_id)+"';")
                  conn.commit()
                  #---------------UPDATE Error Status---------------#
        #except of Read module data
       except Exception as e :
             print("some error in reading from source")
             status = 'error reading source , '+str(e)
             print(status)
             #---------------UPDATE Error Status---------------#
             cursor.\
                 execute('''UPDATE  [TEST_DWH].[dbo].[etl_metadata_schedule] 
                         SET [status]=\'''' 
                         +status+ "',[completed_at]= CURRENT_TIMESTAMP where id= '"+ str(etl_id)+"';")
             conn.commit()
             #---------------UPDATE Error Status---------------#
    
    

    좋은 웹페이지 즐겨찾기