SSL을 통해 Kafka를 연결하는 Golang의 문제

5269 단어 Go 개발
문제: 프로젝트가 SSL로kafka를 연결하려고 하는데 사용 과정에서 몇 가지 특별한 구덩이 현상이 발생했다. 프로그램이 소비와 생산을 할 때 오류가 발생했다.
2019/04/02 20:02:22 unable to create kafka client: "kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"

그리고 카프카에서 오류가 발생한 것은:
[2019-04-02 22:27:24,378] WARN Failed to send SSL Close message  (org.apache.kafka.common.network.SslTransportLayer)
java.io.IOException:      
	at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
	at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
	at sun.nio.ch.IOUtil.write(IOUtil.java:65)
	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
	at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:212)
	at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:175)
	at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:703)
	at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:61)
	at org.apache.kafka.common.network.Selector.doClose(Selector.java:739)
	at org.apache.kafka.common.network.Selector.close(Selector.java:727)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:520)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:412)
	at kafka.network.Processor.poll(SocketServer.scala:551)
	at kafka.network.Processor.run(SocketServer.scala:468)
	at java.lang.Thread.run(Thread.java:748)


이 문제가 발생한 이유는 제 인증서가 스스로 서명했기 때문에 검증할 때 통과할 수 없기 때문에 이 문제가 발생할 수 있습니다. 우리가 수정한 부분은 프로필이 서버를 검증하지 않는 인증서 파라미터 설명입니다. 인증 인증서 설정InsecureSkipVerify:false, 인증서 검증하지 않음InsecureSkipVerify:true 검증 내용: 1.인증서에 사용된 CN의 이름이 일치하는지 여부 2.인증서가 권위적인 서명인지 권위가 아니면certificate signed by unknown authority 자체 인증서가 인증되지 않았습니다. 설정 파일에 인증을 추가하지 않으면 통과할 수 있습니다!코드 구현 프로세스:
  • 생산과 소비
  • package util
    
    import (
      "log"
      "github.com/Shopify/sarama"
      "crypto/tls"
      "io/ioutil"
      "crypto/x509"
      "os"
      "os/signal"
      "sync"
      "fmt"
      "time"
    )
    
    func KafkaConsumer(addrs []string,topics string) {
          tlsConfig, err := NewTLSConfig("client.cer.pem",
        "client.key.pem","server.cer.pem")
      if err != nil {
        log.Fatal(err)
      }
    
      //          
      tlsConfig.InsecureSkipVerify = true
    
      //        
      consumerConfig := sarama.NewConfig()
      //  TLS  
      consumerConfig.Net.TLS.Enable = true
      consumerConfig.Net.TLS.Config = tlsConfig
    
      //         
      client, err := sarama.NewClient(addrs, consumerConfig)
      if err != nil {
        log.Fatalf("unable to create kafka client: %q", err)
      }
    
      consumer, err := sarama.NewConsumerFromClient(client)
      if err != nil {
        log.Fatal(err)
      }
      defer consumer.Close()
    
      consumerLoop(consumer, topics)
    }
    
    func NewTLSConfig(clientCertFile, clientKeyFile, caCertFile string) (*tls.Config, error) {
      tlsConfig := tls.Config{}
    
      //        
      cert, err := tls.LoadX509KeyPair(clientCertFile, clientKeyFile)
      if err != nil {
        return &tlsConfig, err
      }
      tlsConfig.Certificates = []tls.Certificate{cert}
    
      //   CA  
      caCert, err := ioutil.ReadFile(caCertFile)
      if err != nil {
        return &tlsConfig, err
      }
      caCertPool := x509.NewCertPool()
      caCertPool.AppendCertsFromPEM(caCert)
      tlsConfig.RootCAs = caCertPool
    
      tlsConfig.BuildNameToCertificate()
      return &tlsConfig, err
    }
    
    func consumerLoop(consumer sarama.Consumer, topic string) {
      partitions, err := consumer.Partitions(topic)
      if err != nil {
        log.Println("unable to fetch partition IDs for the topic", topic, err)
        return
      }
    
      signals := make(chan os.Signal, 1)
      signal.Notify(signals, os.Interrupt)
    
      var wg sync.WaitGroup
      for partition := range partitions {
        wg.Add(1)
        go func() {
          consumePartition(consumer, int32(partition), signals,topic)
          wg.Done()
        }()
      }
      wg.Wait()
    }
    
    func consumePartition(consumer sarama.Consumer, partition int32, signals chan os.Signal,topic string) {
      log.Println("Receving on partition", partition)
      //partitionConsumer, err := consumer.ConsumePartition("zhang", partition, sarama.OffsetNewest)
      partitionConsumer, err := consumer.ConsumePartition(topic, partition, sarama.OffsetOldest)
      if err != nil {
        log.Println(err)
        return
      }
      defer func() {
        if err := partitionConsumer.Close(); err != nil {
          log.Println(err)
        }
      }()
    
      consumed := 0
    ConsumerLoop:
      for {
        select {
        case msg := 
  • 테스트
  • package test
    
    import (
    	"testing"
    	"copCRoad/util"
    )
    var Address = []string{"IP:6666"}
    var topic ="zhang"
    var srcValue = "    SSL    kafka "
    func TestKafkaProducter(t *testing.T) {
    		util.KafkaProducer(Address,topic,srcValue)
    }
    func TestKafkaConsumer(t *testing.T)  {
    	util.KafkaConsumer(Address,topic)
    }
    
    

    총괄: 코드가 아직 간소하지 않아서 다음에 최적화를 진행합니다!이상의 코드는 이미 검증되었는데 문제가 있으면 당신이 작성한 인증서에 문제가 있으면 저의 다른 블로그를 참고할 수 있습니다.https://blog.csdn.net/weixin_36771703/article/details/88762831;남에게 장미를 선물하면, 손에 남은 향기를 남긴다!
    참조:https://blog.csdn.net/kdpujie/article/details/79093595 https://blog.csdn.net/wangshubo1989/article/details/77508738

    좋은 웹페이지 즐겨찾기