Namespaced Topic Kafka Producer로 Maxwell 시작(Kafka에서 유휴 리스너 찾기)
kafka:9092
Namespaced Topic Kafka Producer로 Maxwell 시작
이는 전제 조건인 AWS Aurora에서 Maxwell Kafka Producer로의 약간의 변형입니다.
전제 조건에서 Maxwell 주제에 대한 메시지를 생성하는 기본 Kafka Producer 구성으로 Maxwell을 실행했습니다.
Maxwell 주제의 메시지 수를 얻으려면 다음을 실행할 수 있습니다.
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic mytopic --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}'
이 예에서 우리는
MAXWELL_OPTIONS
환경 변수를 재정의하고 동적 주제 이름을 지정하여 Maxwell이 데이터베이스 이름으로 네임스페이스가 지정된 동일한 이름으로 각 테이블의 메시지를 주제로 라우팅합니다.docker run -it --rm \
--env MYSQL_USERNAME=AURORA_USERNAME \
--env MYSQL_PASSWORD=AURORA_PASSWORD \
--env MYSQL_HOST=AURORA_HOST \
--link kafka \
--env KAFKA_HOST=kafka \
--env KAFKA_PORT=9092 \
--env MAXWELL_OPTIONS="--kafka_topic=maxwell_%{database}_%{table}
--name maxwell \
osheroff/maxwell
다음은
Consumers
와 관련하여 사물을 보는 그래픽 방식입니다. 이제 Maxwell로 돌아가 보겠습니다.다음은 Maxwell의 출력입니다.
17:44:34,901 INFO ProducerConfig - ProducerConfig values:
request.timeout.ms = 30000
retry.backoff.ms = 100
buffer.memory = 33554432
ssl.truststore.password = null
batch.size = 16384
ssl.keymanager.algorithm = SunX509
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.key.password = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.provider = null
sasl.kerberos.service.name = null
max.in.flight.requests.per.connection = 5
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [kafka:9092]
client.id =
max.request.size = 1048576
acks = 1
linger.ms = 0
sasl.kerberos.kinit.cmd = /usr/bin/kinit
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
metadata.fetch.timeout.ms = 60000
ssl.endpoint.identification.algorithm = null
ssl.keystore.location = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
ssl.truststore.location = null
ssl.keystore.password = null
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
block.on.buffer.full = false
metrics.sample.window.ms = 30000
metadata.max.age.ms = 300000
security.protocol = PLAINTEXT
ssl.protocol = TLS
sasl.kerberos.min.time.before.relogin = 60000
timeout.ms = 30000
connections.max.idle.ms = 540000
ssl.trustmanager.algorithm = PKIX
metric.reporters = []
compression.type = none
ssl.truststore.type = JKS
max.block.ms = 60000
retries = 0
send.buffer.bytes = 131072
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
reconnect.backoff.ms = 50
metrics.num.samples = 2
ssl.keystore.type = JKS
17:44:34,952 INFO AppInfoParser - Kafka version : 0.9.0.1
17:44:34,952 INFO AppInfoParser - Kafka commitId : 23c69d62a0cabf06
17:44:35,012 INFO Maxwell - Maxwell v1.7.0 is booting (MaxwellKafkaProducer), starting at BinlogPosition[mysql-bin-changelog.000002:84337]
17:44:35,680 INFO MysqlSavedSchema - Restoring schema id 1 (last modified at BinlogPosition[mysql-bin-changelog.000002:3521])
17:44:38,991 INFO OpenReplicator - starting replication at mysql-bin-changelog.000002:84337
프로세스는 이제 새 데이터 이벤트를 기다리고 유휴 Kafka 수신기를 찾고 있습니다.
consumer
를 시작합니다(다른 터미널 창에서). 이 명령은 Kafka 서비스에 연결된 이름 없는 Spotify/Kafka 인스턴스를 시작하고, 소비자를 시작하고, Maxwell 주제의 기존 메시지를 표시하고, 종료할 때까지 새 메시지를 기다립니다(컨테이너를 파괴함).docker run -it --rm --link kafka spotify/kafka /opt/kafka_2.11-0.10.1.0/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic maxwell_{AURORA_DATABASE}_{AURORA_TABLE} --from-beginning
AWS Aurora 인스턴스에 연결하고 일부 레코드를 삽입하고 일부 레코드를 업데이트합니다. Maxwell의 데이터 이벤트는 소비자 터미널 창에 인쇄됩니다.
{"database":"AURORA_DATABASE","table":"AURORA_TABLE","type":"update","ts":1484606003,"xid":1655558,"commit":true,"data":{"id":4,"first_name":"Mendy","last_name":"Montana"},"old":{"first_name":"Montana"}}
{"database":"AURORA_DATABASE","table":"AURORA_TABLE","type":"update","ts":1484606435,"xid":1658343,"commit":true,"data":{"id":4,"first_name":"Montana","last_name":"Mendy"},"old":{"first_name":"Tim"}}
{"database":"AURORA_DATABASE","table":"AURORA_TABLE","type":"update","ts":1484606451,"xid":1658455,"commit":true,"data":{"id":4,"first_name":"Tim","last_name":"Mendy"},"old":{"first_name":"Montana"}}
Reference
이 문제에 관하여(Namespaced Topic Kafka Producer로 Maxwell 시작(Kafka에서 유휴 리스너 찾기)), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/montana/start-maxwell-with-namespaced-topic-kafka-producer-look-for-idle-listeners-in-kafka-3nbg텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)