kafka 의 0.8.2.1 버 전의 자바 코드 구현

7084 단어 메시지 큐
1. 설명
     이 코드 구현 은 kafka2.10 의 0.8.2.1 버 전의 자바 코드 가 실현 되 고 소비 자 는 여러 개의 Topic 소비 에 대한 다 중 스 레 드 실현 이다.
2. 설치
     참고: Kafka 간단 한 튜 토리 얼 구축
3. 가 져 오기 의존
    kafka 의 의존 만 가 져 옵 니 다. 스 레 드 풀 은 spring 의 Thread PoolTask Executor 스 레 드 풀 을 사용 합 니 다.

    org.apache.kafka
    kafka_2.10
    0.8.2.1

    
    org.apache.kafka
    kafka-clients
    0.8.2.1

4. 설정
4.1 스 레 드 탱크 의 간단 한 설정
@Configuration
public class SpringAsyncConfig {
    @Value(value = "${async.pool.max.size:80}")
    private int maxPoolSize;
    @Value(value = "${async.pool.queue.size:20}")
    private int queueSize;
    @Value(value= "${async.pool.core.size:5}")
    private int corePoolSize;
    @Value(value= "${async.pool.core.size:5}")
    private int knowsCorePoolSize;

    @Bean(name = "asyncTaskExecutor")
    public AsyncTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueSize);
        executor.setCorePoolSize(corePoolSize);
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }
}

4.2 Kafka 의 간단 한 설정
@Configuration
public class KafkaConfig {

    public Properties producerConfig(){
        Properties props = new Properties();
        //         ,    Linux     Kafka,    ,        
        props.put("bootstrap.servers", "localhost:9092");
        props.put("timeout.ms", 3000);
        props.put("metadata.fetch.timeout.ms", 3000);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("acks", "0");
        return props;
    }

    @Bean
    public KafkaProducer kafkaProducer() {
        return new KafkaProducer(producerConfig());
    }

    public Properties consumerConfig(){
        Properties props = new Properties();
        props.put("auto.offset.reset", "smallest");
        //         ,    Linux     Kafka,    ,        
        props.put("zookeeper.connect", "172.31.52.83:2181");
        props.put("group.id", "defaultGroup");
        props.put("zookeeper.session.timeout.ms", "10000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("partition.assignment.strategy", "range");
        return props;
    }

    @Bean
    public ConsumerConfig kafkaConsumer() {
        ConsumerConfig consumerConfig = new ConsumerConfig(consumerConfig());
        return consumerConfig;
    }
}

5. 생산자
@Component
public class KafkaProducerClient {

    @Autowired
    private KafkaProducer kafkaProducer;
    
    private void produce(String topic,String message){
        ProducerRecord record = new ProducerRecord<>(topic,message);
        kafkaProducer.send(record,(metadata,e) -> {
            if(e != null){
                //       
                System.out.println("      ");
            }
        });
    }
}

6. 소비자
    몇 개의 Topic 이 있 을 수 있 습 니 다. 여기 서 보 여 주 는 것 은 두 개의 Topic 입 니 다.
6.1 소비자 시동 입구
/**
 *       
 *     :  InitializingBean              
 */
@Component
public class KafkaConsumerStart implements InitializingBean {

    /**
     *    
     */
    @Autowired
    private AsyncTaskExecutor taskExecutor;

    /**
     *      
     */
    @Autowired
    private ConsumerConfig consumerConfig;

    /**
     *           ,     ,Biz        ,       ,,      ,       
     */
    @Autowired
    private TopicsHandleBiz topicsHandleBiz;

    private ConsumerConnector consumer;

    @Override
    public void afterPropertiesSet() {
        try{
            consumer = Consumer.createJavaConsumerConnector(consumerConfig);
            Map topicCountMap = new HashMap();
            // topics
            String testTopic1 = "testTopic1";
            String testTopic2 = "testTopic2";

            topicCountMap.put(testTopic1, 1);
            topicCountMap.put(testTopic2, 1);

            Map>> consumerMap = consumer.createMessageStreams(topicCountMap);

            //        testTopic1
            List> streams1 = consumerMap.get(testTopic1);
            if(streams1.size() > 0){
                taskExecutor.execute(new TestTopicHandle1(streams1.get(0),topicsHandleBiz));
            }

            //        testTopic2
            List> streams2 = consumerMap.get(testTopic2);
            if(streams2.size() > 0){
                taskExecutor.execute(new TestTopicHandle2(streams2.get(0),topicsHandleBiz));
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

6.2 스 레 드 처리 testTopic 1
public class TestTopicHandle1 implements Runnable {

    private TopicsHandleBiz topicsHandleBiz;

    private KafkaStream kafkaStream;

    public TestTopicHandle1(KafkaStream kafkaStream,TopicsHandleBiz topicsHandleBiz) throws ClassNotFoundException {
        this.kafkaStream = kafkaStream;
        this.topicsHandleBiz = topicsHandleBiz;
    }

    @Override
    public void run() {
        ConsumerIterator it = kafkaStream.iterator();
        while (it.hasNext()) {
            String message = new String(it.next().message());
            try {
                //   testTopic1   
                TopicsHandleBiz.save(message);
            }catch (Exception e){
                //   
                e.printStackTrace();
            }
        }
    }
}

6.3 스 레 드 처리 testTopic 2
public class TestTopicHandle2 implements Runnable {

    private TopicsHandleBiz topicsHandleBiz;

    private KafkaStream kafkaStream;

    public TestTopicHandle2(KafkaStream kafkaStream,TopicsHandleBiz topicsHandleBiz) throws ClassNotFoundException {
        this.kafkaStream = kafkaStream;
        this.topicsHandleBiz = topicsHandleBiz;
    }

    @Override
    public void run() {
        ConsumerIterator it = kafkaStream.iterator();
        while (it.hasNext()) {
            String message = new String(it.next().message());
            try {
                //   testTopic2   
                TopicsHandleBiz.save(message);
            }catch (Exception e){
                //   
                e.printStackTrace();
            }
        }
    }
}

좋은 웹페이지 즐겨찾기