ksqlDB 임베디드 Kafka Connect와 함께 Debezium MS SQL 커넥터 사용
12154 단어 mssqlapachekafkaksqldbdebezium
Docker Compose 설정
나는 독립적이고 반복 가능한 데모 코드를 좋아합니다. 그런 이유로 저는 Docker Compose를 사용하는 것을 좋아하며 커넥터 설치, 주방 싱크대 등 모든 것을 거기에 포함시킵니다.
전체Docker Compose file on GitHub를 찾을 수 있습니다.
Confluent Hub 클라이언트 없이 ksqlDB에 커넥터 설치
일반적으로 Docker Compose 서비스의
command:
스탠자를 활용하여 커넥터 설치as detailed here와 같은 작업을 수행합니다. ksqlDB 0.11에서는 Confluent Hub 클라이언트가 없기 때문에 약간 해커 경로를 택해야 했습니다. Confluent Hub으로 이동하여 원하는 커넥터(이 경우 Debezium MS SQL)를 다운로드하면 네트워크 콘솔에서 직접 URL을 확인할 수 있습니다. 이 경우https://d1i4a15mxbxib1.cloudfront.net/api/plugins/debezium/debezium-connector-sqlserver/versions/1.2.2/debezium-debezium-connector-sqlserver-1.2.2.zip
이제 이 URL은 변경될 수 있지만 지금은 작동합니다.
이 코드를 사용하면 회전할 때 ksqlDB Docker 컨테이너 내에서 커넥터를 다운로드하고 설치할 수 있습니다.
curl https://d1i4a15mxbxib1.cloudfront.net/api/plugins/debezium/debezium-connector-sqlserver/versions/1.2.2/debezium-debezium-connector-sqlserver-1.2.2.zip -o /tmp/kafka-connect-mssql.zip
yum install -y unzip
unzip /tmp/kafka-connect-mssql.zip -d /usr/share/java/
최신 버전의 컨테이너가 루트가 아닌 사용자로 실행되고
sudo
가 설치되지 않은( no sandwiches for me ) 계획에 주름이 있습니다. 이 문제를 해결하기 위해 Docker Compose 사양에서 루트 사용자로 실행되도록 컨테이너를 승격합니다. ksqldb:
image: confluentinc/ksqldb-server:0.11.0
container_name: ksqldb
user: root
…
이제 컨테이너가 시작되면 커넥터를 다운로드하고
unzip
커넥터 아카이브를 설치plugin.path
하고 압축을 풉니다. 여기서 Kafka Connect(ksqlDB에 내장된 실행)가 찾을 수 있습니다.참고: 이를 수행하는 '적절한' 방법은 이미 설치된 커넥터 플러그인으로 자체 ksqlDB 이미지를 굽거나 커넥터를 호스트 시스템에 다운로드하여 ksqlDB 컨테이너에 마운트하는 것입니다. 이 두 가지 모두 괜찮지 만 내 목적에 따라 독립형 Docker Compose 파일보다 더 많은 움직이는 부분과 문제가 발생합니다. :) |
스택 실행
스핀 업Docker Compose file
docker-compose up -d
그런 다음 ksqlDB를 시작합니다. 겉보기에 복잡해 보이는 이 스니펫은 CLI를 시작하기 전에 ksqlDB를 사용할 수 있을 때까지 기다립니다.
docker exec -it ksqldb bash -c 'echo -e "\n\n Waiting for ksqlDB to be available before launching CLI\n"; while : ; do curl_status=$(curl -s -o /dev/null -w %{http_code} http://ksqldb:8088/info) ; echo -e $(date) " ksqlDB server listener HTTP state: " $curl_status " (waiting for 200)" ; if [$curl_status -eq 200] ; then break ; fi ; sleep 5 ; done ; ksql http://ksqldb:8088'
별도의 터미널에서 ksqlDB 시작이 완료되면(즉, 위 명령에서 ksqlDB CLI가 시작되면) MS SQL 커넥터가 올바르게 설치되었는지 확인합니다.
docker exec ksqldb curl -s localhost:8083/connector-plugins
넌 봐야 해
[{"class":"io.debezium.connector.sqlserver.SqlServerConnector","type":"source","version":"1.2.2.Final"}]
CDC용 MS SQL 구성
MS SQL 컨테이너가 시작되면 몇 가지 스크립트가 실행되어 CDC용 데이터베이스를 설정하고 일부 테스트 데이터를 추가합니다. Docker Compose를 사용하지 않는 경우 다음을 직접 실행해야 합니다.
USE [master]
GO
CREATE DATABASE demo;
GO
USE [demo]
EXEC sys.sp_cdc_enable_db
GO
-- Run this to confirm that CDC is now enabled:
SELECT name, is_cdc_enabled FROM sys.databases;
GO
use [demo];
CREATE TABLE demo.dbo.ORDERS ( order_id INT, customer_id INT, order_ts DATE, order_total_usd DECIMAL(5,2), item VARCHAR(50) );
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'ORDERS',
@role_name = NULL,
@supports_net_changes = 0
GO
-- At this point you should get a row returned from this query
SELECT s.name AS Schema_Name, tb.name AS Table_Name , tb.object_id, tb.type, tb.type_desc, tb.is_tracked_by_cdc FROM sys.tables tb INNER JOIN sys.schemas s on s.schema_id = tb.schema_id WHERE tb.is_tracked_by_cdc = 1
GO
-- h/t William Prigol Lopes https://stackoverflow.com/a/61698148/350613
ksqlDB에 SQL Server 커넥터 추가
이제 ksqlDB 프롬프트가 열려 있어야 합니다.
===========================================
= _ _ ____ ____ =
= | | _____ __ _| | _ \| __ ) =
= | |/ / __|/ _` | | | | | _ \ =
= | <\__ \ (_| | | |_| | |_) | =
= |_|\_\___/\__, |_|____/|____/ =
= |_| =
= Event Streaming Database purpose-built =
= for stream processing apps =
===========================================
Copyright 2017-2020 Confluent Inc.
CLI v0.11.0, Server v0.11.0 located at http://ksqldb:8088
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql>
ksqlDB 프롬프트에서 커넥터를 생성합니다.
CREATE SOURCE CONNECTOR SOURCE_MSSQL_ORDERS_01 WITH (
'connector.class' = 'io.debezium.connector.sqlserver.SqlServerConnector',
'database.hostname' = 'mssql',
'database.port' = '1433',
'database.user' = 'sa',
'database.password' = 'Admin123',
'database.dbname' = 'demo',
'database.server.name' = 'mssql',
'table.whitelist' = 'dbo.orders',
'database.history.kafka.bootstrap.servers' = 'broker:29092',
'database.history.kafka.topic' = 'dbz_dbhistory.mssql.asgard-01',
'decimal.handling.mode' = 'double'
);
성공적으로 실행 중인지 확인
SHOW CONNECTORS;
Connector Name | Type | Class | Status
--------------------------------------------------------------------------------------------------------------------
SOURCE_MSSQL_ORDERS_01 | SOURCE | io.debezium.connector.sqlserver.SqlServerConnector | RUNNING (1/1 tasks RUNNING)
--------------------------------------------------------------------------------------------------------------------
그렇지 않은 경우(예: 상태가
WARNING
인 경우) 실행docker logs -f ksqldb
하고 페이지를 통해 ERROR
를 찾습니다.ksqlDB에서 MS SQL 데이터 사용
커넥터가 실행되고 데이터가 흐르면 이에 대해 스트림을 선언할 수 있습니다.
CREATE STREAM ORDERS WITH (KAFKA_TOPIC='mssql.dbo.ORDERS', VALUE_FORMAT='AVRO');
그런 다음 데이터 탐색을 시작합니다.
SET 'auto.offset.reset' = 'earliest';
SELECT SOURCE->NAME, SOURCE->SCHEMA + '.' + SOURCE->"TABLE", OP,BEFORE,AFTER FROM ORDERS EMIT CHANGES LIMIT 2;
+-----------------------+-----------------------+-------+---------+-----------------------+
|NAME |KSQL_COL_0 |OP |BEFORE |AFTER |
+-----------------------+-----------------------+-------+---------+-----------------------+
|mssql |dbo.ORDERS |r |null |{ORDER_ID=1, CUSTOMER_I|
| | | | |D=7, ORDER_TS=18256, OR|
| | | | |DER_TOTAL_USD=2.1, ITEM|
| | | | |=Proper Job} |
|mssql |dbo.ORDERS |r |null |{ORDER_ID=2, CUSTOMER_I|
| | | | |D=8, ORDER_TS=18236, OR|
| | | | |DER_TOTAL_USD=0.23, ITE|
| | | | |M=Wainwright} |
중첩 필드에 액세스하기 위해
→
연산자를 사용합니다.SELECT AFTER->ORDER_ID, AFTER->CUSTOMER_ID, AFTER->ORDER_TOTAL_USD FROM ORDERS EMIT CHANGES LIMIT 5;
+-------------+--------------+------------------+
|ORDER_ID |CUSTOMER_ID |ORDER_TOTAL_USD |
+-------------+--------------+------------------+
|1 |7 |2.1 |
|2 |8 |0.23 |
|3 |12 |4.3 |
|4 |7 |4.88 |
|5 |14 |3.89 |
Limit Reached
Query terminated
MS SQL에서 변경 사항 캡처
지금까지 우리는 MS SQL에서 Kafka/ksqlDB로 데이터의 스냅샷/부트스트랩 수집을 보았습니다. MS SQL에서 몇 가지를 변경하고 ksqlDB에 어떻게 나타나는지 봅시다.
MS SQL CLI 시작
docker exec -it mssql bash -c '/opt/mssql-tools/bin/sqlcmd -l 30 -d demo -S localhost -U sa -P $SA_PASSWORD'
데이터를 일부 변경
DELETE FROM ORDERS WHERE ORDER_ID=1;
UPDATE ORDERS SET ORDER_TOTAL_USD = ORDER_TOTAL_USD * 0.9 WHERE ORDER_ID =2;
INSERT INTO ORDERS (order_id, customer_id, order_ts, order_total_usd, item) values (9, 5, '2019-11-29T11:10:39Z', '2.24', 'Black Sheep Ale');
GO
ksqlDB에서 데이터 확인
SELECT OP,
SOURCE->SCHEMA + '.' + SOURCE->"TABLE",
BEFORE->ORDER_ID AS B_ORDER_ID,
AFTER->ORDER_ID AS A_ORDER_ID,
BEFORE->ORDER_TOTAL_USD AS B_ORDER_TOTAL_USD,
AFTER->ORDER_TOTAL_USD AS A_ORDER_TOTAL_USD,
BEFORE->ITEM AS B_ITEM,
AFTER->ITEM AS A_ITEM
FROM ORDERS
WHERE NOT OP='r'
EMIT CHANGES;
+-----+-------------+------------+-----------+-------------------+------------------+---------------------+---------------------+
|OP |KSQL_COL_0 |B_ORDER_ID |A_ORDER_ID |B_ORDER_TOTAL_USD |A_ORDER_TOTAL_USD |B_ITEM |A_ITEM |
+-----+-------------+------------+-----------+-------------------+------------------+---------------------+---------------------+
|d |dbo.ORDERS |1 |null |2.1 |null |Proper Job |null |
|u |dbo.ORDERS |2 |2 |0.23 |0.21 |Wainwright |Wainwright |
|c |dbo.ORDERS |null |9 |null |2.24 |null |Black Sheep Ale |
참고 사항:
d
삭제 메시지는 삭제되기 전의 행 상태에 대한 액세스를 제공합니다u
업데이트 메시지는 업데이트되기 전의 필드 값을 제공합니다. ( ORDER_TOTAL_USD
) c
생성 메시지의 BEFORE
개체에 null 값이 있습니다. 행이 생성되기 전에 값이 없었기 때문입니다. :) Reference
이 문제에 관하여(ksqlDB 임베디드 Kafka Connect와 함께 Debezium MS SQL 커넥터 사용), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/rmoff/using-the-debezium-ms-sql-connector-with-ksqldb-embedded-kafka-connect-5bb7텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)