kafka 0.8 간단 한 예 읽 기
시스템 의 flume 은 데 이 터 를 kafka 에 기록 합 니 다.데이터 의 정확성 을 검증 하기 위해 저 는 kafka 를 읽 는 애플 릿 을 썼 습 니 다.그래서 이 프로젝트 의 주요 의 미 는 데이터 검증 을 하기 위해 실제 생산 환경의 배치 가 이것 보다 복잡 하 다 는 것 이다.
저 는 Maven 관리 자바 프로젝트 를 사용 합 니 다.
우선 Maven 의 dependency 는 다음 과 같 습 니 다.
4.0.0
org.iker.example
kafka.consumer
0.0.1
jar
kafka.consumer
http://maven.apache.org
UTF-8
junit
junit
3.8.1
test
org.scala-lang
scala-library
2.10
log4j
log4j
1.2.15
com.sun.jmx
jmxri
com.sun.jdmk
jmxtools
javax.jms
jms
org.slf4j
slf4j-simple
1.6.4
org.apache.kafka
kafka_2.10
0.8.2.1
jmxri
com.sun.jmx
jms
javax.jms
jmxtools
com.sun.jdmk
com.101tec
zkclient
0.3
com.yammer.metrics
metrics-core
2.2.0
com.yammer.metrics
metrics-annotation
2.2.0
org.easymock
easymock
3.0
test
org.scalatest
scalatest
1.2
test
maven-assembly-plugin
jar-with-dependencies
config
package
single
두 개의 자바 클 라 스 가 있 습 니 다.하 나 는 사용자 의 상호작용 을 제공 하고 하 나 는 실제 적 으로 kafka 를 읽 는 데 사 용 됩 니 다.명령 행 을 통 해 kafka 에서 데 이 터 를 읽 고 이 프로그램 에 따라 원 하 는 프로그램 을 쓸 수 있 습 니 다.
ConsumerApp 은 사용자 와 상호작용 을 제공 합 니 다.
package org.iker.example.kafka.consumer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
//import org.iker.example.kafka.producer.ProducerApp;
/**
* Simple Kafka message consumer
*/
public class ConsumerApp {
private final ConsumerConfig config;
private final ConsumerConnector consumer;
private ExecutorService executor;
private static String KAFKA_TOPIC = "dx_sem_server_logs";
public ConsumerApp(String zooKeeper, String groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", zooKeeper);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
this.config = new ConsumerConfig(props);
this.consumer = Consumer.createJavaConsumerConnector(this.config);
}
public void shutdown() {
if (this.consumer != null) {
this.consumer.shutdown();
}
if (this.executor != null) {
executor.shutdown();
}
try {
if (!this.executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
System.out
.println("Timed out waiting for consumer threads to shut down, exiting uncleanly.");
}
} catch (InterruptedException e) {
System.out
.println("Interrupted during shutdown, exiting uncleanly.");
}
}
public void run(int numThreads) {
Map topicCountMap = new HashMap();
topicCountMap.put(ConsumerApp.KAFKA_TOPIC, new Integer(numThreads));
Map>> consumerMap = this.consumer
.createMessageStreams(topicCountMap);
List> streams = consumerMap
.get(ConsumerApp.KAFKA_TOPIC);
// now launch all the threads
//
this.executor = Executors.newFixedThreadPool(numThreads);
// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
this.executor.submit(new ConsumerThread(stream, threadNumber));
threadNumber++;
}
}
public static void main(String[] args) {
String zooKeeper = args[0];
String groupId = args[1];
//int threads = Integer.parseInt(args[2]);
int threads = 1;
ConsumerApp app = new ConsumerApp(zooKeeper, groupId);
app.run(threads);
try {
while (true) {
Thread.sleep(10000L);
}
} catch (Exception e) {
e.printStackTrace();
}
app.shutdown();
}
}
ConsumerThread 는 kafka 를 읽 는 데 사 용 됩 니 다.
package org.iker.example.kafka.consumer;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
public class ConsumerThread implements Runnable {
private final int threadSerial;
private final KafkaStream stream;
public ConsumerThread(KafkaStream stream, int threadSerial) {
this.threadSerial = threadSerial;
this.stream = stream;
}
public void run() {
ConsumerIterator iter = this.stream.iterator();
String fileName = threadSerial + ".data";
System.out.println("Start Thread: " + this.threadSerial);
int cnt = 0;
try {
DataOutputStream out = new DataOutputStream(new FileOutputStream(fileName));
while (iter.hasNext()) {
byte[] bytes = iter.next().message();
out.writeInt(bytes.length);
out.write(bytes);
out.flush();
cnt++;
}
out.close();
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Total write: " + cnt);
System.out.println("Shutting down Thread: " + this.threadSerial);
}
}
컴 파일 은 매우 간단 하 다.
mvn cleanpackage
target 디 렉 터 리 에서 jar 가방 을 받 을 수 있 습 니 다.
실행 도 간단 합 니 다:
java -cp$your_jar_file org.iker.example.kafka.consumer.ConsumerApp
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Is Eclipse IDE dying?In 2014 the Eclipse IDE is the leading development environment for Java with a market share of approximately 65%. but ac...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.