Python:mysql mom 모듈 을 사용 하여 실시 간 으로 MySQL 데 이 터 를 Elastic Search 에 동기 화

13040 단어 python
mysqlsmam 문서:https://mysqlsmom.readthedocs.io/en/latest/hello.html github: https://github.com/m358807551/mysqlsmom
설치 하 다.
 pip install mysqlsmom

전량 동기 화
#           

$ mom new test_mom/init_config.py -t init --force

#       
$ vim ./test_mom/init_config.py  #          

#     
$ mom run -c ./test_mom/init_config.py

증분 동기 화
세 개의 파일 설정
test_mom
├── binlog_config.py   #     
├── my_filters.py         #         watched 
└── my_handlers.py    #         pipeline

새 설정
mom new test_mom/binlog_config.py -t binlog --force

1、binlog_config.py
# coding=utf-8

STREAM = "BINLOG"  # "BINLOG" or "INIT"
SERVER_ID = 99
SLAVE_UUID = __name__

#      BULK_SIZE     elasticsearch,          1
BULK_SIZE = 1

BINLOG_CONNECTION = {
    'host': '127.0.0.1',
    'port': 3306,
    'user': 'root',
    'passwd': '123456'
}

# redis           
REDIS = {
    "host": "127.0.0.1",
    "port": 6379,
    "db": 0,
    # "password": "password",  #              
}

NODES = [{"host": "127.0.0.1", "port": 9200}]

TASKS = [
    {
        "stream": {
            "database": "demo",
            "table": "student"
        },
        "jobs": [{
            "actions": ["insert", "update"],
            "watched": {
                "filter_display": {}
            },
            "pipeline": [
                {"only_fields": {"fields": ["id", "name", "age"]}},
                {"change_name": {"key": "name", "prefix": "hot-"}},
                {"set_id": {"field": "id"}}
            ],
            "dest": {
                "es": {
                    "action": "upsert",
                    "index": "demo",
                    "type": "student",
                    "nodes": NODES
                }
            }
        },
            {
                "actions": ["delete"],
                "pipeline": [
                    # {"only_fields": {"fields": ["id", "name", "age"]}},
                    {"set_id": {"field": "id"}}
                ],
                "dest": {
                    "es": {
                        "action": "delete",
                        "index": "demo",
                        "type": "student",
                        "nodes": NODES
                    }
                }
            }
        ]
    }
]

CUSTOM_ROW_HANDLERS = "./my_handlers.py"
CUSTOM_ROW_FILTERS = "./my_filters.py"


사용자 정의 프로세서 myhandlers.py
# -*- coding: utf-8 -*-


import copy

def change_name(row, key, prefix):
    new_row = copy.deepcopy(row)
    new_row[key] = "{}{}".format(prefix, row[key])
    #       ,        
    return new_row


사용자 정의 필터 myfilters.py
# -*- coding: utf-8 -*-

def filter_display(event):
	#   True   False,     
    return event["values"]["display"] == 1


시동 을 걸다
mom run -c test_mom/binlog_config.py 

좋은 웹페이지 즐겨찾기