Springboot 는 Kafka 를 통합 하여 producer 와 consumer 의 예제 코드 를 실현 합 니 다.
11478 단어 Springboot집성Kafka
Kafka 는 높 은 스루풋 의 분포 식 게시 구독 메시지 시스템 으로 다음 과 같은 특성 이 있다.O(1)의 디스크 데이터 구 조 를 통 해 정 보 를 제공 하 는 지속 화 는 TB 의 메시지 저장 에 도 장시간 안정 적 인 성능 을 유지 할 수 있다.높 은 스루풋:매우 일반적인 하드웨어 인 Kafka 도 초당 수백 만 의 소식 을 지원 할 수 있다.Kafka 서버 와 소비 기 클 러 스 터 를 통 해 메 시 지 를 구분 하 는 것 을 지원 합 니 다.Hadoop 병렬 데이터 로드 지원.
Kafka 설치
kafka 를 설치 하려 면 zookeeper 의 지원 이 필요 하기 때문에 Windows 를 설치 할 때 zookeeper 를 먼저 설치 한 다음 에 kafka 를 설치 하면 됩 니 다.다음은 Mac 설치 절차 와 주의해 야 할 점 을 알려 드 리 겠 습 니 다.windows 의 설정 은 위치 가 다른 것 을 제외 하고 거의 다 르 지 않 습 니 다.
brew install kafka
네,그렇게 간단 합 니 다.mac 의 이전 명령 으로 해결 할 수 있 습 니 다.이 설치 과정 은 잠시 기 다 려 야 할 수도 있 습 니 다.네트워크 상황 과 관계 가 있 을 것 입 니 다."Error:Could not link:/usr/local/share/doc/homebrew"와 같은 오류 메시지 가 설치 되 어 있 을 수 있 습 니 다.이 는 괜 찮 습 니 다.자동 으로 무시 되 었 습 니 다.결국 우 리 는 아래 의 모습 을 보고 성공 했다.==> Summary ðŸº/usr/local/Cellar/kafka/1.1.0: 157 files, 47.8MB
설 치 된 프로필 위 치 는 다음 과 같 습 니 다.필요 에 따라 포트 번호 같은 것 을 수정 하면 됩 니 다.
설 치 된 zoopeper 와 kafka 의 위치/usr/local/cellar/
설정 파일/usr/local/etc/kafka/server.properties/usr/local/etc/kafka/zookeeper.properties
zookeeper 시작
./bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &
시작 kafka./bin/kafka-server-start /usr/local/etc/kafka/server.properties &
kafka 에 Topic 을 만 들 고 topic 이름 은 test 입 니 다.원 하 는 이름 으로 설정 할 수 있 습 니 다.나중에 코드 에 올 바 르 게 설정 하면 됩 니 다.
./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
1.의존 해결
spring boot 와 관련 된 의존 은 언급 하지 않 겠 습 니 다.kafka 와 관련 된 것 은 하나의 spring-kafka 통합 패키지 에 만 의존 합 니 다.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.1.RELEASE</version>
</dependency>
여기 프로필 을 먼저 보 여 드 리 겠 습 니 다.
#============== kafka ===================
kafka.consumer.zookeeper.connect=10.93.21.21:2181
kafka.consumer.servers=10.93.21.21:9092
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest
kafka.consumer.topic=test
kafka.consumer.group.id=test
kafka.consumer.concurrency=10
kafka.producer.servers=10.93.21.21:9092
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960
2、Configuration:Kafka producer 1)@Configuration,@Enablekafka 를 통 해 Config 를 설명 하고 KafkaTemplate 능력 을 엽 니 다.
2)@Value 를 통 해 application.properties 설정 파일 의 kafka 설정 을 주입 합 니 다.
3)bean 생 성,@Bean
package com.kangaroo.sentinel.collect.configuration;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${kafka.producer.servers}")
private String servers;
@Value("${kafka.producer.retries}")
private int retries;
@Value("${kafka.producer.batch.size}")
private int batchSize;
@Value("${kafka.producer.linger}")
private int linger;
@Value("${kafka.producer.buffer.memory}")
private int bufferMemory;
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
}
우리 프로듀서 를 실험 해서 컨트롤 러 를 쓰 세 요.topic=test,key=key,메시지 보 내기
package com.kangaroo.sentinel.collect.controller;
import com.kangaroo.sentinel.common.response.Response;
import com.kangaroo.sentinel.common.response.ResultCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@RestController
@RequestMapping("/kafka")
public class CollectController {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private KafkaTemplate kafkaTemplate;
@RequestMapping(value = "/send", method = RequestMethod.GET)
public Response sendKafka(HttpServletRequest request, HttpServletResponse response) {
try {
String message = request.getParameter("message");
logger.info("kafka ={}", message);
kafkaTemplate.send("test", "key", message);
logger.info(" kafka .");
return new Response(ResultCode.SUCCESS, " kafka ", null);
} catch (Exception e) {
logger.error(" kafka ", e);
return new Response(ResultCode.EXCEPTION, " kafka ", null);
}
}
}
3、configuration:kafka consumer1)@Configuration,@Enablekafka 를 통 해 Config 를 설명 하고 KafkaTemplate 능력 을 엽 니 다.
2)@Value 를 통 해 application.properties 설정 파일 의 kafka 설정 을 주입 합 니 다.
3)bean 생 성,@Bean
package com.kangaroo.sentinel.collect.configuration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.consumer.servers}")
private String servers;
@Value("${kafka.consumer.enable.auto.commit}")
private boolean enableAutoCommit;
@Value("${kafka.consumer.session.timeout}")
private String sessionTimeout;
@Value("${kafka.consumer.auto.commit.interval}")
private String autoCommitInterval;
@Value("${kafka.consumer.group.id}")
private String groupId;
@Value("${kafka.consumer.auto.offset.reset}")
private String autoOffsetReset;
@Value("${kafka.consumer.concurrency}")
private int concurrency;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return propsMap;
}
@Bean
public Listener listener() {
return new Listener();
}
}
new Listener()는 kafka 에서 읽 은 데 이 터 를 처리 하기 위해 bean 을 생 성 합 니 다.Listener 의 간단 한 구현 demo 는 다음 과 같 습 니 다.key 와 message 값 을 간단하게 읽 고 인쇄 할 뿐 입 니 다.@KafkaListener 에서 topics 속성 은 kafka topic 이름 을 지정 하 는 데 사 용 됩 니 다.topic 이름 은 메시지 생산자 가 지정 합 니 다.즉,kafkaTemplate 에서 메 시 지 를 보 낼 때 지정 합 니 다.
package com.kangaroo.sentinel.collect.configuration;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
public class Listener {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
@KafkaListener(topics = {"test"})
public void listen(ConsumerRecord<?, ?> record) {
logger.info("kafka key: " + record.key());
logger.info("kafka value: " + record.value().toString());
}
}
tips:1)카 프 카 를 설치 하 는 방법 을 소개 하지 않 았 습 니 다.카 프 카 를 설정 할 때 localhost 나 127.0.0.1 이 아 닌 완전 bid 네트워크 ip 을 사용 하 는 것 이 좋 습 니 다.
2)kafka 가 자체 적 으로 가지 고 있 는 zookeeper 를 사용 하여 kafka 를 배치 하지 않 는 것 이 좋 습 니 다.접근 이 통 하지 않 을 수 있 습 니 다.
3)이론 적 으로 consumer 가 kafka 를 읽 는 것 은 zookeeper 를 통 해 이 루어 져 야 하지만 여기 서 우 리 는 kafkaserver 의 주 소 를 사용 하 는데 왜 깊이 연구 하지 않 았 습 니까?
4)감청 메시지 설정 을 정의 할 때 GROUPID_CONFIG 설정 항목 의 값 은 소비자 그룹의 이름 을 지정 하 는 데 사 용 됩 니 다.같은 그룹 에 여러 개의 모니터 대상 이 존재 하면 하나의 모니터 대상 만 메 시 지 를 받 을 수 있 습 니 다.
이상 이 바로 본 고의 모든 내용 입 니 다.여러분 의 학습 에 도움 이 되 고 저 희 를 많이 응원 해 주 셨 으 면 좋 겠 습 니 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
[MeU] Hashtag 기능 개발➡️ 기존 Tag 테이블에 존재하지 않는 해시태그라면 Tag , tagPostMapping 테이블에 모두 추가 ➡️ 기존에 존재하는 해시태그라면, tagPostMapping 테이블에만 추가 이후에 개발할 태그 기반 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.