Namespaced Topic Kafka Producer로 Maxwell 시작(Kafka에서 유휴 리스너 찾기)

전제 조건을 실행한 후 다음을 갖게 됩니다.
  • AWS Aurora 인스턴스
  • osheroff/maxwell이라는 Maxwell 이미지
  • kafka라는 AKafka 서비스, 수신 대기 중 kafka:9092

  • Namespaced Topic Kafka Producer로 Maxwell 시작
    이는 전제 조건인 AWS Aurora에서 Maxwell Kafka Producer로의 약간의 변형입니다.

    전제 조건에서 Maxwell 주제에 대한 메시지를 생성하는 기본 Kafka Producer 구성으로 Maxwell을 실행했습니다.

    Maxwell 주제의 메시지 수를 얻으려면 다음을 실행할 수 있습니다. bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic mytopic --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}'
    이 예에서 우리는 MAXWELL_OPTIONS 환경 변수를 재정의하고 동적 주제 이름을 지정하여 Maxwell이 데이터베이스 이름으로 네임스페이스가 지정된 동일한 이름으로 각 테이블의 메시지를 주제로 라우팅합니다.

    docker run -it --rm \
        --env MYSQL_USERNAME=AURORA_USERNAME \
        --env MYSQL_PASSWORD=AURORA_PASSWORD \
        --env MYSQL_HOST=AURORA_HOST \
        --link kafka \
        --env KAFKA_HOST=kafka \
        --env KAFKA_PORT=9092 \
        --env MAXWELL_OPTIONS="--kafka_topic=maxwell_%{database}_%{table}
        --name maxwell \
        osheroff/maxwell
    




    다음은 Consumers 와 관련하여 사물을 보는 그래픽 방식입니다. 이제 Maxwell로 돌아가 보겠습니다.

    다음은 Maxwell의 출력입니다.

    17:44:34,901 INFO  ProducerConfig - ProducerConfig values: 
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        buffer.memory = 33554432
        ssl.truststore.password = null
        batch.size = 16384
        ssl.keymanager.algorithm = SunX509
        receive.buffer.bytes = 32768
        ssl.cipher.suites = null
        ssl.key.password = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.provider = null
        sasl.kerberos.service.name = null
        max.in.flight.requests.per.connection = 5
        sasl.kerberos.ticket.renew.window.factor = 0.8
        bootstrap.servers = [kafka:9092]
        client.id = 
        max.request.size = 1048576
        acks = 1
        linger.ms = 0
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        metadata.fetch.timeout.ms = 60000
        ssl.endpoint.identification.algorithm = null
        ssl.keystore.location = null
        value.serializer = class org.apache.kafka.common.serialization.StringSerializer
        ssl.truststore.location = null
        ssl.keystore.password = null
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        block.on.buffer.full = false
        metrics.sample.window.ms = 30000
        metadata.max.age.ms = 300000
        security.protocol = PLAINTEXT
        ssl.protocol = TLS
        sasl.kerberos.min.time.before.relogin = 60000
        timeout.ms = 30000
        connections.max.idle.ms = 540000
        ssl.trustmanager.algorithm = PKIX
        metric.reporters = []
        compression.type = none
        ssl.truststore.type = JKS
        max.block.ms = 60000
        retries = 0
        send.buffer.bytes = 131072
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        reconnect.backoff.ms = 50
        metrics.num.samples = 2
        ssl.keystore.type = JKS
    
    17:44:34,952 INFO  AppInfoParser - Kafka version : 0.9.0.1
    17:44:34,952 INFO  AppInfoParser - Kafka commitId : 23c69d62a0cabf06
    17:44:35,012 INFO  Maxwell - Maxwell v1.7.0 is booting (MaxwellKafkaProducer), starting at BinlogPosition[mysql-bin-changelog.000002:84337]
    17:44:35,680 INFO  MysqlSavedSchema - Restoring schema id 1 (last modified at BinlogPosition[mysql-bin-changelog.000002:3521])
    17:44:38,991 INFO  OpenReplicator - starting replication at mysql-bin-changelog.000002:84337
    




    프로세스는 이제 새 데이터 이벤트를 기다리고 유휴 Kafka 수신기를 찾고 있습니다.
    consumer를 시작합니다(다른 터미널 창에서). 이 명령은 Kafka 서비스에 연결된 이름 없는 Spotify/Kafka 인스턴스를 시작하고, 소비자를 시작하고, Maxwell 주제의 기존 메시지를 표시하고, 종료할 때까지 새 메시지를 기다립니다(컨테이너를 파괴함).

    docker run -it --rm --link kafka spotify/kafka /opt/kafka_2.11-0.10.1.0/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic maxwell_{AURORA_DATABASE}_{AURORA_TABLE} --from-beginning
    


    AWS Aurora 인스턴스에 연결하고 일부 레코드를 삽입하고 일부 레코드를 업데이트합니다. Maxwell의 데이터 이벤트는 소비자 터미널 창에 인쇄됩니다.

    {"database":"AURORA_DATABASE","table":"AURORA_TABLE","type":"update","ts":1484606003,"xid":1655558,"commit":true,"data":{"id":4,"first_name":"Mendy","last_name":"Montana"},"old":{"first_name":"Montana"}}
    {"database":"AURORA_DATABASE","table":"AURORA_TABLE","type":"update","ts":1484606435,"xid":1658343,"commit":true,"data":{"id":4,"first_name":"Montana","last_name":"Mendy"},"old":{"first_name":"Tim"}}
    {"database":"AURORA_DATABASE","table":"AURORA_TABLE","type":"update","ts":1484606451,"xid":1658455,"commit":true,"data":{"id":4,"first_name":"Tim","last_name":"Mendy"},"old":{"first_name":"Montana"}}
    

    좋은 웹페이지 즐겨찾기