kafka 의 0.8.2.1 버 전의 자바 코드 구현
7084 단어 메시지 큐
이 코드 구현 은 kafka2.10 의 0.8.2.1 버 전의 자바 코드 가 실현 되 고 소비 자 는 여러 개의 Topic 소비 에 대한 다 중 스 레 드 실현 이다.
2. 설치
참고: Kafka 간단 한 튜 토리 얼 구축
3. 가 져 오기 의존
kafka 의 의존 만 가 져 옵 니 다. 스 레 드 풀 은 spring 의 Thread PoolTask Executor 스 레 드 풀 을 사용 합 니 다.
org.apache.kafka
kafka_2.10
0.8.2.1
org.apache.kafka
kafka-clients
0.8.2.1
4. 설정
4.1 스 레 드 탱크 의 간단 한 설정
@Configuration
public class SpringAsyncConfig {
@Value(value = "${async.pool.max.size:80}")
private int maxPoolSize;
@Value(value = "${async.pool.queue.size:20}")
private int queueSize;
@Value(value= "${async.pool.core.size:5}")
private int corePoolSize;
@Value(value= "${async.pool.core.size:5}")
private int knowsCorePoolSize;
@Bean(name = "asyncTaskExecutor")
public AsyncTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueSize);
executor.setCorePoolSize(corePoolSize);
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}
}
4.2 Kafka 의 간단 한 설정
@Configuration
public class KafkaConfig {
public Properties producerConfig(){
Properties props = new Properties();
// , Linux Kafka, ,
props.put("bootstrap.servers", "localhost:9092");
props.put("timeout.ms", 3000);
props.put("metadata.fetch.timeout.ms", 3000);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "0");
return props;
}
@Bean
public KafkaProducer kafkaProducer() {
return new KafkaProducer(producerConfig());
}
public Properties consumerConfig(){
Properties props = new Properties();
props.put("auto.offset.reset", "smallest");
// , Linux Kafka, ,
props.put("zookeeper.connect", "172.31.52.83:2181");
props.put("group.id", "defaultGroup");
props.put("zookeeper.session.timeout.ms", "10000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("partition.assignment.strategy", "range");
return props;
}
@Bean
public ConsumerConfig kafkaConsumer() {
ConsumerConfig consumerConfig = new ConsumerConfig(consumerConfig());
return consumerConfig;
}
}
5. 생산자
@Component
public class KafkaProducerClient {
@Autowired
private KafkaProducer kafkaProducer;
private void produce(String topic,String message){
ProducerRecord record = new ProducerRecord<>(topic,message);
kafkaProducer.send(record,(metadata,e) -> {
if(e != null){
//
System.out.println(" ");
}
});
}
}
6. 소비자
몇 개의 Topic 이 있 을 수 있 습 니 다. 여기 서 보 여 주 는 것 은 두 개의 Topic 입 니 다.
6.1 소비자 시동 입구
/**
*
* : InitializingBean
*/
@Component
public class KafkaConsumerStart implements InitializingBean {
/**
*
*/
@Autowired
private AsyncTaskExecutor taskExecutor;
/**
*
*/
@Autowired
private ConsumerConfig consumerConfig;
/**
* , ,Biz , ,, ,
*/
@Autowired
private TopicsHandleBiz topicsHandleBiz;
private ConsumerConnector consumer;
@Override
public void afterPropertiesSet() {
try{
consumer = Consumer.createJavaConsumerConnector(consumerConfig);
Map topicCountMap = new HashMap();
// topics
String testTopic1 = "testTopic1";
String testTopic2 = "testTopic2";
topicCountMap.put(testTopic1, 1);
topicCountMap.put(testTopic2, 1);
Map>> consumerMap = consumer.createMessageStreams(topicCountMap);
// testTopic1
List> streams1 = consumerMap.get(testTopic1);
if(streams1.size() > 0){
taskExecutor.execute(new TestTopicHandle1(streams1.get(0),topicsHandleBiz));
}
// testTopic2
List> streams2 = consumerMap.get(testTopic2);
if(streams2.size() > 0){
taskExecutor.execute(new TestTopicHandle2(streams2.get(0),topicsHandleBiz));
}
}catch (Exception e){
e.printStackTrace();
}
}
}
6.2 스 레 드 처리 testTopic 1
public class TestTopicHandle1 implements Runnable {
private TopicsHandleBiz topicsHandleBiz;
private KafkaStream kafkaStream;
public TestTopicHandle1(KafkaStream kafkaStream,TopicsHandleBiz topicsHandleBiz) throws ClassNotFoundException {
this.kafkaStream = kafkaStream;
this.topicsHandleBiz = topicsHandleBiz;
}
@Override
public void run() {
ConsumerIterator it = kafkaStream.iterator();
while (it.hasNext()) {
String message = new String(it.next().message());
try {
// testTopic1
TopicsHandleBiz.save(message);
}catch (Exception e){
//
e.printStackTrace();
}
}
}
}
6.3 스 레 드 처리 testTopic 2
public class TestTopicHandle2 implements Runnable {
private TopicsHandleBiz topicsHandleBiz;
private KafkaStream kafkaStream;
public TestTopicHandle2(KafkaStream kafkaStream,TopicsHandleBiz topicsHandleBiz) throws ClassNotFoundException {
this.kafkaStream = kafkaStream;
this.topicsHandleBiz = topicsHandleBiz;
}
@Override
public void run() {
ConsumerIterator it = kafkaStream.iterator();
while (it.hasNext()) {
String message = new String(it.next().message());
try {
// testTopic2
TopicsHandleBiz.save(message);
}catch (Exception e){
//
e.printStackTrace();
}
}
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
kafka 의 0.8.2.1 버 전의 자바 코드 구현1. 설명 이 코드 구현 은 kafka2.10 의 0.8.2.1 버 전의 자바 코드 가 실현 되 고 소비 자 는 여러 개의 Topic 소비 에 대한 다 중 스 레 드 실현 이다. 2. 설치 참고: Kafka 간단 한...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.