Springboot 는 Kafka 를 통합 하여 producer 와 consumer 의 예제 코드 를 실현 합 니 다.

11478 단어 Springboot집성Kafka
본 고 는 springboot 프로젝트 에 kafka 송 수신 message 를 통합 하 는 방법 을 소개 한다.
Kafka 는 높 은 스루풋 의 분포 식 게시 구독 메시지 시스템 으로 다음 과 같은 특성 이 있다.O(1)의 디스크 데이터 구 조 를 통 해 정 보 를 제공 하 는 지속 화 는 TB 의 메시지 저장 에 도 장시간 안정 적 인 성능 을 유지 할 수 있다.높 은 스루풋:매우 일반적인 하드웨어 인 Kafka 도 초당 수백 만 의 소식 을 지원 할 수 있다.Kafka 서버 와 소비 기 클 러 스 터 를 통 해 메 시 지 를 구분 하 는 것 을 지원 합 니 다.Hadoop 병렬 데이터 로드 지원.
Kafka 설치
kafka 를 설치 하려 면 zookeeper 의 지원 이 필요 하기 때문에 Windows 를 설치 할 때 zookeeper 를 먼저 설치 한 다음 에 kafka 를 설치 하면 됩 니 다.다음은 Mac 설치 절차 와 주의해 야 할 점 을 알려 드 리 겠 습 니 다.windows 의 설정 은 위치 가 다른 것 을 제외 하고 거의 다 르 지 않 습 니 다.

brew install kafka
네,그렇게 간단 합 니 다.mac 의 이전 명령 으로 해결 할 수 있 습 니 다.이 설치 과정 은 잠시 기 다 려 야 할 수도 있 습 니 다.네트워크 상황 과 관계 가 있 을 것 입 니 다."Error:Could not link:/usr/local/share/doc/homebrew"와 같은 오류 메시지 가 설치 되 어 있 을 수 있 습 니 다.이 는 괜 찮 습 니 다.자동 으로 무시 되 었 습 니 다.결국 우 리 는 아래 의 모습 을 보고 성공 했다.
==> Summary 🍺/usr/local/Cellar/kafka/1.1.0: 157 files, 47.8MB
설 치 된 프로필 위 치 는 다음 과 같 습 니 다.필요 에 따라 포트 번호 같은 것 을 수정 하면 됩 니 다.
설 치 된 zoopeper 와 kafka 의 위치/usr/local/cellar/
설정 파일/usr/local/etc/kafka/server.properties/usr/local/etc/kafka/zookeeper.properties
zookeeper 시작
./bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &
시작 kafka
./bin/kafka-server-start /usr/local/etc/kafka/server.properties &
kafka 에 Topic 을 만 들 고 topic 이름 은 test 입 니 다.원 하 는 이름 으로 설정 할 수 있 습 니 다.나중에 코드 에 올 바 르 게 설정 하면 됩 니 다.
./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
 
1.의존 해결
spring boot 와 관련 된 의존 은 언급 하지 않 겠 습 니 다.kafka 와 관련 된 것 은 하나의 spring-kafka 통합 패키지 에 만 의존 합 니 다.

<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
   <version>1.1.1.RELEASE</version>
  </dependency>
 여기 프로필 을 먼저 보 여 드 리 겠 습 니 다.

#============== kafka ===================
kafka.consumer.zookeeper.connect=10.93.21.21:2181
kafka.consumer.servers=10.93.21.21:9092
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest
kafka.consumer.topic=test
kafka.consumer.group.id=test
kafka.consumer.concurrency=10

kafka.producer.servers=10.93.21.21:9092
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960
2、Configuration:Kafka producer
1)@Configuration,@Enablekafka 를 통 해 Config 를 설명 하고 KafkaTemplate 능력 을 엽 니 다.
2)@Value 를 통 해 application.properties 설정 파일 의 kafka 설정 을 주입 합 니 다.
3)bean 생 성,@Bean

package com.kangaroo.sentinel.collect.configuration;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

@Configuration
@EnableKafka
public class KafkaProducerConfig {
 @Value("${kafka.producer.servers}")
 private String servers;
 @Value("${kafka.producer.retries}")
 private int retries;
 @Value("${kafka.producer.batch.size}")
 private int batchSize;
 @Value("${kafka.producer.linger}")
 private int linger;
 @Value("${kafka.producer.buffer.memory}")
 private int bufferMemory;

 public Map<String, Object> producerConfigs() {
  Map<String, Object> props = new HashMap<>();
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
  props.put(ProducerConfig.RETRIES_CONFIG, retries);
  props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
  props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
  props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  return props;
 }

 public ProducerFactory<String, String> producerFactory() {
  return new DefaultKafkaProducerFactory<>(producerConfigs());
 }

 @Bean
 public KafkaTemplate<String, String> kafkaTemplate() {
  return new KafkaTemplate<String, String>(producerFactory());
 }
}

우리 프로듀서 를 실험 해서 컨트롤 러 를 쓰 세 요.topic=test,key=key,메시지 보 내기

package com.kangaroo.sentinel.collect.controller;
import com.kangaroo.sentinel.common.response.Response;
import com.kangaroo.sentinel.common.response.ResultCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

@RestController
@RequestMapping("/kafka")
public class CollectController {
 protected final Logger logger = LoggerFactory.getLogger(this.getClass());
 @Autowired
 private KafkaTemplate kafkaTemplate;

 @RequestMapping(value = "/send", method = RequestMethod.GET)
 public Response sendKafka(HttpServletRequest request, HttpServletResponse response) {
  try {
   String message = request.getParameter("message");
   logger.info("kafka   ={}", message);
   kafkaTemplate.send("test", "key", message);
   logger.info("  kafka  .");
   return new Response(ResultCode.SUCCESS, "  kafka  ", null);
  } catch (Exception e) {
   logger.error("  kafka  ", e);
   return new Response(ResultCode.EXCEPTION, "  kafka  ", null);
  }
 }
}
3、configuration:kafka consumer
1)@Configuration,@Enablekafka 를 통 해 Config 를 설명 하고 KafkaTemplate 능력 을 엽 니 다.
2)@Value 를 통 해 application.properties 설정 파일 의 kafka 설정 을 주입 합 니 다.
3)bean 생 성,@Bean

package com.kangaroo.sentinel.collect.configuration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
 @Value("${kafka.consumer.servers}")
 private String servers;
 @Value("${kafka.consumer.enable.auto.commit}")
 private boolean enableAutoCommit;
 @Value("${kafka.consumer.session.timeout}")
 private String sessionTimeout;
 @Value("${kafka.consumer.auto.commit.interval}")
 private String autoCommitInterval;
 @Value("${kafka.consumer.group.id}")
 private String groupId;
 @Value("${kafka.consumer.auto.offset.reset}")
 private String autoOffsetReset;
 @Value("${kafka.consumer.concurrency}")
 private int concurrency;
 @Bean
 public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
  ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(consumerFactory());
  factory.setConcurrency(concurrency);
  factory.getContainerProperties().setPollTimeout(1500);
  return factory;
 }

 public ConsumerFactory<String, String> consumerFactory() {
  return new DefaultKafkaConsumerFactory<>(consumerConfigs());
 }

 public Map<String, Object> consumerConfigs() {
  Map<String, Object> propsMap = new HashMap<>();
  propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
  propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
  propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
  propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
  propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
  return propsMap;
 }

 @Bean
 public Listener listener() {
  return new Listener();
 }
}
new Listener()는 kafka 에서 읽 은 데 이 터 를 처리 하기 위해 bean 을 생 성 합 니 다.Listener 의 간단 한 구현 demo 는 다음 과 같 습 니 다.key 와 message 값 을 간단하게 읽 고 인쇄 할 뿐 입 니 다.
@KafkaListener 에서 topics 속성 은 kafka topic 이름 을 지정 하 는 데 사 용 됩 니 다.topic 이름 은 메시지 생산자 가 지정 합 니 다.즉,kafkaTemplate 에서 메 시 지 를 보 낼 때 지정 합 니 다.

package com.kangaroo.sentinel.collect.configuration;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;

public class Listener {
 protected final Logger logger = LoggerFactory.getLogger(this.getClass());


 @KafkaListener(topics = {"test"})
 public void listen(ConsumerRecord<?, ?> record) {
  logger.info("kafka key: " + record.key());
  logger.info("kafka value: " + record.value().toString());
 }
}
tips:
1)카 프 카 를 설치 하 는 방법 을 소개 하지 않 았 습 니 다.카 프 카 를 설정 할 때 localhost 나 127.0.0.1 이 아 닌 완전 bid 네트워크 ip 을 사용 하 는 것 이 좋 습 니 다.
2)kafka 가 자체 적 으로 가지 고 있 는 zookeeper 를 사용 하여 kafka 를 배치 하지 않 는 것 이 좋 습 니 다.접근 이 통 하지 않 을 수 있 습 니 다.
3)이론 적 으로 consumer 가 kafka 를 읽 는 것 은 zookeeper 를 통 해 이 루어 져 야 하지만 여기 서 우 리 는 kafkaserver 의 주 소 를 사용 하 는데 왜 깊이 연구 하지 않 았 습 니까?
4)감청 메시지 설정 을 정의 할 때 GROUPID_CONFIG 설정 항목 의 값 은 소비자 그룹의 이름 을 지정 하 는 데 사 용 됩 니 다.같은 그룹 에 여러 개의 모니터 대상 이 존재 하면 하나의 모니터 대상 만 메 시 지 를 받 을 수 있 습 니 다.
이상 이 바로 본 고의 모든 내용 입 니 다.여러분 의 학습 에 도움 이 되 고 저 희 를 많이 응원 해 주 셨 으 면 좋 겠 습 니 다.

좋은 웹페이지 즐겨찾기