CDC의 빠른 사용: Lakesoul의 새로운 데모를 통해 환경을 보다 쉽게 ​​설정할 수 있습니다.

변경 데이터 캡처(CDC)는 데이터베이스에서 데이터 변경 사항을 캡처하는 데 사용되는 데이터베이스 지향 기술로 데이터 동기화, 데이터 배포 및 데이터 수집에 적용됩니다. 전자는 오프라인 상태로 오프라인 스케줄링을 통해 질의가 가능하고, 질의를 통해 최신 데이터를 얻기 위해 다른 시스템과 테이블을 동기화해 데이터의 일관성과 실시간 성능을 보장할 수 없다. 쿼리 프로세스에서 데이터가 여러 번 변경될 수 있습니다. [Lakesoul]( https://github.com/meta-soul/LakeSoul)'s CDC 기술은 데이터 일관성 및 실시간을 보장하기 위해 소비 로그를 구현할 수 있는 로그 기반 CDC 유형에 속합니다.

며칠 전** [Lakesoul]( https://github.com/meta-soul/LakeSoul )은 GitHub에 데모를 업로드했습니다. Mysql 및 Oracle과 같은 관계형 데이터베이스의 추가, 삭제 및 변경 작업은 CDC를 통해 Lakesoul에 액세스할 수 있으며 실제 시각. 프로세스는 다음과 같습니다. Mysql->Debezium->Kafka->SparkStreaming->Lakesoul. 완전한 프레임워크를 구축한 후 시스템은 실시간으로 데이터를 추가, 삭제 및 수정하고 쿼리할 때 최신 데이터를 얻을 수 있습니다. **[업서트]( https://github.com/meta-soul/LakeSoul/wiki/03.-Usage-Doc#311-code-examples )는 사용 시 필요합니다.

아래 데모를 보거나 [Lakesoul]( https://github.com/meta-soul/LakeSoul/tree/main/examples/cdc_ingestion_debezium )에서 확인하십시오.

LakeSoul에 대한 CDC 수집에는 두 가지 방법이 있습니다. 1) CDC 스트림을 Kafka에 작성하고 스파크 스트리밍을 사용하여 변환하고 LakeSoul에 작성합니다(이미 지원됨). 2) Flink CDC를 사용하여 LakeSoul에 직접 작성합니다.

이 데모에서 Lakesoul 팀은 첫 번째 방법을 시연했습니다. 그들은 MySQL 인스턴스를 설정하고 스크립트를 사용하여 DB 수정을 생성하고 Debezium을 사용하여 Kafka와 LakeSoul에 동기화했습니다.

1. MySQL 설정
1.1 데이터베이스 및 테이블 생성

Create database cdc;
CREATE TABLE test(
 id int primary key,
 rangeid int,
 value varchar(100) 
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


*2.2 cdc 벤치마크 생성기 사용:
*
cdc 동기화를 테스트하고 벤치마킹하기 위한 mysql 데이터 생성기를 제공합니다. 생성기는 디렉토리 아래에 있습니다.

examples/cdc_ingestion_debezium/MysqlBenchmark.


1. 필요에 따라 mysqlcdc.conf 수정

user=user name of mysql
 passwd=password of mysql
 host=host of mysql
 port=port of mysql


2.테이블에 데이터 삽입

# Inside () are comments of parameters, remove them before execution
 bash MysqlCdcBenchmark.sh  insert  cdc(db name) test(table name) 10(lines to insert) 1(thread number)


3.데이터를 테이블로 업데이트

bash MysqlCdcBenchmark.sh  update  cdc test id(primary key) value(column to update) 10(lines to update) 


4. 테이블에서 데이터 삭제
bash MysqlCdcBenchmark.sh delete cdc test 10(삭제할 라인)

2.Setup Kafka (Ignore this step if you already have Kafka running)
2.1 Install Kafka via K8s


kubectl create -f install/cluster-operator -n my-cluster-operator-namespace
kubectl apply -f examples/kafka/kafka-persistent-single.yaml


3.Debezium 설정(이미 가지고 있는 경우 무시)
3.1 데베지움 설치
K8에서 실행 중인 Debezium 컨테이너를 빠르게 설정하려면:

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: dbz-pod-claim
spec:
  accessModes:
    - ReadWriteOnce
  # replace to actual StorageClass in your cluster
  storageClassName: 
  resources:
    requests:
      storage: 10Gi
---
apiVersion: v1
kind: Pod
metadata:
  name: dbz-pod
  namespace: dmetasoul
spec:
  restartPolicy: Never
  containers:
  - name: dbs
    image: debezium/connect:latest
    env:
      - name: BOOTSTRAP_SERVERS
        # replace to actual kafka host
        value: ${kafka_host}:9092
      - name: GROUP_ID
        value: "1"
      - name: CONFIG_STORAGE_TOPIC
        value: my_connect_configs
      - name: OFFSET_STORAGE_TOPIC
        value: my_connect_offsets
      - name: STATUS_STORAGE_TOPIC
        value: my_connect_statuses
    resources:
      requests:
        cpu: 500m
        memory: 4Gi
      limits:
        cpu: 4
        memory: 8Gi
    volumeMounts:
      - mountPath: "/kafka/data"
        name: dbz-pv-storage

  volumes:
    - name: dbz-pv-storage
      persistentVolumeClaim:
        claimName: dbz-pod-claim


그런 다음 이 yaml 파일을 적용합니다.

kubectl apply -f pod.yaml


3.2 Debezium 동기화 작업 설정

# remember to replace {dbzhost} to actual dbz deployment ip address
# replace database parameters accordingly
curl -X POST http://{dbzhost}:8083/connectors/ -H 'Cache-Control: no-cache' -H 'Content-Type: application/json' -d '{
    "name": "cdc",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "tasks.max": "1",
        "database.hostname": "mysqlhost",
        "database.port": "mysqlport",
        "database.user": "mysqluser",
        "database.password": "mysqlpassword",
        "database.server.id": "1",
        "database.server.name": "cdcserver",
        "database.include.list": "cdc",
        "database.history.kafka.bootstrap.servers": "kafkahost:9092",
        "database.history.kafka.topic": "schema-changes.cdc",
        "decimal.handling.mode": "double",
        "table.include.list":"cdc.test" 
    }
}'


그런 다음 동기화 작업이 성공적으로 생성되었는지 확인합니다.

curl -H "Accept:application/json" dbzhost:8083 -X GET http://dbzhost:8083/connectors/


테스트가 완료된 후 동기화 작업을 삭제할 수 있습니다.

curl -i  -X DELETE http://dbzhost:8083/connectors/cdc


4.LakeSoul에 대한 Spark Streaming Sink 시작
4.1 설정
LakeSoul 및 Spark 환경 설정 방법은 Quick Start를 참조하십시오.

4.2 스파크 쉘 시작
Spark 셸은 kafka 종속 항목으로 시작해야 합니다.

> ./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 --conf spark.dmetasoul.lakesoul.meta.host=localhost --conf spark.sql.extensions=com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension --conf spark.dmetasoul.lakesoul.meta.database.name=test_lakesoul_meta --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog


4.3 LakeSoul 테이블 생성
방금 설정한 MySQL 테이블과 동기화할 MysqlCdcTest라는 LakeSoul 테이블을 생성합니다. LakeSoul 테이블에는 기본 키 ID도 있으며 CDC 작업을 나타내고 작업 필드가 있는 테이블 속성 lakesoul_cdc_change_column을 추가하려면 추가 필드 작업이 필요합니다.
com.dmetasoul.lakesoul.tables.LakeSoulTable 가져오기

>val path="/opt/spark/cdctest"
>val data=Seq((1L,1L,"hello world","insert")).toDF("id","rangeid","value","op")
>LakeSoulTable.createTable(data, path).shortTableName("cdc").hashPartitions("id").hashBucketNum(2).rangePartitions("rangeid").tableProperty("lakesoul_cdc_change_column" -> "op").create()
> 5.4 Start spark streaming to sync Debezium CDC data into LakeSoul
> import com.dmetasoul.lakesoul.tables.LakeSoulTable
> val path="/opt/spark/cdctest"
> val lakeSoulTable = LakeSoulTable.forPath(path)
> var strList = List.empty[String]
> //js1 is just a fake data to help generate the schema
> val js1 = """{
>           |  "before": {
>           |    "id": 2,
>           |    "rangeid": 2,
>           |    "value": "sms"
>           |  },
>           |  "after": {
>           |    "id": 2,
>           |    "rangeid": 2,
>           |    "value": "sms"
>           |  },
>           |  "source": {
>           |    "version": "1.8.0.Final",
>           |    "connector": "mysql",
>           |    "name": "cdcserver",
>           |    "ts_ms": 1644461444000,
>           |    "snapshot": "false",
>           |    "db": "cdc",
>           |    "sequence": null,
>           |    "table": "sms",
>           |    "server_id": 529210004,
>           |    "gtid": "de525a81-57f6-11ec-9b60-fa163e692542:1621099",
>           |    "file": "binlog.000033",
>           |    "pos": 54831329,
>           |    "row": 0,
>           |    "thread": null,
>           |    "query": null
>           |  },
>           |  "op": "c",
>           |  "ts_ms": 1644461444777,
>           |  "transaction": null
>           |}""".stripMargin
> strList = strList :+ js1
> val rddData = spark.sparkContext.parallelize(strList)
> val resultDF = spark.read.json(rddData)
> val sche = resultDF.schema
> import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
> // Specify kafka settings
> val kfdf = spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "kafkahost:9092")
>   .option("subscribe", "cdcserver.cdc.test")
>   .option("startingOffsets", "latest")
>   .load()
> // parse CDC json from debezium, and transform `op` field into one of 'insert', 'update', 'delete' into LakeSoul
> val kfdfdata = kfdf
>   .selectExpr("CAST(value AS STRING) as value")
>   .withColumn("payload", from_json($"value", sche))
>   .filter("value is not null")
>   .drop("value")
>   .select("payload.after", "payload.before", "payload.op")
>   .withColumn(
>     "op",
>     when($"op" === "c", "insert")
>       .when($"op" === "u", "update")
>       .when($"op" === "d", "delete")
>       .otherwise("unknown")
>   )
>   .withColumn(
>     "data",
>     when($"op" === "insert" || $"op" === "update", $"after")
>       .when($"op" === "delete", $"before")
>   )
>   .drop($"after")
>   .drop($"before")
>   .select("data.*", "op")
> // upsert into LakeSoul with microbatch
> kfdfdata.writeStream
>   .foreachBatch { (batchDF: DataFrame, _: Long) =>
>     {
>       lakeSoulTable.upsert(batchDF)
>       batchDF.show
>     }
>   }
>   .start()
>   .awaitTermination()


4.5 동기화된 데이터를 보기 위해 LakeSoul에서 읽기:

import com.dmetasoul.lakesoul.tables.LakeSoulTable
val path="/opt/spark/cdctest"
val lakeSoulTable = LakeSoulTable.forPath(path)
lakeSoulTable.toDF.select("*").show()


이것은 CDC를 사용하여 환경을 빠르게 설정하는 데 도움이 되는 매우 상세한 데모입니다. 다음으로 Flink CDC, Lakesoul CDC, Debezium, DataX, Kettle 등과 같은 오픈 소스 CDC 솔루션을 비교해 보겠습니다.

좋은 웹페이지 즐겨찾기