Spring WebSocket with Kafka

구상한 프로젝트 아키텍처는,
기본적으로 MVC 구조랑 비슷하게 DTO로 클라이언트와 통신을 한다.
채팅 방 정보와 같은 것은 디비에 저장하고, 채팅 내역은 kafka에 먼저 저장을 하는 방식..?으로 구현했다.

MVC 패턴 관련한것은 제쳐두고, 카프카 세팅에만 집중해서 이해해보도록 하려고 한다.

build.gradle

// build.gradle
// Kafka
implementation("org.springframework.kafka:spring-kafka")

// 1. Use Guava in your implementation only
implementation("com.google.guava:guava:31.1-jre")

build.gradle 설정에 이것을 추가해준다.

Configurations

// Kafka 에서 통신할 내용
public class KafkaConstants {
    public static final String KAFKA_TOPIC = "test";
    public static final String GROUP_ID = "tt";
    public static final String KAFKA_BROKER = "localhost:9092";
}

토픽이나 그룹아이디, 브로커 주소는 수정하기 편하게 따로 클래스로 빼두었다.

Producer Config

@Configuration
@EnableKafka
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, MessageDto> producerFactory() {
        return new DefaultKafkaProducerFactory<>(kafkaProducerConfiguration());
    }

    @Bean
    public Map<String, Object> kafkaProducerConfiguration() {
        return ImmutableMap.<String, Object>builder()
                .put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKER)
                .put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
                .put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class)
                .put("group.id", KafkaConstants.GROUP_ID)
                .build();
    }

    @Bean
    public KafkaTemplate<String, MessageDto> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

중요한 것은 중앙에 kafkaProducerConfiguration 인 것 같다. 여기서 관련된 설정을 몇개해준다. HashMap 을 사용해도 상관없는데, 이 설정은 변경되지 않는 설정이므로 google guava를 import해서 ImmutableMap을 사용해 주었다.
내용을 보면,

  • ProducerConfig.BOOTSTRAP_SERVERS_CONFIG : 브로커 주소를 설정해준다.
  • ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG : 키를 어떤 Serializer를 사용해서 설정할 것인가
  • ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG : Value는?
  • "group.id" : group id 지정

나는 Key는 문자열 값인 uuid이고, 값은 채팅 내역이 json이므로 json으로 설정해 주었다. 그래서 이것을 바탕으로 ProducerFactory를 생성하고, 이것을 Kafka에서 KafkaTemplate으로 활용하여 사용하는 듯 하다.

이 설정은 컨슈머도 마찬가지이다.

ConsumerConfig

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, MessageDto> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, MessageDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<String, MessageDto> consumerFactory() {
        JsonDeserializer<MessageDto> deserializer = new JsonDeserializer<>(MessageDto.class);
        deserializer.setRemoveTypeHeaders(false);
        deserializer.addTrustedPackages("*");
        deserializer.setUseTypeMapperForKey(true);

        ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
                .put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKER)
                .put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
                .put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
                .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
                .put("group.id", KafkaConstants.GROUP_ID)
                .build();

        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
    }

}

Producer랑 거의 비슷하다.
여기도 Factory만들어서 사용하는 건데, 다른점은 나는 MessageDto라는 객체로 Json을 받아들일 건데, 이게 내가 생성한 임의의 객체라서 카프카에서 해독을 할 수 있게 도와줘야 한다.

그래서 consumerFactory안에 위에 네줄을 추가해줘야 카프카에서 오류가 나질 않는다.

WebSocketconfig

@Configuration
// @EnableWebSocket
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    // 메시지 발행 요청 : /topic (Application Destination Prefix)
    // 메시지 구독 요청 : /kafka (enable Simple Broker)
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/topic");
        registry.setApplicationDestinationPrefixes("/kafka");
    }

    // Stomp WebSocket Endpoint : /ws-chat
    // Unity 에서 접속하려 하니 SockJS 를 빼야 했다.
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws-chat")
                .setAllowedOrigins("*");
    }

이것은 WebSocket 기본 설정이다. 신기했던건 SockJS를 사용하니 유니티에서 접속이 불가능했다.

Service

KafkaProducer

@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaProducer {

    private final KafkaTemplate<String, MessageDto> kafkaTemplate;

    public void send(String topic, MessageDto messageDto) {
        log.info("topic : " + topic);
        log.info("send Message : " + messageDto.getMessage());
        kafkaTemplate.send(topic, messageDto);
    }
}

위에 producerConfig에서 등록한 카프카템플릿을 이용해서, 지정해놓은 토픽에 메세지를 보내는 방식이다.

KafkaConsumuer

@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaConsumer {

    private final SimpMessagingTemplate template;

    @KafkaListener(topics = KafkaConstants.KAFKA_TOPIC, groupId = KafkaConstants.GROUP_ID)
    public void consume(MessageDto message) throws IOException {
        log.info("Consumed Message : " + message.getMessage());
        HashMap<String, String> msg = new HashMap<>();
        msg.put("roomId", message.getRoomId());
        msg.put("message", message.getMessage());
        msg.put("writer", message.getWriter());

        ObjectMapper mapper = new ObjectMapper();
        template.convertAndSend("/topic/tt", mapper.writeValueAsString(msg));
    }
}

KafkaListener 어노테이션을 이용하여, 구독할 토픽과 그룹아이디를 설정해준다.
거기서 메세지를 읽어서, SimpMessagingTemplate을 이용하여 STOMP WebSocket으로 메세지를 날려주는 것이다.

Controller

ChatController

@RestController
@RequiredArgsConstructor
@Slf4j
public class ChatController {

    private final ChatServiceImpl chatService;
    private final KafkaTemplate<String, MessageDto> kafkaTemplate;
    private final KafkaProducer kafkaProducer;
    private final ChatMessageHistoryRepository chatMessageHistoryRepository;

    @PostMapping("/publish")
    public void sendMessage(@RequestBody MessageDto messageDto) {
        log.info("ChatController -> sendMessage : " + messageDto.getMessage());
        try {
            kafkaTemplate.send(KafkaConstants.KAFKA_TOPIC, messageDto);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @PostMapping("/message")
    @MessageMapping("/message")
    public void message(@RequestBody MessageDto message) {
        log.info(message.getMessage());
        chatMessageHistoryRepository.save(message);
        kafkaProducer.send(KafkaConstants.KAFKA_TOPIC, message);
    }

    @GetMapping("/history")
    public List<MessageDto> getMessageHistory() {
        log.info("history 호출");
        return chatMessageHistoryRepository.get();
    }
}

클라이언트 창이 따로없어, Stomp Websocket을 연결하는 것을 확인할 수 없어 PostMapping을 추가해주었다.
kafka/message 로 메세지를 날리면, 프로듀서 서비스를 호출하여 날려버린다.
/publish 는 위작업을 컨트롤러에서 바로 처리하는 작업인 것이다.
따라 치면서 이해를 하려고 했는데, 함수 정리가 조금 필요할 것 같다.

그냥 간단하게 코드 적어놓고 설명만해서, 나는 좀 이해가 되지만 처음 보는 사람은 이해가 힘들듯 하다.
좀 더 만져보고 완전히 정리를 한번 더 하던가 해야겠다.

좋은 웹페이지 즐겨찾기