ksqlDB 임베디드 Kafka Connect와 함께 Debezium MS SQL 커넥터 사용

a question on StackOverflow에서 프롬프트를 표시하여 ksqlDB을 사용하여 Microsoft SQL Server에서 CDC 이벤트를 수집하도록 설정Debezium을 간단히 살펴보겠습니다. 이 중 일부는 이전 기사Streaming data from SQL Server to Kafka to Snowflake ❄️ with Kafka Connect를 기반으로 합니다.

Docker Compose 설정



나는 독립적이고 반복 가능한 데모 코드를 좋아합니다. 그런 이유로 저는 Docker Compose를 사용하는 것을 좋아하며 커넥터 설치, 주방 싱크대 등 모든 것을 거기에 포함시킵니다.

전체Docker Compose file on GitHub를 찾을 수 있습니다.

Confluent Hub 클라이언트 없이 ksqlDB에 커넥터 설치



일반적으로 Docker Compose 서비스의 command: 스탠자를 활용하여 커넥터 설치as detailed here와 같은 작업을 수행합니다. ksqlDB 0.11에서는 Confluent Hub 클라이언트가 없기 때문에 약간 해커 경로를 택해야 했습니다. Confluent Hub으로 이동하여 원하는 커넥터(이 경우 Debezium MS SQL)를 다운로드하면 네트워크 콘솔에서 직접 URL을 확인할 수 있습니다. 이 경우

https://d1i4a15mxbxib1.cloudfront.net/api/plugins/debezium/debezium-connector-sqlserver/versions/1.2.2/debezium-debezium-connector-sqlserver-1.2.2.zip

이제 이 URL은 변경될 수 있지만 지금은 작동합니다.

이 코드를 사용하면 회전할 때 ksqlDB Docker 컨테이너 내에서 커넥터를 다운로드하고 설치할 수 있습니다.

curl https://d1i4a15mxbxib1.cloudfront.net/api/plugins/debezium/debezium-connector-sqlserver/versions/1.2.2/debezium-debezium-connector-sqlserver-1.2.2.zip -o /tmp/kafka-connect-mssql.zip
yum install -y unzip
unzip /tmp/kafka-connect-mssql.zip -d /usr/share/java/

최신 버전의 컨테이너가 루트가 아닌 사용자로 실행되고 sudo가 설치되지 않은( no sandwiches for me ) 계획에 주름이 있습니다. 이 문제를 해결하기 위해 Docker Compose 사양에서 루트 사용자로 실행되도록 컨테이너를 승격합니다.

  ksqldb:
    image: confluentinc/ksqldb-server:0.11.0
    container_name: ksqldb
    user: root
    …

이제 컨테이너가 시작되면 커넥터를 다운로드하고unzip 커넥터 아카이브를 설치plugin.path하고 압축을 풉니다. 여기서 Kafka Connect(ksqlDB에 내장된 실행)가 찾을 수 있습니다.

참고: 이를 수행하는 '적절한' 방법은 이미 설치된 커넥터 플러그인으로 자체 ksqlDB 이미지를 굽거나 커넥터를 호스트 시스템에 다운로드하여 ksqlDB 컨테이너에 마운트하는 것입니다. 이 두 가지 모두 괜찮지 만 내 목적에 따라 독립형 Docker Compose 파일보다 더 많은 움직이는 부분과 문제가 발생합니다. :) |

스택 실행



스핀 업Docker Compose file

docker-compose up -d

그런 다음 ksqlDB를 시작합니다. 겉보기에 복잡해 보이는 이 스니펫은 CLI를 시작하기 전에 ksqlDB를 사용할 수 있을 때까지 기다립니다.

docker exec -it ksqldb bash -c 'echo -e "\n\n Waiting for ksqlDB to be available before launching CLI\n"; while : ; do curl_status=$(curl -s -o /dev/null -w %{http_code} http://ksqldb:8088/info) ; echo -e $(date) " ksqlDB server listener HTTP state: " $curl_status " (waiting for 200)" ; if [$curl_status -eq 200] ; then break ; fi ; sleep 5 ; done ; ksql http://ksqldb:8088'

별도의 터미널에서 ksqlDB 시작이 완료되면(즉, 위 명령에서 ksqlDB CLI가 시작되면) MS SQL 커넥터가 올바르게 설치되었는지 확인합니다.

docker exec ksqldb curl -s localhost:8083/connector-plugins

넌 봐야 해

[{"class":"io.debezium.connector.sqlserver.SqlServerConnector","type":"source","version":"1.2.2.Final"}]

CDC용 MS SQL 구성



MS SQL 컨테이너가 시작되면 몇 가지 스크립트가 실행되어 CDC용 데이터베이스를 설정하고 일부 테스트 데이터를 추가합니다. Docker Compose를 사용하지 않는 경우 다음을 직접 실행해야 합니다.

USE [master]
GO
CREATE DATABASE demo;
GO
USE [demo]
EXEC sys.sp_cdc_enable_db
GO

-- Run this to confirm that CDC is now enabled:
SELECT name, is_cdc_enabled FROM sys.databases;
GO

use [demo];

CREATE TABLE demo.dbo.ORDERS ( order_id INT, customer_id INT, order_ts DATE, order_total_usd DECIMAL(5,2), item VARCHAR(50) );
GO

EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'ORDERS',
@role_name = NULL,
@supports_net_changes = 0
GO

-- At this point you should get a row returned from this query
SELECT s.name AS Schema_Name, tb.name AS Table_Name , tb.object_id, tb.type, tb.type_desc, tb.is_tracked_by_cdc FROM sys.tables tb INNER JOIN sys.schemas s on s.schema_id = tb.schema_id WHERE tb.is_tracked_by_cdc = 1
GO
-- h/t William Prigol Lopes https://stackoverflow.com/a/61698148/350613

ksqlDB에 SQL Server 커넥터 추가



이제 ksqlDB 프롬프트가 열려 있어야 합니다.

                  ===========================================
                  =       _              _ ____  ____       =
                  =      | | _____  __ _| |  _ \| __ )      =
                  =      | |/ / __|/ _` | | | | |  _ \      =
                  =      |   <\__ \ (_| | | |_| | |_) |     =
                  =      |_|\_\___/\__, |_|____/|____/      =
                  =                   |_|                   =
                  =  Event Streaming Database purpose-built =
                  =        for stream processing apps       =
                  ===========================================

Copyright 2017-2020 Confluent Inc.

CLI v0.11.0, Server v0.11.0 located at http://ksqldb:8088

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql>

ksqlDB 프롬프트에서 커넥터를 생성합니다.

CREATE SOURCE CONNECTOR SOURCE_MSSQL_ORDERS_01 WITH (
      'connector.class' = 'io.debezium.connector.sqlserver.SqlServerConnector',
      'database.hostname' = 'mssql',
      'database.port' = '1433',
      'database.user' = 'sa',
      'database.password' = 'Admin123',
      'database.dbname' = 'demo',
      'database.server.name' = 'mssql',
      'table.whitelist' = 'dbo.orders',
      'database.history.kafka.bootstrap.servers' = 'broker:29092',
      'database.history.kafka.topic' = 'dbz_dbhistory.mssql.asgard-01',
      'decimal.handling.mode' = 'double'
);

성공적으로 실행 중인지 확인

SHOW CONNECTORS;

 Connector Name         | Type   | Class                                              | Status
--------------------------------------------------------------------------------------------------------------------
 SOURCE_MSSQL_ORDERS_01 | SOURCE | io.debezium.connector.sqlserver.SqlServerConnector | RUNNING (1/1 tasks RUNNING)
--------------------------------------------------------------------------------------------------------------------

그렇지 않은 경우(예: 상태가 WARNING 인 경우) 실행docker logs -f ksqldb하고 페이지를 통해 ERROR 를 찾습니다.

ksqlDB에서 MS SQL 데이터 사용



커넥터가 실행되고 데이터가 흐르면 이에 대해 스트림을 선언할 수 있습니다.

CREATE STREAM ORDERS WITH (KAFKA_TOPIC='mssql.dbo.ORDERS', VALUE_FORMAT='AVRO');

그런 다음 데이터 탐색을 시작합니다.

SET 'auto.offset.reset' = 'earliest';

SELECT SOURCE->NAME, SOURCE->SCHEMA + '.' + SOURCE->"TABLE", OP,BEFORE,AFTER FROM ORDERS EMIT CHANGES LIMIT 2;

+-----------------------+-----------------------+-------+---------+-----------------------+
|NAME                   |KSQL_COL_0             |OP     |BEFORE   |AFTER                  |
+-----------------------+-----------------------+-------+---------+-----------------------+
|mssql                  |dbo.ORDERS             |r      |null     |{ORDER_ID=1, CUSTOMER_I|
|                       |                       |       |         |D=7, ORDER_TS=18256, OR|
|                       |                       |       |         |DER_TOTAL_USD=2.1, ITEM|
|                       |                       |       |         |=Proper Job}           |
|mssql                  |dbo.ORDERS             |r      |null     |{ORDER_ID=2, CUSTOMER_I|
|                       |                       |       |         |D=8, ORDER_TS=18236, OR|
|                       |                       |       |         |DER_TOTAL_USD=0.23, ITE|
|                       |                       |       |         |M=Wainwright}          |

중첩 필드에 액세스하기 위해 연산자를 사용합니다.

SELECT AFTER->ORDER_ID, AFTER->CUSTOMER_ID, AFTER->ORDER_TOTAL_USD FROM ORDERS EMIT CHANGES LIMIT 5;

+-------------+--------------+------------------+
|ORDER_ID     |CUSTOMER_ID   |ORDER_TOTAL_USD   |
+-------------+--------------+------------------+
|1            |7             |2.1               |
|2            |8             |0.23              |
|3            |12            |4.3               |
|4            |7             |4.88              |
|5            |14            |3.89              |
Limit Reached
Query terminated

MS SQL에서 변경 사항 캡처



지금까지 우리는 MS SQL에서 Kafka/ksqlDB로 데이터의 스냅샷/부트스트랩 수집을 보았습니다. MS SQL에서 몇 가지를 변경하고 ksqlDB에 어떻게 나타나는지 봅시다.

MS SQL CLI 시작

docker exec -it mssql bash -c '/opt/mssql-tools/bin/sqlcmd -l 30 -d demo -S localhost -U sa -P $SA_PASSWORD'

데이터를 일부 변경

DELETE FROM ORDERS WHERE ORDER_ID=1;
UPDATE ORDERS SET ORDER_TOTAL_USD = ORDER_TOTAL_USD * 0.9 WHERE ORDER_ID =2;
INSERT INTO ORDERS (order_id, customer_id, order_ts, order_total_usd, item) values (9, 5, '2019-11-29T11:10:39Z', '2.24', 'Black Sheep Ale');
GO

ksqlDB에서 데이터 확인

SELECT OP,
       SOURCE->SCHEMA + '.' + SOURCE->"TABLE",
       BEFORE->ORDER_ID AS B_ORDER_ID,
       AFTER->ORDER_ID AS A_ORDER_ID,
       BEFORE->ORDER_TOTAL_USD AS B_ORDER_TOTAL_USD,
       AFTER->ORDER_TOTAL_USD AS A_ORDER_TOTAL_USD,
       BEFORE->ITEM AS B_ITEM,
       AFTER->ITEM AS A_ITEM
  FROM ORDERS
  WHERE NOT OP='r'
  EMIT CHANGES;

+-----+-------------+------------+-----------+-------------------+------------------+---------------------+---------------------+
|OP   |KSQL_COL_0   |B_ORDER_ID  |A_ORDER_ID |B_ORDER_TOTAL_USD  |A_ORDER_TOTAL_USD |B_ITEM               |A_ITEM               |
+-----+-------------+------------+-----------+-------------------+------------------+---------------------+---------------------+
|d    |dbo.ORDERS   |1           |null       |2.1                |null              |Proper Job           |null                 |
|u    |dbo.ORDERS   |2           |2          |0.23               |0.21              |Wainwright           |Wainwright           |
|c    |dbo.ORDERS   |null        |9          |null               |2.24              |null                 |Black Sheep Ale      |

참고 사항:
  • d 삭제 메시지는 삭제되기 전의 행 상태에 대한 액세스를 제공합니다
  • .
  • u 업데이트 메시지는 업데이트되기 전의 필드 값을 제공합니다. ( ORDER_TOTAL_USD )
  • c 생성 메시지의 BEFORE 개체에 null 값이 있습니다. 행이 생성되기 전에 값이 없었기 때문입니다. :)
  • 좋은 웹페이지 즐겨찾기