Kafka 소스 해독-생산자KafkaProducer
Kafka 소스 해독-생산자KafkaProducer(중단 지속 더...)
KafkaProducer 클래스에 대한 참고 사항은 다음과 같습니다.
//  producer 
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
//  
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
    producer.send(new ProducerRecord("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();  매개 변수
    private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
    private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
    private static final String JMX_PREFIX = "kafka.producer";
    // client.id
    private String clientId;
    private final Partitioner partitioner;
    private final int maxRequestSize;
    private final long totalMemorySize;
    //  Kafka 
    private final Metadata metadata;
    private final RecordAccumulator accumulator;
    private final Sender sender;
    private final Metrics metrics;
    // i/o , 
    private final Thread ioThread;
    private final CompressionType compressionType;
    private final Sensor errors;
    //  
    private final Time time;
    private final Serializer keySerializer;
    private final Serializer valueSerializer;
    //  
    private final ProducerConfig producerConfig;
    private final long maxBlockTimeMs;
    private final int requestTimeoutMs;
    private final ProducerInterceptors interceptors;
   메서드
KafkaProducer(ProducerConfig, Serializer, Serializer)
send(ProducerRecord,Callback)
 byte[] key = "key".getBytes();
 byte[] value = "value".getBytes();
 ProducerRecord record = new ProducerRecord("my-topic", key, value)
 producer.send(record).get();  ProducerRecord record = new ProducerRecord("the-topic", key, value);
 producer.send(myRecord,
               new Callback() {
                   public void onCompletion(RecordMetadata metadata, Exception e) {
                       if(e != null) {
                          e.printStackTrace();
                       } else {
                          System.out.println("The offset of the record we just sent is: " + metadata.offset());
                       }
                   }
               });  producer.send(new ProducerRecord(topic, partition, key1, value1), callback1);
producer.send(new ProducerRecord(topic, partition, key2, value2), callback2);
   참조: KafkaProducer 설명서
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
마이크로서비스 프레임워크적인 것을 목표로(Apach Kafka편)최종적으로는 SpirngBootLayer + BusinessLogicLayer + RepositoryLayer 라고 하는 구성의 서비스를 만들어, BusinessLogicLayer 이외는 application.yml...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.