스프링부트 Kafka 연습 2 - JSON 주고받기
저번에 이어서 kafka 예제를 만들어 보겠습니다.
저번 예제에서는 String형식의 메세지만 주고 받았습니다.
제가 협업프로젝트를 하면서 대부분 프론트엔드와 JSON형식으로 통신했었기에 kafka도 JSON으로 통신해야할 필요성을 느껴서 예제를 만들었습니다.
JSON을 주고 받으려면 Value의 Stringserializer부분을 JSON으로 바꿔주어야합니다.
기존 방식처럼 yml에 하려고 했는데 아무리 해도 안되서 Config Class를 만들어 주는 방식으로 했습니다.
전체 소스는 제 깃헙 링크에서 봐주세요!
https://github.com/namusik/TIL-SampleProject/tree/main/Kafka/Sample%20Project
application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
KafkaProducerConfig
@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String servers;
@Bean
public ProducerFactory<String, Chatmessage> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Chatmessage> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
KafkaConsumerConfig
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String servers;
@Bean
public ConsumerFactory<String, Chatmessage> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "testgroup");
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(Chatmessage.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Chatmessage> kafkaListener() {
ConcurrentKafkaListenerContainerFactory<String, Chatmessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
KafkaProducerService
@Service
@RequiredArgsConstructor
public class KafkaProducerService {
private static final String TOPIC = "testTopic";
private final KafkaTemplate<String, Chatmessage> kafkaTemplate;
public void sendMessage(Chatmessage chatmessage) {
System.out.println("chatmessage = " + chatmessage.getContext());
kafkaTemplate.send(TOPIC, chatmessage);
}
}
KafkaConsumerService
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "testTopic", groupId = "testgroup", containerFactory = "kafkaListener")
public void consume(Chatmessage message){
System.out.println("name = " + message.getSender());
System.out.println("consume message = " + message.getContext());
}
}
!!!!@KafkaListener 어노테이션 안에 containerFactory 꼭 넣어줘야 함!!!
최종적으로 메시지를 받으면 객체의 필드값 출력해주기
KafkaController
@RestController
@RequiredArgsConstructor
public class KafkaController {
private final KafkaProducerService producerService;
@PostMapping("/kafka")
public String sendMessage(@RequestBody Chatmessage chatmessage) {
System.out.println("chatmessage = " + chatmessage);
producerService.sendMessage(chatmessage);
return "success";
}
}
Json 형식으로 Controller에 보낼 것이기에 @RequestBody 사용.
Chatmessage
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class Chatmessage {
private String sender;
private String context;
}
채팅메시지 객체를 만들어 줌
실행결과
zookeeper와 kafka 서버를 먼저 실행시켜주고
스프링부트 서버 run을 해줍니다
Postman을 사용해서 api를 실행시키면
성공시 "success"를 반환하고
인텔리제이에는 컨슈머가 받은 Chatmessage의 sender와 context를 출력해주면 성공!!
여기까지 springboot에서 kafka 사용해보기 기본이었습니다.
현재 궁극적인 목표는 STOMP + Kafka를 사용해서 채팅방을 만드는 미니프로젝트를 만드는 것입니다~! 제발~~!
참고
https://www.skyer9.pe.kr/wordpress/?p=1550
Author And Source
이 문제에 관하여(스프링부트 Kafka 연습 2 - JSON 주고받기), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://velog.io/@rainbowweb/kafka-연습프로젝트-2-JSON-주고받기저자 귀속: 원작자 정보가 원작자 URL에 포함되어 있으며 저작권은 원작자 소유입니다.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)