canal 추출 mysql의binlog 로그를kafka 환경 구축에 기록하기
1.mysql 관련 구성
1.canal의 원리는 mysql binlog 기술을 바탕으로 mysql의 binlog 쓰기 기능을 켜고 binlog 모드를 row로 설정해야 한다는 것이다.
#mysql
service mysqld start
service mysqld status
service mysqld stop
2. mysql의 binlog 기능을 활성화합니다.
[mysqld]
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
user=mysql
# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0
#binlog server_id, server , canal slaveId
server-id = 1
#mysql
log-bin = $datadir/mysql-bin
#
binlog_format = ROW
3. 구성이 적용되는지 테스트
my.cnf
:#
mysql -uroot -proot
use mysql;
mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
1 row in set (0.01 sec)
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
4.canal의 원리는 mysql slave를 모의하는 것으로 mysql slave와 관련된 권한을 설정해야 한다
mysql> CREATE USER canal IDENTIFIED BY 'canal';
Query OK, 0 rows affected (0.00 sec)
mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
Query OK, 0 rows affected (0.00 sec)
mysql> FLUSH PRIVILEGES;
Query OK, 0 rows affected (0.00 sec)
#
mysql> show grants for 'canal';
+----------------------------------------------------------------------------------------------------------------------------------------------+
| Grants for canal@% |
+----------------------------------------------------------------------------------------------------------------------------------------------+
| GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY PASSWORD '*E3619321C1A937C46A0D8BD1DAC39F93B27D4458' |
+----------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)
2.canal 구축
1. 패키지 다운로드를 설치합니다. 다른 버전을 다운로드하려면 필요한 버전 번호를 직접 수정하여 다운로드할 수 있습니다.
https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
2. 패키지 압축 풀기 설치
# -C
tar -zxf canal.deployer-1.1.4.tar.gz -C /usr/canal/
3. canal 서버 구성
#canal/conf
-rwxrwxrwx 1 root root 291 Sep 2 2019 canal_local.properties
-rwxrwxrwx 1 root root 5140 Apr 15 05:56 canal.properties
drwxrwxrwx 2 root root 4096 Apr 15 05:29 example
-rwxrwxrwx 1 root root 3119 Sep 2 2019 logback.xml
drwxrwxrwx 2 root root 4096 Apr 15 04:41 metrics
drwxrwxrwx 3 root root 4096 Apr 15 04:41 spring
a. canal 수정properties
# tcp
canal.serverMode = kafka
# ,
canal.instance.parser.parallelThreadSize = 16
# Kafka bootstrap.servers
canal.mq.servers = 192.168.198.158:9092
# , 0, 3
canal.mq.retries = 3
# Kafka batch.size, producer , 16K
canal.mq.batchSize = 16384
# Kafka max.request.size, , 1M
canal.mq.maxRequestSize = 1048576
# Kafka linger.ms, sender , 0ms
# batch.size linger.ms ,
canal.mq.lingerMs = 200
# Kafka buffer.memory, , 32M
canal.mq.bufferMemory = 33554432
# binlog , 50
canal.mq.canalBatchSize = 50
# binlog , 200ms
canal.mq.canalGetTimeout = 200
# binlog JSON 。 false, Protobuf
canal.mq.flatMessage = true
#
canal.mq.compressionType = none
# Kafka acks, all, leader follower producer ack,0 ack,1 leader ack
canal.mq.acks = all
# Kafka
canal.mq.transaction = true
b. example/instance를 수정합니다.properties
# topic
# mq config
# MQ/KAFka TOPIC
canal.mq.topic=canal # kafka topic
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
4. canal 시작 명령
bin/start.sh
bin/stop.sh
5.kafka에서 데이터 보기
{
"data":[{
"id":"1","name":"a"}],"database":"test","es":1586900538000,"id":8,"isDdl":false,"mysqlType":{
"id":"int","name":"varchar(10)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{
"id":4,"name":12},"table":"Student","ts":1586900539360,"type":"INSERT"}
{
"data":[{
"id":"1","name":"b"}],"database":"test","es":1586900671000,"id":10,"isDdl":false,"mysqlType":{
"id":"int","name":"varchar(10)"},"old":[{
"name":"a"}],"pkNames":["id"],"sql":"","sqlType":{
"id":4,"name":12},"table":"Student","ts":1586900671586,"type":"UPDATE"}
{
"data":[{
"id":"1","name":"b"}],"database":"test","es":1586900790000,"id":12,"isDdl":false,"mysqlType":{
"id":"int","name":"varchar(10)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{
"id":4,"name":12},"table":"Student","ts":1586900798554,"type":"DELETE"}
{
"data":[{
"id":"2","name":"a"}],"database":"test","es":1586900817000,"id":13,"isDdl":false,"mysqlType":{
"id":"int","name":"varchar(10)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{
"id":4,"name":12},"table":"Student","ts":1586900817797,"type":"INSERT"}
다른 실례를 설정할 때 example 폴더를 새 디렉터리로 복사하고 instance를 수정해야 합니다.properties, 서비스 재시작 필요 없음
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
대인기의 Github 기계 학습 아이템을 소개 5선 - ScrapeStorm이 기사에서는 다섯 가지 인기있는 GitHub 기계 학습 항목을 소개합니다. 이러한 항목에는 자연 언어 처리(NLP), 컴퓨터 비전, 빅 데이터 등 다양한 기계 학습 분야가 포함되어 있습니다. NLP는 텍스트의 처리...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.