Spring WebSocket with Kafka
구상한 프로젝트 아키텍처는,
기본적으로 MVC 구조랑 비슷하게 DTO로 클라이언트와 통신을 한다.
채팅 방 정보와 같은 것은 디비에 저장하고, 채팅 내역은 kafka에 먼저 저장을 하는 방식..?으로 구현했다.
MVC 패턴 관련한것은 제쳐두고, 카프카 세팅에만 집중해서 이해해보도록 하려고 한다.
build.gradle
// build.gradle
// Kafka
implementation("org.springframework.kafka:spring-kafka")
// 1. Use Guava in your implementation only
implementation("com.google.guava:guava:31.1-jre")
// build.gradle
// Kafka
implementation("org.springframework.kafka:spring-kafka")
// 1. Use Guava in your implementation only
implementation("com.google.guava:guava:31.1-jre")
build.gradle 설정에 이것을 추가해준다.
Configurations
// Kafka 에서 통신할 내용
public class KafkaConstants {
public static final String KAFKA_TOPIC = "test";
public static final String GROUP_ID = "tt";
public static final String KAFKA_BROKER = "localhost:9092";
}
// Kafka 에서 통신할 내용
public class KafkaConstants {
public static final String KAFKA_TOPIC = "test";
public static final String GROUP_ID = "tt";
public static final String KAFKA_BROKER = "localhost:9092";
}
토픽이나 그룹아이디, 브로커 주소는 수정하기 편하게 따로 클래스로 빼두었다.
Producer Config
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, MessageDto> producerFactory() {
return new DefaultKafkaProducerFactory<>(kafkaProducerConfiguration());
}
@Bean
public Map<String, Object> kafkaProducerConfiguration() {
return ImmutableMap.<String, Object>builder()
.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKER)
.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class)
.put("group.id", KafkaConstants.GROUP_ID)
.build();
}
@Bean
public KafkaTemplate<String, MessageDto> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
중요한 것은 중앙에 kafkaProducerConfiguration
인 것 같다. 여기서 관련된 설정을 몇개해준다. HashMap
을 사용해도 상관없는데, 이 설정은 변경되지 않는 설정이므로 google guava를 import해서 ImmutableMap
을 사용해 주었다.
내용을 보면,
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
: 브로커 주소를 설정해준다.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
: 키를 어떤 Serializer를 사용해서 설정할 것인가ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
: Value는?"group.id"
: group id 지정
나는 Key는 문자열 값인 uuid이고, 값은 채팅 내역이 json이므로 json으로 설정해 주었다. 그래서 이것을 바탕으로 ProducerFactory를 생성하고, 이것을 Kafka에서 KafkaTemplate
으로 활용하여 사용하는 듯 하다.
이 설정은 컨슈머도 마찬가지이다.
ConsumerConfig
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MessageDto> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MessageDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, MessageDto> consumerFactory() {
JsonDeserializer<MessageDto> deserializer = new JsonDeserializer<>(MessageDto.class);
deserializer.setRemoveTypeHeaders(false);
deserializer.addTrustedPackages("*");
deserializer.setUseTypeMapperForKey(true);
ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKER)
.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
.put("group.id", KafkaConstants.GROUP_ID)
.build();
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
}
}
Producer랑 거의 비슷하다.
여기도 Factory만들어서 사용하는 건데, 다른점은 나는 MessageDto
라는 객체로 Json을 받아들일 건데, 이게 내가 생성한 임의의 객체라서 카프카에서 해독을 할 수 있게 도와줘야 한다.
그래서 consumerFactory
안에 위에 네줄을 추가해줘야 카프카에서 오류가 나질 않는다.
WebSocketconfig
@Configuration
// @EnableWebSocket
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
// 메시지 발행 요청 : /topic (Application Destination Prefix)
// 메시지 구독 요청 : /kafka (enable Simple Broker)
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/topic");
registry.setApplicationDestinationPrefixes("/kafka");
}
// Stomp WebSocket Endpoint : /ws-chat
// Unity 에서 접속하려 하니 SockJS 를 빼야 했다.
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws-chat")
.setAllowedOrigins("*");
}
이것은 WebSocket 기본 설정이다. 신기했던건 SockJS
를 사용하니 유니티에서 접속이 불가능했다.
Service
KafkaProducer
@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaProducer {
private final KafkaTemplate<String, MessageDto> kafkaTemplate;
public void send(String topic, MessageDto messageDto) {
log.info("topic : " + topic);
log.info("send Message : " + messageDto.getMessage());
kafkaTemplate.send(topic, messageDto);
}
}
@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaProducer {
private final KafkaTemplate<String, MessageDto> kafkaTemplate;
public void send(String topic, MessageDto messageDto) {
log.info("topic : " + topic);
log.info("send Message : " + messageDto.getMessage());
kafkaTemplate.send(topic, messageDto);
}
}
위에 producerConfig
에서 등록한 카프카템플릿을 이용해서, 지정해놓은 토픽에 메세지를 보내는 방식이다.
KafkaConsumuer
@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaConsumer {
private final SimpMessagingTemplate template;
@KafkaListener(topics = KafkaConstants.KAFKA_TOPIC, groupId = KafkaConstants.GROUP_ID)
public void consume(MessageDto message) throws IOException {
log.info("Consumed Message : " + message.getMessage());
HashMap<String, String> msg = new HashMap<>();
msg.put("roomId", message.getRoomId());
msg.put("message", message.getMessage());
msg.put("writer", message.getWriter());
ObjectMapper mapper = new ObjectMapper();
template.convertAndSend("/topic/tt", mapper.writeValueAsString(msg));
}
}
KafkaListener
어노테이션을 이용하여, 구독할 토픽과 그룹아이디를 설정해준다.
거기서 메세지를 읽어서, SimpMessagingTemplate
을 이용하여 STOMP WebSocket으로 메세지를 날려주는 것이다.
Controller
ChatController
@RestController
@RequiredArgsConstructor
@Slf4j
public class ChatController {
private final ChatServiceImpl chatService;
private final KafkaTemplate<String, MessageDto> kafkaTemplate;
private final KafkaProducer kafkaProducer;
private final ChatMessageHistoryRepository chatMessageHistoryRepository;
@PostMapping("/publish")
public void sendMessage(@RequestBody MessageDto messageDto) {
log.info("ChatController -> sendMessage : " + messageDto.getMessage());
try {
kafkaTemplate.send(KafkaConstants.KAFKA_TOPIC, messageDto);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@PostMapping("/message")
@MessageMapping("/message")
public void message(@RequestBody MessageDto message) {
log.info(message.getMessage());
chatMessageHistoryRepository.save(message);
kafkaProducer.send(KafkaConstants.KAFKA_TOPIC, message);
}
@GetMapping("/history")
public List<MessageDto> getMessageHistory() {
log.info("history 호출");
return chatMessageHistoryRepository.get();
}
}
@RestController
@RequiredArgsConstructor
@Slf4j
public class ChatController {
private final ChatServiceImpl chatService;
private final KafkaTemplate<String, MessageDto> kafkaTemplate;
private final KafkaProducer kafkaProducer;
private final ChatMessageHistoryRepository chatMessageHistoryRepository;
@PostMapping("/publish")
public void sendMessage(@RequestBody MessageDto messageDto) {
log.info("ChatController -> sendMessage : " + messageDto.getMessage());
try {
kafkaTemplate.send(KafkaConstants.KAFKA_TOPIC, messageDto);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@PostMapping("/message")
@MessageMapping("/message")
public void message(@RequestBody MessageDto message) {
log.info(message.getMessage());
chatMessageHistoryRepository.save(message);
kafkaProducer.send(KafkaConstants.KAFKA_TOPIC, message);
}
@GetMapping("/history")
public List<MessageDto> getMessageHistory() {
log.info("history 호출");
return chatMessageHistoryRepository.get();
}
}
클라이언트 창이 따로없어, Stomp Websocket을 연결하는 것을 확인할 수 없어 PostMapping
을 추가해주었다.
kafka/message
로 메세지를 날리면, 프로듀서 서비스를 호출하여 날려버린다.
/publish
는 위작업을 컨트롤러에서 바로 처리하는 작업인 것이다.
따라 치면서 이해를 하려고 했는데, 함수 정리가 조금 필요할 것 같다.
그냥 간단하게 코드 적어놓고 설명만해서, 나는 좀 이해가 되지만 처음 보는 사람은 이해가 힘들듯 하다.
좀 더 만져보고 완전히 정리를 한번 더 하던가 해야겠다.
Author And Source
이 문제에 관하여(Spring WebSocket with Kafka), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://velog.io/@sossont/Spring-WebSocket-with-Kafka저자 귀속: 원작자 정보가 원작자 URL에 포함되어 있으며 저작권은 원작자 소유입니다.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)