Kafka Spring 활용 [1] Producer, Consumer
📘Kafka Spring에서 활용하기
Kafka의 동작 구조를 살펴봤으니 실제로 Spring Cloud에서는 어떻게 사용하는지 직접 만들어서 사용해보자.
우리가 지금까지 만들었던 catalog-service와 order-service를 사용하여 주문이 발생했을 때 catalog-service에 존재하는 재고 qty의 값을 줄여주는 로직을 kafka의 메세지를 주고 받음으로써 처리하는 로직을 만들어보려고 한다.
🔨Catalog Service (Consumer) 수정
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
kafka 의존성을 추가해주고
@EnableKafka //kafka 설정 추가
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory(){
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); //kafka 실행 서버 ip
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId"); //Consumer들을 그룹핑 할수 있다.
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); //KEY 값을 String de serializer로 지정
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); //VALUE 값을 String de serializer로 지정
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory()); //위에서 설정한 consumerFactory를 설정해줌
return kafkaListenerContainerFactory;
}
}
src 아래 messagequeue 패키지를 생성하여 KafkaConsumerConfig
파일을 만들어 Kafka 설정값을 추가해주었다. Catalog는 Order에서 발생된 메세지를 읽어서 사용하는 쪽(Consumer)이므로 DESERIALIZER로 세팅해주었다.
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaConsumer {
private final CatalogRepository repository;
@KafkaListener(topics = "example-catalog-topic")
public void updateQty(String kafkaMessage){
log.info("kafka message = {}", kafkaMessage);
//kafka 메세지 역 직렬화
Map<Object, Object> map = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
try {
map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
} catch (JsonProcessingException e) {
e.printStackTrace();
}
CatalogEntity entity = repository.findByProductId(map.get("productId").toString());
if(entity != null){
entity.setStock(entity.getStock() - (Integer)map.get("qty"));
//update
repository.save(entity);
}
}
}
그리고 @KafkaListener
어노테이션을 사용하여 example-catalog-topic 토픽에 대해 대기하도록 등혹간 뒤 ObjectMapper를 사용하여 json 데이터를 파싱한 후 로직을 실행하도록 코드를 작성한다.
🔨Order Service (Producer) 수정
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
kafka 의존성을 동일하게 추가해주고
@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory(){
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); //kafka 실행 서버 ip
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //KEY 값을 String serializer로 지정
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //VALUE 값을 String serializer로 지정
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
}
위와는 반대로 Producer로써의 설정으로 추가해준다. 거의 동일하지만 모든 설정이 Consumer가 아닌 Producer인것을 확인하고 코드를 작성하자!
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public OrderDto orderSend(String topic, OrderDto orderDto){
ObjectMapper mapper = new ObjectMapper();
//json format으로 변경
String json = "";
try {
json = mapper.writeValueAsString(orderDto);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
//kafka 메세지 전송
kafkaTemplate.send(topic, json);
log.info("Kafka Producer send data from the order service = {}",orderDto);
return orderDto;
}
}
그리고 topic과 OrderDto 객체를 매개변수로 전달받아 OrderDto는 json 형태로 변환해주고 KafkaTemplate
의 send()
를 통해서 메세지를 전달하도록 orderSend를 작성해준다.
@RestController
@RequestMapping("/order-service")
@RequiredArgsConstructor
public class OrderController {
private final Environment env;
private final OrderService orderService;
private final KafkaProducer kafkaProducer; //kafka producer 주입
...
@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId, @RequestBody RequestOrder requestOrder){
ModelMapper mapper = new ModelMapper();
mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
//기존의 jpa 로직
OrderDto orderDto = mapper.map(requestOrder, OrderDto.class);
orderDto.setUserId(userId);
OrderDto createOrder = orderService.createOrder(orderDto);
ResponseOrder responseOrder = mapper.map(createOrder, ResponseOrder.class);
//kafka 로직 추가
kafkaProducer.orderSend("example-catalog-topic", orderDto);
return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
}
}
Controller에서 기존의 내부 로직만 실행하던 부분에 kafka 메세지 전달 로직을 추가로 작성해준다.
👏Test 해보기
실행에 앞서 Config server, Eureka Server, Kafka Server, Kafka zookeaper Server, Gateway Server 모두 실행해준 뒤 서비스를 실행시키자
./bin/windows/kafka-server-start.bat ./config/server.properties
./bin/windows/zookeeper-server-start.bat ./config/zookeeper.properties
그 후에 Catalog service에서 DB를 확인하면 초기 데이터로 세팅되어 있는 것을 확인할 수 있다.
Order Service가 실행되었고 정상 return을 받았다.
user-service를 실행하여 정상적인 로그인 후 아이디 값을 받지 않았지만 주문만 테스트하고 주문시에 user를 체크하는 로직이 없기 때문에 정상적으로 실행된다.
order-service에서 정상적으로 메세지를 전송했다.
catalog-service에서도 메세지를 정상적으로 수신했고
주문 가능 수량도 90으로 줄어든 것을 확인할 수 있다.
Author And Source
이 문제에 관하여(Kafka Spring 활용 [1] Producer, Consumer), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://velog.io/@ililil9482/Kafka-활용해보기-1-Producer-Consumer저자 귀속: 원작자 정보가 원작자 URL에 포함되어 있으며 저작권은 원작자 소유입니다.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)