스프링부트 Kafka 연습 2 - JSON 주고받기

저번에 이어서 kafka 예제를 만들어 보겠습니다.

저번 예제에서는 String형식의 메세지만 주고 받았습니다.

제가 협업프로젝트를 하면서 대부분 프론트엔드와 JSON형식으로 통신했었기에 kafka도 JSON으로 통신해야할 필요성을 느껴서 예제를 만들었습니다.

JSON을 주고 받으려면 Value의 Stringserializer부분을 JSON으로 바꿔주어야합니다.

기존 방식처럼 yml에 하려고 했는데 아무리 해도 안되서 Config Class를 만들어 주는 방식으로 했습니다.


전체 소스는 제 깃헙 링크에서 봐주세요!

https://github.com/namusik/TIL-SampleProject/tree/main/Kafka/Sample%20Project

application.yml

  spring:
  	kafka:
      bootstrap-servers: localhost:9092

KafkaProducerConfig

@EnableKafka
@Configuration
public class KafkaProducerConfig {

  @Value("${spring.kafka.bootstrap-servers}")
  private String servers;

  @Bean
  public ProducerFactory<String, Chatmessage> producerFactory() {
      Map<String, Object> config = new HashMap<>();
      config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
      config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
      config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
      return new DefaultKafkaProducerFactory<>(config);
  }

  @Bean
  public KafkaTemplate<String, Chatmessage> kafkaTemplate() {
      return new KafkaTemplate<>(producerFactory());
  }
}

KafkaConsumerConfig

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    @Bean
    public ConsumerFactory<String, Chatmessage> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "testgroup");

        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(Chatmessage.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Chatmessage> kafkaListener() {
        ConcurrentKafkaListenerContainerFactory<String, Chatmessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

KafkaProducerService

@Service
@RequiredArgsConstructor
public class KafkaProducerService {
    private static final String TOPIC = "testTopic";
    private final KafkaTemplate<String, Chatmessage> kafkaTemplate;

    public void sendMessage(Chatmessage chatmessage) {
        System.out.println("chatmessage = " + chatmessage.getContext());

        kafkaTemplate.send(TOPIC, chatmessage);
    }
}

KafkaConsumerService

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "testTopic", groupId = "testgroup", containerFactory = "kafkaListener")
    public void consume(Chatmessage message){
        System.out.println("name = " + message.getSender());
        System.out.println("consume message = " + message.getContext());
    }

}

!!!!@KafkaListener 어노테이션 안에 containerFactory 꼭 넣어줘야 함!!!

최종적으로 메시지를 받으면 객체의 필드값 출력해주기

KafkaController

@RestController
@RequiredArgsConstructor
public class KafkaController {
    private final KafkaProducerService producerService;

    @PostMapping("/kafka")
    public String sendMessage(@RequestBody Chatmessage chatmessage) {
        System.out.println("chatmessage = " + chatmessage);
        producerService.sendMessage(chatmessage);

        return "success";
    }
}

Json 형식으로 Controller에 보낼 것이기에 @RequestBody 사용.

Chatmessage

@Getter
@NoArgsConstructor
@AllArgsConstructor
public class Chatmessage {

    private String sender;
    private String context;
}

채팅메시지 객체를 만들어 줌

실행결과

zookeeper와 kafka 서버를 먼저 실행시켜주고

스프링부트 서버 run을 해줍니다

Postman을 사용해서 api를 실행시키면

성공시 "success"를 반환하고

인텔리제이에는 컨슈머가 받은 Chatmessage의 sender와 context를 출력해주면 성공!!

여기까지 springboot에서 kafka 사용해보기 기본이었습니다.

현재 궁극적인 목표는 STOMP + Kafka를 사용해서 채팅방을 만드는 미니프로젝트를 만드는 것입니다~! 제발~~!

참고

https://www.skyer9.pe.kr/wordpress/?p=1550

좋은 웹페이지 즐겨찾기