Kafka 자바 api-소비자 코드 와 소비 분석,생산자 소비자 프로필 상세 설명

32358 단어 kafka
1.소비자 코드 는 소비자 에 게 사용 되 므 로 앞에서 쓴 생산자 코드 도 먼저 붙 여야 합 니 다.생산자 코드 와 사용자 정의 partition 은 maven 가이드 백 을 사용 합 니 다.
<dependencies>
      <dependency>
           <groupId>com.alibaba.jstormgroupId>
           <artifactId>jstorm-coreartifactId>
           <version>2.1.1version>
           <exclusions>
               <exclusion>
                   <groupId>org.slf4jgroupId>
                   <artifactId>slf4j-nopartifactId>
               exclusion>
               <exclusion>
                   <groupId>org.slf4jgroupId>
                   <artifactId>slf4j-jdk14artifactId>
               exclusion>
           exclusions>
       dependency>
       <dependency>
           <groupId>org.apache.kafkagroupId>
           <artifactId>kafka_2.8.2artifactId>
           <version>0.8.1version>
           <exclusions>
               <exclusion>
                   <artifactId>jmxtoolsartifactId>
                   <groupId>com.sun.jdmkgroupId>
               exclusion>
               <exclusion>
                   <artifactId>jmxriartifactId>
                   <groupId>com.sun.jmxgroupId>
               exclusion>
               <exclusion>
                   <artifactId>jmsartifactId>
                   <groupId>javax.jmsgroupId>
               exclusion>
               <exclusion>
                   <groupId>org.apache.zookeepergroupId>
                   <artifactId>zookeeperartifactId>
               exclusion>
               <exclusion>
                   <groupId>org.slf4jgroupId>
                   <artifactId>slf4j-log4j12artifactId>
               exclusion>
               <exclusion>
                   <groupId>org.slf4jgroupId>
                   <artifactId>slf4j-apiartifactId>
               exclusion>
           exclusions>
       dependency>
   dependencies>
/**
 *        Kafka producer  
 *       :
 * 1、    
 * 2、        partition      
 *
 *
 * KafkaSpout  
 */
public class KafkaProducerSimple {
    public static void main(String[] args) {
        /**
         * 1、    kafka producer         
         *    topic        , kafka           。
         *  bin/kafka-topics.sh --create --zookeeper mini1:2181 --replication-factor 2 --partitions 3 --topic test
         */

        String TOPIC = "orderMq";
        /**
         * 2、      
         */
        Properties props = new Properties();
        /*
         * key.serializer.class   serializer.class  key         
         */
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        /*
         * kafka broker     ,   host1:port1,host2:port2
         */
        props.put("metadata.broker.list", "mini1:9092,mini2:9092,mini3:9092");
        /*
         * request.required.acks,                ,    0,1,-1
         * 0,   producer          broker ack,   0.7     。
         *             ,            , server            。
         * 1,    leader replica        ,producer     ack。
         *              ,   server         ,client    。
         *      leader ,       leader   ,          。
         * -1,       ISR       ,producer     ack。
         *              ,      replica  ,         
         */
        props.put("request.required.acks", "1");
        /*
         *     ,     ,      partitioner partitioner.class
         *    :kafka.producer.DefaultPartitioner
         *          partition ,      key  hash。
         */
        props.put("partitioner.class", "com.scu.kafka.MyLogPartitioner");
//        props.put("partitioner.class", "kafka.producer.DefaultPartitioner");
        /**
         * 3、      ,     
         */
        Producer producer = new Producer(new ProducerConfig(props));
        /**
         * 4、  for      
         */
        for (int messageNo = 1; messageNo < 100000; messageNo++) {
            /**
             * 5、  producer send      
             *   :       partitionKey,        MyLogPartitioner      
             */
            producer.send(new KeyedMessage(TOPIC, messageNo + "", "appid" + UUID.randomUUID() + "itcast"));
        }
    }
}
public class MyLogPartitioner implements Partitioner {
    private static Logger logger = Logger.getLogger(MyLogPartitioner.class);

    public MyLogPartitioner(VerifiableProperties props) {
    }

    /**
     *
     * @param obj    key      hash  partition
     * @param numPartitions   partition          topic,  partition       ,     2
     * @return      partition
     */
    public int partition(Object obj, int numPartitions) {
        return Integer.parseInt(obj.toString())%numPartitions;
    }

}

주:orderMq 라 는 topic 는 일찍부터 명령 행 을 통 해 만 들 었 습 니 다.partition 은 3 개 로 지정 되 었 습 니 다.다음은 소비자 코드.
public class KafkaConsumerSimple implements Runnable {
    public String title;
    public KafkaStream<byte[], byte[]> stream;
    public KafkaConsumerSimple(String title, KafkaStream<byte[], byte[]> stream) {
        this.title = title;
        this.stream = stream;
    }
    public void run() {
        System.out.println("     " + title);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        /**
         *     stream        ,        ,hasNext()   
         *      `ConsumerConnector#shutdown`,  `hasNext`   false
         * */
        while (it.hasNext()) {
            MessageAndMetadata<byte[], byte[]> data = it.next();
            String topic = data.topic();
            int partition = data.partition();
            long offset = data.offset();
            String msg = new String(data.message());
            System.out.println(String.format(
                    "Consumer: [%s],  Topic: [%s],  PartitionId: [%d], Offset: [%d], msg: [%s]",
                    title, topic, partition, offset, msg));
        }
        System.out.println(String.format("Consumer: [%s] exiting ...", title));
    }

    public static void main(String[] args) throws Exception{
        Properties props = new Properties();
        props.put("group.id", "dashujujiagoushi");//      ,   
        props.put("zookeeper.connect", "mini1:2181,mini2:2181,mini3:2181");//zookeeper  
        props.put("auto.offset.reset", "largest");//        
        props.put("auto.commit.interval.ms", "1000");
        props.put("partition.assignment.strategy", "roundrobin");//      
        ConsumerConfig config = new ConsumerConfig(props);
        String topic1 = "orderMq";
        String topic2 = "paymentMq";
        //  ConsumerConnector    ,consumer        ,      
        ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(config);
        //    map
        Map topicCountMap = new HashMap();
        topicCountMap.put(topic1, 3);
        //Map>  String topic, List     
        Mapbyte[], byte[]>>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap);
        //   `kafkaTest`     streams
        Listbyte[], byte[]>> streams = topicStreamsMap.get(topic1);
        //       4    
        ExecutorService executor = Executors.newFixedThreadPool(3);
        //  20 consumer threads
        for (int i = 0; i < streams.size(); i++)
            executor.execute(new KafkaConsumerSimple("   " + (i + 1), streams.get(i)));
    }
}

테스트:먼저 소비자 프로그램 을 실행 합 니 다.파 티 션 디 렉 터 리 에 있 는 segment 파일 은 이전에 생 성 된 데이터 가 있 지만 인쇄 되 지 않 고 계속 알림 을 줍 니 다.(이미 소비 상태 로 표 시 된 것 은 더 이상 소비 하지 않 습 니 다.기본 적 인 상황 은 이 렇 습 니 다.0 부터 소 비 를 시작 할 수 있 습 니 다)
15:10:38.228 [main-SendThread(mini1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x15fdedc70380022 after 1ms
15:10:40.230 [main-SendThread(mini1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x15fdedc70380022 after 4ms

생산 이 필요 하기 때문에 생산자 프로그램 을 실행 하고 콘 솔 은 다음 과 같이 인쇄 합 니 다.
...
Consumer: [   1],  Topic: [orderMq],  PartitionId: [1], Offset: [17857], msg: [appidc977abb2-f0bc-41da-9daa-6b080321947bitcast]
Consumer: [   2],  Topic: [orderMq],  PartitionId: [0], Offset: [17724], msg: [appid9101368e-ac81-4bbf-b2b5-8f2facd41f54itcast]
Consumer: [   1],  Topic: [orderMq],  PartitionId: [1], Offset: [17858], msg: [appidb145da08-bb61-42e7-b140-9fed576c2faeitcast]
Consumer: [   1],  Topic: [orderMq],  PartitionId: [1], Offset: [17859], msg: [appid909a90ae-c0fb-42ac-97de-6d7438895e07itcast]
Consumer: [   3],  Topic: [orderMq],  PartitionId: [2], Offset: [17713], msg: [appid157754b5-6958-4286-9c25-ff67ccc61a42itcast]
Consumer: [   3],  Topic: [orderMq],  PartitionId: [2], Offset: [17714], msg: [appidb93b9355-4713-4e22-823a-756b4fe75bdfitcast]
Consumer: [   3],  Topic: [orderMq],  PartitionId: [2], Offset: [17715], msg: [appidf82ca658-528a-4f40-a023-8a155c15eaa1itcast]
...

요약 하면 다음 과 같다.
Consumer: [   1],  Topic: [orderMq],  PartitionId: [1], Offset: [17857], msg: [appidc977abb2-f0bc-41da-9daa-6b080321947bitcast]
Consumer: [   2],  Topic: [orderMq],  PartitionId: [0], Offset: [17724], msg: [appid9101368e-ac81-4bbf-b2b5-8f2facd41f54itcast]
Consumer: [   3],  Topic: [orderMq],  PartitionId: [2], Offset: [17713], msg: [appid157754b5-6958-4286-9c25-ff67ccc61a42itcast]

세 소비자 가 소비 에 대응 하 는 파 티 션 을 볼 수 있다.그러면 다음 문 제 를 고려 하여 orderMq 를 만 들 때 partition 을 3 개 로 지정 합 니 다.이때 제 가 KafkaStream 5 개 를 만 들 기로 지정 하면 어떻게 소비 할 까요?소비자 코드 는 다음 과 같이 두 번 수정 합 니 다.
topicCountMap.put(topic1, 5);
ExecutorService executor = Executors.newFixedThreadPool(5);

다시 한 번 위 와 같이 실행 하면 수출 결 과 는 3 명의 소비자 만 볼 수 있 기 때문에 KafkaStream 을 partition 보다 많이 지정 하 는 것 은 소 용이 없고 해당 수량의 소비자 만 해당 하 는 partition 의 데 이 터 를 소비 할 수 있다.
Consumer: [   2],  Topic: [orderMq],  PartitionId: [2], Offset: [26420], msg: [appid4b778b51-33c7-42de-83c2-5b85f8f2428aitcast]
Consumer: [   3],  Topic: [orderMq],  PartitionId: [0], Offset: [26423], msg: [appid86045c25-7b3f-4c82-ad2a-3e8e11958b28itcast]
Consumer: [   4],  Topic: [orderMq],  PartitionId: [1], Offset: [26562], msg: [appid213b5a91-a7bf-4a39-b585-456d95748566itcast]

지 정 된 KafkaStream 이 2 밖 에 없다 면?테스트 를 하지 않 은 결과 한 소비 자 는 파 티 션 2 개 를 소비 하고 다른 소비 자 는 파 티 션 1 개 중 데 이 터 를 소비 하 는 것 으로 나 타 났 다.
생산자,소비자 프로필 설명 은 자바 api 로 생산자 코드 를 쓰 든 소비자 코드 를 쓰 든 프로필 을 사용 합 니 다.다음은 생산자 와 소비자 프로필 소개 입 니 다.
생산자 프로필 설명
#  kafka    ,    metadata,      
metadata.broker.list=kafka01:9092,kafka02:9092

#        。  kafka.producer.DefaultPartitioner,   key       
#partitioner.class=kafka.producer.DefaultPartitioner

#     ,  0     ,1   gzip  ,2   snappy  。                  ,                  。
compression.codec=none

#         
serializer.class=kafka.serializer.DefaultEncoder

#        ,      topic     ,  empty,     。
#compressed.topics=

#                 ,    0,1,-1
# 0: producer    broker  ack
# 1:  leader         ack
# -1:     follower          ack.
request.required.acks=0

#   producer  ack  ,broker          ,    ,broker   producer    error ACK.                  (  follower      )
request.timeout.ms=10000

#           ,  “sync”   ,"async"   。           ,
           buffer ,       ,                 
producer.type=sync

#  async   , message           ,       broker,   5000ms
#    batch.num.messages    .
queue.buffering.max.ms = 5000

#  async   ,producer   buffer      
#     ,producer            broker,       producer     
#   ,           ,    producer          ,   10000
queue.buffering.max.messages=20000

#      ,           ,   200
batch.num.messages=500

#     producer        "queue.buffering.max.meesages" 
#        ,      enqueue(producer           )
#   producer             , timeout     "  "   
# -1:        ,       
# 0:      ,     
queue.enqueue.timeout.ms=-1


#  producer   error ACK,       ACK ,         
#   broker               ,        (  ACK  )
#      broker        ,    3.
message.send.max.retries=3

# producer  topic metada     ,producer    partition leader   ,    topic   
#   producer            metadata, producer       ,      
# (  topic  ,partition  ,leader   ),                    ,   600000
topic.metadata.refresh.interval.ms=60000

소비자 프로필 설명
# zookeeper       
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181

# zookeeper session    ,  5000ms,           
zookeeper.session.timeout.ms=5000

#      ,                           
zookeeper.connection.timeout.ms=10000

#          offset zookeeper 。  offset     time          。     zookeeper       ,            
zookeeper.sync.time.ms=2000

#     
group.id=xxx

#  consumer          ,     zookeeper  offset  
#   offset              zk    ,        (  ),     ,   true
auto.commit.enable=true

#       。  60 * 1000
auto.commit.interval.ms=1000

#   consumer   ,    ,        ,            ,    
conusmer.id=xxx

#         ,         ,           
client.id=xxxx

#             (  10)
queued.max.message.chunks=50

#     consumer   group ,  reblance,     partitions          consumer ,    consumer     partition     ,      zk   "Partition Owner registry"    ,         consumer        ,       ,         .
rebalance.max.retries=5

#          ,broker   consumer         chunk   feth       ,      ,    ,       consumer   
fetch.min.bytes=6553600

#          ,server     ,    ,        consumer
fetch.wait.max.ms=5000
socket.receive.buffer.bytes=655360

#   zookeeper  offset  offset     。        offset。 smallest、largest、anything  ,          offset、     offset、   。  largest
auto.offset.reset=smallest

#         
derializer.class=kafka.serializer.DefaultDecoder

좋은 웹페이지 즐겨찾기