Kafka의 High Level Consumer 설계

5738 단어 메시지 대기열
텍스트:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
High Level Consumer를 사용하는 이유
  • 일부 응용 장면에서 우리는 다중 라인을 통해 정보를 읽기를 원하지만 우리는 카프카에서 정보를 소비하는 순서에 관심이 없다. 우리는 데이터가 소비될 수 있는 것만 관심을 가지면 된다.High Level은 이러한 소비 동작을 추상화하는 데 쓰인다.
  • 메시지 소비는 Consumer Group 단위이고 각각의 Consumer Group에는 여러 개의 consumer가 있을 수 있다. 각각의 consumer는 하나의 라인이고 topic의 각partition은 한 개의 consumer만 읽을 수 있다. Consumer Group에 대응하는 각partition은 최신 offset의 값을 가지고 zookeeper에 저장된다.그래서 중복 소비는 일어나지 않는다.
  • consumer의 offerset이 zookeeper에 실시간으로 전송되는 것은 아니기 때문에 Consumer가 갑자기 Crash가 되면 중복된 정보를 읽을 수 있다
  • 설계 High Level Consumer
    High Level Consumer는 다중 스레드 환경에 사용할 수 있고 사용할 수 있습니다. 스레드 모델의 스레드 수(group의 consumer 수)는 topic의partition 수와 관련이 있습니다. 다음은 몇 가지 규칙을 열거합니다.
  • 제공된 스레드 수량이partition의 수량보다 많으면 일부 스레드는 메시지를 받지 못합니다.
  • 제공된 스레드 수량이partition의 수량보다 적으면 일부 스레드는 여러 개의partition에서 메시지를 수신한다.
  • 어떤 라인이 여러 파티션에서 메시지를 받을 때 메시지를 받는 순서를 보장하지 않습니다.파티션 3에서 5개의 메시지를 받고, 파티션 4에서 6개의 메시지를 받고, 이어서 파티션 3에서 10개의 메시지를 받을 수 있습니다.
  • 더 많은 라인을 추가할 때kafka가re-balance를 하게 되고partition과 라인의 대응 관계를 바꿀 수 있습니다.
  • 갑작스럽게 컨소시엄과 브로커를 멈추면 메시지가 중복되는 경우를 피하기 위해 shutdown 이전에 Thread를 통과한다.sleep(10000)Consumer가 offset을 zookeeper
  • 에 동기화할 수 있는 시간
    예.
    Maven 의존성
          
            
                org.apache.kafka
                kafka_2.10
                0.8.2.0
            
            
                org.apache.kafka
                kafka-clients
                0.8.2.0
            

    Consumer 스레드
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.message.MessageAndMetadata;
    
    public class ConsumerThread implements Runnable {
     private KafkaStream kafkaStream;
     //    
     private int threadNumber;
     public ConsumerThread(KafkaStream kafkaStream, int threadNumber) {
      this.threadNumber = threadNumber;
      this.kafkaStream = kafkaStream;
     }
     public void run() {
      ConsumerIterator it = kafkaStream.iterator();
      StringBuffer sb = new StringBuffer();
    //       Kafka    ,            
      while (it.hasNext()) {
       MessageAndMetadata metaData = it.next();
       sb.append("Thread: " + threadNumber + " ");
       sb.append("Part: " + metaData.partition() + " ");
       sb.append("Key: " + metaData.key() + " ");
       sb.append("Message: " + metaData.message() + " ");
       sb.append("
    "); System.out.println(sb.toString()); } System.out.println("Shutting down Thread: " + threadNumber); } }

    나머지 절차
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
     
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
     
    public class ConsumerGroupExample {
        private final ConsumerConnector consumer;
        private final String topic;
        private  ExecutorService executor;
     
        public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
            consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                    createConsumerConfig(a_zookeeper, a_groupId));
            this.topic = a_topic;
        }
     
        public void shutdown() {
            if (consumer != null) consumer.shutdown();
            if (executor != null) executor.shutdown();
        }
     
        public void run(int a_numThreads) {
            Map topicCountMap = new HashMap();
            topicCountMap.put(topic, new Integer(a_numThreads));
            //   Map     Topic     KafkaStream
            Map>> consumerMap = consumer.createMessageStreams(topicCountMap);
            List> streams = consumerMap.get(topic);
     
            //  Java   
            executor = Executors.newFixedThreadPool(a_numThreads);
     
            //    consume     messages
            int threadNumber = 0;
            for (final KafkaStream stream : streams) {
                executor.submit(new ConsumerTest(stream, threadNumber));
                threadNumber++;
            }
        }
     
        private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
            Properties props = new Properties();
            //     Zookeeper  ,             Partition Consumer Offerset
            props.put("zookeeper.connect", a_zookeeper);
           //consumer group  ID
            props.put("group.id", a_groupId);
            //Kafka  Zookeeper     (  )
            props.put("zookeeper.session.timeout.ms", "400");
           //ZooKeeper  ‘follower’    Master    
            props.put("zookeeper.sync.time.ms", "200");
          //consumer  offerset Zookeeper   
            props.put("auto.commit.interval.ms", "1000");
     
            return new ConsumerConfig(props);
        }
     
        public static void main(String[] args) {
            String zooKeeper = args[0];
            String groupId = args[1];
            String topic = args[2];
            int threads = Integer.parseInt(args[3]);
     
            ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
            example.run(threads);
             //  consumer offerset         zookeeper(           ),  shutdown Consumer   ,           
            //  sleep  , consumer offset   zookeeper
            try {
                Thread.sleep(10000);
            } catch (InterruptedException ie) {
     
            }
            example.shutdown();
        }
    }

    좋은 웹페이지 즐겨찾기