Kafka 소스 해독-생산자KafkaProducer
Kafka 소스 해독-생산자KafkaProducer(중단 지속 더...)
KafkaProducer 클래스에 대한 참고 사항은 다음과 같습니다.
// producer
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
//
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
매개 변수
private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
private static final String JMX_PREFIX = "kafka.producer";
// client.id
private String clientId;
private final Partitioner partitioner;
private final int maxRequestSize;
private final long totalMemorySize;
// Kafka
private final Metadata metadata;
private final RecordAccumulator accumulator;
private final Sender sender;
private final Metrics metrics;
// i/o ,
private final Thread ioThread;
private final CompressionType compressionType;
private final Sensor errors;
//
private final Time time;
private final Serializer keySerializer;
private final Serializer valueSerializer;
//
private final ProducerConfig producerConfig;
private final long maxBlockTimeMs;
private final int requestTimeoutMs;
private final ProducerInterceptors interceptors;
메서드
KafkaProducer(ProducerConfig, Serializer, Serializer)
send(ProducerRecord,Callback)
byte[] key = "key".getBytes();
byte[] value = "value".getBytes();
ProducerRecord record = new ProducerRecord("my-topic", key, value)
producer.send(record).get();
ProducerRecord record = new ProducerRecord("the-topic", key, value);
producer.send(myRecord,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null) {
e.printStackTrace();
} else {
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
}
});
producer.send(new ProducerRecord(topic, partition, key1, value1), callback1);
producer.send(new ProducerRecord(topic, partition, key2, value2), callback2);
참조: KafkaProducer 설명서
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
마이크로서비스 프레임워크적인 것을 목표로(Apach Kafka편)최종적으로는 SpirngBootLayer + BusinessLogicLayer + RepositoryLayer 라고 하는 구성의 서비스를 만들어, BusinessLogicLayer 이외는 application.yml...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.