SSL을 통해 Kafka를 연결하는 Golang의 문제
5269 단어 Go 개발
2019/04/02 20:02:22 unable to create kafka client: "kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"
그리고 카프카에서 오류가 발생한 것은:
[2019-04-02 22:27:24,378] WARN Failed to send SSL Close message (org.apache.kafka.common.network.SslTransportLayer)
java.io.IOException:
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:212)
at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:175)
at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:703)
at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:61)
at org.apache.kafka.common.network.Selector.doClose(Selector.java:739)
at org.apache.kafka.common.network.Selector.close(Selector.java:727)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:520)
at org.apache.kafka.common.network.Selector.poll(Selector.java:412)
at kafka.network.Processor.poll(SocketServer.scala:551)
at kafka.network.Processor.run(SocketServer.scala:468)
at java.lang.Thread.run(Thread.java:748)
이 문제가 발생한 이유는 제 인증서가 스스로 서명했기 때문에 검증할 때 통과할 수 없기 때문에 이 문제가 발생할 수 있습니다. 우리가 수정한 부분은 프로필이 서버를 검증하지 않는 인증서 파라미터 설명입니다. 인증 인증서 설정InsecureSkipVerify:false, 인증서 검증하지 않음InsecureSkipVerify:true 검증 내용: 1.인증서에 사용된 CN의 이름이 일치하는지 여부 2.인증서가 권위적인 서명인지 권위가 아니면certificate signed by unknown authority 자체 인증서가 인증되지 않았습니다. 설정 파일에 인증을 추가하지 않으면 통과할 수 있습니다!코드 구현 프로세스:
package util
import (
"log"
"github.com/Shopify/sarama"
"crypto/tls"
"io/ioutil"
"crypto/x509"
"os"
"os/signal"
"sync"
"fmt"
"time"
)
func KafkaConsumer(addrs []string,topics string) {
tlsConfig, err := NewTLSConfig("client.cer.pem",
"client.key.pem","server.cer.pem")
if err != nil {
log.Fatal(err)
}
//
tlsConfig.InsecureSkipVerify = true
//
consumerConfig := sarama.NewConfig()
// TLS
consumerConfig.Net.TLS.Enable = true
consumerConfig.Net.TLS.Config = tlsConfig
//
client, err := sarama.NewClient(addrs, consumerConfig)
if err != nil {
log.Fatalf("unable to create kafka client: %q", err)
}
consumer, err := sarama.NewConsumerFromClient(client)
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
consumerLoop(consumer, topics)
}
func NewTLSConfig(clientCertFile, clientKeyFile, caCertFile string) (*tls.Config, error) {
tlsConfig := tls.Config{}
//
cert, err := tls.LoadX509KeyPair(clientCertFile, clientKeyFile)
if err != nil {
return &tlsConfig, err
}
tlsConfig.Certificates = []tls.Certificate{cert}
// CA
caCert, err := ioutil.ReadFile(caCertFile)
if err != nil {
return &tlsConfig, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig.RootCAs = caCertPool
tlsConfig.BuildNameToCertificate()
return &tlsConfig, err
}
func consumerLoop(consumer sarama.Consumer, topic string) {
partitions, err := consumer.Partitions(topic)
if err != nil {
log.Println("unable to fetch partition IDs for the topic", topic, err)
return
}
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
var wg sync.WaitGroup
for partition := range partitions {
wg.Add(1)
go func() {
consumePartition(consumer, int32(partition), signals,topic)
wg.Done()
}()
}
wg.Wait()
}
func consumePartition(consumer sarama.Consumer, partition int32, signals chan os.Signal,topic string) {
log.Println("Receving on partition", partition)
//partitionConsumer, err := consumer.ConsumePartition("zhang", partition, sarama.OffsetNewest)
partitionConsumer, err := consumer.ConsumePartition(topic, partition, sarama.OffsetOldest)
if err != nil {
log.Println(err)
return
}
defer func() {
if err := partitionConsumer.Close(); err != nil {
log.Println(err)
}
}()
consumed := 0
ConsumerLoop:
for {
select {
case msg :=
package test
import (
"testing"
"copCRoad/util"
)
var Address = []string{"IP:6666"}
var topic ="zhang"
var srcValue = " SSL kafka "
func TestKafkaProducter(t *testing.T) {
util.KafkaProducer(Address,topic,srcValue)
}
func TestKafkaConsumer(t *testing.T) {
util.KafkaConsumer(Address,topic)
}
총괄: 코드가 아직 간소하지 않아서 다음에 최적화를 진행합니다!이상의 코드는 이미 검증되었는데 문제가 있으면 당신이 작성한 인증서에 문제가 있으면 저의 다른 블로그를 참고할 수 있습니다.https://blog.csdn.net/weixin_36771703/article/details/88762831;남에게 장미를 선물하면, 손에 남은 향기를 남긴다!
참조:https://blog.csdn.net/kdpujie/article/details/79093595 https://blog.csdn.net/wangshubo1989/article/details/77508738
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
SSL을 통해 Kafka를 연결하는 Golang의 문제문제: 프로젝트가 SSL로kafka를 연결하려고 하는데 사용 과정에서 몇 가지 특별한 구덩이 현상이 발생했다. 프로그램이 소비와 생산을 할 때 오류가 발생했다. 그리고 카프카에서 오류가 발생한 것은: 이 문제가 발생...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.