Embedded Kafka를 통한 Kafka 테스트

개요 ⌨️

마이크로서비스 아키텍쳐 환경에서 서로 다른 도메인들간 분산 이벤트 처리를 위해 Kafka를 사용하는 경우가 많은데, 실제 운영하는 Kafka를 연동한 후 테스트를 진행하는 경우를 종종 목격한 적이 있습니다.

이는 외부 afka에 의존해 테스트를 진행하는 것이기 때문에 테스트의 안정성을 떨어뜨릴 수 있고,
외부 Kafka의 문제로 인해 테스트 구동이 제한될 수도 있다고 생각합니다.

Spring Boot에서 외부 Kafka 서버에 의존하지 않는 안정적이고 독립적인 통합 테스트하는 방법에 대해서 알려드리고자 합니다.




의존성 🐭

우선 Spring에서 Kafka를 사용하기 위해 Kafka 관련 의존성을 추가해줍니다.

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.2</version>
</dependency>

그 후 Kafka Test 관련 의존성을 추가해줍니다.

이때 테스트 환경에서만 의존성을 유지시킬 것이기 때문에 scope를 test로 세팅해줍니다.

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <version>2.6.3.RELEASE</version>
    <scope>test</scope>
</dependency>



적용 예시 🐰

예시 코드는 총 [6개]입니다.

  1. 먼저 Kafka를 사용하기 위해 properties를 세팅해줍니다.

  1. 그 후 Producer 설정을 세팅해줍니다.

  1. Consumer 또한 설정을 세팅해줍니다.

  1. Consumer를 작성해줍니다.

  1. Producer를 작성해줍니다.

위 코드를 기준으로 테스트 코드를 작성해보았습니다.
테스트 코드의 결과는 성공적으로 수행됩니다.

텍스트로 보기
application.properties
------------------------------------------------------

spring:
    kafka:
        producer:
            bootstrap-servers: localhost:9092
        consumer:
            auto-offset-reset: earliest
            bootstrap-servers: localhost:9092
            enable-auto-commit: false
        listener:
            ack-mode: manual



KafkaProducerConfig.class
------------------------------------------------------

@Configuration
public class KafkaProducerConfig {
    @Value("${spring.kafka.producer.bootstrap-servers}")
    private String BOOTSTRAP_SERVERS;

    @Bean
    public ProducerFactory<String, String> factory(){
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    KafkaTemplate<String, String> kafkaTemplate(){
        return new KafkaTemplate<>(factory());
    }
}



KafkaConsumerConfig.class
------------------------------------------------------

@Configuration
public class KafkaConsumerConfig {
    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String BOOTSTRAP_ADDRESS;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String AUTO_OFFSET_RESET;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean AUTO_COMMIT;

    @Bean
    ConsumerFactory<String,String> consumerFactory(){
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_ADDRESS);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT);
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new StringDeserializer());
    }

    @Bean
    ConcurrentKafkaListenerContainerFactory<String, String> containerFactory(){
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}



KafkaProducer.class
------------------------------------------------------

@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaProducer {
    private final KafkaTemplate<String, String> kafkaTemplate;

    public void send(String topic, String payload) {
        kafkaTemplate.send(topic, payload);
    }
}



KafkaConsumer.class
------------------------------------------------------

@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaConsumer {
    private final ObjectMapper objectMapper;
    private List<RegisteredPostEvent> eventRepo = new ArrayList<>();

    @KafkaListener(topics = "testTopic", groupId = "testGroup")
    protected void consume(@Payload String payload, Acknowledgment acknowledgment) throws Exception {
        log.info("recive event : {}", payload);
        RegisteredPostEvent event = objectMapper.readValue(payload, RegisteredPostEvent.class);
        eventRepo.add(event);

        // Process
        acknowledgment.acknowledge();
    }

    public List<RegisteredPostEvent> getEventRepo() {
        return eventRepo;
    }
}



EmbeddedKafkaTest.class
------------------------------------------------------

@SpringBootTest
@EmbeddedKafka(partitions = 3,
               brokerProperties = {
                    "listeners=PLAINTEXT://localhost:9092"
               },
               ports = { 9092 })
class EmbeddedKafkaIntegrationTest {

    @Autowired
    KafkaProducer producer;

    @Autowired
    ObjectMapper objectMapper;

    @Autowired
    private KafkaConsumer kafkaConsumer;

    @Test
    void test() throws Exception {
        // given
        RegisteredPostEvent event = RegisteredPostEvent.idOf(1L);
        String payload = objectMapper.writeValueAsString(event);

        // when
        producer.send("testTopic", payload);
        Thread.sleep(2000);

        // then
        assertNotEquals(0, kafkaConsumer.getEventRepo().size());
    }
}



핵 심 🐭

위 코드중에 주제에 맞는 가장 핵심이 되는 부분은 EmbeddedKafkaIntegrationTest 클래스에 선언되어 있는 EmbeddedKafka 어노테이션입니다.

위 어노테이션은 spring-kafka-test에 존재하는 어노테이션으로 해당 어노테이션을 클래스 레벨에 선언함으로써 해당 테스트는 Embedded Kafka가 세팅된 환경에서 테스트하게 됩니다.

참고) https://www.baeldung.com/spring-boot-kafka-testing

좋은 웹페이지 즐겨찾기