CDC의 빠른 사용: Lakesoul의 새로운 데모를 통해 환경을 보다 쉽게 설정할 수 있습니다.
며칠 전** [Lakesoul]( https://github.com/meta-soul/LakeSoul )은 GitHub에 데모를 업로드했습니다. Mysql 및 Oracle과 같은 관계형 데이터베이스의 추가, 삭제 및 변경 작업은 CDC를 통해 Lakesoul에 액세스할 수 있으며 실제 시각. 프로세스는 다음과 같습니다. Mysql->Debezium->Kafka->SparkStreaming->Lakesoul. 완전한 프레임워크를 구축한 후 시스템은 실시간으로 데이터를 추가, 삭제 및 수정하고 쿼리할 때 최신 데이터를 얻을 수 있습니다. **[업서트]( https://github.com/meta-soul/LakeSoul/wiki/03.-Usage-Doc#311-code-examples )는 사용 시 필요합니다.
아래 데모를 보거나 [Lakesoul]( https://github.com/meta-soul/LakeSoul/tree/main/examples/cdc_ingestion_debezium )에서 확인하십시오.
LakeSoul에 대한 CDC 수집에는 두 가지 방법이 있습니다. 1) CDC 스트림을 Kafka에 작성하고 스파크 스트리밍을 사용하여 변환하고 LakeSoul에 작성합니다(이미 지원됨). 2) Flink CDC를 사용하여 LakeSoul에 직접 작성합니다.
이 데모에서 Lakesoul 팀은 첫 번째 방법을 시연했습니다. 그들은 MySQL 인스턴스를 설정하고 스크립트를 사용하여 DB 수정을 생성하고 Debezium을 사용하여 Kafka와 LakeSoul에 동기화했습니다.
1. MySQL 설정
1.1 데이터베이스 및 테이블 생성
Create database cdc;
CREATE TABLE test(
id int primary key,
rangeid int,
value varchar(100)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
*2.2 cdc 벤치마크 생성기 사용:
*
cdc 동기화를 테스트하고 벤치마킹하기 위한 mysql 데이터 생성기를 제공합니다. 생성기는 디렉토리 아래에 있습니다.
examples/cdc_ingestion_debezium/MysqlBenchmark.
1. 필요에 따라 mysqlcdc.conf 수정
user=user name of mysql
passwd=password of mysql
host=host of mysql
port=port of mysql
2.테이블에 데이터 삽입
# Inside () are comments of parameters, remove them before execution
bash MysqlCdcBenchmark.sh insert cdc(db name) test(table name) 10(lines to insert) 1(thread number)
3.데이터를 테이블로 업데이트
bash MysqlCdcBenchmark.sh update cdc test id(primary key) value(column to update) 10(lines to update)
4. 테이블에서 데이터 삭제
bash MysqlCdcBenchmark.sh delete cdc test 10(삭제할 라인)
2.Setup Kafka (Ignore this step if you already have Kafka running)
2.1 Install Kafka via K8s
kubectl create -f install/cluster-operator -n my-cluster-operator-namespace
kubectl apply -f examples/kafka/kafka-persistent-single.yaml
3.Debezium 설정(이미 가지고 있는 경우 무시)
3.1 데베지움 설치
K8에서 실행 중인 Debezium 컨테이너를 빠르게 설정하려면:
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: dbz-pod-claim
spec:
accessModes:
- ReadWriteOnce
# replace to actual StorageClass in your cluster
storageClassName:
resources:
requests:
storage: 10Gi
---
apiVersion: v1
kind: Pod
metadata:
name: dbz-pod
namespace: dmetasoul
spec:
restartPolicy: Never
containers:
- name: dbs
image: debezium/connect:latest
env:
- name: BOOTSTRAP_SERVERS
# replace to actual kafka host
value: ${kafka_host}:9092
- name: GROUP_ID
value: "1"
- name: CONFIG_STORAGE_TOPIC
value: my_connect_configs
- name: OFFSET_STORAGE_TOPIC
value: my_connect_offsets
- name: STATUS_STORAGE_TOPIC
value: my_connect_statuses
resources:
requests:
cpu: 500m
memory: 4Gi
limits:
cpu: 4
memory: 8Gi
volumeMounts:
- mountPath: "/kafka/data"
name: dbz-pv-storage
volumes:
- name: dbz-pv-storage
persistentVolumeClaim:
claimName: dbz-pod-claim
그런 다음 이 yaml 파일을 적용합니다.
kubectl apply -f pod.yaml
3.2 Debezium 동기화 작업 설정
# remember to replace {dbzhost} to actual dbz deployment ip address
# replace database parameters accordingly
curl -X POST http://{dbzhost}:8083/connectors/ -H 'Cache-Control: no-cache' -H 'Content-Type: application/json' -d '{
"name": "cdc",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"tasks.max": "1",
"database.hostname": "mysqlhost",
"database.port": "mysqlport",
"database.user": "mysqluser",
"database.password": "mysqlpassword",
"database.server.id": "1",
"database.server.name": "cdcserver",
"database.include.list": "cdc",
"database.history.kafka.bootstrap.servers": "kafkahost:9092",
"database.history.kafka.topic": "schema-changes.cdc",
"decimal.handling.mode": "double",
"table.include.list":"cdc.test"
}
}'
그런 다음 동기화 작업이 성공적으로 생성되었는지 확인합니다.
curl -H "Accept:application/json" dbzhost:8083 -X GET http://dbzhost:8083/connectors/
테스트가 완료된 후 동기화 작업을 삭제할 수 있습니다.
curl -i -X DELETE http://dbzhost:8083/connectors/cdc
4.LakeSoul에 대한 Spark Streaming Sink 시작
4.1 설정
LakeSoul 및 Spark 환경 설정 방법은 Quick Start를 참조하십시오.
4.2 스파크 쉘 시작
Spark 셸은 kafka 종속 항목으로 시작해야 합니다.
> ./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 --conf spark.dmetasoul.lakesoul.meta.host=localhost --conf spark.sql.extensions=com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension --conf spark.dmetasoul.lakesoul.meta.database.name=test_lakesoul_meta --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog
4.3 LakeSoul 테이블 생성
방금 설정한 MySQL 테이블과 동기화할 MysqlCdcTest라는 LakeSoul 테이블을 생성합니다. LakeSoul 테이블에는 기본 키 ID도 있으며 CDC 작업을 나타내고 작업 필드가 있는 테이블 속성 lakesoul_cdc_change_column을 추가하려면 추가 필드 작업이 필요합니다.
com.dmetasoul.lakesoul.tables.LakeSoulTable 가져오기
>val path="/opt/spark/cdctest"
>val data=Seq((1L,1L,"hello world","insert")).toDF("id","rangeid","value","op")
>LakeSoulTable.createTable(data, path).shortTableName("cdc").hashPartitions("id").hashBucketNum(2).rangePartitions("rangeid").tableProperty("lakesoul_cdc_change_column" -> "op").create()
> 5.4 Start spark streaming to sync Debezium CDC data into LakeSoul
> import com.dmetasoul.lakesoul.tables.LakeSoulTable
> val path="/opt/spark/cdctest"
> val lakeSoulTable = LakeSoulTable.forPath(path)
> var strList = List.empty[String]
> //js1 is just a fake data to help generate the schema
> val js1 = """{
> | "before": {
> | "id": 2,
> | "rangeid": 2,
> | "value": "sms"
> | },
> | "after": {
> | "id": 2,
> | "rangeid": 2,
> | "value": "sms"
> | },
> | "source": {
> | "version": "1.8.0.Final",
> | "connector": "mysql",
> | "name": "cdcserver",
> | "ts_ms": 1644461444000,
> | "snapshot": "false",
> | "db": "cdc",
> | "sequence": null,
> | "table": "sms",
> | "server_id": 529210004,
> | "gtid": "de525a81-57f6-11ec-9b60-fa163e692542:1621099",
> | "file": "binlog.000033",
> | "pos": 54831329,
> | "row": 0,
> | "thread": null,
> | "query": null
> | },
> | "op": "c",
> | "ts_ms": 1644461444777,
> | "transaction": null
> |}""".stripMargin
> strList = strList :+ js1
> val rddData = spark.sparkContext.parallelize(strList)
> val resultDF = spark.read.json(rddData)
> val sche = resultDF.schema
> import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
> // Specify kafka settings
> val kfdf = spark.readStream
> .format("kafka")
> .option("kafka.bootstrap.servers", "kafkahost:9092")
> .option("subscribe", "cdcserver.cdc.test")
> .option("startingOffsets", "latest")
> .load()
> // parse CDC json from debezium, and transform `op` field into one of 'insert', 'update', 'delete' into LakeSoul
> val kfdfdata = kfdf
> .selectExpr("CAST(value AS STRING) as value")
> .withColumn("payload", from_json($"value", sche))
> .filter("value is not null")
> .drop("value")
> .select("payload.after", "payload.before", "payload.op")
> .withColumn(
> "op",
> when($"op" === "c", "insert")
> .when($"op" === "u", "update")
> .when($"op" === "d", "delete")
> .otherwise("unknown")
> )
> .withColumn(
> "data",
> when($"op" === "insert" || $"op" === "update", $"after")
> .when($"op" === "delete", $"before")
> )
> .drop($"after")
> .drop($"before")
> .select("data.*", "op")
> // upsert into LakeSoul with microbatch
> kfdfdata.writeStream
> .foreachBatch { (batchDF: DataFrame, _: Long) =>
> {
> lakeSoulTable.upsert(batchDF)
> batchDF.show
> }
> }
> .start()
> .awaitTermination()
4.5 동기화된 데이터를 보기 위해 LakeSoul에서 읽기:
import com.dmetasoul.lakesoul.tables.LakeSoulTable
val path="/opt/spark/cdctest"
val lakeSoulTable = LakeSoulTable.forPath(path)
lakeSoulTable.toDF.select("*").show()
이것은 CDC를 사용하여 환경을 빠르게 설정하는 데 도움이 되는 매우 상세한 데모입니다. 다음으로 Flink CDC, Lakesoul CDC, Debezium, DataX, Kettle 등과 같은 오픈 소스 CDC 솔루션을 비교해 보겠습니다.
Reference
이 문제에 관하여(CDC의 빠른 사용: Lakesoul의 새로운 데모를 통해 환경을 보다 쉽게 설정할 수 있습니다.), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/qazmkop/quick-use-of-cdc-a-new-demo-from-lakesoul-makes-it-easier-to-set-up-the-environment-o24텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)