Qué es y como crear ETL en AWS Glue Parte 3
에스크리비엔도 엘 코디고
Empezaremos por incluir las siguienteslibrerias de Glue.
JsonOptions nos permitirá especificar los paths dónde queremos que se cree nuestro archivo final.
DynamicFrame nos permitirá crear un frame de datos de tipo spark
GlueContext nos permitirá ejecutar nuestro job ETL en el entorno serverless
GlueArgParser nos permitirá leer las variables de sysArgs que enviemos a nuestro job
import com.amazonaws.services.glue.util.JsonOptions
import com.amazonaws.services.glue.{DynamicFrame, GlueContext}
import com.amazonaws.services.glue.util.GlueArgParser
Definiremos nuestra GlueApp y nuestro main donde se realizará la primera ejecución de nuestro código
object GlueApp {
def main(sysArgs: Array[String]) {
...
}
}
Ahora declararemos nuestras 변수
GlueContext는 Amazon S3 버킷의 DynamicFrame, AWS, JDBC 등의 데이터 카탈로그에 대해 자세히 설명합니다.
Característica DataSource y DataSink que, su vez, se pueden usar para leer y escribir objetos DynamicFrame.
val glueContext: GlueContext = new GlueContext(sc)
Punto de entrada principal para la funcionalidad Spark. Un SparkContext는 Spark가 RDD 생성을 위해 사용되는 클러스터의 연결을 나타내며 클러스터에서 분산 및 변수를 축적합니다.
val sc: SparkContext = new SparkContext()
일반 식별기호 de la sesión Spark
val spark = glueContext.getSparkSession
object GlueApp {
val glueContext: GlueContext = new GlueContext(sc)
val sc: SparkContext = new SparkContext()
val spark = glueContext.getSparkSession
def main(sysArgs: Array[String]) {
...
}
}
Si deseamos leer una variable de ambiente que hemos enviado en los job parameters lo podemos leer de la siguiente 형식:
/* Lee el valor del job parameter enviado ejemplo: --env (key) ci (value)
el valor lo leerá como ci */
val args = GlueArgParser.getResolvedOptions(sysArgs, Array("env"))
// ejemplo
// val table = s"${args("env")}_transactions" se traduce como ci_transactions
Dentro de nuestro main comenzaremos por declarar nuestra base de datos junto a nuestras tablas, esto nos permitirá transformar o ejecutar Consultas.
// Catálogo de datos: bases de datos y tablas
val dbName = s"db-kushki-ejemplo"
val tblCsv = s"transacciones" //El nombre de la tabla con la ubicación del S3
val tblDynamo = s"transactions" //El nombre de la tabla con la ubicación de dynamo
Ahora declararemos nuestro 출력 디렉토리(la carpeta final donde se guardará nuestro archivo generado)
// Directorio final donde se guardará nuestro archivo dentro de un bucket S3
val baseOutputDir = s"s3://${args("env")}-trx-ejemplo/"
val transactionDir= s"$baseOutputDir/transaction/"
Apache Spark에서 DataFrame으로 추상화하는 원칙은 SparkSQL에서 Ry 및 Pandas로 DataFrame을 구성하는 것과 유사합니다. Un elemento DataFrame은 SQL 작업(선택, 프로젝트, 집계)과 유사합니다. En este caso de nuestro cátalogo de datos crearemos un DynamicFrame de cada tabla
// Read data into a dynamic frame
val trx_dyn: DynamicFrame = glueContext.getCatalogSource(database = dbName, tableName = tblDYNAMO ).getDynamicFrame()
val trx_csv: DynamicFrame = glueContext.getCatalogSource(database = dbName, tableName = tblCSV ).getDynamicFrame()
ApplyMapping 대 ResolveChoice
ApplyMapping: DynamicFrame을 구체적으로 선언하기 위해 맵에 적용합니다.
ResolveChoice: 해결 프로그램에 대한 정보는 DynamicFrame 요소에 대해 모호한 정보를 제공합니다.
적용 매핑
ResolveChoice
Tipos de datos
Es incompatible si un tipo de dato es ambiguo
Se define un solo tipo de dato
매핑
El Dataframe devuelve solo lo que se mapea
Devuelve todos los campos incluyendo al campo que se le realizó el casting
// ApplyMapping
val trx_dyn_mapping= trx_dyn.applyMapping(mappings = Seq(("id", "string", "id", "string"),("cliente", "string", "cliente", "string"),("estado", "string", "estado", "string"),("monto", "bigint", "monto", "double")), caseSensitive = false, transformationContext = "trx_dyn_mapping")
// ResolveChoice
val trx_dyn_resolve= trx_dyn.resolveChoice(specs = Seq(("monto", "cast:double")))
En nuestro ejemplo es necesario resolver el problema de los tipos de datos ambiguos debido a que en nuestro archivo csv se presenta datos de tipo bigint, y en nuestra tabla de dynamo se presenta datos de tipo Number, ambos tipos de datos deben ser del mismo tipo por lo que se necesita aplicar resolveChoice, en este caso applyMapping nos devolverá un problema debido a que la columna monto devolverá un struct de los diferentes tipos de dato.
val trx_dyn_resolve= trx_dyn.resolveChoice(specs = Seq(("monto", "cast:double")))
val trx_csv_resolve= trx_csv.resolveChoice(specs = Seq(("monto", "cast:double")))
En la siguiente sección de código procederemos a crear nuestra pseudo-tabla donde ejecutaremos sentencias SQL es Importante darle un nombre simple pero distintivo
// Spark SQL on a Spark dataframe
val dynDf = trx_dyn_resolve.toDF()
dynDf.createOrReplaceTempView("dynamoTable")
val csvDf = trx_csv_resolve.toDF()
csvDf.createOrReplaceTempView("csvTable")
A continuación realizaremos nuestra sentencia SQL con cualquier lógica de negocio que necesitemos, para nuestro ejemplo realizaremos una sentencia simple en la cual obtenga todos los registros que hagan un match
// SQL Query
val dynSqlDf = spark.sql("SELECT T1.id,T1.monto,T1.cliente,T1.estado FROM dynamoTable T1 LEFT JOIN csvTable T2 ON (T1.id=T2.id) WHERE T2.idIS NOT NULL AND (T1.monto=T2.monto AND T1.cliente=T2.cliente AND T1.estado = T2.estado)")
El runtime por detrás de AWS Glue ejecuta un proceso de ApacheSpark por lo que los DynamicFrame que retornemos se crearán en multi partes, por lo que utilizaremos coalesce(1) para juntarlos en uno solo, sin embargo esto puede ocasionar errores en grandes cantidades de datos retornados.
//Compact al run-part files into one
val dynFile = DynamicFrame(dynSqlDf, glueContext).withName("dyn_dyf").coalesce(1)
최종 절차는 S3의 양동이에 있는 경로에 대한 구체적인 결과를 보호하는 새로운 절차입니다.
// Save file into S3
glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions(Map("path" -> transactionDir)), format = "csv").writeDynamicFrame(dynFile)
소스 코드
El script completeto lo puedes encontrar aquí:
Github
Espero este tutorial te haya sido de ayuda !
Reference
이 문제에 관하여(Qué es y como crear ETL en AWS Glue Parte 3), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/davidshaek/que-es-y-como-crear-etl-en-aws-glue-parte-3-4n6n텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)