변경 데이터 캡처로 업스트림 데이터 변경 처리

데이터 파이프라인을 관리한 사람은 상위 데이터가 어떻게 변화하고 전체 파이프라인에 어떻게 영향을 미칠지 알 수 있다.우리의 데이터 lake에 다음과 같은 고객 데이터가 있다고 가정하십시오.
+---+-----+
| id| name|
+---+-----+
|id1|Alice|
|id2|  Bob|
+---+-----+
한 달 후 Alice에서 Carol로 고객 이름이 변경되었음을 깨닫고 지난 한 달 동안 잘못된 데이터를 사용했습니다.이런 데이터가 정확하지 않으면 우리의 데이터 분석과 기계 학습 모델에 영향을 줄 수 있다.그렇다면, 우리는 어떻게 그것을 검측하고, 어떻게 자동화를 실현합니까?
다행히도, 충분한 정보가 있으면, 기존의 파이프라인을 너무 복잡하게 하지 않고 데이터를 쉽게 업데이트할 수 있다.우리는 두 가지 구성 부분이 필요하다.

  • 이벤트 생성기 변경 - 생성된 데이터의 생성/업데이트/삭제 이벤트

  • 이벤트 파서 변경 - 기존 데이터에 변경 사항 적용
  • Apache Hudi나 Delta Lake를 사용하여 최전방에서 생활할 수 있다면 변경 사건 해상도에 대한 수요는 논란이 될 것이다.Hudi/Delta Lake는 변경 사항을 해결하고 데이터의 단일 복사본을 보존하는 대신 저희 데이터의 매번 수정(Hudi의 타임라인과 Delta Lake의 시간 여행)을 보존할 수 있습니다.이것은 의심할 여지없이 더욱 강력한 기능이지만, 어떤 경우, 이러한 구성 요소들은 기존의 파이프에 간단하게 설치되지 않는다. 특히, 그들은 일반적인 꽃무늬 바닥이나 avro 형식이 아니라, 자신의hudi나delta 형식으로 데이터를 저장해야 하기 때문이다.이것은 우리의 데이터 목록, 데이터 모니터링, 데이터 시각화는 현재hudi나delta를 알아야 한다는 것을 의미한다. 이것은 아마도 1년 내에 준비가 되지 않았을 것이다. 특히 이러한 서비스가 내부 개발이 아니라면.

    변화 이벤트 포착 모델
    클래스를 만들어 봅시다. 변경 사항을 포착할 수 있도록 사용할 수 있습니다.
    case class ChangeEvent(
      changeType: String,           // type of change - INSERT, UPDATE or DELETE
      timestamp: String,            // when this change happened 
      columnNames: Seq[String],     // names of columns (Mandatory for INSERT / UPDATE)
      columnValues: Seq[String],    // values of columns (Mandatory for INSERT / UPDATE)
      oldKeyNames: Seq[String],     // names of old key columns (Mandatory for UPDATE / DELETE)
      oldKeyValues: Seq[String]     // values of old key columns (Mandatory for UPDATE / DELETE)
    )
    
    대다수의 속성은 모두 자명하지 않다.oldKeyNamesoldKeyValuesUPDATE/DELETE 조회에 사용된 오래된 데이터의 키/값을 포함한다.만약 열 데이터 유형에 따라 서로 다른 변경 사항을 적용해야 한다면, 우리는 더 많은 속성을 사용하여 모델을 풍부하게 할 수 있다. 예를 들어 columnTypesoldKeyTypes.
    이를 흔히 변경 데이터 캡처(Change data capture, CDC)라고 하는데, PostgreSQL, MySQL, MongoDB를 포함한 많은 데이터베이스가 이를 지원한다.사실 우리의 ChangeEvent 클래스는 wal2json의 간소화 출력이고 후자는 논리 복제에 사용되는 PostgreSQL 플러그인이다.

    이로움과 폐단
    CDC를 사용하면 다음과 같은 두 가지 주요 이점이 있습니다.
  • 비록 이 기능의 주요 목적은 데이터베이스 복사본을 만들고 데이터를 이전하는 것이지만 데이터 파이프라인에서의 기능은 매우 강력하다. 왜냐하면 변경은 거의 즉시 파이프라인에서 할 수 있기 때문에 실시간 파이프라인을 가능하게 한다.
  • 상류는 여러 곳에서 통지 논리를 실현할 필요가 없다.예를 들어 영화 평론 사이트를 생각해 보자.사용자가 자신의 의견을 작성/업데이트/삭제할 때 파이프라인에 통지해야 합니다. 이것은 3개의 다른 REST API 통지 파이프라인에 걸쳐 통지해야 한다는 것을 의미합니다.개발자가 알림 파이프를 잊어버리면 아무도 의식하지 못하는 상황에서 데이터를 잃어버린다.
  • 이곳의 작은 단점은 복제로 인해 원본 데이터베이스의 성능은 약간 떨어지지만 영향은 데이터베이스의 유형에 따라 다르다는 것이다.

    문제
    지금, 이 건의를 들은 후에, 우리는 아마도 자신에게 몇 가지 문제를 묻고 싶을 것이다.
  • 데이터에 메인 키(즉 nooldKeyNamesoldKeyValues의 개념이 없으면 어떻게 해야 합니까?
  • 메인 키가 바뀌면 어떻게 합니까?
  • 열을 추가하거나 이름을 바꾸면 어떻게 됩니까?
  • 기둥 하나를 빼면 어떡하지?
  • 열이 데이터 형식을 바꾸면 어떻게 합니까?
  • 불행하게도, 만약 우리의 데이터가 메인 키가 없다면, CDC는 근본적으로 일을 할 수 없을 것이다. (우리가 INSERT s에만 흥미를 가지고 UPDATE s 또는 DELETE s)이 경우 주 키로 UUID 또는 시퀀스 열을 추가하는 것이 좋습니다.주요한 관건적인 변경은 정확하게 설계된 데이터 모델에서 거의 일어나지 않지만 불가피한 경우 변경된 데이터를 새로운 데이터 모델로 보고 데이터의 스냅샷을 가져오는 것이 좋다.
    변경 이벤트를 기존 데이터에 적용할 때, 나머지 문제는 코드에서 해결할 수 있지만, 본고는 이러한 문제를 토론하지 않을 것입니다.
    이제 예를 들어 그것이 어떻게 일을 하는지 봅시다.나는 이 예에서 Apache Spark 2.4를 사용하지만 같은 논리도 다른 구조에서 다시 실현할 수 있다.

    기존 데이터 에뮬레이션
    우선, 우리는 데이터 호수에 이미 존재하는 일부 데이터를 모의할 것이다.timestamp열은 이전에 데이터lake에 기록된 데이터에 나타납니다. 이 열을 기반으로 변경 이벤트를 적용해야 하기 때문입니다.
    import org.apache.spark.sql.{DataFrame, Row}
    import org.apache.spark.sql.types.{StringType, StructField, StructType}
    import org.apache.spark.sql.expressions.Window
    
    val schema = StructType(Seq("timestamp", "id", "name").map(StructField(_, StringType)))
    val datalakeDf = spark.createDataFrame(spark.sparkContext.parallelize(Seq(
        Row("0", "id1", "Alice"),
        Row("0", "id2", "Bob")
    )), schema)
    
    +---------+---+-----+
    |timestamp| id| name|
    +---------+---+-----+
    |        0|id1|Alice|
    |        0|id2|  Bob|
    +---------+---+-----+
    

    이벤트 생성기 변경
    다음 단계는 우리가 Change Event Generator 생성하고자 하는 데이터의 변경 이벤트를 채우는 것입니다.이 생성기의 실현은 원본 코드가 특정한 것이기 때문에 우리는 본문에서 이 문제를 토론하지 않을 것이다.oldKeyNamesoldKeyValuesinsert에 대해 없습니다.마찬가지로 columnNamescolumnValues는 필요 없다.
    val COLUMN_NAMES = Seq("id", "name")  // column names of the data
    val OLD_KEY_NAMES = Seq("id")         // column names of the primary key
    
    val changeEventDf = spark.createDataFrame(Seq(
        ChangeEvent("update", "1", COLUMN_NAMES, Seq("id1", "Angela"), OLD_KEY_NAMES, Seq("id1")),
        ChangeEvent("delete", "2", null, null, OLD_KEY_NAMES, Seq("id2")),
        ChangeEvent("insert", "3", COLUMN_NAMES, Seq("id2", "Carol"), null, null)
    ))
    
    +----------+---------+-----------+-------------+-----------+------------+
    |changeType|timestamp|columnNames| columnValues|oldKeyNames|oldKeyValues|
    +----------+---------+-----------+-------------+-----------+------------+
    |    update|        1| [id, name]|[id1, Angela]|       [id]|       [id1]|
    |    delete|        2|       null|         null|       [id]|       [id2]|
    |    insert|        3| [id, name]| [id2, Carol]|       null|        null|
    +----------+---------+-----------+-------------+-----------+------------+
    
    delete열과 columnNames열을 나누면 기존 데이터와 유사한 데이터 프레임워크가 생길 수 있습니다.그러나 우리는 여전히 그것을 사용해야 하기 때문에 columnValuesoldKeyNames를 보류할 것이다.
    val splitChangeEventDf = COLUMN_NAMES.zipWithIndex.foldLeft(changeEventDf) {
        (df, column) => df.withColumn(column._1, $"columnValues"(column._2))
    }.drop("columnNames", "columnValues")
    
    +----------+---------+-----------+------------+----+------+
    |changeType|timestamp|oldKeyNames|oldKeyValues|  id|  name|
    +----------+---------+-----------+------------+----+------+
    |    update|        1|       [id]|       [id1]| id1|Angela|
    |    delete|        2|       [id]|       [id2]|null|  null|
    |    insert|        3|       null|        null| id2| Carol|
    +----------+---------+-----------+------------+----+------+
    
    이러한 변화 사건에 따라 우리는 다음과 같은 순서에 따라 이러한 변화를 응용해야 한다.
  • oldKeyValuesnameid1에서 Alice로 업데이트
  • 삭제 Angela
  • id2=id2재삽입name

  • 이벤트 파서 변경
    두 번째 구성 요소를 만들 때가 되었습니다. 이 구성 요소는 Carol 에서 생성된 변경 이벤트를 적용합니다.먼저 두 개의 지원 함수를 작성합니다.Change Event Generator - 데이터 프레임을 연합하여 패턴 차이를 고려한다. 예를 들어 서로 다른 열의 순서나 일치하지 않는 열, 예를 들어 df1은 두 열이 있는데 그것이 바로cola와colB이다. 그러나 df2는cola만 있다.
      def unionWithSchema(dataFrames: DataFrame *): Option[DataFrame] = {
    
        if(dataFrames.isEmpty) {
          return None
        }
        val spark = dataFrames.head.sparkSession
        val distinctSchemas = dataFrames.map(_.schema.toList).distinct
        val unionDf = if(distinctSchemas.size == 1) {
          dataFrames.tail.foldLeft(dataFrames.head) {
            (df1, df2) => df1.union(df2)
          }
        } else {
          val allSchemas = distinctSchemas.flatten.distinct.sortBy(schema => schema.name)
          val schemaWithAllColumns = StructType(allSchemas)
          val emptyDataFrame = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schemaWithAllColumns)
          val orderedColumns = emptyDataFrame.columns.map(col)
    
          dataFrames.foldLeft(emptyDataFrame) {
            (df1, df2) => {
              val missingColumns = allSchemas diff df2.schema
              val unionSafeDf = missingColumns.foldLeft(df2) {
                (df, missingField) => df.withColumn(missingField.name, lit(null).cast(missingField.dataType))
              }
              df1.union(unionSafeDf.select(orderedColumns: _*))
            }
          }
        }
        Some(unionDf)
    }
    
    unionWithSchema - 변경 이벤트를 시간순으로 적용합니다.
      def applyChangeEventsByTimestamp(dataFrame: DataFrame, primaryKeyArr: Array[String]): DataFrame = {
    
        val partitionCols = if(primaryKeyArr.isEmpty) {
          dataFrame.columns.map(col)
        } else {
          primaryKeyArr.map(colName => col(s"old_$colName"))
        }
        val window = Window.partitionBy(partitionCols: _*).orderBy(desc("timestamp"))
    
        var dfInitial = dataFrame
        var iteration = 0
        var done = false
    
        // Potentially expensive. Monitor logs and improve if required.
        while (!done) {
          val dfBefore = if (iteration == 0) {
            dfInitial
          } else {
            primaryKeyArr.foldLeft(dfInitial) {
              // Update the old keys for UPDATE ops
              (df, colName) => df.withColumn(s"old_$colName",
                when($"changeType" === "update", col(colName)).otherwise(col(s"old_$colName")))
            }
          }
    
          val dfAfter = dfBefore.withColumn("rank", row_number().over(window))
            .filter(col("rank") === 1)
            .drop("rank")
    
          done = dfAfter.count == dfBefore.count
          dfInitial = dfAfter
          iteration = iteration + 1
        }
        dfInitial
    }
    

    변경 이벤트 해결
    우선, 우리는 가장 흔히 볼 수 있는 장면인 신기록 추가를 처리할 것이다.이때 우리는 사용할 필요가 없다applyChangeEventsByTimestamp.timestamp 변경 이벤트가 아직 적용되지 않았기 때문에id2 두 개의 기록이 있습니다.
    val insertDf = splitChangeEventDf.filter($"changeType" === "insert").drop("oldKeyNames", "oldKeyValues")
    val datalakeAndInsertDf = unionWithSchema(insertDf, datalakeDf).get
    
    +----------+---+-----+---------+
    |changeType| id| name|timestamp|
    +----------+---+-----+---------+
    |    insert|id2|Carol|        3|
    |      null|id1|Alice|        0|
    |      null|id2|  Bob|        0|
    +----------+---+-----+---------+
    
    이제 기록 업데이트와 삭제를 처리합시다.우리의 예시에서 유일한 키 그룹만 있기 때문에 왼쪽 접힌 코드는 한 번만 실행됩니다.
    val updateDeleteDf = splitChangeEventDf.filter($"changeType" === "update" || $"changeType" === "delete")
    val distinctOldKeyNames = updateDeleteDf.select("oldKeyNames").distinct.collect.flatMap(_.getSeq[String](0))
    
    // Ideally there should only be one but primary key can be changed in some cases
    val updateDeleteExpandedDfs = distinctOldKeyNames.foldLeft(datalakeAndInsertDf)(
    
        (datalakeAndInsertDf, primaryKeyArr) => {
    
            // Step 1: For INSERT / Existing data, create new pkey columns (using existing values)
            val datalakeAndInsertCeDf = primaryKeyArr.foldLeft(datalakeAndInsertDf) {
                (df, colName) => df.withColumn(s"old_$colName", col(colName))
            }
    
            // Step 2: For UPDATE / DELETE, split the old keys array column
            val updateDeleteCeDf = primaryKeyArr.zipWithIndex.foldLeft(updateDeleteDf) {
                (df, pKey) => df.withColumn(s"old_${pKey._1}", $"oldKeyValues"(pKey._2))
            }
            .filter($"oldKeyNames" === primaryKeyArr)
            .drop("oldKeyNames", "oldKeyValues")
    
            // Step 3: Union all the data
            val initialDf = unionWithSchema(datalakeAndInsertCeDf, updateDeleteCeDf).get.cache()
    
            // Step 4: Resolve the change events chronologically
            val resolvedDf = applyChangeEventsByTimestamp(initialDf, primaryKeyArr)
    
            // Step 5: Drop DELETE records and unnecessary columns
            resolvedDf
                .filter($"changeType" =!= "delete" || $"changeType".isNull)
                .drop(primaryKeyArr.map(colName => s"old_$colName"):  _*)
        }
    }
    // Step 6: Remove unwanted columns and get rid of duplicate rows
    return updateDeleteExpandedDfs.drop("changeType").distinct()
    
    각 단계를 완료하고 출력을 시각화합니다.

  • 기존 키에 대한 정보를 포함하도록 delete를 채웁니다.이 예에서 그것은 datalakeAndInsertDf일 것이다.
    scala> datalakeAndInsertCeDf.show
    +----------+---+-----+---------+------+
    |changeType| id| name|timestamp|old_id|
    +----------+---+-----+---------+------+
    |    insert|id2|Carol|        3|   id2|
    |      null|id1|Alice|        0|   id1|
    |      null|id2|  Bob|        0|   id2|
    +----------+---+-----+---------+------+
    

  • 충실old_id은 기존 키에 대한 정보를 포함한다(본 예는updateDeleteDf.
    scala> updateDeleteCeDf.show
    +----------+---------+----+------+------+
    |changeType|timestamp|  id|  name|old_id|
    +----------+---------+----+------+------+
    |    update|        1| id1|Angela|   id1|
    |    delete|        2|null|  null|   id2|
    +----------+---------+----+------+------+
    

  • #1과 #2의 데이터 프레임을 결합합니다.
    scala> initialDf.show
    +----------+----+------+------+---------+
    |changeType|  id|  name|old_id|timestamp|
    +----------+----+------+------+---------+
    |    insert| id2| Carol|   id2|        3|
    |      null| id1| Alice|   id1|        0|
    |      null| id2|   Bob|   id2|        0|
    |    update| id1|Angela|   id1|        1|
    |    delete|null|  null|   id2|        2|
    +----------+----+------+------+---------+
    
  • old_id 함수에 의해 정의된 변경 이벤트를 시간 순서대로 적용합니다.이 때 키마다 하나의 기록만 있어야 한다. (다음 단계에서 삭제할 applyChangeEventsByTimestamp() 기록은 제외된다.
    scala> resolvedDf.show
    +----------+---+------+------+---------+
    |changeType| id|  name|old_id|timestamp|
    +----------+---+------+------+---------+
    |    update|id1|Angela|   id1|        1|
    |    insert|id2| Carol|   id2|        3|
    +----------+---+------+------+---------+
    

  • 변경 형식이 timestamp 인 기록을 삭제합니다. 지속되지 않기 때문입니다.우리의 예에서 delete 기록은 delete에 덮어씌워졌기 때문에 삭제할 내용이 없습니다.delete도 삭제되었습니다. 왜냐하면 우리는 더 이상 이런 정보를 필요로 하지 않기 때문입니다.
    +----------+---+------+---------+
    |changeType| id|  name|timestamp|
    +----------+---+------+---------+
    |    update|id1|Angela|        1|
    |    insert|id2| Carol|        3|
    +----------+---+------+---------+
    

  • 만일 더 많은 키 조합이 있다면 1-5단계를 반복하지만 이 예에서 우리는 하나insert만 있기 때문에 해결 방안의 끝이다.우리는 old_id를 삭제하고 중복 기록을 삭제하기만 하면 된다.
    +---+------+---------+
    | id|  name|timestamp|
    +---+------+---------+
    |id2| Carol|        3|
    |id1|Angela|        1|
    +---+------+---------+
    
  • 봐라!우리가 원하는 모든 형식으로 데이터 lake에 영구화할 수 있는 응용 변경 데이터 프레임워크가 있습니다.

    좋은 웹페이지 즐겨찾기