변경 데이터 캡처로 업스트림 데이터 변경 처리
36335 단어 databasedatapipelinedataengineering
+---+-----+
| id| name|
+---+-----+
|id1|Alice|
|id2| Bob|
+---+-----+
한 달 후 Alice에서 Carol로 고객 이름이 변경되었음을 깨닫고 지난 한 달 동안 잘못된 데이터를 사용했습니다.이런 데이터가 정확하지 않으면 우리의 데이터 분석과 기계 학습 모델에 영향을 줄 수 있다.그렇다면, 우리는 어떻게 그것을 검측하고, 어떻게 자동화를 실현합니까?다행히도, 충분한 정보가 있으면, 기존의 파이프라인을 너무 복잡하게 하지 않고 데이터를 쉽게 업데이트할 수 있다.우리는 두 가지 구성 부분이 필요하다.
이벤트 생성기 변경 - 생성된 데이터의 생성/업데이트/삭제 이벤트
이벤트 파서 변경 - 기존 데이터에 변경 사항 적용
변화 이벤트 포착 모델
클래스를 만들어 봅시다. 변경 사항을 포착할 수 있도록 사용할 수 있습니다.
case class ChangeEvent(
changeType: String, // type of change - INSERT, UPDATE or DELETE
timestamp: String, // when this change happened
columnNames: Seq[String], // names of columns (Mandatory for INSERT / UPDATE)
columnValues: Seq[String], // values of columns (Mandatory for INSERT / UPDATE)
oldKeyNames: Seq[String], // names of old key columns (Mandatory for UPDATE / DELETE)
oldKeyValues: Seq[String] // values of old key columns (Mandatory for UPDATE / DELETE)
)
대다수의 속성은 모두 자명하지 않다.oldKeyNames
와 oldKeyValues
는 UPDATE
/DELETE
조회에 사용된 오래된 데이터의 키/값을 포함한다.만약 열 데이터 유형에 따라 서로 다른 변경 사항을 적용해야 한다면, 우리는 더 많은 속성을 사용하여 모델을 풍부하게 할 수 있다. 예를 들어 columnTypes
와 oldKeyTypes
.이를 흔히 변경 데이터 캡처(Change data capture, CDC)라고 하는데, PostgreSQL, MySQL, MongoDB를 포함한 많은 데이터베이스가 이를 지원한다.사실 우리의
ChangeEvent
클래스는 wal2json의 간소화 출력이고 후자는 논리 복제에 사용되는 PostgreSQL 플러그인이다.이로움과 폐단
CDC를 사용하면 다음과 같은 두 가지 주요 이점이 있습니다.
문제
지금, 이 건의를 들은 후에, 우리는 아마도 자신에게 몇 가지 문제를 묻고 싶을 것이다.
oldKeyNames
나 oldKeyValues
의 개념이 없으면 어떻게 해야 합니까?INSERT
s에만 흥미를 가지고 UPDATE
s 또는 DELETE
s)이 경우 주 키로 UUID 또는 시퀀스 열을 추가하는 것이 좋습니다.주요한 관건적인 변경은 정확하게 설계된 데이터 모델에서 거의 일어나지 않지만 불가피한 경우 변경된 데이터를 새로운 데이터 모델로 보고 데이터의 스냅샷을 가져오는 것이 좋다.변경 이벤트를 기존 데이터에 적용할 때, 나머지 문제는 코드에서 해결할 수 있지만, 본고는 이러한 문제를 토론하지 않을 것입니다.
이제 예를 들어 그것이 어떻게 일을 하는지 봅시다.나는 이 예에서
Apache Spark 2.4
를 사용하지만 같은 논리도 다른 구조에서 다시 실현할 수 있다.기존 데이터 에뮬레이션
우선, 우리는 데이터 호수에 이미 존재하는 일부 데이터를 모의할 것이다.
timestamp
열은 이전에 데이터lake에 기록된 데이터에 나타납니다. 이 열을 기반으로 변경 이벤트를 적용해야 하기 때문입니다.import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.expressions.Window
val schema = StructType(Seq("timestamp", "id", "name").map(StructField(_, StringType)))
val datalakeDf = spark.createDataFrame(spark.sparkContext.parallelize(Seq(
Row("0", "id1", "Alice"),
Row("0", "id2", "Bob")
)), schema)
+---------+---+-----+
|timestamp| id| name|
+---------+---+-----+
| 0|id1|Alice|
| 0|id2| Bob|
+---------+---+-----+
이벤트 생성기 변경
다음 단계는 우리가
Change Event Generator
생성하고자 하는 데이터의 변경 이벤트를 채우는 것입니다.이 생성기의 실현은 원본 코드가 특정한 것이기 때문에 우리는 본문에서 이 문제를 토론하지 않을 것이다.oldKeyNames
및 oldKeyValues
는 insert
에 대해 없습니다.마찬가지로 columnNames
와columnValues
는 필요 없다.val COLUMN_NAMES = Seq("id", "name") // column names of the data
val OLD_KEY_NAMES = Seq("id") // column names of the primary key
val changeEventDf = spark.createDataFrame(Seq(
ChangeEvent("update", "1", COLUMN_NAMES, Seq("id1", "Angela"), OLD_KEY_NAMES, Seq("id1")),
ChangeEvent("delete", "2", null, null, OLD_KEY_NAMES, Seq("id2")),
ChangeEvent("insert", "3", COLUMN_NAMES, Seq("id2", "Carol"), null, null)
))
+----------+---------+-----------+-------------+-----------+------------+
|changeType|timestamp|columnNames| columnValues|oldKeyNames|oldKeyValues|
+----------+---------+-----------+-------------+-----------+------------+
| update| 1| [id, name]|[id1, Angela]| [id]| [id1]|
| delete| 2| null| null| [id]| [id2]|
| insert| 3| [id, name]| [id2, Carol]| null| null|
+----------+---------+-----------+-------------+-----------+------------+
delete
열과 columnNames
열을 나누면 기존 데이터와 유사한 데이터 프레임워크가 생길 수 있습니다.그러나 우리는 여전히 그것을 사용해야 하기 때문에 columnValues
와 oldKeyNames
를 보류할 것이다.val splitChangeEventDf = COLUMN_NAMES.zipWithIndex.foldLeft(changeEventDf) {
(df, column) => df.withColumn(column._1, $"columnValues"(column._2))
}.drop("columnNames", "columnValues")
+----------+---------+-----------+------------+----+------+
|changeType|timestamp|oldKeyNames|oldKeyValues| id| name|
+----------+---------+-----------+------------+----+------+
| update| 1| [id]| [id1]| id1|Angela|
| delete| 2| [id]| [id2]|null| null|
| insert| 3| null| null| id2| Carol|
+----------+---------+-----------+------------+----+------+
이러한 변화 사건에 따라 우리는 다음과 같은 순서에 따라 이러한 변화를 응용해야 한다.oldKeyValues
의name
를 id1
에서 Alice
로 업데이트Angela
id2
=id2
재삽입name
이벤트 파서 변경
두 번째 구성 요소를 만들 때가 되었습니다. 이 구성 요소는
Carol
에서 생성된 변경 이벤트를 적용합니다.먼저 두 개의 지원 함수를 작성합니다.Change Event Generator
- 데이터 프레임을 연합하여 패턴 차이를 고려한다. 예를 들어 서로 다른 열의 순서나 일치하지 않는 열, 예를 들어 df1은 두 열이 있는데 그것이 바로cola와colB이다. 그러나 df2는cola만 있다. def unionWithSchema(dataFrames: DataFrame *): Option[DataFrame] = {
if(dataFrames.isEmpty) {
return None
}
val spark = dataFrames.head.sparkSession
val distinctSchemas = dataFrames.map(_.schema.toList).distinct
val unionDf = if(distinctSchemas.size == 1) {
dataFrames.tail.foldLeft(dataFrames.head) {
(df1, df2) => df1.union(df2)
}
} else {
val allSchemas = distinctSchemas.flatten.distinct.sortBy(schema => schema.name)
val schemaWithAllColumns = StructType(allSchemas)
val emptyDataFrame = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schemaWithAllColumns)
val orderedColumns = emptyDataFrame.columns.map(col)
dataFrames.foldLeft(emptyDataFrame) {
(df1, df2) => {
val missingColumns = allSchemas diff df2.schema
val unionSafeDf = missingColumns.foldLeft(df2) {
(df, missingField) => df.withColumn(missingField.name, lit(null).cast(missingField.dataType))
}
df1.union(unionSafeDf.select(orderedColumns: _*))
}
}
}
Some(unionDf)
}
unionWithSchema
- 변경 이벤트를 시간순으로 적용합니다. def applyChangeEventsByTimestamp(dataFrame: DataFrame, primaryKeyArr: Array[String]): DataFrame = {
val partitionCols = if(primaryKeyArr.isEmpty) {
dataFrame.columns.map(col)
} else {
primaryKeyArr.map(colName => col(s"old_$colName"))
}
val window = Window.partitionBy(partitionCols: _*).orderBy(desc("timestamp"))
var dfInitial = dataFrame
var iteration = 0
var done = false
// Potentially expensive. Monitor logs and improve if required.
while (!done) {
val dfBefore = if (iteration == 0) {
dfInitial
} else {
primaryKeyArr.foldLeft(dfInitial) {
// Update the old keys for UPDATE ops
(df, colName) => df.withColumn(s"old_$colName",
when($"changeType" === "update", col(colName)).otherwise(col(s"old_$colName")))
}
}
val dfAfter = dfBefore.withColumn("rank", row_number().over(window))
.filter(col("rank") === 1)
.drop("rank")
done = dfAfter.count == dfBefore.count
dfInitial = dfAfter
iteration = iteration + 1
}
dfInitial
}
변경 이벤트 해결
우선, 우리는 가장 흔히 볼 수 있는 장면인 신기록 추가를 처리할 것이다.이때 우리는 사용할 필요가 없다
applyChangeEventsByTimestamp
.timestamp
변경 이벤트가 아직 적용되지 않았기 때문에id2
두 개의 기록이 있습니다.val insertDf = splitChangeEventDf.filter($"changeType" === "insert").drop("oldKeyNames", "oldKeyValues")
val datalakeAndInsertDf = unionWithSchema(insertDf, datalakeDf).get
+----------+---+-----+---------+
|changeType| id| name|timestamp|
+----------+---+-----+---------+
| insert|id2|Carol| 3|
| null|id1|Alice| 0|
| null|id2| Bob| 0|
+----------+---+-----+---------+
이제 기록 업데이트와 삭제를 처리합시다.우리의 예시에서 유일한 키 그룹만 있기 때문에 왼쪽 접힌 코드는 한 번만 실행됩니다.val updateDeleteDf = splitChangeEventDf.filter($"changeType" === "update" || $"changeType" === "delete")
val distinctOldKeyNames = updateDeleteDf.select("oldKeyNames").distinct.collect.flatMap(_.getSeq[String](0))
// Ideally there should only be one but primary key can be changed in some cases
val updateDeleteExpandedDfs = distinctOldKeyNames.foldLeft(datalakeAndInsertDf)(
(datalakeAndInsertDf, primaryKeyArr) => {
// Step 1: For INSERT / Existing data, create new pkey columns (using existing values)
val datalakeAndInsertCeDf = primaryKeyArr.foldLeft(datalakeAndInsertDf) {
(df, colName) => df.withColumn(s"old_$colName", col(colName))
}
// Step 2: For UPDATE / DELETE, split the old keys array column
val updateDeleteCeDf = primaryKeyArr.zipWithIndex.foldLeft(updateDeleteDf) {
(df, pKey) => df.withColumn(s"old_${pKey._1}", $"oldKeyValues"(pKey._2))
}
.filter($"oldKeyNames" === primaryKeyArr)
.drop("oldKeyNames", "oldKeyValues")
// Step 3: Union all the data
val initialDf = unionWithSchema(datalakeAndInsertCeDf, updateDeleteCeDf).get.cache()
// Step 4: Resolve the change events chronologically
val resolvedDf = applyChangeEventsByTimestamp(initialDf, primaryKeyArr)
// Step 5: Drop DELETE records and unnecessary columns
resolvedDf
.filter($"changeType" =!= "delete" || $"changeType".isNull)
.drop(primaryKeyArr.map(colName => s"old_$colName"): _*)
}
}
// Step 6: Remove unwanted columns and get rid of duplicate rows
return updateDeleteExpandedDfs.drop("changeType").distinct()
각 단계를 완료하고 출력을 시각화합니다.기존 키에 대한 정보를 포함하도록
delete
를 채웁니다.이 예에서 그것은 datalakeAndInsertDf
일 것이다.scala> datalakeAndInsertCeDf.show
+----------+---+-----+---------+------+
|changeType| id| name|timestamp|old_id|
+----------+---+-----+---------+------+
| insert|id2|Carol| 3| id2|
| null|id1|Alice| 0| id1|
| null|id2| Bob| 0| id2|
+----------+---+-----+---------+------+
충실
old_id
은 기존 키에 대한 정보를 포함한다(본 예는updateDeleteDf
.scala> updateDeleteCeDf.show
+----------+---------+----+------+------+
|changeType|timestamp| id| name|old_id|
+----------+---------+----+------+------+
| update| 1| id1|Angela| id1|
| delete| 2|null| null| id2|
+----------+---------+----+------+------+
#1과 #2의 데이터 프레임을 결합합니다.
scala> initialDf.show
+----------+----+------+------+---------+
|changeType| id| name|old_id|timestamp|
+----------+----+------+------+---------+
| insert| id2| Carol| id2| 3|
| null| id1| Alice| id1| 0|
| null| id2| Bob| id2| 0|
| update| id1|Angela| id1| 1|
| delete|null| null| id2| 2|
+----------+----+------+------+---------+
old_id
함수에 의해 정의된 변경 이벤트를 시간 순서대로 적용합니다.이 때 키마다 하나의 기록만 있어야 한다. (다음 단계에서 삭제할 applyChangeEventsByTimestamp()
기록은 제외된다.scala> resolvedDf.show
+----------+---+------+------+---------+
|changeType| id| name|old_id|timestamp|
+----------+---+------+------+---------+
| update|id1|Angela| id1| 1|
| insert|id2| Carol| id2| 3|
+----------+---+------+------+---------+
변경 형식이
timestamp
인 기록을 삭제합니다. 지속되지 않기 때문입니다.우리의 예에서 delete
기록은 delete
에 덮어씌워졌기 때문에 삭제할 내용이 없습니다.delete
도 삭제되었습니다. 왜냐하면 우리는 더 이상 이런 정보를 필요로 하지 않기 때문입니다.+----------+---+------+---------+
|changeType| id| name|timestamp|
+----------+---+------+---------+
| update|id1|Angela| 1|
| insert|id2| Carol| 3|
+----------+---+------+---------+
만일 더 많은 키 조합이 있다면 1-5단계를 반복하지만 이 예에서 우리는 하나
insert
만 있기 때문에 해결 방안의 끝이다.우리는 old_id
를 삭제하고 중복 기록을 삭제하기만 하면 된다.+---+------+---------+
| id| name|timestamp|
+---+------+---------+
|id2| Carol| 3|
|id1|Angela| 1|
+---+------+---------+
Reference
이 문제에 관하여(변경 데이터 캡처로 업스트림 데이터 변경 처리), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/waiyan1612/handling-upstream-data-changes-via-change-data-capture-1aog텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)