Debezium을 이용한 PostgreSQL CDC(Change Data Capture)
Change Data Capture?
- 데이터베이스에서 변경 데이터 캡처(change data capture, CDC)는 변경된 데이터를 사용하여 동작을 취할 수 있도록 데이터를 결정하고 추적하기 위해 사용되는 여러 소프트웨어 디자인 패턴들의 모임이다.
Debezium
- Debezium은 기존 데이터베이스 이벤트를 스트림으로 바꾸는 변환하는 플랫폼으로 어플리케이션은 데이터베이스내의 행 레벨의 변경을 확인 할 수 있다.
Debezium 아키텍쳐
- Debezium은 기존 데이터베이스 이벤트를 스트림으로 바꾸는 변환하는 플랫폼으로 어플리케이션은 데이터베이스내의 행 레벨의 변경을 확인 할 수 있다.
Debezium 아키텍쳐
- debezium connectors는 MySQL, PostgreSQL 데이터베이스에 대한 CDC를 지원합니다.
- kafka Connect는 kafka broker와 별도의 서비스로 운영됩니다.
- sink connector를 이용하여 다른 데이터베이스로의 변환이 가능합니다.
Start Example
- Start Zookeeper
- Start Kafka
- Start PostgreSQL
- Start PostgrSQL command line client
- Start Kafka Connect
- Start consumer
- Use JDBCSinkConnector
1. Start Zookeeper
docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.8
- Start Zookeeper
- Start Kafka
- Start PostgreSQL
- Start PostgrSQL command line client
- Start Kafka Connect
- Start consumer
- Use JDBCSinkConnector
docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.8
-it
터미널의 표준 입력 및 출력이 컨테이너에 연결됩니다.
--rm
컨테이너가 중지되면 제거됩니다.
--name zookeeper
컨테이너의 이름입니다.
-p 2181:2181 -p 2888:2888 -p 3888:3888
컨테이너의 포트 3개를 Docker 호스트의 동일한 포트에 매핑합니다. 이를 통해 다른 컨테이너가 Zookeeper와 통신할 수 있습니다.
- 실행확인
Starting up in standalone mode
ZooKeeper JMX enabled by default
Using config: /zookeeper/conf/zoo.cfg
2017-09-21 07:15:55,417 - INFO [main:QuorumPeerConfig@134] - Reading configuration from: /zookeeper/conf/zoo.cfg
2017-09-21 07:15:55,419 - INFO [main:DatadirCleanupManager@78] - autopurge.snapRetainCount set to 3
2017-09-21 07:15:55,419 - INFO [main:DatadirCleanupManager@79] - autopurge.purgeInterval set to 1
...
port 0.0.0.0/0.0.0.0:2181
2. Start Kafka
docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.8
-it
터미널의 표준 입력 및 출력이 컨테이너에 연결됩니다.
--rm
컨테이너가 중지되면 제거됩니다.
--name kafka
컨테이너의 이름입니다.
-p 9092:9092
컨테이너의 9092 포트를 Docker 호스트의 동일한 포트에 매핑합니다. 이를 통해 다른 컨테이 너가 kafka와 통신할 수 있습니다.
--link zookeeper:zookeeper
동일한 Docker 호스트에서 실행 중인 컨테이너에서 zookeeper를 찾고 있다는 것을 컨테이너 에게 알립니다.
- 실행확인
...
2017-09-21 07:16:59,085 - INFO [main-EventThread:ZkClient@713] - zookeeper state changed (SyncConnected)
2017-09-21 07:16:59,218 - INFO [main:Logging$class@70] - Cluster ID = LPtcBFxzRvOzDSXhc6AamA
...
2017-09-21 07:16:59,649 - INFO [main:Logging$class@70] - [Kafka Server 1], started
3. Start PostgreSQL
docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgresuser -e POSTGRES_PASSWORD=password debezium/postgres:1.8
-it
터미널의 표준 입력 및 출력이 컨테이너에 연결됩니다.
--rm
컨테이너가 중지되면 제거됩니다.
--name postgres
컨테이너의 이름입니다.
-p 5432:5432
컨테이너의 5432 포트를 Docker 호스트의 동일한 포트에 매핑합니다. 이를 통해 다른 컨테이 너가 postgres와 통신할 수 있습니다.
-e POSTGRES_USER=postgresuser -e POSTGRES_PASSWORD=password
debezium postgreSQL 유저명과 비밀번호입니다.
- 실행확인
...
server started
CREATE DATABASE
...
4. Start PostgreSQL command line client
- docker 터미널 실행
docker exec -ti postgres /bin/bash
- postgres config 확인
cat /var/lib/postgresql/data/postgresql.conf
# LOGGING
# log_min_error_statement = fatal
# log_min_messages = DEBUG1
# CONNECTION
listen_addresses = '*'
# MODULES
shared_preload_libraries = 'decoderbufs,wal2json'
# REPLICATION
wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart)
max_wal_senders = 4 # max number of walsender processes (change requires restart)
#wal_keep_segments = 4 # in logfile segments, 16MB each; 0 disables
#wal_sender_timeout = 60s # in milliseconds; 0 disables
max_replication_slots = 4 # max number of replication slots (change requires restart)
- wal_level
- minimal : 갑작스런 종료로부터 다시 시작하는데 필요한 최소 정보.
- archive : 데이터베이스 엔진이 WAL 보관을 할 수 있도록 함.
- hot_standby : 데이터베이스 엔진이 서버의 읽기 전용 복제본을 생성할 수 있도록 함.
- logical : WAL 데이터를 다른 시스템이 사용할 수 있도록 하는데 필요한 모든 정보. - max_wal_senders
- WAL 발신자는 WAL을 수신자로 보내기 위해 데이터베이스에서 실행되는 프로세스입니다. - psql 실행
psql -U postgresuser
- Sample Data 생성
CREATE DATABASE dbbase;
\c dbbase
CREATE TABLE sample (
id int,
name varchar(8),
age int,
datetime_created timestamp,
datetime_updated timestamp,
primary key(id)
);
ALTER TABLE sample replica identity FULL;
- replica identity (DEFAULT / NOTHING / FULL / INDEX)
- WAL에 기록되는 세부 정보의 양을 결정하는 옵션.
- DEFAULT : 기본 키 열의 이전 값을 기록합니다.
- NOTHING : 이전 행의 대한 정보를 기록하지 않습니다.
- FULL : 모든 열에 대한 이전 정보를 기록합니다.
- INDEX : 명명된 인덱스에 포함된 열의 이전 값을 기록합니다.
5. Start Kafka Connect
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link postgres:postgres debezium/connect:1.8
-it
터미널의 표준 입력 및 출력이 컨테이너에 연결됩니다.
--rm
컨테이너가 중지되면 제거됩니다.
--name connect
컨테이너의 이름입니다.
-p 8083:8083
컨테이너의 8083 포트를 Docker 호스트의 동일한 포트에 매핑합니다. 이를 통해 컨테이너 외 부의 어플리케이션은 Kafka Connect의 REST API를 사용하여 새 컨테이너 인스턴스를 설정하 고 관리 할 수 있습니다.
--link zookeeper:zookeeper --link kafka:kafka --link postgres:postgres
zookeeper, kafka, postgres 컨테이너에 연결합니다.
-e CONFIG_STORAGE_TOPIC=my_connect_configs
-e OFFSET_STORAGE_TOPIC=my_connect_offsets
-e STATUS_STORAGE_TOPIC=my_connect_statuses
debezium 이미지에 필요한 환경 변수를 설정합니다.
- 실행확인
...
2020-02-06 15:48:33,939 INFO || Kafka version: 3.0.0 [org.apache.kafka.common.utils.AppInfoParser]
...
2020-02-06 15:48:34,485 INFO || [Worker clientId=connect-1, groupId=1] Starting connectors and tasks using config offset -1 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2020-02-06 15:48:34,485 INFO || [Worker clientId=connect-1, groupId=1] Finished starting connectors and tasks [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
- kafka connect 서비스 상태 확인
curl -H "Accept:application/json" localhost:8083/
{"version":"3.0.0","commit":"cb8625948210849f"}
- kafka connect에 등록된 커넥터 목록 확인
curl -H "Accept:application/json" localhost:8083/connectors/
[]
- kafka connect 등록
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{"name": "fulfillment-connector", "config": {"connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgresuser", "database.password": "password", "database.dbname" : "dbbase", "database.server.name": "fulfillment", "table.include.list": "public.sample"}}'
- kafka connect에 등록된 커넥터 목록 확인
curl -H "Accept:application/json" localhost:8083/connectors/
[fulfillment-connector]
- connect의 task 확인
curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/fulfillment-connector
HTTP/1.1 200 OK
Date: Thu, 06 Feb 2020 22:12:03 GMT
Content-Type: application/json
Content-Length: 531
Server: Jetty(9.4.20.v20190813)
{
"name": "fulfillment-connector",
...
"tasks": [
{
"connector": "fulfillment-connector",
"task": 0
}
]
}
6. Start consumer
- Sample consumer 확인
docker run -it --rm --name consumer --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:1.8 watch-topic -a fulfillment.public.sample | grep --line-buffered '^{' | sudo python3 -u <filepath>/samplestream.py > <filepath>/samplestream.txt
-watch-topic
fulfillment.public.sample topic을 봅니다.
-a
topic이 생성된 이후의 모든 이벤트를 봅니다.
- 실행확인
Using ZOOKEEPER_CONNECT=172.17.0.2:2181
Using KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.7:9092
Using KAFKA_BROKER=172.17.0.3:9092
Contents of topic fulfillment.public.sample:
- insert data
insert into sample values (1001, 'H1', 10, now(), now());
- consumer 확인
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32",
...
"payload":{"before":null,"after":{"id":1001,"name":"H1","age":10,"datetime_created":1648719890915312,"datetime_updated":1648719890915312},"source":{"version":"1.8.1.Final","connector":"postgresql","name":"fulfillment","ts_ms":1648719890941,"snapshot":"false","db":"dbbase","sequence":"[null,\"23719176\"]","schema":"public","table":"sample","txId":556,"lsn":23719176,"xmin":null},"op":"c","ts_ms":1648719891348,"transaction":null}}
- samplestream.txt 확인
- Insert, Update, Delete Evnet시 한줄 씩 생성되는 것을 볼 수 있다.
tail -f samplestream.txt
1001,H1,10,1648719890915312,1648719890915312,2022-03-31-20-00-49,c
1001,H1,11,1648719890915312,1648719890915312,2022-03-31-20-00-49,u
1001,None,None,None,None,2022-03-31-20-00-49,d
1002,H2,20,1648724488499908,1648724488499908,2022-03-31-20-01-28,c
- samplestream.py
import json
import os
import sys
from datetime import datetime
def parse_crud(payload, op_type):
current_ts = datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
out_list = []
out_list.append(payload.get('id'))
out_list.append(payload.get('name'))
out_list.append(payload.get('age'))
out_list.append(payload.get('datetime_created'))
out_list.append(payload.get('datetime_updated'))
out_list.append(current_ts)
out_list.append(op_type)
return out_list
def parse_payload(input_raw_json):
input_json = json.loads(input_raw_json)
op_type = input_json.get('payload', {}).get('op')
if op_type == 'c':
return parse_crud(
input_json.get('payload', {}).get('after', {}),
op_type
)
elif op_type == 'd':
return parse_crud(
input_json.get('payload', {}).get('before', {}),
op_type
)
elif op_type == 'u':
return parse_crud(
input_json.get('payload', {}).get('after', {}),
op_type
)
return []
for line in sys.stdin:
data = parse_payload(line)
log_str = ','.join([str(elt) for elt in data])
print(log_str, flush=True)
- stop docker
docker stop $(docker ps -aq)
7. Use JdbcSinkConnector
- jdbcSinkConnector는 confluent에서 개발되었기 때문에 kafka connector에 파일을 추가해 주어야 한다.
- /kafka/libs 폴더에 PostgreSQL JDBC driver를 넣어야 한다.
- /kafka/connect/kafka-connect-jdbc 폴더에 Kafka Connect JDBC을 넣어야 한다. - debezium-examples/end-to-end-demo/debezium-jdbc/의 Dockerfile을 빌드해서 사용한다.
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka jdbc-sink
- connector 삭제 방법
curl -i -X DELETE -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/fulfillment-connector
- connector 생성
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
localhost:8083/connectors/ -d '{"name": "source-connector", "config": {"connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgresuser", "database.password": "password", "database.dbname" : "dbbase", "database.server.name": "fulfillment", "table.include.list": "public.sample",
"transforms": "route","transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)", "transforms.route.replacement": "targetsample"}}'
- sinkConnector 생성
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '{"name": "jdbc-sink","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1","topics": "targetsample","connection.url": "jdbc:postgresql://postgres:5432/dbbase?user=postgresuser&password=password", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "auto.create": "true", "insert.mode": "upsert", "pk.fields": "id", "pk.mode": "record_key", "delete.enabled": "true" }}'
- sample table에 Insert, Update, Delete시 target table에 데이터의 변경을 확인 할 수 있다.
- Data 확인
select * from sample;
select * from targetsample;
Reference
위키피디아 Change Data Capture
debezium 1.8 공식문서
postgreSQL 14 공식문서
confluent jdbcSinkConnector 공식문서
Author And Source
이 문제에 관하여(Debezium을 이용한 PostgreSQL CDC(Change Data Capture)), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://velog.io/@kero88/Debezium을-이용한-PostgreSQL-CDCChange-Data-Capture저자 귀속: 원작자 정보가 원작자 URL에 포함되어 있으며 저작권은 원작자 소유입니다.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)