콘솔 소비자 - 승리를 위한 BytesDeserializer
소개
Confluent Avro Serializer 및 Deserializer는 스키마의 고유 ID를 메시지에 저장하는 데 활용합니다. 예기치 않은 문자가 문자열에 나타나면 유형 불일치가 더 분명해집니다. 그러나 인쇄할 수 없는 문자는 어떻습니까? 그들은 어떻게 나타 납니까? 그렇다면 문제가 명백할까요?
데모
Datagen 소스 커넥터를 사용하여 간단한 데모를 수행할 수 있습니다. Avro를 키로 사용하여 커넥터를 만듭니다. Datagen의 빠른 시작
users
의 데이터 유형은 문자열입니다. Avro 직렬 변환기는 이것을 Avro 프리미티브로 작성합니다. 일반적으로 Avro를 사용하는 경우 최상위 개체는 레코드이지만 직렬 변환기에는 프리미티브를 지원하기 위한 사용자 지정 코드가 있습니다.구성
Datagen 커넥터는 Avro에서 표시되는 키로 구성됩니다.
{
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"tasks.max": "1",
"kafka.topic": "users",
"quickstart": "users",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url" : "http://schema-registry:8081",
"key.converter.schemas.enable": "true",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url" : "http://schema-registry:8081",
"value.converter.schemas.enable": "true",
"max.interval": 100,
"iterations": 10000000
}
대본
키를 애플리케이션에 사용한 기본값인
Serdes.String()
로 읽는 Kafka Streams 애플리케이션을 작성합니다. 읽기용 serdeusers
를 기본 serde에서 Avro Serde로 변경하는 것을 잊었습니다. 이제 사용자와 함께 주문 스트림에 참여하고 조인이 성공하지 않습니다.조사...
저라면 가장 먼저
kafka-avro-console-consumer
를 사용하여 진행 상황을 확인합니다.kafka-avro-console-consumer \
--bootstrap-server localhost:19092 \
--property schema.registry.url="http://localhost:8081" \
--property print.key=true \
--property key.separator="|" \
--from-beginning \
--skip-message-on-error \
--key-deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--topic users
결과는 매우 정상적이고 예상되는 콘텐츠를 포함합니다.
User_9|{"registertime":1489457902486,"userid":"User_9","regionid":"Region_1","gender":"OTHER"}
User_1|{"registertime":1500277798184,"userid":"User_1","regionid":"Region_2","gender":"OTHER"}
이제 인쇄할 수 없는 바이트가 트리거하는 경우 추가 빈 줄이 나타날 수 있습니다. 그러나 그것이 항상 눈에 띄고 명백한 문제는 아닙니다(적어도 나에게는 분명하지 않습니다).
키 디시리얼라이저가
BytesDeserializer
인 경우 어떻게 됩니까?kafka-avro-console-consumer \
--bootstrap-server localhost:19092 \
--property schema.registry.url="http://localhost:8081" \
--property print.key=true \
--property key.separator="|" \
--from-beginning \
--skip-message-on-error \
--key-deserializer=org.apache.kafka.common.serialization.BytesDeserializer \
--topic users
직렬 변환기의 매직 바이트(0x00)와 schema-id의 바이트는 인쇄 가능한 16진수 문자로 표시됩니다.
\x00\x00\x00\x00\x03\x0CUser_9|{"registertime":1489457902486,"userid":"User_9","regionid":"Region_1","gender":"OTHER"}
\x00\x00\x00\x00\x03\x0CUser_1|{"registertime":1500277798184,"userid":"User_1","regionid":"Region_2","gender":"OTHER"}
이제 문제를 쉽게 볼 수 있습니다. 키는 Avro(직렬 변환기에서 정의한 기본 Avro 문자열)입니다. 솔루션: 문자열을 사용하도록 커넥터를 업데이트하거나 스트림 애플리케이션을 업데이트하여 데이터 키를 다시 지정하십시오.
노트
데모를 위해 컨테이너를 실행하는 것은 좋지만 URL이 일치하지 않으면 혼란스러울 수 있습니다.
localhost:port
는 포트 매핑을 통해 호스트 시스템(노트북)에서 서비스에 연결하는 데 사용됩니다. 실제 호스트 이름은 다른 컨테이너에서 서비스에 액세스할 때 사용됩니다. 따라서 연결 구성 내에 http://schema-registry:8081
가 표시되고 호스트 시스템에서 실행되는 명령에 대해 http://localhost:8081
가 표시됩니다. 이 스크립트는 데모 코드와 일치하므로 여기에서는 번역하지 않습니다.유용한 쉘 별칭
나는 이것을 내
.zshrc
에 정의했습니다.alias kcc='kafka-console-consumer \
--bootstrap-server localhost:19092 \
--key-deserializer=org.apache.kafka.common.serialization.BytesDeserializer \
--property print.key=true \
--property key.separator="|" \
--from-beginning \
--topic'
alias kacc='kafka-avro-console-consumer \
--bootstrap-server localhost:19092 \
--property schema.registry.url="http://localhost:8081" \
--property print.key=true \
--property key.separator="|" \
--from-beginning \
--skip-message-on-error \
--key-deserializer=org.apache.kafka.common.serialization.BytesDeserializer \
--topic'
테이크아웃
key-mismatch
데모에서 사용할 수 있습니다. 손을 내밀다
귀하가 사용하는 개발 개선 사항에 대해 더 많이 듣고 싶습니다.
contact us으로 문의하십시오.
Reference
이 문제에 관하여(콘솔 소비자 - 승리를 위한 BytesDeserializer), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/nbuesing/console-consumer-bytesdeserializer-for-the-win-4i7b텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)