Kafka 배치 및 코드 인 스 턴 스

15611 단어 kafka
kafka 는 분포 식 로그 수집 이나 시스템 모니터링 서비스 로 서 적절 한 장소 에서 사용 할 필요 가 있 습 니 다.kafka 의 배 치 는 zookeeper 환경/kafka 환경 을 포함 하고 설정 작업 도 해 야 합 니 다.다음은 kafka 를 어떻게 사용 하 는 지 소개 합 니 다.
    우 리 는 3 개의 zookeeper 인 스 턴 스 를 사용 하여 zk 클 러 스 터 를 구축 하고 2 개의 kafka broker 를 사용 하여 kafka 클 러 스 터 를 구축 합 니 다.
    이 가운데 카 프 카 는 0.8V,조 키 퍼 는 3.4.5V 였 다.
 
1.Zookeeper 클 러 스 터 구축
    우 리 는 3 개의 zk 실례 가 있 는데 각각 zk-0,zk-1,zk-2 이다.테스트 에 만 사용 된다 면 zk 인 스 턴 스 를 1 개 사용 할 수 있 습 니 다.
    1) zk-0
    설정 파일 조정:
clientPort=2181
server.0=127.0.0.1:2888:3888
server.1=127.0.0.1:2889:3889
server.2=127.0.0.1:2890:3890
##         ,         

    zookeeper 시작
./zkServer.sh start

    2) zk-1
    설정 파일 조정(기타 설정 과 zk-0 한 마리):
clientPort=2182
##         ,         

    zookeeper 시작
 
./zkServer.sh start

    3) zk-2
    설정 파일 조정(기타 설정 과 zk-0 한 마리):
clientPort=2183
##         ,         

    zookeeper 시작
 
./zkServer.sh start

  
2.Kafka 군집 구축
    Broker 설정 파일 은 zookeeper 와 관련 된 약속 이 있 기 때문에 broker 설정 파일 을 먼저 보 여 줍 니 다.저 희 는 kafka broker 2 개 를 사용 하여 이 클 러 스 터 환경 을 구축 합 니 다.각각 kafka-0,kafka-1 입 니 다.
    1) kafka-0
    config 디 렉 터 리 에서 설정 파일 을 수정 합 니 다:
broker.id=0
port=9092
num.network.threads=2
num.io.threads=2
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dir=./logs
num.partitions=2
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=536870912
##replication  ,   topic partitions kafka-cluster   2 
##    cluster     ..
default.replication.factor=1
log.cleanup.interval.mins=10
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
zookeeper.connection.timeout.ms=1000000

    kafka 는 scala 언어 로 작성 되 었 기 때문에 kafka 를 실행 하려 면 먼저 scala 관련 환경 을 준비 해 야 합 니 다.
> cd kafka-0
> ./sbt update
> ./sbt package
> ./sbt assembly-package-dependency 

    그 중 마지막 명령 집행 에 이상 이 생 길 수 있 으 므 로 잠시 상관 하지 않 는 다.kafka broker 시작:
> JMS_PORT=9997 bin/kafka-server-start.sh config/server.properties &

    zookeeper 환경 이 정상적으로 작 동 되 었 기 때문에,우 리 는 kafka 를 통 해 zookeeper 를 시작 하 는 것 을 마 운 트 할 필요 가 없습니다.만약 당신 의 기계 에 여러 개의 kafka broker 가 배치 되 어 있다 면,당신 은 JMS 를 성명 해 야 합 니 다.PORT.
    2) kafka-1
broker.id=1
port=9093
##     kafka-0    

    그리고 kafka-0 과 같이 포장 명령 을 실행 하고 이 broker 를 시작 합 니 다.
> JMS_PORT=9998 bin/kafka-server-start.sh config/server.properties &

    다음 명령 을 통 해 topic 의"partition"/"replicas"의 분포 와 생존 상황 을 볼 수 있 습 니 다.
> bin/kafka-list-topic.sh --zookeeper localhost:2181
topic: my-replicated-topic	partition: 0	leader: 2	replicas: 1,2,0	isr: 2
topic: test	partition: 0	leader: 0	replicas: 0	isr: 0 

    지금까지 환경 이 OK 되 었 으 니 프로 그래 밍 인 스 턴 스 를 보 여 드 리 겠 습 니 다설정 매개 변수 상세 설명 ]
 
프로젝트 준비
    프로젝트 는 maven 기반 구축 으로 kafka 자바 클 라 이언 트 가 너무 엉망 이 라 고 할 수 밖 에 없습니다.환경 을 구축 하 는 데 는 많은 번 거 로 움 을 만 날 수 있다.다음 pom.xml 참조;그 중에서 각 의존 패 키 지 는 반드시 버 전이 조 화 롭 고 일치 해 야 한다.만약 에 kafka client 의 버 전과 kafka server 의 버 전이 일치 하지 않 으 면 많은 이상 이 있 을 것 입 니 다.예 를 들 어'broker id not exists'등 입 니 다.kafka 가 0.7 에서 0.8 로 업그레이드 한 후(본명 은 2.8.0)클 라 이언 트 와 server 가 통신 하 는 protocol 이 바 뀌 었 기 때문이다.
<dependencies>
	<dependency>
		<groupId>log4j</groupId>
		<artifactId>log4j</artifactId>
		<version>1.2.14</version>
	</dependency>
	<dependency>
		<groupId>org.apache.kafka</groupId>
		<artifactId>kafka_2.8.2</artifactId>
		<version>0.8.0</version>
		<exclusions>
			<exclusion>
				<groupId>log4j</groupId>
				<artifactId>log4j</artifactId>
			</exclusion>
		</exclusions>
	</dependency>
	<dependency>
		<groupId>org.scala-lang</groupId>
		<artifactId>scala-library</artifactId>
		<version>2.8.2</version>
	</dependency>
	<dependency>
		<groupId>com.yammer.metrics</groupId>
		<artifactId>metrics-core</artifactId>
		<version>2.2.0</version>
	</dependency>
	<dependency>
		<groupId>com.101tec</groupId>
		<artifactId>zkclient</artifactId>
		<version>0.3</version>
	</dependency>
</dependencies>

 
4.프로듀서 엔 드 코드
    1)producer.properties 파일:이 파일 은/resources 디 렉 터 리 에 놓 여 있 습 니 다.
#partitioner.class=
##broker     kafka server   ,  producer   broker   metadata
##    broker     metadata,      ,   broker     
##  ,     spring     
##metadata.broker.list=127.0.0.1:9092,127.0.0.1:9093
##,127.0.0.1:9093
##  ,   async
producer.type=sync
compression.codec=0
serializer.class=kafka.serializer.StringEncoder
## producer.type=async   
#batch.num.messages=100

    2)KafkaProducerClient.java 코드 샘플
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * User: guanqing-liu
 */
public class KafkaProducerClient {

	private Producer<String, String> inner;
	
	private String brokerList;//for metadata discovery,spring setter
	private String location = "kafka-producer.properties";//spring setter
	
	private String defaultTopic;//spring setter

	public void setBrokerList(String brokerList) {
		this.brokerList = brokerList;
	}

	public void setLocation(String location) {
		this.location = location;
	}

	public void setDefaultTopic(String defaultTopic) {
		this.defaultTopic = defaultTopic;
	}

	public KafkaProducerClient(){}
	
	public void init() throws Exception {
		Properties properties = new Properties();
		properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));
		
		
		if(brokerList != null) {
			properties.put("metadata.broker.list", brokerList);
		}

		ProducerConfig config = new ProducerConfig(properties);
		inner = new Producer<String, String>(config);
	}

	public void send(String message){
		send(defaultTopic,message);
	}
	
	public void send(Collection<String> messages){
		send(defaultTopic,messages);
	}
	
	public void send(String topicName, String message) {
		if (topicName == null || message == null) {
			return;
		}
		KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);
		inner.send(km);
	}

	public void send(String topicName, Collection<String> messages) {
		if (topicName == null || messages == null) {
			return;
		}
		if (messages.isEmpty()) {
			return;
		}
		List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
		int i= 0;
		for (String entry : messages) {
			KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);
			kms.add(km);
			i++;
			if(i % 20 == 0){
				inner.send(kms);
				kms.clear();
			}
		}
		
		if(!kms.isEmpty()){
			inner.send(kms);
		}
	}

	public void close() {
		inner.close();
	}

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		KafkaProducerClient producer = null;
		try {
			producer = new KafkaProducerClient();
			//producer.setBrokerList("");
			int i = 0;
			while (true) {
				producer.send("test-topic", "this is a sample" + i);
				i++;
				Thread.sleep(2000);
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (producer != null) {
				producer.close();
			}
		}

	}

}

    3)spring 설정
    <bean id="kafkaProducerClient" class="com.test.kafka.KafkaProducerClient" init-method="init" destroy-method="close">
        <property name="zkConnect" value="${zookeeper_cluster}"></property>
        <property name="defaultTopic" value="${kafka_topic}"></property>
    </bean>

 
5.소비자 단
     1)consumer.properties:파일 은/resources 디 렉 터 리 아래 에 있 습 니 다.
##       ,     spring  
##zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
##,127.0.0.1:2182,127.0.0.1:2183
# timeout in ms for connecting to zookeeper
zookeeper.connectiontimeout.ms=1000000
#consumer group id
group.id=test-group
#consumer timeout
#consumer.timeout.ms=5000
auto.commit.enable=true
auto.commit.interval.ms=60000

    2)Kafka ConsumerClient.java 코드 샘플
package com.test.kafka;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;

/**
 * User: guanqing-liu 
 */
public class KafkaConsumerClient {

	private String groupid; //can be setting by spring
	private String zkConnect;//can be setting by spring
	private String location = "kafka-consumer.properties";//      
	private String topic;
	private int partitionsNum = 1;
	private MessageExecutor executor; //message listener
	private ExecutorService threadPool;
	
	private ConsumerConnector connector;
	
	private Charset charset = Charset.forName("utf8");

	public void setGroupid(String groupid) {
		this.groupid = groupid;
	}

	public void setZkConnect(String zkConnect) {
		this.zkConnect = zkConnect;
	}

	public void setLocation(String location) {
		this.location = location;
	}

	public void setTopic(String topic) {
		this.topic = topic;
	}

	public void setPartitionsNum(int partitionsNum) {
		this.partitionsNum = partitionsNum;
	}

	public void setExecutor(MessageExecutor executor) {
		this.executor = executor;
	}

	public KafkaConsumerClient() {}

	//init consumer,and start connection and listener
	public void init() throws Exception {
		if(executor == null){
			throw new RuntimeException("KafkaConsumer,exectuor cant be null!");
		}
		Properties properties = new Properties();
		properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));
		
		if(groupid != null){
			properties.put("groupid", groupid);
		}
		if(zkConnect != null){
			properties.put("zookeeper.connect", zkConnect);
		}
		ConsumerConfig config = new ConsumerConfig(properties);

		connector = Consumer.createJavaConsumerConnector(config);
		Map<String, Integer> topics = new HashMap<String, Integer>();
		topics.put(topic, partitionsNum);
		Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topics);
		List<KafkaStream<byte[], byte[]>> partitions = streams.get(topic);
		threadPool = Executors.newFixedThreadPool(partitionsNum * 2);
		
		//start
		for (KafkaStream<byte[], byte[]> partition : partitions) {
			threadPool.execute(new MessageRunner(partition));
		}
	}

	public void close() {
		try {
			threadPool.shutdownNow();
		} catch (Exception e) {
			//
		} finally {
			connector.shutdown();
		}

	}

	class MessageRunner implements Runnable {
		private KafkaStream<byte[], byte[]> partition;

		MessageRunner(KafkaStream<byte[], byte[]> partition) {
			this.partition = partition;
		}

		public void run() {
			ConsumerIterator<byte[], byte[]> it = partition.iterator();
			while (it.hasNext()) {
				// connector.commitOffsets();    offset, autocommit.enable=false   
				MessageAndMetadata<byte[], byte[]> item = it.next();
				try{
					executor.execute(new String(item.message(),charset));// UTF-8,    
				}catch(Exception e){
					//
				}
			}
		}
		
		public String getContent(Message message){
            ByteBuffer buffer = message.payload();
            if (buffer.remaining() == 0) {
                return null;
            }
            CharBuffer charBuffer = charset.decode(buffer);
            return charBuffer.toString();
		}
	}

	public static interface MessageExecutor {

		public void execute(String message);
	}

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		KafkaConsumerClient consumer = null;
		try {
			MessageExecutor executor = new MessageExecutor() {

				public void execute(String message) {
					System.out.println(message);
				}
			};
			consumer = new KafkaConsumerClient();
			
			consumer.setTopic("test-topic");
			consumer.setPartitionsNum(2);
			consumer.setExecutor(executor);
			consumer.init();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			 if(consumer != null){
				 consumer.close();
			 }
		}

	}

}

    3)spring 설정(약)
 
    상기 LogConsumer 클래스 에 서 는 이상 한 상황 에 관심 이 많 지 않 으 므 로 Message Executor.execute()방법 에서 이상 한 상황 을 던 져 야 합 니 다.
    테스트 할 때 consumer 를 먼저 시작 한 다음 producer 를 시작 하면 최신 정 보 를 실시 간 으로 관찰 할 수 있 습 니 다.

좋은 웹페이지 즐겨찾기