Hudi 사용 시 실용적인 비즈니스 문제 해결: LakeSoul은 null 필드 비재정의 의미 체계를 지원합니다.

최근 LakeSoul R&D 팀은 사용자가 Hudi를 사용하여 실용적인 비즈니스 문제를 해결하도록 도왔습니다. 다음은 요약 및 기록입니다. 비즈니스 프로세스는 업스트림 시스템이 온라인 DB 테이블에서 원본 데이터를 JSON 형식으로 추출하여 Kafka에 쓰는 것입니다. 다운스트림 시스템은 Spark를 사용하여 Kafka의 메시지를 읽습니다. 데이터는 Hudi를 사용하여 업데이트 및 집계되고 분석을 위해 다운스트림 데이터베이스로 전송됩니다.


Kafka의 일부 데이터는 원본 테이블의 일부 필드일 뿐입니다. 데이터 샘플 Kafka: {A: A1, C: C4, D: D6, E: E7} {A: A2, B: B4, E: E6} {A: A3, B: B5, C: C5, D: D5, E: E5}. 후속 데이터 업데이트에서는 업데이트 없이 누락된 필드 값 대신 최신 기록 데이터를 사용합니다.
다음 그림은 데이터 업데이트 프로세스를 단순화합니다. 원래 테이블에서 5개의 필드는 A, B, C, D 및 E입니다. 필드 A는 기본 키이고 유형은 문자열입니다. Spark는 Kafka에서 배치 데이터를 읽고 Upsert(고정 스키마의 DataFrame)에 필요한 형식으로 변환합니다. MOR(Merge on Read)은 새 테이블 내용을 읽습니다.

Hudi의 Merge on Read는 현재 이 비즈니스 프로세스를 구현하는 데 사용되며 위에서 잘못 정렬된 JSON 데이터에 대한 고정 스키마가 없으므로 지원되지 않습니다. 위의 비즈니스 프로세스에 대한 Hudi의 Merge on Read 구현은 위에서 잘못 정렬된 JSON 데이터에 대한 고정 스키마 없이는 불가능했을 것입니다. 누락된 필드가 null 값으로 채워지면 잘못된 NULL 값이 원본 콘텐츠를 덮어씁니다. Merge Into를 사용하고 쓰기 성능이 요구 사항을 충족하지 못하는 경우 쓰기 시 복사가 저하됩니다. 해결 방법은 각 데이터 완성에 대해 원본 테이블에서 변경되지 않은 데이터를 가져오는 것입니다. 그러나 이것은 사용자의 기대와 일치하지 않는 리소스 비용과 개발 작업량을 증가시킵니다.

LakeSoul은 맞춤형 MergeOperator를 지원합니다. Upsert를 수행할 때 각 필드에 사용자 정의 MergeOperator를 전달할 수 있습니다. 매개변수는 필드의 원래 값과 Upsert의 새 값입니다. 여기에서 비즈니스 요구 사항에 따라 병합 결과를 결정할 수 있습니다. UDF는 Spark의 기본 UDF와 동일합니다. Upsert를 사용하는 경우 기본 키 값을 지정해야 합니다. 따라서 여러 델타 파일은 동일한 기본 키 및 동일한 필드에 대해 다양한 값을 가질 수 있습니다. MergeOperator는 이러한 값의 병합 동작을 제어합니다. 기본 MergeOperator 구현은 다음과 같습니다.

class DefaultMergeOp[T] extends MergeOperator[T] {
  override def mergeData(input: Seq[T]): T = {
    input.last
  }
}


이 시나리오에서는 MergeOperator를 정의할 수 있습니다. 정의되지 않은 필드의 경우 MergeOperator는 여전히 null 값을 고유 마커로 채웁니다(서비스는 고유 마커가 일반 데이터와 충돌하지 않도록 보장함). MergeOperator는 병합 중에 무시되고 원래 값을 반환합니다. 이와 같이 Spark에서 JSON 데이터를 처리하고 Upsert를 실행하면 null이 무시됩니다. 원본 콘텐츠를 덮어쓰지 않아 초기 데이터 채우기 프로세스를 통해 누락된 필드 데이터를 줄이고 실행 효율성을 크게 향상시키며 코드 논리를 단순화합니다. 이 사용자 지정 MergeOperator의 코드는 다음과 같습니다.


class MergeNonNullOp[T] extends MergeOperator[T] {
  override def mergeData(input: Seq[T]): T = {
    val output=input.filter(_!=null)
    output.filter(!_.equals("null")).last
  }
}


보시다시피 MergeOperator의 간단한 사용자 정의 구현은 복잡한 비즈니스 문제를 해결합니다.

LakeSoul 팀은 빈 필드를 무시하는 MergeOperator를 LakeSoul의 기본 제공 시스템에 통합할 계획입니다. 이 유형의 MergeOperator를 기본적으로 활성화할지 여부를 제어하는 ​​글로벌 옵션을 사용하여 개발 효율성을 더욱 향상시킬 수 있습니다. Github 문제: https://github.com/meta-soul/LakeSoul/issues/30을 참조하십시오.

향후 LakeSoul은 Merge Into SQL 구문을 지원하여 Upsert 동작을 정의하고 Merge on Read를 지원하여 스트림 일괄 쓰기 업데이트의 표현을 더욱 개선할 것입니다.
LakeSoul Cloud-Native Stream Batch 올인원 표면 스토리지 프레임워크에 대한 자세한 내용은 이전 문서를 참조하십시오.

좋은 웹페이지 즐겨찾기