Kafka Connect (Source, Sink)

28425 단어 msamsa

🔨Maria DB 설정

create databse mydb;

을 통해 Maria DB에 mydb라는 테이블을 하나 만들어두자.

order-service에 mariaDB driver를 추가해주자

<dependency>
    <groupId>org.mariadb.jdbc</groupId>
    <artifactId>mariadb-java-client</artifactId>
    <version>2.7.2</version>
</dependency>

3.x가 나왔지만 잘 모르니까 2.7.2로 설치했다.

create table users(
    id int auto_increment primary key,
    user_id varchar(20),
    pwd varchar(20),
    name varchar(20),
    created_at datetime default NOW()
);
create table orders (
    id int auto_increment primary key,
    product_id varchar(20) not null,
    qty int default 0,
    unit_price int default 0,
    total_price int default 0,
    user_id varchar(50) not null,
    order_id varchar(50) not null,
    created_at datetime default NOW()
);

그후에 2개의 테이블을 생성해준다.

🔨Kafka Connect

👉Kafka Connect 설치

http://packages.confluent.io/archive

curl -O http://packages.confluent.io/archive/5.5/confluent-community-5.5.2-2.12.tar.gz

url로 접근하여 connect 압축파일을 다운받는다. 혹은 curl 명령어로 설치해도 된다. 나는 7.0으로 설치했다.

./bin/windows/connect-distributed.bat ./etc/kafka/connect-distributed.properties

명령어를 입력하여 kafka connect를 실행시켜보자.

참고로 connect 실행 전 zookeaper와 kafka server를 실행시켜 두고 해야한다.

window의 경우 다음과 같은 오류가 발생한다.

./bin/windows/kafka-run-class.bat 파일에서

rem Claaspath addition for core
rem classpath addition for LSB style path
if exist %BASE_DIR%\share\java\kafka\* (
	call:concat %BASE_DIR%\share\java\kafka\*
)

부분을 찾아서 위에 코드를 삽입해주어야 한다.

다음과 같이 설정해주어야한다.

그럼 이제 이런 오류가 또 나온다 ㅎㅎ

./bin/windows/connect-distributed.bat 파일의 30번째를 보면

rem Log4j settings
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
	set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%BASE_DIR%/config/tools-log4j.properties
)

이 부분을

rem Log4j settings
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
	set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%BASE_DIR%/etc/kafka/tools-log4j.properties
)

로 수정해주어야 한다

/config//etc/kafka/로 수정한것이다.

나는 오류가 떠서 진행할 수 없어서 7.0을 설치하여 위 내용대로 진행하니 우선 실행이 되었다.

그리고 나서 또

could not be established. Broker may not be available. 오류를 뿜어내고 있다.

/{kafka 폴더}/config/server.properties 파일의

포트 부분의 주석을 풀어주고 kafka와 zookeaper를 다시 실행해보자

드디어 정상 실행되었다.

./bin/windows/kafka-topics.bat --bootstrap-server localhost:9092 --list

kfaka에서 해당 명령어를 입력해주면

coonect topic들이 추가된걸 확인할 수 있다.

👉Kafka Connect JDBC 설치

https://docs.confluent.io/5.5.1/connect/kafka-connect-jdbc/index.html

그 후에 위 url로 접근하여 JDBC Connector를 설치해야하는데

누르고

눌러서 압축파일을 다운 받는다.


압축을 풀어서 lib 폴더 내부에 다음 jar 파일을 확인할 수 있는데 해당 파일의 경로를 설정 파일에 추가해주어야한다.

C:\2022\kafka-connect\etc\kafka\connect-distributed.properties 파일을 열어서

window의 경우 다음과 같이 plugin을 추가해준다.

C:\{HomeDirectory}\.m2\repository\org\mariadb\jdbc\mariadb-java-client\2.7.2

이 파일을 복사해서

{kafka-connect폴더}\share\java\kafka에 붙여 넣는다.

🔨Kafka Source Connect 등록

포스트맨을 실행시켜서

http://127.0.0.1:8083/connectors url에 post로 다음과 같이 요청하면

{
    "name" : "my-source-connect",
    "config" : {
        "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url":"jdbc:mysql://localhost:3306/mydb",
        "connection.user":"root",
        "connection.password":"db 비밀번호",
        "mode": "incrementing",
        "incrementing.column.name" : "id",
        "table.whitelist":"users",
        "topic.prefix" : "my_topic_",
        "tasks.max" : "1"
    }
}

결과를 얻을 수 있다. 이제 Maria DB에 Kafka JDBC로 연결된 상태가 되었다.

http://127.0.0.1:8083/connectors 요청시

내가 등록한 connect의 리스트를 볼수 있고

http://127.0.0.1:8083/connectors/my-source-connect/status 요청시

connect의 상태를 확인할 수 있다.

아직 아무런 데이터의 변경사항이 없을 때 topic list이다.

insert into users(user_id, pwd, name) values ('user1', 'test1234', 'user1');

명령어를 통해

db에 데이터가 추가되었고

kafka에도 connect로 등록해놨던 topic이 추가된것을 확인할 수 있다.

그리고 consumer를 사용해보자

./bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my_topic_users --from-beginning

명령어를 실행하여 consumer의 내용을 확인해보면

우리가 방금 추가한 user1의 추가된 정보를 확인할 수 있다.

정말로 이게 추가된게 맞는지 또 확인하고 싶다면 한번 더 db에 데이터를 추가해보자!

입력했고

consumer에도 정상적으로 추가되었다.

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "user_id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "pwd"
      },
      {
        "type": "string",
        "optional": true,
        "field": "name"
      },
      {
        "type": "int64",
        "optional": true,
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "created_at"
      }
    ],
    "optional": false,
    "name": "users"
  },
  "payload": {
    "id": 1,
    "user_id": "user1",
    "pwd": "test1234",
    "name": "user1",
    "created_at": 1647466531000
  }
}
/////
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "user_id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "pwd"
      },
      {
        "type": "string",
        "optional": true,
        "field": "name"
      },
      {
        "type": "int64",
        "optional": true,
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "created_at"
      }
    ],
    "optional": false,
    "name": "users"
  },
  "payload": {
    "id": 2,
    "user_id": "user2",
    "pwd": "test1234",
    "name": "user2",
    "created_at": 1647466806000
  }
}

이 데이터를 json 형태로 잘 정리하면 위 내용과 같다.

🔨Kafka Sink Connect 등록

{
    "name":"my-sink-connect",
    "config":{
        "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url":"jdbc:mysql://localhost:3306/mydb",
        "connection.user":"root",
        "connection.password":"root 비밀번호",
        "auto.create":"true",
        "auto.evolve":"true",
        "delete.enabled":"false",
        "tasks.max":"1",
        "topics":"my_topic_users"
    }
}

Source Connect를 등록할때와 마찬가지로 topic을 먼저 등록한다.

정상적으로 등록된 것을 확인할 수 있다.

db에도 my_topic_users라는 이름으로 테이블이 생성되었고

안에 데이터도 동일하게 들어가있는 것도 확인할 수 있다.

insert into users(user_id, pwd, name) values ('user3', 'test123412', 'user3');
insert into users(user_id, pwd, name) values ('admin', 'admin1234', 'admin');

이제 2개의 insert query를 날려서 유저를 users 테이블에 데이터를 등록해봤다.

두 테이블 모두 동인하게 데이터가 들어가있는 것을 확인할 수 있다.

좋은 웹페이지 즐겨찾기