Kafka Connect (Source, Sink)
🔨Maria DB 설정
create databse mydb;
create databse mydb;
을 통해 Maria DB에 mydb라는 테이블을 하나 만들어두자.
order-service
에 mariaDB driver를 추가해주자
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<version>2.7.2</version>
</dependency>
3.x가 나왔지만 잘 모르니까 2.7.2로 설치했다.
create table users(
id int auto_increment primary key,
user_id varchar(20),
pwd varchar(20),
name varchar(20),
created_at datetime default NOW()
);
create table orders (
id int auto_increment primary key,
product_id varchar(20) not null,
qty int default 0,
unit_price int default 0,
total_price int default 0,
user_id varchar(50) not null,
order_id varchar(50) not null,
created_at datetime default NOW()
);
그후에 2개의 테이블을 생성해준다.
🔨Kafka Connect
👉Kafka Connect 설치
http://packages.confluent.io/archive
curl -O http://packages.confluent.io/archive/5.5/confluent-community-5.5.2-2.12.tar.gz
url로 접근하여 connect 압축파일을 다운받는다. 혹은 curl 명령어로 설치해도 된다. 나는 7.0으로 설치했다.
./bin/windows/connect-distributed.bat ./etc/kafka/connect-distributed.properties
명령어를 입력하여 kafka connect를 실행시켜보자.
참고로 connect 실행 전 zookeaper와 kafka server를 실행시켜 두고 해야한다.
window의 경우 다음과 같은 오류가 발생한다.
./bin/windows/kafka-run-class.bat
파일에서
rem Claaspath addition for core
rem classpath addition for LSB style path
if exist %BASE_DIR%\share\java\kafka\* (
call:concat %BASE_DIR%\share\java\kafka\*
)
부분을 찾아서 위에 코드를 삽입해주어야 한다.
다음과 같이 설정해주어야한다.
그럼 이제 이런 오류가 또 나온다 ㅎㅎ
./bin/windows/connect-distributed.bat
파일의 30번째를 보면
rem Log4j settings
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%BASE_DIR%/config/tools-log4j.properties
)
이 부분을
rem Log4j settings
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%BASE_DIR%/etc/kafka/tools-log4j.properties
)
로 수정해주어야 한다
/config/
를 /etc/kafka/
로 수정한것이다.
나는 오류가 떠서 진행할 수 없어서 7.0을 설치하여 위 내용대로 진행하니 우선 실행이 되었다.
그리고 나서 또
could not be established. Broker may not be available.
오류를 뿜어내고 있다.
/{kafka 폴더}/config/server.properties
파일의
포트 부분의 주석을 풀어주고 kafka와 zookeaper를 다시 실행해보자
드디어 정상 실행되었다.
./bin/windows/kafka-topics.bat --bootstrap-server localhost:9092 --list
kfaka에서 해당 명령어를 입력해주면
coonect topic들이 추가된걸 확인할 수 있다.
👉Kafka Connect JDBC 설치
https://docs.confluent.io/5.5.1/connect/kafka-connect-jdbc/index.html
그 후에 위 url로 접근하여 JDBC Connector를 설치해야하는데
누르고
눌러서 압축파일을 다운 받는다.
압축을 풀어서 lib 폴더 내부에 다음 jar 파일을 확인할 수 있는데 해당 파일의 경로를 설정 파일에 추가해주어야한다.
C:\2022\kafka-connect\etc\kafka\connect-distributed.properties
파일을 열어서
window의 경우 다음과 같이 plugin을 추가해준다.
C:\{HomeDirectory}\.m2\repository\org\mariadb\jdbc\mariadb-java-client\2.7.2
의
이 파일을 복사해서
{kafka-connect폴더}\share\java\kafka
에 붙여 넣는다.
🔨Kafka Source Connect 등록
포스트맨을 실행시켜서
http://127.0.0.1:8083/connectors url에 post로 다음과 같이 요청하면
{
"name" : "my-source-connect",
"config" : {
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mysql://localhost:3306/mydb",
"connection.user":"root",
"connection.password":"db 비밀번호",
"mode": "incrementing",
"incrementing.column.name" : "id",
"table.whitelist":"users",
"topic.prefix" : "my_topic_",
"tasks.max" : "1"
}
}
결과를 얻을 수 있다. 이제 Maria DB에 Kafka JDBC로 연결된 상태가 되었다.
http://127.0.0.1:8083/connectors 요청시
내가 등록한 connect의 리스트를 볼수 있고
http://127.0.0.1:8083/connectors/my-source-connect/status 요청시
connect의 상태를 확인할 수 있다.
아직 아무런 데이터의 변경사항이 없을 때 topic list이다.
insert into users(user_id, pwd, name) values ('user1', 'test1234', 'user1');
명령어를 통해
db에 데이터가 추가되었고
kafka에도 connect로 등록해놨던 topic이 추가된것을 확인할 수 있다.
그리고 consumer를 사용해보자
./bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my_topic_users --from-beginning
명령어를 실행하여 consumer의 내용을 확인해보면
우리가 방금 추가한 user1의 추가된 정보를 확인할 수 있다.
정말로 이게 추가된게 맞는지 또 확인하고 싶다면 한번 더 db에 데이터를 추가해보자!
입력했고
consumer에도 정상적으로 추가되었다.
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "user_id"
},
{
"type": "string",
"optional": true,
"field": "pwd"
},
{
"type": "string",
"optional": true,
"field": "name"
},
{
"type": "int64",
"optional": true,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "created_at"
}
],
"optional": false,
"name": "users"
},
"payload": {
"id": 1,
"user_id": "user1",
"pwd": "test1234",
"name": "user1",
"created_at": 1647466531000
}
}
/////
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "user_id"
},
{
"type": "string",
"optional": true,
"field": "pwd"
},
{
"type": "string",
"optional": true,
"field": "name"
},
{
"type": "int64",
"optional": true,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "created_at"
}
],
"optional": false,
"name": "users"
},
"payload": {
"id": 2,
"user_id": "user2",
"pwd": "test1234",
"name": "user2",
"created_at": 1647466806000
}
}
이 데이터를 json 형태로 잘 정리하면 위 내용과 같다.
🔨Kafka Sink Connect 등록
{
"name":"my-sink-connect",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://localhost:3306/mydb",
"connection.user":"root",
"connection.password":"root 비밀번호",
"auto.create":"true",
"auto.evolve":"true",
"delete.enabled":"false",
"tasks.max":"1",
"topics":"my_topic_users"
}
}
{
"name":"my-sink-connect",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://localhost:3306/mydb",
"connection.user":"root",
"connection.password":"root 비밀번호",
"auto.create":"true",
"auto.evolve":"true",
"delete.enabled":"false",
"tasks.max":"1",
"topics":"my_topic_users"
}
}
Source Connect를 등록할때와 마찬가지로 topic을 먼저 등록한다.
정상적으로 등록된 것을 확인할 수 있다.
db에도 my_topic_users라는 이름으로 테이블이 생성되었고
안에 데이터도 동일하게 들어가있는 것도 확인할 수 있다.
insert into users(user_id, pwd, name) values ('user3', 'test123412', 'user3');
insert into users(user_id, pwd, name) values ('admin', 'admin1234', 'admin');
이제 2개의 insert query를 날려서 유저를 users 테이블에 데이터를 등록해봤다.
두 테이블 모두 동인하게 데이터가 들어가있는 것을 확인할 수 있다.
Author And Source
이 문제에 관하여(Kafka Connect (Source, Sink)), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://velog.io/@ililil9482/Kafka-Connect저자 귀속: 원작자 정보가 원작자 URL에 포함되어 있으며 저작권은 원작자 소유입니다.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)