Kafka 배치 및 코드 인 스 턴 스
15611 단어 kafka
우 리 는 3 개의 zookeeper 인 스 턴 스 를 사용 하여 zk 클 러 스 터 를 구축 하고 2 개의 kafka broker 를 사용 하여 kafka 클 러 스 터 를 구축 합 니 다.
이 가운데 카 프 카 는 0.8V,조 키 퍼 는 3.4.5V 였 다.
1.Zookeeper 클 러 스 터 구축
우 리 는 3 개의 zk 실례 가 있 는데 각각 zk-0,zk-1,zk-2 이다.테스트 에 만 사용 된다 면 zk 인 스 턴 스 를 1 개 사용 할 수 있 습 니 다.
1) zk-0
설정 파일 조정:
clientPort=2181
server.0=127.0.0.1:2888:3888
server.1=127.0.0.1:2889:3889
server.2=127.0.0.1:2890:3890
## ,
zookeeper 시작
./zkServer.sh start
2) zk-1
설정 파일 조정(기타 설정 과 zk-0 한 마리):
clientPort=2182
## ,
zookeeper 시작
./zkServer.sh start
3) zk-2
설정 파일 조정(기타 설정 과 zk-0 한 마리):
clientPort=2183
## ,
zookeeper 시작
./zkServer.sh start
2.Kafka 군집 구축
Broker 설정 파일 은 zookeeper 와 관련 된 약속 이 있 기 때문에 broker 설정 파일 을 먼저 보 여 줍 니 다.저 희 는 kafka broker 2 개 를 사용 하여 이 클 러 스 터 환경 을 구축 합 니 다.각각 kafka-0,kafka-1 입 니 다.
1) kafka-0
config 디 렉 터 리 에서 설정 파일 을 수정 합 니 다:
broker.id=0
port=9092
num.network.threads=2
num.io.threads=2
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dir=./logs
num.partitions=2
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=536870912
##replication , topic partitions kafka-cluster 2
## cluster ..
default.replication.factor=1
log.cleanup.interval.mins=10
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
zookeeper.connection.timeout.ms=1000000
kafka 는 scala 언어 로 작성 되 었 기 때문에 kafka 를 실행 하려 면 먼저 scala 관련 환경 을 준비 해 야 합 니 다.
> cd kafka-0
> ./sbt update
> ./sbt package
> ./sbt assembly-package-dependency
그 중 마지막 명령 집행 에 이상 이 생 길 수 있 으 므 로 잠시 상관 하지 않 는 다.kafka broker 시작:
> JMS_PORT=9997 bin/kafka-server-start.sh config/server.properties &
zookeeper 환경 이 정상적으로 작 동 되 었 기 때문에,우 리 는 kafka 를 통 해 zookeeper 를 시작 하 는 것 을 마 운 트 할 필요 가 없습니다.만약 당신 의 기계 에 여러 개의 kafka broker 가 배치 되 어 있다 면,당신 은 JMS 를 성명 해 야 합 니 다.PORT.
2) kafka-1
broker.id=1
port=9093
## kafka-0
그리고 kafka-0 과 같이 포장 명령 을 실행 하고 이 broker 를 시작 합 니 다.
> JMS_PORT=9998 bin/kafka-server-start.sh config/server.properties &
다음 명령 을 통 해 topic 의"partition"/"replicas"의 분포 와 생존 상황 을 볼 수 있 습 니 다.
> bin/kafka-list-topic.sh --zookeeper localhost:2181
topic: my-replicated-topic partition: 0 leader: 2 replicas: 1,2,0 isr: 2
topic: test partition: 0 leader: 0 replicas: 0 isr: 0
지금까지 환경 이 OK 되 었 으 니 프로 그래 밍 인 스 턴 스 를 보 여 드 리 겠 습 니 다설정 매개 변수 상세 설명 ]
프로젝트 준비
프로젝트 는 maven 기반 구축 으로 kafka 자바 클 라 이언 트 가 너무 엉망 이 라 고 할 수 밖 에 없습니다.환경 을 구축 하 는 데 는 많은 번 거 로 움 을 만 날 수 있다.다음 pom.xml 참조;그 중에서 각 의존 패 키 지 는 반드시 버 전이 조 화 롭 고 일치 해 야 한다.만약 에 kafka client 의 버 전과 kafka server 의 버 전이 일치 하지 않 으 면 많은 이상 이 있 을 것 입 니 다.예 를 들 어'broker id not exists'등 입 니 다.kafka 가 0.7 에서 0.8 로 업그레이드 한 후(본명 은 2.8.0)클 라 이언 트 와 server 가 통신 하 는 protocol 이 바 뀌 었 기 때문이다.
<dependencies>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.14</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.8.2</artifactId>
<version>0.8.0</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.3</version>
</dependency>
</dependencies>
4.프로듀서 엔 드 코드
1)producer.properties 파일:이 파일 은/resources 디 렉 터 리 에 놓 여 있 습 니 다.
#partitioner.class=
##broker kafka server , producer broker metadata
## broker metadata, , broker
## , spring
##metadata.broker.list=127.0.0.1:9092,127.0.0.1:9093
##,127.0.0.1:9093
## , async
producer.type=sync
compression.codec=0
serializer.class=kafka.serializer.StringEncoder
## producer.type=async
#batch.num.messages=100
2)KafkaProducerClient.java 코드 샘플
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
* User: guanqing-liu
*/
public class KafkaProducerClient {
private Producer<String, String> inner;
private String brokerList;//for metadata discovery,spring setter
private String location = "kafka-producer.properties";//spring setter
private String defaultTopic;//spring setter
public void setBrokerList(String brokerList) {
this.brokerList = brokerList;
}
public void setLocation(String location) {
this.location = location;
}
public void setDefaultTopic(String defaultTopic) {
this.defaultTopic = defaultTopic;
}
public KafkaProducerClient(){}
public void init() throws Exception {
Properties properties = new Properties();
properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));
if(brokerList != null) {
properties.put("metadata.broker.list", brokerList);
}
ProducerConfig config = new ProducerConfig(properties);
inner = new Producer<String, String>(config);
}
public void send(String message){
send(defaultTopic,message);
}
public void send(Collection<String> messages){
send(defaultTopic,messages);
}
public void send(String topicName, String message) {
if (topicName == null || message == null) {
return;
}
KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);
inner.send(km);
}
public void send(String topicName, Collection<String> messages) {
if (topicName == null || messages == null) {
return;
}
if (messages.isEmpty()) {
return;
}
List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
int i= 0;
for (String entry : messages) {
KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);
kms.add(km);
i++;
if(i % 20 == 0){
inner.send(kms);
kms.clear();
}
}
if(!kms.isEmpty()){
inner.send(kms);
}
}
public void close() {
inner.close();
}
/**
* @param args
*/
public static void main(String[] args) {
KafkaProducerClient producer = null;
try {
producer = new KafkaProducerClient();
//producer.setBrokerList("");
int i = 0;
while (true) {
producer.send("test-topic", "this is a sample" + i);
i++;
Thread.sleep(2000);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (producer != null) {
producer.close();
}
}
}
}
3)spring 설정
<bean id="kafkaProducerClient" class="com.test.kafka.KafkaProducerClient" init-method="init" destroy-method="close">
<property name="zkConnect" value="${zookeeper_cluster}"></property>
<property name="defaultTopic" value="${kafka_topic}"></property>
</bean>
5.소비자 단
1)consumer.properties:파일 은/resources 디 렉 터 리 아래 에 있 습 니 다.
## , spring
##zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
##,127.0.0.1:2182,127.0.0.1:2183
# timeout in ms for connecting to zookeeper
zookeeper.connectiontimeout.ms=1000000
#consumer group id
group.id=test-group
#consumer timeout
#consumer.timeout.ms=5000
auto.commit.enable=true
auto.commit.interval.ms=60000
2)Kafka ConsumerClient.java 코드 샘플
package com.test.kafka;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;
/**
* User: guanqing-liu
*/
public class KafkaConsumerClient {
private String groupid; //can be setting by spring
private String zkConnect;//can be setting by spring
private String location = "kafka-consumer.properties";//
private String topic;
private int partitionsNum = 1;
private MessageExecutor executor; //message listener
private ExecutorService threadPool;
private ConsumerConnector connector;
private Charset charset = Charset.forName("utf8");
public void setGroupid(String groupid) {
this.groupid = groupid;
}
public void setZkConnect(String zkConnect) {
this.zkConnect = zkConnect;
}
public void setLocation(String location) {
this.location = location;
}
public void setTopic(String topic) {
this.topic = topic;
}
public void setPartitionsNum(int partitionsNum) {
this.partitionsNum = partitionsNum;
}
public void setExecutor(MessageExecutor executor) {
this.executor = executor;
}
public KafkaConsumerClient() {}
//init consumer,and start connection and listener
public void init() throws Exception {
if(executor == null){
throw new RuntimeException("KafkaConsumer,exectuor cant be null!");
}
Properties properties = new Properties();
properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));
if(groupid != null){
properties.put("groupid", groupid);
}
if(zkConnect != null){
properties.put("zookeeper.connect", zkConnect);
}
ConsumerConfig config = new ConsumerConfig(properties);
connector = Consumer.createJavaConsumerConnector(config);
Map<String, Integer> topics = new HashMap<String, Integer>();
topics.put(topic, partitionsNum);
Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topics);
List<KafkaStream<byte[], byte[]>> partitions = streams.get(topic);
threadPool = Executors.newFixedThreadPool(partitionsNum * 2);
//start
for (KafkaStream<byte[], byte[]> partition : partitions) {
threadPool.execute(new MessageRunner(partition));
}
}
public void close() {
try {
threadPool.shutdownNow();
} catch (Exception e) {
//
} finally {
connector.shutdown();
}
}
class MessageRunner implements Runnable {
private KafkaStream<byte[], byte[]> partition;
MessageRunner(KafkaStream<byte[], byte[]> partition) {
this.partition = partition;
}
public void run() {
ConsumerIterator<byte[], byte[]> it = partition.iterator();
while (it.hasNext()) {
// connector.commitOffsets(); offset, autocommit.enable=false
MessageAndMetadata<byte[], byte[]> item = it.next();
try{
executor.execute(new String(item.message(),charset));// UTF-8,
}catch(Exception e){
//
}
}
}
public String getContent(Message message){
ByteBuffer buffer = message.payload();
if (buffer.remaining() == 0) {
return null;
}
CharBuffer charBuffer = charset.decode(buffer);
return charBuffer.toString();
}
}
public static interface MessageExecutor {
public void execute(String message);
}
/**
* @param args
*/
public static void main(String[] args) {
KafkaConsumerClient consumer = null;
try {
MessageExecutor executor = new MessageExecutor() {
public void execute(String message) {
System.out.println(message);
}
};
consumer = new KafkaConsumerClient();
consumer.setTopic("test-topic");
consumer.setPartitionsNum(2);
consumer.setExecutor(executor);
consumer.init();
} catch (Exception e) {
e.printStackTrace();
} finally {
if(consumer != null){
consumer.close();
}
}
}
}
3)spring 설정(약)
상기 LogConsumer 클래스 에 서 는 이상 한 상황 에 관심 이 많 지 않 으 므 로 Message Executor.execute()방법 에서 이상 한 상황 을 던 져 야 합 니 다.
테스트 할 때 consumer 를 먼저 시작 한 다음 producer 를 시작 하면 최신 정 보 를 실시 간 으로 관찰 할 수 있 습 니 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spring Cloud를 사용한 기능적 Kafka - 1부지금까지 찾을 수 없었던 Spring Cloud Kafka의 작업 데모를 만들기 위해 이 기사를 정리했습니다. Confluent 스키마 레지스트리 7.1.0 이 기사는 먼저 Spring Cloud Stream을 사용...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.