Kafka 소스 해독-생산자KafkaProducer

Kafka 소스 해독-생산자KafkaProducer(중단 지속 더...)


KafkaProducer 클래스에 대한 참고 사항은 다음과 같습니다.
  • producer는 라인이 안전하기 때문에 다중 라인은 하나의 단일 모드를 사용하는 대상이 여러 대상보다 빠르다.
  • producer는 서버에 아직 발송되지 않은 메시지 데이터를 저장하는 버퍼가 있고 백엔드 I/O 루틴은 이 메시지 데이터를 리퀘스트 요청으로kafka 그룹에 전송하는 것을 책임진다. 발송이 완료된 후에 반드시 프로세스를 close해서 떨어뜨려야 한다. 그렇지 않으면 데이터가 분실될 수 있다.
  • 메시지 데이터(record)를 보내는 리퀘스트가 실패하면 Producer는 자동으로 다시 시도하고 0으로 설정하면 자동으로 다시 시도하지 않습니다.주의: 사용하면 중복 발송 가능합니다.
  • producer는 각 구역에서 기록이 전송되지 않은 버퍼를 유지합니다. 이 버퍼의 크기는 정의됩니다. 값이 대량으로 처리될수록 단일 전송 데이터가 크지만 호스트 메모리의 제한을 받습니다(각 활동 중의partition에는 이런 버퍼가 있습니다)
  • 기본적으로 버퍼에 사용하지 않은 공간이 추가로 있어도 즉시sendbuffer는 버퍼의 데이터를 보냅니다.리퀘스트 수를 줄이려면 설정 요구 값이 0보다 크면, Producer는 리퀘스트 이전에 이 밀리초 값을 기다리고, TCP의 Nagle 알고리즘과 같은 횟수에 더 많은 기록을 원합니다.예를 들어 아래 코드에서 만약에 100개의 리코더가 하나의 요청에 발송된다면 우리는 1로 설정하고 버퍼가 가득 차지 않을 때 1ms를 기다리며 더 많은 리코더가 버퍼에 도착하기를 기다린다.주의: 0으로 설정하면 시간에 밀접하게 도착하는 Record도 보통 함께 일괄 처리되기 때문에 지연 설정이 어떻든지 간에 최종적으로 일괄 처리가 발생합니다.0보다 큰 값으로 설정하면 최대 부하의 영향을 받지 않고 적은 지연을 대가로 더 적고 효율적인 요청을 할 수 있습니다.
  • 캐시 영역의 크기를 설정합니다. 리코더의 전송 속도가 서버에 보내는 속도보다 크면 버퍼 공간이 다 사용되고 추가 콜이 막힙니다. 막힌 한도값은 설정에 의해 설정됩니다. 초과되면 TimeoutException
  • 을 던집니다.
    //  producer 
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    //  
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
    Producer producer = new KafkaProducer<>(props);
    for(int i = 0; i < 100; i++)
        producer.send(new ProducerRecord("my-topic", Integer.toString(i), Integer.toString(i)));
    
    producer.close();

    매개 변수

        private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
        private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
        private static final String JMX_PREFIX = "kafka.producer";
        // client.id
        private String clientId;
        private final Partitioner partitioner;
        private final int maxRequestSize;
        private final long totalMemorySize;
        //  Kafka 
        private final Metadata metadata;
        private final RecordAccumulator accumulator;
        private final Sender sender;
        private final Metrics metrics;
        // i/o , 
        private final Thread ioThread;
        private final CompressionType compressionType;
        private final Sensor errors;
        //  
        private final Time time;
        private final Serializer keySerializer;
        private final Serializer valueSerializer;
        //  
        private final ProducerConfig producerConfig;
        private final long maxBlockTimeMs;
        private final int requestTimeoutMs;
        private final ProducerInterceptors interceptors;
    

    메서드


    KafkaProducer(ProducerConfig, Serializer, Serializer)
  • 구조 방법은 주로 제품 설정 정보, K와 V의 서열화 클래스를 초기화하는 것이다.당분간 해독을 하지 않겠습니다. 관심 있으신 분들은 카프카 원본을 보시고 주소는 문말을 보십시오.
  • 주의: 제품 사용이 완료된 후에close()를 호출하여 자원 데이터가 유출되지 않도록 해야 한다.

  • send(ProducerRecord,Callback)
  • send 방법은 비동기적이다. 이 방법을 호출하면 기록 데이터(record)를 아직 발송하지 않은 기록 데이터(record) 버퍼에 보내고 리코더 메타데이터를 업데이트한다. 발송할 때 리코더 제공 방법(Callback)을 호출하는 것을 확인한다. 장점은producer가 단일 메시지 (record)을 대량으로 발송하여 효율을 높일 수 있다는 것이다.
  • 반환 결과 RecordMetadata는 Records가 보낸 지정한 구역, 편이도, 시간 스탬프를 기록합니다.Topic에서 CreateTime을 사용하는 경우 타임 스탬프는 사용자에게 제공되는 타임 스탬프이고 사용자가 레코드에 타임 스탬프를 지정하지 않으면 레코드에 시간을 보냅니다.Topic에 LogAppendTime을 사용하면 메시지를 추가할 때 시간 스탬프는 Kafka broker 호스트의 로컬 시간입니다.
  • send 호출은 비동기적이기 때문에 이 기록에 분배될 RecordMetadata의 미래를 되돌려줍니다.이 다음에 get () 을 호출하면 관련 요청이 완료될 때까지 막히고 기록된 메타데이터를 되돌려주거나 기록을 보낼 때 발생하는 이상을 던집니다.
  • 아날로그 차단 코드
  •  byte[] key = "key".getBytes();
     byte[] value = "value".getBytes();
     ProducerRecord record = new ProducerRecord("my-topic", key, value)
     producer.send(record).get();
  • 완전히 막히지 않습니다. 리셋 파라미터를 사용하여 리셋을 제공할 수 있습니다. 이 리셋은 요청이 완료될 때 호출됩니다.
  • ProducerRecord record = new ProducerRecord("the-topic", key, value);
     producer.send(myRecord,
                   new Callback() {
                       public void onCompletion(RecordMetadata metadata, Exception e) {
                           if(e != null) {
                              e.printStackTrace();
                           } else {
                              System.out.println("The offset of the record we just sent is: " + metadata.offset());
                           }
                       }
                   });
  • 같은 구역에 보낸 기록에 대해 순서대로 리셋을 실행한다.즉, 아래의 예에서 콜백1은 콜백2 이전에 실행될 것을 보증한다.
  • producer.send(new ProducerRecord(topic, partition, key1, value1), callback1);
    producer.send(new ProducerRecord(topic, partition, key2, value2), callback2);
     
  • 업무의 일부분으로 사용할 때 리셋 함수를 정의하거나 미래의 결과를 검사해서 보내는 오류를 검사할 필요가 없습니다.만약 발송 호출이 실패하고 복구할 수 없는 오류가 발생하면, 마지막 commitTransaction () 호출이 실패하고, 지난번 실패한 발송에서 이상이 발생합니다.이 경우, 프로그램은 abortTransaction () 을 호출하여 상태를 초기화하고 데이터를 계속 보내야 합니다.

  • 참조: KafkaProducer 설명서

    좋은 웹페이지 즐겨찾기