Embedded Kafka를 통한 Kafka 테스트
개요 ⌨️
마이크로서비스 아키텍쳐 환경에서 서로 다른 도메인들간 분산 이벤트 처리를 위해 Kafka를 사용하는 경우가 많은데, 실제 운영하는 Kafka를 연동한 후 테스트를 진행하는 경우를 종종 목격한 적이 있습니다.
이는 외부 afka에 의존해 테스트를 진행하는 것이기 때문에 테스트의 안정성을 떨어뜨릴 수 있고,
외부 Kafka의 문제로 인해 테스트 구동이 제한될 수도 있다고 생각합니다.
Spring Boot에서 외부 Kafka 서버에 의존하지 않는 안정적이고 독립적인 통합 테스트하는 방법에 대해서 알려드리고자 합니다.
의존성 🐭
우선 Spring에서 Kafka를 사용하기 위해 Kafka 관련 의존성을 추가해줍니다.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.2</version>
</dependency>
그 후 Kafka Test 관련 의존성을 추가해줍니다.
이때 테스트 환경에서만 의존성을 유지시킬 것이기 때문에 scope를 test로 세팅해줍니다.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.6.3.RELEASE</version>
<scope>test</scope>
</dependency>
적용 예시 🐰
예시 코드는 총 [6개]입니다.
- 먼저 Kafka를 사용하기 위해 properties를 세팅해줍니다.
- 그 후 Producer 설정을 세팅해줍니다.
- Consumer 또한 설정을 세팅해줍니다.
- Consumer를 작성해줍니다.
- Producer를 작성해줍니다.
위 코드를 기준으로 테스트 코드를 작성해보았습니다.
테스트 코드의 결과는 성공적으로 수행됩니다.
application.properties
------------------------------------------------------
spring:
kafka:
producer:
bootstrap-servers: localhost:9092
consumer:
auto-offset-reset: earliest
bootstrap-servers: localhost:9092
enable-auto-commit: false
listener:
ack-mode: manual
KafkaProducerConfig.class
------------------------------------------------------
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.producer.bootstrap-servers}")
private String BOOTSTRAP_SERVERS;
@Bean
public ProducerFactory<String, String> factory(){
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
KafkaTemplate<String, String> kafkaTemplate(){
return new KafkaTemplate<>(factory());
}
}
KafkaConsumerConfig.class
------------------------------------------------------
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.consumer.bootstrap-servers}")
private String BOOTSTRAP_ADDRESS;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String AUTO_OFFSET_RESET;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean AUTO_COMMIT;
@Bean
ConsumerFactory<String,String> consumerFactory(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_ADDRESS);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new StringDeserializer());
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
KafkaProducer.class
------------------------------------------------------
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String payload) {
kafkaTemplate.send(topic, payload);
}
}
KafkaConsumer.class
------------------------------------------------------
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaConsumer {
private final ObjectMapper objectMapper;
private List<RegisteredPostEvent> eventRepo = new ArrayList<>();
@KafkaListener(topics = "testTopic", groupId = "testGroup")
protected void consume(@Payload String payload, Acknowledgment acknowledgment) throws Exception {
log.info("recive event : {}", payload);
RegisteredPostEvent event = objectMapper.readValue(payload, RegisteredPostEvent.class);
eventRepo.add(event);
// Process
acknowledgment.acknowledge();
}
public List<RegisteredPostEvent> getEventRepo() {
return eventRepo;
}
}
EmbeddedKafkaTest.class
------------------------------------------------------
@SpringBootTest
@EmbeddedKafka(partitions = 3,
brokerProperties = {
"listeners=PLAINTEXT://localhost:9092"
},
ports = { 9092 })
class EmbeddedKafkaIntegrationTest {
@Autowired
KafkaProducer producer;
@Autowired
ObjectMapper objectMapper;
@Autowired
private KafkaConsumer kafkaConsumer;
@Test
void test() throws Exception {
// given
RegisteredPostEvent event = RegisteredPostEvent.idOf(1L);
String payload = objectMapper.writeValueAsString(event);
// when
producer.send("testTopic", payload);
Thread.sleep(2000);
// then
assertNotEquals(0, kafkaConsumer.getEventRepo().size());
}
}
핵 심 🐭
위 코드중에 주제에 맞는 가장 핵심이 되는 부분은 EmbeddedKafkaIntegrationTest 클래스에 선언되어 있는 EmbeddedKafka 어노테이션입니다.
위 어노테이션은 spring-kafka-test에 존재하는 어노테이션으로 해당 어노테이션을 클래스 레벨에 선언함으로써 해당 테스트는 Embedded Kafka가 세팅된 환경에서 테스트하게 됩니다.
참고) https://www.baeldung.com/spring-boot-kafka-testing
Author And Source
이 문제에 관하여(Embedded Kafka를 통한 Kafka 테스트), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://velog.io/@wodyd202/Embedded-Kafka를-통한-Kafka-테스트저자 귀속: 원작자 정보가 원작자 URL에 포함되어 있으며 저작권은 원작자 소유입니다.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)