SpringBoot 통합 Kafka 절차
5205 단어 SpringBootKafka
이 편 은 SpringBoot 가 Kafka 를 어떻게 통합 하 는 지 설명 하고 발송 과 소비 기능 을 테스트 하기 위해 Demo 를 간단하게 작 성 했 습 니 다.
머리말
선택 한 버 전 은 다음 과 같 습 니 다.
springboot : 2.3.4.RELEASE
spring-kafka : 2.5.6.RELEASE
kafka : 2.5.1
zookeeper : 3.4.14
이 Demo 는 SpringBoot 가 비교적 높 은 버 전인 SpringBoot 2.3.4.RELEASE 를 사용 합 니 다.spring-kafka 2.5.6 RELEASE 를 도입 하여 버 전 관계 에 대응 합 니 다.
Spring Boot 2.3 users should use 2.5.x (Boot dependency management will use the correct version).
spring 과 kafka 의 버 전 관계
https://spring.io/projects/sp ...
1.Kafka 와 Zookeeper 환경 구축
kafka 와 zookeeper 환경 을 구축 하고 시작 합 니 다.
2.데모 프로젝트 를 만 들 고 spring-kafka 도입
2.1 pom 파일
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
2.2 application.yml 설정
spring:
kafka:
bootstrap-servers: 192.168.25.6:9092 #bootstrap-servers: kafka ,
consumer:
group-id: myGroup
enable-auto-commit: true
auto-commit-interval: 100ms
properties:
session.timeout.ms: 15000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
producer:
retries: 0 # 0 ,
batch-size: 16384 # , Producer 。 。 ( )。16384
buffer-memory: 33554432 #Producer ,33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer #
value-serializer: org.apache.kafka.common.serialization.StringSerializer #
2.3 메시지 체 메시지 정의
/**
* @author johnny
* @create 2020-09-23 9:21
**/
@Data
public class Message {
private Long id;
private String msg;
private Date sendTime;
}
2.4 정의 KafkaSender주로 KafkaTemplate 를 이용 하여 메 시 지 를 보 내 고 메 시 지 를 Message 로 봉 하여 JSon 꼬치 로 전환 하여 Kafka 에 보 냅 니 다.
@Component
@Slf4j
public class KafkaSender {
private final KafkaTemplate<String, String> kafkaTemplate;
// kafkaTemplate
public KafkaSender(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
private Gson gson = new GsonBuilder().create();
public void send(String msg) {
Message message = new Message();
message.setId(System.currentTimeMillis());
message.setMsg(msg);
message.setSendTime(new Date());
log.info("【++++++++++++++++++ message :{}】", gson.toJson(message));
// topic = hello2
kafkaTemplate.send("hello2",gson.toJson(message));
}
}
2.5 정의 KafkaConsumer감청 방법 에 있어 서 주 해 를 통 해 감청 기 를 설정 하면 됩 니 다.또한 감청 이 필요 한 topic 를 지정 합 니 다.
kafka 의 메시지 재 수신 단 은 Consumer Record 대상 으로 봉 인 됩 니 다.내부 의 value 속성 은 실제 메시지 입 니 다.
@Component
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics = {"hello2"})
public void listen(ConsumerRecord<?, ?> record) {
Optional.ofNullable(record.value())
.ifPresent(message -> {
log.info("【+++++++++++++++++ record = {} 】", record);
log.info("【+++++++++++++++++ message = {}】", message);
});
}
}
3.테스트 효과Http 인 터 페 이 스 를 제공 하여 KafkaSender 로 메 시 지 를 보 냅 니 다.
3.1 Http 테스트 인터페이스 제공
@RestController
@Slf4j
public class TestController {
@Autowired
private KafkaSender kafkaSender;
@GetMapping("sendMessage/{msg}")
public void sendMessage(@PathVariable("msg") String msg){
kafkaSender.send(msg);
}
}
3.2 시작 항목8080 포트 감청
KafkaMessage Listener Container 에 consumer group=my Group 에 hello 2-0 topic 을 감청 하 는 소비자 가 있 습 니 다.
3.3 Http 인터페이스 호출
http://localhost:8080/sendMessage/KafkaTestMsg
이로써 스프링 부 트 통합 카 프 카 가 마무리 됐다.
이상 은 SpringBoot 통합 Kafka 의 절차 에 대한 상세 한 내용 입 니 다.SpringBoot 통합 Kafka 에 관 한 자 료 는 저희 의 다른 관련 글 을 주목 해 주 십시오!
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
【Java・SpringBoot・Thymeleaf】 에러 메세지를 구현(SpringBoot 어플리케이션 실천편 3)로그인하여 사용자 목록을 표시하는 응용 프로그램을 만들고, Spring에서의 개발에 대해 공부하겠습니다 🌟 마지막 데이터 바인딩에 계속 바인딩 실패 시 오류 메시지를 구현합니다. 마지막 기사🌟 src/main/res...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.