kafka 및 debezium을 사용하여 데이터베이스 테이블 변경 사항 캡처

색인



  • Introduction

  • Environment preparation and system setup
  • Using our setup

  • Conclusion and final thoughts

  • 1. 소개

    This tutorial will guide you through how to setup up all that you need to stream changes happening to a postgres database table using kafka. There are a bunch of similar tutorials out there but I prepared this one to be very simple to setup (literally a single command) and bare bones, just what you need to get you started.

    The idea is to capture all the changes (additions, deletions and updates) and stream them so you can do whatever you want with them, for example: archive them, modify them and add them to another table, analyse them etc.

    We achieve this using a technique called Change Data Capture (CDC). We use our database (in this case PostgreSQL) ability to replicate to capture the changes through pgoutput, postgres' standard logical decoding output plug-in.

    Once we have captured the changes we pass them to kafka using debezium's connector. It is debezium that implementes CDC in this particular architecture.

    For more details, read this https://debezium.io/documentation/reference/1.4/connectors/postgresql.html

    kafka에 들어가면 변경 사항이 여러 kafka 커넥터로 읽을 수 있는 주제에 저장됩니다. 오프셋(마지막으로 읽은 메시지 수)을 변경하여 처음부터 모든 변경 사항을 읽거나, 중단한 위치에서 읽기를 다시 시작할 수 있습니다(충돌 후 또는 항상 읽지 않는 경우).

    2. 환경 준비 및 시스템 설정

    The only requirement for this to work is to have Docker (correctly authenticated) and docker-compose fully installed in your system.

    To make it easy, you can clone this repository that I prepared. We will go through all the files so there's not (too much) magic involved in making this work.

    https://github.com/zom-pro/kafka-cdc

    우리가 사용할 메인 파일은 docker-compose.yml 입니다. 이 파일은 docker-compose up 명령이 수행하는 작업을 제어합니다.

    일반적인 docker-compose 파일이지만 주목해야 할 몇 가지 흥미로운 사항이 있습니다. 우선, postgres(변경 사항을 캡처하려는 데이터베이스), zookeper(kafka에 필요합니다. kafka를 다중 노드 분산 시스템으로 실행할 수 있음), kafka(저장 및 저장에 사용할 스트리밍 플랫폼) 데이터베이스의 변경 사항 스트리밍), 연결(데이터베이스를 kakfa에 연결할 수 있는 debezium 소스 커넥터). 이 커넥터는 이 서비스를 구축하는 방법에 대한 추가 정보를 찾을 수 있는 폴더인 "컨텍스트"로 구축됩니다. 이 특정 구현은 debezium에서 권장합니다. 그들은 매우 포괄적인 문서(처음에 링크)를 가지고 있으므로 더 자세한 내용은 다루지 않겠습니다.

    커넥터 컨텍스트의 원본 코드는 다음에서 찾을 수 있습니다. https://github.com/debezium/debezium-examples/tree/master/end-to-end-demo/debezium-jdbc
    link 섹션은 구성 요소 간의 필수 연결을 보여줍니다.
    또 다른 흥미로운 세부 사항은 postgres가 init.sql 파일을 사용하여 데이터베이스를 구축하는 방법입니다. 이는 컨테이너가 빌드될 때까지 데이터베이스 설정이 완료되어 사용할 준비가 되었음을 의미합니다. 이것은 엄청난 일입니다. 이를 지원하지 않는 데이터베이스에서 이 작업을 시도했다면 이 작업을 수행하는 것이 얼마나 고통스러운지 알게 될 것입니다.

    환경을 시작하려면 다음을 실행하세요.

    docker-compose up
    


    그리고 그게 다야! 변경 사항을 수신하는 kafka에 연결된 데이터베이스가 있는 완전한 운영 환경이 있다는 것을 알고 있습니다.

    실행하고 분리하려면 -d를 추가할 수 있습니다. 그렇지 않으면 창을 닫거나 ctrl-c 모든 컨테이너가 중지됩니다.

    컨테이너를 중지하고 파괴하려면 docker-compose down 를 실행하십시오. 이렇게 하면 컨테이너가 중지될 뿐만 아니라 컨테이너와 사용된 네트워크도 제거됩니다.

    3. 설정 사용

    Let's explore what we have created. We will use the docker exec command to run commands in our containers as well as the REST interfaces they expose. Start a new shell if you didn't use -d .

    We will request kafka its available connectors.

    curl -H "Accept:application/json" localhost:8083/connectors/
    

    The result should be [] and the reason is while our debezium connector container is running, we haven't connected it to kafka yet. So let's do that:

    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @postgresql-connect.json
    

    We are using the postgresql-connect.json file that contains the configuration for our connector. The most important sections are the ones that point the connector to the right database and the right table (remember, we already created these through the init.sql file/command).

    The result of this command should be something like HTTP/1.1 201 Created and a bunch of extra information about the connector we just created.

    Now that we have the source connector installed we are ready to start listening to our kafka topic with a console connector. This is the connector reading "from" kafka (debezium, the source connector, was reading from the database "to" kafka)

    docker exec kafka bin/kafka-console-consumer.sh  --bootstrap-server kafka:9092 --topic postgres.public.data --from-beginning | jq
    

    The first part of this command will initialise and connect the connector. jq is just a utility to make the output look prettier so you can run it without | jq if you don't have it installed (or you can install it). Note also that kafka comes with a bunch of utilities in its bin folder that are worth exploring.

    That command will run and it will wait for something to happen (the table is empty in the database at this point).

    Let's use psql to add some data to the table (you need to run this in an additional terminal session. At this point, one session has docker-compose, the other one has our connector and we will run the command in a third one).

    docker exec -it postgres psql -U kafkasandbox -d kafkasandboxdb -c 'INSERT INTO data(key, value) VALUES(1, 10), (2, 20), (3, 30), (4, 40), (5, 50), (6, 60), (7, 70), (8, 80), (9, 90), (10, 100)'
    

    Now, go and have a look at the connector session and you should see all your inserts.

    At this point you can start experimenting with deletions, changes etc. Pay attention to the payload section of each event being streamed by kafka.

    For example, run

    docker exec -it postgres psql -U kafkasandbox -d kafkasandboxdb -c "UPDATE data SET value=60 where key=3"
    

    This is the resulting payload

        "before": {
          "id": 3,
          "value": 30,
          "key": 3
        },
        "after": {
          "id": 3,
          "value": 60,
          "key": 3
        },
        "source": {
          "version": "1.4.1.Final",
          "connector": "postgresql",
          "name": "postgres",
          "ts_ms": 1613679449081,
          "snapshot": "false",
          "db": "kafkasandboxdb",
          "schema": "public",
          "table": "data",
          "txId": 558,
          "lsn": 23744400,
          "xmin": null
        },
        "op": "u",
        "ts_ms": 1613679449296,
        "transaction": null
      }
    

    It contains everything to fully define the update we just did. Type of operation "u", the previous and now current value, the affected table and database, the timestamp, etc.

    You can now do whatever you want/need with these changes. You could for example create and archive table if you want to be able to stream the changes made in a particular table in different clients. Or you could take the changes, do something with them like version them and push them back into another table.

    You can use a variety of different connectors (we are using a console connector for convenience here). Normally, these connectors will be called sink connectors (remember debezium was our source connector)

    Another interesting thing to try is killing and re-starting your connector. Stop it with control-c and re-run the command

    docker exec kafka bin/kafka-console-consumer.sh  --bootstrap-server kafka:9092 --topic postgres.public.data --from-beginning | jq
    

    Note we are reading back from "the beginning" which means all the changes will be re-played but we could have chosen a particular "offset" (location in the stream). The most common case is to run it without the --from-beginning which will start reading from where we left it. Let's try offsetting the connector. Ctrl-c your connector and change a couple of things in your database (use the update command above with a different value for example). Then re-start the connector with an specified offset (also note we need to specify the partition, we only have one partition so we will use 0).

    docker exec kafka bin/kafka-console-consumer.sh  --bootstrap-server kafka:9092 --topic postgres.public.data --offset 8 --partition 0  | jq 
    

    This will read from the message offset = 8 onward (including the changes you made if you made any of course)

    To determine your latest offset run:

    docker exec kafka bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka:9092 --topic postgres.public.data
    

    In my case the result of this command is

    postgres.public.data:0:14
    

    Which means that if I offset to 13, I will see the last message only and whatever it happens from that point onward.

    4. 결론 및 최종 생각

    Hopefully you enjoyed this introduction to kafka. Kafka itself is big and it comes in different flavours (like confluent kafka for example) and a world of add-ons and related functionality. Running it in production is not for the faint-hearted neither but it's a really powerful platform.

    Kafka implementing CDC as we are using here solves a number of problems that you would need to solve if you implement this from scratch. For example, without kafka you need to find a way to store the events in order, being able to reproduce them from any location to support clients crashing etc. Also kafka provides other useful tools such as logs compaction which will be very useful in a production-level solution.

    As a way to understand this last thought, compare this implementation we described here with the alternative setup proposed in AWS here (without kafka implementing CDC with AWS streams)

    https://aws.amazon.com/es/blogs/database/stream-changes-from-amazon-rds-for-postgresql-using-amazon-kinesis-data-streams-and-aws-lambda/

    항상 그렇듯이 문제나 의견이 있으면 알려주세요!

    좋은 웹페이지 즐겨찾기