kafka 0.8 간단 한 예 읽 기

8572 단어 자바kafka
Kafka 0.8 과 kafka 0.7 의 변화 가 매우 커서 사용 방식 과 인터페이스 class 가 모두 변 했다.0.8 의 kafka 가 데 이 터 를 어떻게 읽 는 지 간단하게 소개 합 니 다.
시스템 의 flume 은 데 이 터 를 kafka 에 기록 합 니 다.데이터 의 정확성 을 검증 하기 위해 저 는 kafka 를 읽 는 애플 릿 을 썼 습 니 다.그래서 이 프로젝트 의 주요 의 미 는 데이터 검증 을 하기 위해 실제 생산 환경의 배치 가 이것 보다 복잡 하 다 는 것 이다.
저 는 Maven 관리 자바 프로젝트 를 사용 합 니 다.
우선 Maven 의 dependency 는 다음 과 같 습 니 다.
 

	4.0.0

	org.iker.example
	kafka.consumer
	0.0.1
	jar

	kafka.consumer
	http://maven.apache.org

	
		UTF-8
	

	
		
			junit
			junit
			3.8.1
			test
		
		
		
		
			org.scala-lang
			scala-library
			2.10
		
		
			log4j
			log4j
			1.2.15
			
				
					com.sun.jmx
					jmxri
				
				
					com.sun.jdmk
					jmxtools
				
				
					javax.jms
					jms
				
			
		
		
			org.slf4j
			slf4j-simple
			1.6.4
		
		
			org.apache.kafka
			kafka_2.10
			0.8.2.1
			
				
					jmxri
					com.sun.jmx
				
				
					jms
					javax.jms
				
				
					jmxtools
					com.sun.jdmk
				
			
		
		
			com.101tec
			zkclient
			0.3
		
		
			com.yammer.metrics
			metrics-core
			2.2.0
		
		
			com.yammer.metrics
			metrics-annotation
			2.2.0
		
		
			org.easymock
			easymock
			3.0
			test
		
		
			org.scalatest
			scalatest
			1.2
			test
		
		
	
	
		
		
				maven-assembly-plugin
				
					
						jar-with-dependencies
					
				
				
					
						config
						package
						
							single
						
					
				
			
			
		
	


두 개의 자바 클 라 스 가 있 습 니 다.하 나 는 사용자 의 상호작용 을 제공 하고 하 나 는 실제 적 으로 kafka 를 읽 는 데 사 용 됩 니 다.명령 행 을 통 해 kafka 에서 데 이 터 를 읽 고 이 프로그램 에 따라 원 하 는 프로그램 을 쓸 수 있 습 니 다.
 ConsumerApp 은 사용자 와 상호작용 을 제공 합 니 다.
package org.iker.example.kafka.consumer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

//import org.iker.example.kafka.producer.ProducerApp;

/**
 * Simple Kafka message consumer
 */
public class ConsumerApp {

	private final ConsumerConfig config;
	private final ConsumerConnector consumer;
	private ExecutorService executor;
	private static String KAFKA_TOPIC = "dx_sem_server_logs";

	public ConsumerApp(String zooKeeper, String groupId) {

		Properties props = new Properties();
		props.put("zookeeper.connect", zooKeeper);
		props.put("group.id", groupId);
		props.put("zookeeper.session.timeout.ms", "400");
		props.put("zookeeper.sync.time.ms", "200");
		props.put("auto.commit.interval.ms", "1000");

		this.config = new ConsumerConfig(props);
		this.consumer = Consumer.createJavaConsumerConnector(this.config);
	}

	public void shutdown() {

		if (this.consumer != null) {
			this.consumer.shutdown();
		}
		if (this.executor != null) {
			executor.shutdown();
		}

		try {
			if (!this.executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
				System.out
						.println("Timed out waiting for consumer threads to shut down, exiting uncleanly.");
			}
		} catch (InterruptedException e) {
			System.out
					.println("Interrupted during shutdown, exiting uncleanly.");
		}
	}

	public void run(int numThreads) {

		Map topicCountMap = new HashMap();
		topicCountMap.put(ConsumerApp.KAFKA_TOPIC, new Integer(numThreads));

		Map>> consumerMap = this.consumer
				.createMessageStreams(topicCountMap);
		List> streams = consumerMap
				.get(ConsumerApp.KAFKA_TOPIC);

		// now launch all the threads
		//
		this.executor = Executors.newFixedThreadPool(numThreads);

		// now create an object to consume the messages
		//
		int threadNumber = 0;
		for (final KafkaStream stream : streams) {
			this.executor.submit(new ConsumerThread(stream, threadNumber));
			threadNumber++;
		}
	}

	public static void main(String[] args) {

		String zooKeeper = args[0];
		String groupId = args[1];
		//int threads = Integer.parseInt(args[2]);
		int threads = 1;

		ConsumerApp app = new ConsumerApp(zooKeeper, groupId);
		app.run(threads);
		try {
			while (true) {
				Thread.sleep(10000L);
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
		app.shutdown();
	}
}

ConsumerThread 는 kafka 를 읽 는 데 사 용 됩 니 다.
 
package org.iker.example.kafka.consumer;

import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;

public class ConsumerThread implements Runnable {

	private final int threadSerial;
	private final KafkaStream stream;

	public ConsumerThread(KafkaStream stream, int threadSerial) {
		this.threadSerial = threadSerial;
		this.stream = stream;
	}

	public void run() {
		ConsumerIterator iter = this.stream.iterator();
		String fileName = threadSerial + ".data";
		System.out.println("Start Thread: " + this.threadSerial);
		int cnt = 0;
		try {
			DataOutputStream out = new DataOutputStream(new FileOutputStream(fileName));
			while (iter.hasNext()) {
				byte[] bytes = iter.next().message();
				out.writeInt(bytes.length);
				out.write(bytes);
				out.flush();
				cnt++;
			}
			out.close();
		} catch (FileNotFoundException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}  catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}  
		System.out.println("Total write: " + cnt);
		System.out.println("Shutting down Thread: " + this.threadSerial);
	}
}

컴 파일 은 매우 간단 하 다.
 mvn cleanpackage
target 디 렉 터 리 에서 jar 가방 을 받 을 수 있 습 니 다.
실행 도 간단 합 니 다:
 java -cp$your_jar_file org.iker.example.kafka.consumer.ConsumerApp

좋은 웹페이지 즐겨찾기