Springboot + kafka 집단 소비자 지정 서버 감청

4060 단어 kafka기술 학습
1.kafkaConfig
@Configuration
@EnableKafka
public class KafkaConfig {
    
    
    @Bean
    @Primary// 
    KafkaListenerContainerFactory> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
    @Bean
    public ConsumerFactory consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    @Bean
    public Map consumerConfigs() {
        Map props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }


    /**
     *  
     */

    @Bean
    KafkaListenerContainerFactory> kafkaListenerContainerFactoryTwoSchedule() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactoryTwoSchedule());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory consumerFactoryTwoSchedule() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigsTwoSchedule());
    }


    /**
     *
     * 
     */
    @Bean
    public Map consumerConfigsTwoSchedule() {
        Map props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapDriverServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
    
}

2. 소비자
@Component
public class KafkaConsumer {
    // 
    @KafkaListener(topics = "${one-topic}")
    public void odConsumer(ConsumerRecord msg) {
        // 
         String payload = (String) record.value();
         log.info(" {}", payload);
        //  
         Book book = JsonUtils.parseJsonToObj(payload, Book.class);
    }
    

    // 
    @KafkaListener(topics = "${twoTopic}", containerFactory = "kafkaListenerContainerFactoryTwoSchedule")
    public void driverSheduleConsumer(ConsumerRecord msg) {
         // 
       String payload = (String) record.value();
         log.info(" {}", payload);
        //  
         Book book = JsonUtils.parseJsonToObj(payload, Book.class);
    }
}

3. 기본적으로 실현되었다...

좋은 웹페이지 즐겨찾기