canal 추출 mysql의binlog 로그를kafka 환경 구축에 기록하기

18005 단어 canal빅데이터kafka
조사 연구 canal 추출 mysql의binlog 로그를kafka 환경 구축에 기록하기
1.mysql 관련 구성
1.canal의 원리는 mysql binlog 기술을 바탕으로 mysql의 binlog 쓰기 기능을 켜고 binlog 모드를 row로 설정해야 한다는 것이다.
#mysql      
service mysqld start
service mysqld status
service mysqld stop

2. mysql의 binlog 기능을 활성화합니다.
[mysqld]
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
user=mysql
# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0
#binlog     server_id,         server     ,   canal slaveId  
server-id = 1
#mysql            
log-bin = $datadir/mysql-bin
#            
binlog_format = ROW

3. 구성이 적용되는지 테스트my.cnf:
#     
mysql -uroot -proot
use mysql;
mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+
1 row in set (0.01 sec)

mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+

4.canal의 원리는 mysql slave를 모의하는 것으로 mysql slave와 관련된 권한을 설정해야 한다
mysql> CREATE USER canal IDENTIFIED BY 'canal';
Query OK, 0 rows affected (0.00 sec)

mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
Query OK, 0 rows affected (0.00 sec)

mysql> FLUSH PRIVILEGES;
Query OK, 0 rows affected (0.00 sec)
#    
mysql> show grants for 'canal';
+----------------------------------------------------------------------------------------------------------------------------------------------+
| Grants for canal@%                                                                                                                           |
+----------------------------------------------------------------------------------------------------------------------------------------------+
| GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY PASSWORD '*E3619321C1A937C46A0D8BD1DAC39F93B27D4458' |
+----------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)

2.canal 구축
1. 패키지 다운로드를 설치합니다. 다른 버전을 다운로드하려면 필요한 버전 번호를 직접 수정하여 다운로드할 수 있습니다.
https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
2. 패키지 압축 풀기 설치
# -C           
tar -zxf canal.deployer-1.1.4.tar.gz -C /usr/canal/

3. canal 서버 구성
#canal/conf     
-rwxrwxrwx 1 root root  291 Sep  2  2019 canal_local.properties
-rwxrwxrwx 1 root root 5140 Apr 15 05:56 canal.properties
drwxrwxrwx 2 root root 4096 Apr 15 05:29 example
-rwxrwxrwx 1 root root 3119 Sep  2  2019 logback.xml
drwxrwxrwx 2 root root 4096 Apr 15 04:41 metrics
drwxrwxrwx 3 root root 4096 Apr 15 04:41 spring

a. canal 수정properties
#    tcp
canal.serverMode = kafka
#        ,                   
canal.instance.parser.parallelThreadSize = 16
# Kafka bootstrap.servers
canal.mq.servers = 192.168.198.158:9092
#          ,  0,  3
canal.mq.retries = 3
# Kafka batch.size, producer        ,  16K
canal.mq.batchSize = 16384
# Kafka max.request.size,          ,  1M
canal.mq.maxRequestSize = 1048576
# Kafka linger.ms, sender                ,  0ms
#   batch.size linger.ms    ,      
canal.mq.lingerMs = 200
# Kafka buffer.memory,    ,  32M
canal.mq.bufferMemory = 33554432
#   binlog       ,  50
canal.mq.canalBatchSize = 50
#   binlog       ,  200ms
canal.mq.canalGetTimeout = 200
#    binlog  JSON  。   false,    Protobuf  
canal.mq.flatMessage = true
#     
canal.mq.compressionType = none
# Kafka acks,  all,    leader    follower     producer  ack,0     ack,1  leader        ack
canal.mq.acks = all
# Kafka          
canal.mq.transaction = true

b. example/instance를 수정합니다.properties
#        topic                     
# mq config
# MQ/KAFka TOPIC  
 canal.mq.topic=canal  # kafka      topic
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*

4. canal 시작 명령
bin/start.sh
bin/stop.sh

5.kafka에서 데이터 보기
{
     "data":[{
     "id":"1","name":"a"}],"database":"test","es":1586900538000,"id":8,"isDdl":false,"mysqlType":{
     "id":"int","name":"varchar(10)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{
     "id":4,"name":12},"table":"Student","ts":1586900539360,"type":"INSERT"}
{
     "data":[{
     "id":"1","name":"b"}],"database":"test","es":1586900671000,"id":10,"isDdl":false,"mysqlType":{
     "id":"int","name":"varchar(10)"},"old":[{
     "name":"a"}],"pkNames":["id"],"sql":"","sqlType":{
     "id":4,"name":12},"table":"Student","ts":1586900671586,"type":"UPDATE"}
{
     "data":[{
     "id":"1","name":"b"}],"database":"test","es":1586900790000,"id":12,"isDdl":false,"mysqlType":{
     "id":"int","name":"varchar(10)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{
     "id":4,"name":12},"table":"Student","ts":1586900798554,"type":"DELETE"}
{
     "data":[{
     "id":"2","name":"a"}],"database":"test","es":1586900817000,"id":13,"isDdl":false,"mysqlType":{
     "id":"int","name":"varchar(10)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{
     "id":4,"name":12},"table":"Student","ts":1586900817797,"type":"INSERT"}

다른 실례를 설정할 때 example 폴더를 새 디렉터리로 복사하고 instance를 수정해야 합니다.properties, 서비스 재시작 필요 없음

좋은 웹페이지 즐겨찾기