Pyspark를 통해 ETL을 수행하는 동적 방법
1 단계
TEST_DWH라는 이름을 가진 데이터베이스(SQL-SERVER를 사용하고 있습니다)에 두 개의 테이블을 만듭니다.
ETL의 마스터 데이터(소스 및 대상 정보)를 보관하기 위한 테이블 etl_metadata
CREATE TABLE [dbo].[etl_metadata](
[id] [int] IDENTITY(1,1) NOT NULL,
[source_type] [varchar](max) NULL,
[source_info] [text] NULL,
[destination_db] [varchar](max) NULL,
[destination_schema] [varchar](max) NULL,
[destination_table] [varchar](max) NULL,
[etl_type] [varchar](max) NULL
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
일일 ETL 진행을 위한 테이블 etl_metadaata_schedule
CREATE TABLE [dbo].[etl_metadata_schedule](
[id] [int] NULL,
[source_type] [varchar](max) NULL,
[source_info] [text] NULL,
[destination_db] [varchar](max) NULL,
[destination_schema] [varchar](max) NULL,
[destination_table] [varchar](max) NULL,
[etl_type] [varchar](max) NULL,
[status] [varchar](max) NULL,
[started_at] [datetime] NULL,
[completed_at] [datetime] NULL,
[schedule_date] [datetime] NULL
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
2 단계
이제 Pyspark를 사용하여 파이썬에서 ETL을 작성하십시오.
"""
Created on Thu Mar 17 11:06:28 2022
@author: Administrator
"""
#SPARK LIBRARIES
from pyspark.sql import SparkSession
import pyodbc
import pandas as pd
#initiate spark env
import findspark
findspark.init()
findspark.find()
#print(findspark.find())
spark = SparkSession \
.builder \
.appName("Python ETL script for TEST") \
.master("local[*]")\
.config("spark.driver.memory", '8g')\
.config("spark.sql.ansi.enabled ",True)\
.config("spark.jars", "C:\Drivers\sqljdbc42.jar") \
.getOrCreate()
source_type = ''
source_info = ''
destination_db=''
destination_schema=''
destination_table = ''
etl_type = ''
query_string = ''
##Initiatong variable for query establishhing
#- timedelta(43)
#today = (date.today())
#print("Today's date:", "select a.*,null status,null status_description ,null started_at,null completed_at,GETDATE() schedule_date from dbo.etl_metadata_schedule_staging where schedule_date = "+"'"+str(today)+"'")
#set variable to be used to connect the database
database = "TEST_DWH"
user = "user"
password = "password"
query_string="SELECT a.*,CONCAT(ISNULL(b.status,'Pending'),b.status) status,null status_description ,null started_at,null completed_at FROM (SELECT *,getdate() schedule_date FROM dbo.etl_metadata ) a LEFT JOIN [dbo].[etl_metadata_schedule] b ON a.id = b.id and CAST(b.schedule_date AS date)= CAST(getdate() AS date) where ISNULL(b.status,'A') != 'completed'"
#Read ETL Meta Data
etl_meta_data_staging = spark.read\
.format("jdbc") \
.option("url", "jdbc:sqlserver://localhost:1433;databaseName={"+database+"};") \
.option("query", query_string) \
.option("user", user) \
.option("password", password) \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.load()
#-------------------CREATE NEW SCHEDULE----------------------------#
etl_meta_data_staging.filter("status == 'Pending'").show()
#THEN READ BASE META DATA AND CREATE ONE ELSE DONT
etl_meta_data_staging.filter("status == 'Pending'").write \
.format("jdbc") \
.option("url", "jdbc:sqlserver://localhost:1433;databaseName={"+database+"};") \
.option("dbtable", "dbo.etl_metadata_schedule") \
.option("user", user) \
.option("password", password) \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.mode("append")\
.save()
#-------------------END CREATE NEW SCHEDULE----------------------------#
#--------------SQL SERVER CONNECTION TO MAINTAIN ERROR STATE-------------#
conn = pyodbc.connect("Driver={ODBC Driver 17 for SQL Server};"
"Server=localhost,1433;"
"Database="+database+";"
"UID="+user+";"
"PWD="+password+";")
cursor = conn.cursor()
#--------------END SQL SERVER CONNECTION TO MAINTAIN ERROR STATE-------------#
df_etl_meta_data_staging = etl_meta_data_staging.toPandas()
df_etl_meta_data_staging=df_etl_meta_data_staging.sort_values('id')
#---LOOP : read FROM SOURCE (ext☺ract) and write to destination.---#
for etl_id in df_etl_meta_data_staging['id']:
status = 'In Progress'
print("Starting for "+ str(etl_id))
#---------------UPDATE In Progress Status---------------#
cursor.\
execute('''UPDATE [TEST_DWH].[dbo].[etl_metadata_schedule]
SET [status]=\''''
+status+ "',[started_at]= CURRENT_TIMESTAMP where id= '"+ str(etl_id)+"';")
conn.commit()
#---------------UPDATE In Progress Status---------------#
# load meta data into variables
source_type = df_etl_meta_data_staging['source_type'][df_etl_meta_data_staging['id']==etl_id].values[0]
source_info = df_etl_meta_data_staging['source_info'][df_etl_meta_data_staging['id']==etl_id].values[0]
destination_db = df_etl_meta_data_staging['destination_db'][df_etl_meta_data_staging['id']==etl_id].values[0]
destination_schema = df_etl_meta_data_staging['destination_schema'][df_etl_meta_data_staging['id']==etl_id].values[0]
destination_table = df_etl_meta_data_staging['destination_table'][df_etl_meta_data_staging['id']==etl_id].values[0]
etl_type = df_etl_meta_data_staging['etl_type'][df_etl_meta_data_staging['id']==etl_id].values[0]
# initialize empty status for each run
status = ''
# Read data from spurce try to read otherwise through exception
#print(url_link)
#print("Reading via ", source_info)
# Read module data
try:
print("Reading via ", source_info)
# Read module data
if source_type == 'CSV':
jdbcDF = spark.read\
.format("csv") \
.option("header", "true") \
.option("quote", "\"") \
.option("escape", "\"") \
.load(source_info)
status= 'read_successful'
jdbcDF.show()
elif source_type == 'sqlserver':
jdbcDF = spark.read\
.format("jdbc") \
.option("url", "jdbc:sqlserver://localhost:1433;databaseName={"+database+"};") \
.option("query", source_info) \
.option("user", user) \
.option("password", password) \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.load()
#Try to Write Extracted data relevant to destination table
try:
jdbcDF.write \
.format("jdbc") \
.option("url", "jdbc:sqlserver://localhost:1433;databaseName={"+destination_db+"};") \
.option("dbtable", destination_schema+"."+destination_table) \
.option("user", user) \
.option("password", password) \
.option("truncate", "true") \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.mode("overwrite")\
.save()
status = 'completed'
print("Write Successful")
#---------------UPDATE Success Status---------------#
cursor.\
execute('''UPDATE [TEST_DWH].[dbo].[etl_metadata_schedule]
SET [status]=\''''
+status+ "',[completed_at]= CURRENT_TIMESTAMP where id= '"+ str(etl_id)+"';")
conn.commit()
#---------------UPDATE Success Status---------------#
#except of Write Extracted data relevant to destination table
#---------------UPDATE Success Status---------------#
except Exception as e :
print('some error in writing')
status = 'error in writing to destination db, '+str(e)
#---------------UPDATE Error Status---------------#
cursor.\
execute('''UPDATE [TEST_DWH].[dbo].[etl_metadata_schedule]
SET [status]=\''''
+status+ "',[completed_at]= CURRENT_TIMESTAMP where id= '"+ str(etl_id)+"';")
conn.commit()
#---------------UPDATE Error Status---------------#
#except of Read module data
except Exception as e :
print("some error in reading from source")
status = 'error reading source , '+str(e)
print(status)
#---------------UPDATE Error Status---------------#
cursor.\
execute('''UPDATE [TEST_DWH].[dbo].[etl_metadata_schedule]
SET [status]=\''''
+status+ "',[completed_at]= CURRENT_TIMESTAMP where id= '"+ str(etl_id)+"';")
conn.commit()
#---------------UPDATE Error Status---------------#
Reference
이 문제에 관하여(Pyspark를 통해 ETL을 수행하는 동적 방법), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/mustafasajid/dynamic-way-doing-etl-through-pyspark-2b8m텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)