(3) Kafka의 생산자 소비자 Api 사용 및 파라미터 설정에 대한 상세한 설명

8190 단어 kafka
  • 생산자 코드

  • public class HelloKafkaProducer {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            Properties properties = new Properties();
            /*    */
            properties.put("bootstrap.servers", "s227:9092,s228:9092,s229:9092");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            KafkaProducer producer = new KafkaProducer<>(properties);
            CountDownLatch countDownLatch = new CountDownLatch(10000);
            for (int i = 0; i < 10000; i++) {
                ProducerRecord record = new ProducerRecord<>("test01", "test_key" + i, "test_value" + i);
    //            Future future = producer.send(record);
    //            RecordMetadata recordMetadata = future.get();
    //            if (null != recordMetadata) {
    //                System.out.println(recordMetadata.offset() + "=>" + recordMetadata.partition());
    //            }
                producer.send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
                        if (null != exception) {
                            exception.printStackTrace();
                        }
                        if (recordMetadata != null) {
                            System.out.println(recordMetadata.offset() + "=>" + recordMetadata.partition());
                        }
                        countDownLatch.countDown();
                    }
                });
            }
            countDownLatch.await();
            System.out.println("      !");
            producer.close();
        }
    }
  • 소비자 코드

  • public class HelloKafkaConsumer {
        public static void main(String[] args) {
            Properties properties = new Properties();
            /*    */
            properties.put("bootstrap.servers", "s227:9092,s228:9092,s229:9092");
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("group.id", "test01_group");
            KafkaConsumer consumer = new KafkaConsumer(properties);
            String topic = "test01";
            consumer.subscribe(Collections.singleton(topic));
    
            while (true) {
                ConsumerRecords records = consumer.poll(500);//500        
                records.forEach(record -> {
                    System.out.println("  :" + record.partition());
                    System.out.println("        :" + record.offset());
                    System.out.println("key:" + record.key());
                    System.out.println("value:" + record.value());
                    System.out.println("===========================================");
                });
            }
    
        }
    }
  • 다중 스레드 모드에서 생산자 코드

  • import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * 〈    〉
    * 〈〉 * * @author * @create 2019/5/8 * @since 1.0.0 */ public class KafkaConcurrentProducer { private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); private static CountDownLatch countDownLatch = new CountDownLatch(1000); private static DemoUser makeUser(int id) { DemoUser demoUser = new DemoUser(id); demoUser.setName("xiaowen" + id); return demoUser; } public static void main(String[] args) throws InterruptedException { Properties properties = new Properties(); properties.put("bootstrap.servers", "s227:9092,s228:9092,s229:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // ProducerConfig.ACKS_CONFIG //thread safe class KafkaProducer producer = new KafkaProducer<>(properties); for (int i = 0; i < 1000; i++) { DemoUser demoUser = makeUser(i); ProducerRecord record = new ProducerRecord<>( "test01", null, System.currentTimeMillis(), demoUser.getId() + "", demoUser.toString()); ProducerWorker worker = new ProducerWorker(record, producer); executorService.submit(worker); } countDownLatch.await(); executorService.shutdown(); } private static class ProducerWorker implements Runnable { private ProducerRecord record; private KafkaProducer producer; public ProducerWorker(ProducerRecord record, KafkaProducer producer) { this.record = record; this.producer = producer; } @Override public void run() { String id = Thread.currentThread().getId() + "-" + System.identityHashCode(producer); producer.send(record, (recordMetadata, exception) -> { if (null != exception) { exception.printStackTrace(); } if (recordMetadata != null) { System.out.println(" :" + recordMetadata.offset() + "=> :" + recordMetadata.partition()); } System.out.println(id + ":[ -" + record + "-] "); countDownLatch.countDown(); }); } } public static class DemoUser { private int id; private String name; @Override public String toString() { return "DemoUser{" + "id=" + id + ", name='" + name + '\'' + '}'; } public String getName() { return name; } public void setName(String name) { this.name = name; } public DemoUser(int id) { this.id = id; } public int getId() { return id; } public void setId(int id) { this.id = id; } } }

     
  • Kafka 생산자 매개 변수


  • 1.acks에 필요한 파티션 복제본 수
    0 1 (기본값) all 안정성이 가장 높고 지연 시간이 높음
     2.buffer.memory 생산자 버퍼 크기 (생산이 너무 빠르면 막히거나 투매 이상을 초래할 수 있음)
    32M(기본값)
    3.max.block.ms에서 kafka 메타데이터 (구역,offset) 의 정보를 가져오는 데 최대 시간 대기
    기본 60000ms 60초
    4.retries 생산자 재시도 횟수 메시지 발송 실패 최대 시도 횟수
    0은(는) 다시 시도하지 않습니다.retry.backoff.ms 및 4 조합
    마지막 재발급 및 다음 재발급 간격 100ms(기본값)
    6.batch.크기 메시지 차지 메모리 크기
    16k(기본값)
    7.linger.ms는 메시지batchSize를 보내는 시간과 6의 조합이 작용하는 것을 가리킨다
    기본값은 0입니다.
    6과 7의 조합으로 먼저 도착하는 대로 보내주세요.
    8.compression.type 
    none(기본값), gzip, snappy
    9.client.id
    표지부호는 임의의 문자열을 설정하여 메시지 추적broker에서 어떤 생산자가 보낸 메시지인지 확인하는 데 사용합니다
    10.max.in.flight.requests.per.connection
    기본값은 1입니다.
    생산자는 브로커가 호응을 줘야 생산을 계속할 수 있어요.
    11.max.request.size
    생산자 생산 소식의 최대 크기를 조절하다
    1M (기본값) 1 요청 일괄 메시지의 크기와 서버.properties의 메시지.max.bytes는 똑같아요.

    좋은 웹페이지 즐겨찾기