Configurando o 프로듀서 Kafka para enviar headers com Go e Sarama
제목
O Kafkaétotalmente agnóstico em relaçao conteúdo da mensage que O Producer envia,ou seja,ele deixa a cargo do usuário a tarefa de enriquecer ou atribuir mais significando a uma mensage.Uma alternativa para contornar esse problemaéutilizar padrões estruturados como JSON ou AVRO onde o usuário livre para definir os campos necesários e pode incluir metadados facilmente.
Um headeréUm par(chave,valor)e umaúnica mensage pode conter diversos headers.Esseéum conceito encontrado em sistemas de mensagem como JMS e de transport como TCP e HTTP e eles podem ser utilizados para roteamento,filtros e anotaçes.O Kafka adicionou suporte a headers a partir da sua versãOv0.11.0.0.Podemos utilizar os headers para adicionar informarçes extras as mensagens quem podem interestsar a differentes consumer.
Vamos pro c ódigo 회사
Deposis dessa pequena introductionço vamos ao que interestsa!Antes de mais nada,precisamos de um ambiente com o Kafka configurado e executando e da biblioteca instalada no ambiente de desenvolvimento.precisa executar o seguinte comando:
go get -u github.com/Shopify/sarama
Após a instalaço jápodemos começar a escrever o código que vai Establecer a conexão com o Kafka e produzir as mensagens.O primeiro passoéinstanciar O objeto de configuraãO e criar O Producer.func initProducer() (sarama.SyncProducer, error) {
// setup sarama log to stdout
sarama.Logger = log.New(os.Stdout, "", log.Ltime)
// producer config
config := sarama.NewConfig()
config.Producer.Retry.Max = 5
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Return.Successes = true
config.Version = sarama.V0_11_0_0
// create producer
prd, err := sarama.NewSyncProducer([]string{kafkaConn}, config)
return prd, err
}
이것은 매우 좋은 독서 체험이다.A documentaço da biblioteca não deixa claro que precisamos specificiar A versão do Kafka que seráutilizada e sem essa configuraço não iremos conseguir utilizar os headers.No meu primeiro contato com a biblioteca cheguei a perder um bom tempo sem conseguir produzir No tópico por causa dessa configuraço.Após gastar algum tempo pesquisando encontrei uma issuer No repositório da biblioteca que me ajudou a entender o problema.Para quem tiver curiosidade deixarei o link Para a issuer a seguir:소비자로부터 제목 정보를 받지 못했습니다.
#1074
honghzzhang
발표 날짜
버전
Sarama 버전: v1.16.0
카프카 버전: 카프카 2.11-1.0.0.회사 명
Go 버전: go1.10 darwin/amd64
구성
나는 작은 테스트 복사를 썼다.첨부 제작자 참조.소비하다.가다
로그
제목이 설정된 테스트 생성기의 출력을 실행하려면:
41973 생산 업체.go:24]producer 메시지 & {send test message for headers [{[]]} {[116 101 115 116 72 101 101 101 114 49] [116 101 116 86 108 117 101 49]} 0 0001-01 00:00 + 0000 UTC 0}
41973 생산 업체.go:27]producer message header key testHeader1,value testValue1
제로 헤드가 있는 테스트 소비자의 출력을 실행합니다.
41959 소비자.이동: 25] 메시지 테마: 보내기, 섹션: 0, 오프셋: 0, 키:, 값: 테스트 메시지 헤더
41959 소비자.이동:26] 소비자 메시지 헤더 크기 0
문제 설명
생산자 측에서 헤더가 있는 메시지를 보낼 때 소비자 측의 헤더 정보가 분실된다.
현재 진행 중인 프로젝트에 대한 정보:
내 프로젝트에서, 나는 kafka 헤드를 사용하여 zipkin 추적 정보를 전파한다.나는 사라마 라이브러리를 생산자 측에 사용하고 사라마 집단 라이브러리는 소비자 측에 사용한다.
내가 쓴 간단한 복제에서 나는 사라마 라이브러리를 사용하여 생산자와 소비자를 위해 사라마 집단 라이브러리의 잠재적인 문제를 배제했다.
문서에 대해kafka버전 0.11+는 소비자 메시지 헤더를 지원합니다.카프카 1.0.0 버전을 사용하고 있습니다. 지원이 있을 것입니다.
View on GitHub
Agora que tudo estáconfigurado podemos escrever o código que vai ser responseável por produzir as mensagens no tópico.Sarama possui uma estrutura para A mensagem do productor e do consumidor의 도서관.Essa estrutura iráreceber informaçes como o tópico para o qual a mensage seráenvida,a chave dessa mensage,os metadados,os headers e atémesmo dados que serão preenchidos somente quando a mensage for entrgue no barramento Kafka,como offset e a partiço.
func produce(message string, headers map[string]string, producer sarama.SyncProducer) {
// publish sync
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(message),
Headers: convertHeaders(headers),
}
p, o, err := producer.SendMessage(msg)
if err != nil {
fmt.Println("Error publish: ", err.Error())
}
fmt.Println("Partition: ", p)
fmt.Println("Offset: ", o)
}
func convertHeaders(headers map[string]string) []sarama.RecordHeader {
output := make([]sarama.RecordHeader, 0)
for key, value := range headers {
output = append(output, sarama.RecordHeader{
Key: []byte(key),
Value: []byte(value),
})
}
return output
}
Deposis de conhecer는 특수한 도서관으로서 간단하고 완전하며 정확하고 간단하며 간단하며 간단하며 정확하며 간단하며 정확하며 정확하며 정확하다.Para enviarmos os headers precisamos converter Para o padrão da biblioteca e por isso temos a funão convertHeaders.Essa funçãoéresponsável por Preincher a estrutura sarama.RecordHeader queéa modelagem da biblioteca para lidar com essa functionalidade.Elaébem simples e como jádescripto anteriormente possui apenas uma chave e um valor queéassociado a essa chave.Com tudo isso configurado utilizaremos o Producer que foi criado e configurado anteriormente e enviaremos a mensage.
Saramaésem dúvidas A mais consolidada e utilizada maséum pouco mais verbosa e baixo nível,outras bibliotecas adicionam camadas de abstraão que facilitam A vida do programmador e tornam o utilizaão mais amigável l.Cada biblioteca tem sua specializade e as escolhas de design trazem consigo um trade off,por ser mais verbosa e com menos abstraçes a biblioteca tem uma maior curva de aprendizado mas possibilita maior controle e flexibilidade ao programador,mas issoéum assto para uma publicaço futura.Abaixo deixo o código completo disponível para quem quiser copiar,estudar e realizar seu próprio teste.
Issoétudo pessoal
func produce(message string, headers map[string]string, producer sarama.SyncProducer) {
// publish sync
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(message),
Headers: convertHeaders(headers),
}
p, o, err := producer.SendMessage(msg)
if err != nil {
fmt.Println("Error publish: ", err.Error())
}
fmt.Println("Partition: ", p)
fmt.Println("Offset: ", o)
}
func convertHeaders(headers map[string]string) []sarama.RecordHeader {
output := make([]sarama.RecordHeader, 0)
for key, value := range headers {
output = append(output, sarama.RecordHeader{
Key: []byte(key),
Value: []byte(value),
})
}
return output
}
Ficou com a alguma dúvida?당신은 이것이 아르고마 코사의 작품이라고 생각합니까?Se sinta a vontade para deixar um comentário.Continuarei comparitilhando o que estou estudando,aprendendo e aplicando no meu dia a dia
Reference
이 문제에 관하여(Configurando o 프로듀서 Kafka para enviar headers com Go e Sarama), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/diogodantas/configurando-o-producer-kafka-para-enviar-headers-com-go-e-sarama-3bha텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)