Spring Boot 통합 Kafka 의 예제 코드

9540 단어 SpringBootKafka
본 고 는 Spring Boot 통합 Kafka 의 예제 코드 를 소개 하여 여러분 에 게 공유 하고 자신 에 게 도 필 기 를 남 겼 습 니 다.
시스템 환경
원 격 서버 에 설 치 된 kafka 서 비 스 를 사용 합 니 다.
  • Ubuntu 16.04 LTS
  • kafka_2.12-0.11.0.0.tgz
  • zookeeper-3.5.2-alpha.tar.gz
  • 집적 과정
    1.spring boot 프로젝트 를 만 들 고 의존 도 를 추가 합 니 다.
    
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>com.laravelshao.springboot</groupId>
      <artifactId>spring-boot-integration-kafka</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <packaging>jar</packaging>
    
      <name>spring-boot-integration-kafka</name>
      <description>Demo project for Spring Boot</description>
    
      <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
      </parent>
    
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!--kafka-->
        <dependency>
          <groupId>org.springframework.kafka</groupId>
          <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-json</artifactId>
        </dependency>
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-test</artifactId>
          <scope>test</scope>
        </dependency>
      </dependencies>
    
      <build>
        <plugins>
          <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
          </plugin>
        </plugins>
      </build>
    </project>
    
    
    2.설정 정보 추가,yml 파일 사용
    
    spring:
     kafka:
      bootstrap-servers:X.X.X.X:9092
      producer:
       value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      consumer:
       group-id: test
       auto-offset-reset: earliest
       value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
       properties:
        spring:
         json:
          trusted:
           packages: com.laravelshao.springboot.kafka
    
    3.메시지 대상 만 들 기
    
    public class Message {
      private Integer id;
      private String msg;
    
      public Message() {
      }
    
      public Message(Integer id, String msg) {
        this.id = id;
        this.msg = msg;
      }
    
      public Integer getId() {
        return id;
      }
    
      public void setId(Integer id) {
        this.id = id;
      }
    
      public String getMsg() {
        return msg;
      }
    
      public void setMsg(String msg) {
        this.msg = msg;
      }
    
      @Override
      public String toString() {
        return "Message{" +
            "id=" + id +
            ", msg='" + msg + '\'' +
            '}';
      }
    }
    
    
    4.생산자 만 들 기
    
    package com.laravelshao.springboot.kafka;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    
    /**
     * Created by shaoqinghua on 2018/3/23.
     */
    @Component
    public class Producer {
      private static Logger log = LoggerFactory.getLogger(Producer.class);
    
      @Autowired
      private KafkaTemplate kafkaTemplate;
    
      public void send(String topic, Message message) {
        kafkaTemplate.send(topic, message);
        log.info("Producer->topic:{}, message:{}", topic, message);
      }
    
    }
    
    
    5.소비 자 를 만 들 고@KafkaListener 를 사용 하여 감청 테 마 를 설명 합 니 다.
    
    package com.laravelshao.springboot.kafka;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    /**
     * Created by shaoqinghua on 2018/3/23.
     */
    @Component
    public class Consumer {
      private static Logger log = LoggerFactory.getLogger(Consumer.class);
    
      @KafkaListener(topics = "test_topic")
      public void receive(ConsumerRecord<String, Message> consumerRecord) {
        log.info("Consumer->topic:{}, value:{}", consumerRecord.topic(), consumerRecord.value());
      }
    
    }
    
    
    6.소비 테스트 발송
    
    package com.laravelshao.springboot;
    
    import com.laravelshao.springboot.kafka.Message;
    import com.laravelshao.springboot.kafka.Producer;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.ApplicationContext;
    
    @SpringBootApplication
    public class IntegrationKafkaApplication {
    
      public static void main(String[] args) throws InterruptedException {
        ApplicationContext context = SpringApplication.run(IntegrationKafkaApplication.class, args);
        Producer producer = context.getBean(Producer.class);
    
        for (int i = 1; i < 10; i++) {
          producer.send("test_topic", new Message(i, "test topic message " + i));
          Thread.sleep(2000);
        }
      }
    
    }
    
    
    발송 메시지,소비 메시지 순서대로 볼 수 있 습 니 다.

    이상 문제
    역 직렬 화 이상(사용자 정의 메시지 대상 은 kafka 가 신뢰 하 는 패키지 경로 에 있 지 않 습 니 다)?
    [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.719 Container exception
    org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition test_topic-0 at offset 9. If needed, please seek past the record to continue consumption.
    Caused by: java.lang.IllegalArgumentException: The class 'com.laravelshao.springboot.kafka.Message' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
     at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139)
     at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:113)
     at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:191)
     at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
     at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
     at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
     at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
     at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
     at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
     at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
     at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103)
     at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:667)
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
     at java.lang.Thread.run(Thread.java:745)
    해결 방법:현재 가방 을 kafka 가 신뢰 하 는 가방 경로 에 추가 합 니 다.
    
    spring:
     kafka:
      consumer:
       properties:
        spring:
         json:
          trusted:
           packages: com.laravelshao.springboot.kafka
    
    이상 이 바로 본 고의 모든 내용 입 니 다.여러분 의 학습 에 도움 이 되 고 저 희 를 많이 응원 해 주 셨 으 면 좋 겠 습 니 다.

    좋은 웹페이지 즐겨찾기