SpringBoot 통합 Kafka 절차

5205 단어 SpringBootKafka
SpringBoot 통합 Kafka
이 편 은 SpringBoot 가 Kafka 를 어떻게 통합 하 는 지 설명 하고 발송 과 소비 기능 을 테스트 하기 위해 Demo 를 간단하게 작 성 했 습 니 다.
머리말
선택 한 버 전 은 다음 과 같 습 니 다.
springboot : 2.3.4.RELEASE
spring-kafka : 2.5.6.RELEASE
kafka : 2.5.1
zookeeper : 3.4.14
이 Demo 는 SpringBoot 가 비교적 높 은 버 전인 SpringBoot 2.3.4.RELEASE 를 사용 합 니 다.spring-kafka 2.5.6 RELEASE 를 도입 하여 버 전 관계 에 대응 합 니 다.
Spring Boot 2.3 users should use 2.5.x (Boot dependency management will use the correct version).
spring 과 kafka 의 버 전 관계
https://spring.io/projects/sp ...
1.Kafka 와 Zookeeper 환경 구축
kafka 와 zookeeper 환경 을 구축 하고 시작 합 니 다.
2.데모 프로젝트 를 만 들 고 spring-kafka 도입
2.1 pom 파일

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
  <groupId>com.google.code.gson</groupId>
  <artifactId>gson</artifactId>
</dependency>
2.2 application.yml 설정

spring:
 kafka:
  bootstrap-servers: 192.168.25.6:9092 #bootstrap-servers:  kafka   ,         
  consumer:
   group-id: myGroup
   enable-auto-commit: true
   auto-commit-interval: 100ms
   properties:
    session.timeout.ms: 15000
   key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
   value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
   auto-offset-reset: earliest
  producer:
   retries: 0 #     0  ,                
   batch-size: 16384 #                , Producer                。                 。               (      )。16384      
   buffer-memory: 33554432 #Producer                      ,33554432     
   key-serializer: org.apache.kafka.common.serialization.StringSerializer #        
   value-serializer: org.apache.kafka.common.serialization.StringSerializer #      
2.3 메시지 체 메시지 정의

/**
 * @author johnny
 * @create 2020-09-23   9:21
 **/
@Data
public class Message {


  private Long id;

  private String msg;

  private Date sendTime;
}
2.4 정의 KafkaSender
주로 KafkaTemplate 를 이용 하여 메 시 지 를 보 내 고 메 시 지 를 Message 로 봉 하여 JSon 꼬치 로 전환 하여 Kafka 에 보 냅 니 다.

@Component
@Slf4j
public class KafkaSender {

  private final KafkaTemplate<String, String> kafkaTemplate;

  //        kafkaTemplate
  public KafkaSender(KafkaTemplate<String, String> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
  }

  private Gson gson = new GsonBuilder().create();

  public void send(String msg) {
    Message message = new Message();

    message.setId(System.currentTimeMillis());
    message.setMsg(msg);
    message.setSendTime(new Date());
    log.info("【++++++++++++++++++ message :{}】", gson.toJson(message));
    //  topic = hello2      
    kafkaTemplate.send("hello2",gson.toJson(message));
  }

}
2.5 정의 KafkaConsumer
감청 방법 에 있어 서 주 해 를 통 해 감청 기 를 설정 하면 됩 니 다.또한 감청 이 필요 한 topic 를 지정 합 니 다.
kafka 의 메시지 재 수신 단 은 Consumer Record 대상 으로 봉 인 됩 니 다.내부 의 value 속성 은 실제 메시지 입 니 다.

@Component
@Slf4j
public class KafkaConsumer {


  @KafkaListener(topics = {"hello2"})
  public void listen(ConsumerRecord<?, ?> record) {

    Optional.ofNullable(record.value())
        .ifPresent(message -> {
          log.info("【+++++++++++++++++ record = {} 】", record);
          log.info("【+++++++++++++++++ message = {}】", message);
        });
  }

}
3.테스트 효과
Http 인 터 페 이 스 를 제공 하여 KafkaSender 로 메 시 지 를 보 냅 니 다.
3.1 Http 테스트 인터페이스 제공

@RestController
@Slf4j
public class TestController {


  @Autowired
  private KafkaSender kafkaSender;


  @GetMapping("sendMessage/{msg}")
  public void sendMessage(@PathVariable("msg") String msg){
    kafkaSender.send(msg);
  }
}
3.2 시작 항목
8080 포트 감청
KafkaMessage Listener Container 에 consumer group=my Group 에 hello 2-0 topic 을 감청 하 는 소비자 가 있 습 니 다.

3.3 Http 인터페이스 호출
http://localhost:8080/sendMessage/KafkaTestMsg

이로써 스프링 부 트 통합 카 프 카 가 마무리 됐다.
이상 은 SpringBoot 통합 Kafka 의 절차 에 대한 상세 한 내용 입 니 다.SpringBoot 통합 Kafka 에 관 한 자 료 는 저희 의 다른 관련 글 을 주목 해 주 십시오!

좋은 웹페이지 즐겨찾기