Spring boot 통합 kafka 및 storm
31720 단어 spring-boot
머리말
업무 수요 로 인해 Strom 과 kafka 를 spring boot 프로젝트 에 통합 시 켜 다른 서비스 출력 로 그 를 kafka 구독 화제 로 실현 해 야 합 니 다. storm 은 이 화 제 를 실시 간 으로 처리 하여 데이터 모니터링 과 다른 데이터 통 계 를 완성 해 야 합 니 다. 그러나 인터넷 튜 토리 얼 이 비교적 적 습 니 다. 오늘 쓰 고 싶 은 것 은 storm + kafka 를 spring boot 까지 어떻게 정리 하 는 지 하 는 것 입 니 다. 그리고 제 가 만난 구 덩이 를 말씀 드 리 겠 습 니 다.
사용 도구 및 환경 설정
1. 자바 버 전 jdk - 1.8
2. 컴 파일 도구 사용 IDEA - 2017
3. maven 프로젝트 관리
4.spring boot-1.5.8.RELEASE
수요 구현
1. 왜 spring boot 에 통합 해 야 합 니까?
spring boot 를 사용 하여 각종 마이크로 서 비 스 를 통일 적 으로 관리 하고 여러 개의 분산 설정 을 피 할 수 있 습 니 다.
2. 구체 적 인 사고 와 통합 원인
spring boot 를 사용 하여 kafka, storm, redis 등 필요 한 bean 을 통일 적 으로 관리 하고 다른 서비스 로 그 를 통 해 Kafka 에 수집 하 며 KafKa 는 실시 간 으로 로 그 를 storm 에 보 내 고 strom bolt 시 해당 하 는 처리 작업 을 합 니 다.
닥 친 문제
1. spring boot 를 사용 하면 통합 storm 이 없습니다.
2. spring boot 시작 방식 으로 제출 Topolgy 를 어떻게 터치 하 는 지 모 르 겠 습 니 다.
3. 토폴로지 제출 시 numbis not client localhost 문제 발생
4. Storm bolt 에 서 는 주 해 를 통 해 실례 화 된 bean 을 얻 을 수 없습니다.
해결 방향
통합 하기 전에 우 리 는 해당 하 는 spring boot 의 시작 방식 과 설정 을 알 아야 합 니 다. (본문 을 읽 을 때 기본적으로 storm, kafka 및 spring boot 에 대해 알 고 사용 합 니 다)
org.apache.kafka
kafka-clients
0.10.1.1
org.springframework.cloud
spring-cloud-starter-stream-kafka
zookeeper
org.apache.zookeeper
spring-boot-actuator
org.springframework.boot
kafka-clients
org.apache.kafka
org.springframework.kafka
spring-kafka
kafka-clients
org.apache.kafka
org.springframework.data
spring-data-hadoop
2.5.0.RELEASE
org.slf4j
slf4j-log4j12
commons-logging
commons-logging
netty
io.netty
jackson-core-asl
org.codehaus.jackson
curator-client
org.apache.curator
jettison
org.codehaus.jettison
jackson-mapper-asl
org.codehaus.jackson
jackson-jaxrs
org.codehaus.jackson
snappy-java
org.xerial.snappy
jackson-xc
org.codehaus.jackson
guava
com.google.guava
hadoop-mapreduce-client-core
org.apache.hadoop
zookeeper
org.apache.zookeeper
servlet-api
javax.servlet
org.apache.zookeeper
zookeeper
3.4.10
slf4j-log4j12
org.slf4j
org.apache.hbase
hbase-client
1.2.4
log4j
log4j
zookeeper
org.apache.zookeeper
netty
io.netty
hadoop-common
org.apache.hadoop
guava
com.google.guava
hadoop-annotations
org.apache.hadoop
hadoop-yarn-common
org.apache.hadoop
slf4j-log4j12
org.slf4j
org.apache.hadoop
hadoop-common
2.7.3
commons-logging
commons-logging
curator-client
org.apache.curator
jackson-mapper-asl
org.codehaus.jackson
jackson-core-asl
org.codehaus.jackson
log4j
log4j
snappy-java
org.xerial.snappy
zookeeper
org.apache.zookeeper
guava
com.google.guava
hadoop-auth
org.apache.hadoop
commons-lang
commons-lang
slf4j-log4j12
org.slf4j
servlet-api
javax.servlet
org.apache.hadoop
hadoop-mapreduce-examples
2.7.3
commons-logging
commons-logging
netty
io.netty
guava
com.google.guava
log4j
log4j
servlet-api
javax.servlet
org.apache.storm
storm-core
${storm.version}
${provided.scope}
org.apache.logging.log4j
log4j-slf4j-impl
servlet-api
javax.servlet
org.apache.storm
storm-kafka
1.1.1
kafka-clients
org.apache.kafka
그 중에서 jar 가방 을 제거 하 는 것 은 프로젝트 구축 의존 이 얼마나 되 는 지 에 대한 질문 입 니 다. storm 버 전 은 1.1.0 spring boot 관련 의존
org.springframework.boot
spring-boot-starter
org.springframework.boot
spring-boot-starter-logging
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-aop
org.springframework.boot
spring-boot-starter-test
test
org.springframework.boot
spring-boot-starter-log4j2
org.mybatis.spring.boot
mybatis-spring-boot-starter
${mybatis-spring.version}
org.springframework.boot
spring-boot-configuration-processor
true
입 니 다.ps: maven 의 jar 패 키 지 는 프로젝트 사용 수요 때문에 가장 간단 하지 않 습 니 다. 참고 하 시기 바 랍 니 다. 프로젝트 구조: config - 서로 다른 환경 설정 파일 저장 spring boot 를 시작 할 때 저희 가 발견 할 수 있 습 니 다.
/**
* @author Leezer
* @date 2017/12/28
* spring Topology
**/
@Configuration
@Component
public class AutoLoad implements ApplicationListener<ContextRefreshedEvent> {
private static String BROKERZKSTR;
private static String TOPIC;
private static String HOST;
private static String PORT;
public AutoLoad(@Value("${storm.brokerZkstr}") String brokerZkstr,
@Value("${zookeeper.host}") String host,
@Value("${zookeeper.port}") String port,
@Value("${kafka.default-topic}") String topic
){
BROKERZKSTR = brokerZkstr;
HOST= host;
TOPIC= topic;
PORT= port;
}
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
try {
// topologyBuilder 。
TopologyBuilder topologyBuilder = new TopologyBuilder();
// , 。
BrokerHosts brokerHosts = new ZkHosts(BROKERZKSTR);
// Kafka Topic, zookeeper
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, TOPIC, "/storm", "s32");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.zkServers = Collections.singletonList(HOST);
spoutConfig.zkPort = Integer.parseInt(PORT);
// Kafka
spoutConfig.startOffsetTime = OffsetRequest.LatestTime();
KafkaSpout receiver = new KafkaSpout(spoutConfig);
topologyBuilder.setSpout("kafka-spout", receiver, 1).setNumTasks(2);
topologyBuilder.setBolt("alarm-bolt", new AlarmBolt(), 1).setNumTasks(2).shuffleGrouping("kafka-spout");
Config config = new Config();
config.setDebug(false);
/* topology storm slot , slot supervisor worker , spot worker , , topology 2 worker , 4 topology , topology 。 kill topology slot topology 。
*/
config.setNumWorkers(1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("kafka-spout", config, topologyBuilder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
}
}
[Tomcat-startStop-1] ERROR o.a.c.c.ContainerBase - A child container failed during start
java.util.concurrent.ExecutionException: org.apache.catalina.LifecycleException: Failed to start component [StandardEngine[Tomcat].StandardHost[localhost].TomcatEmbeddedContext[]]
at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_144]
at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_144]
at org.apache.catalina.core.ContainerBase.startInternal(ContainerBase.java:939) [tomcat-embed-core-8.5.23.jar:8.5.23]
at org.apache.catalina.core.StandardHost.startInternal(StandardHost.java:872) [tomcat-embed-core-8.5.23.jar:8.5.23]
at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150) [tomcat-embed-core-8.5.23.jar:8.5.23]
at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1419) [tomcat-embed-core-8.5.23.jar:8.5.23]
at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1409) [tomcat-embed-core-8.5.23.jar:8.5.23]
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [?:1.8.0_144]
at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_144]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_144]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_144]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144]
이 는 가 져 온 jar 패키지 가 servlet - api 버 전 을 내장 버 전보 다 낮 게 도 입 했 기 때 문 입 니 다. 우리 가 해 야 할 일 은 Maven 의존 을 열 어 제거 하 는 것 입 니 다.
servlet-api
javax.servlet
그리고 다시 시작 하면 돼.
org.apache.storm.utils.NimbusLeaderNotFoundException: Could not find leader nimbus from seed hosts [localhost]. Did you specify a valid list of nimbus hosts for config nimbus.seeds?at org.apache.storm.utils.NimbusClient.getConfiguredClientAs(NimbusClient.java:90
이 문 제 를 오래 생각 했 는데 인터넷 의 해석 이 모두 storm 설정 문제 로 인해 잘못 되 었 다 는 것 을 알 게 되 었 습 니 다. 그러나 제 storm 은 서버 에 배 치 된 것 입 니 다. 관련 설정 이 없고 이치 에 따라 서비스 기 에서 관련 설정 을 읽 어야 합 니 다. 그러나 결 과 는 그렇지 않 습 니 다.마지막 으로 몇 가지 방법 을 시 도 했 는데 모두 틀 렸 다 는 것 을 알 게 되 었 습 니 다. 클 러 스 터 를 구축 할 때 storm 은 해당 하 는 로 컬 클 러 스 터 LocalCluster cluster = new LocalCluster();
를 제공 하여 로 컬 테스트 를 했 습 니 다. 로 컬 테스트 에서 배치 테스트 를 하면 서버 에 배치 하려 면 다음 과 같은 작업 을 제출 해 야 합 니 다. cluster.submitTopology("kafka-spout", config, topologyBuilder.createTopology());
// :
StormSubmitter.submitTopology("kafka-spout", config, topologyBuilder.createTopology());
위 에서 언급 한 문제 1 - 3 문 제 를 해결 하 였 습 니 다. 4: bolt 에서 관련 bean 인 스 턴 스 를 사용 하 는 것 입 니 다. 저 는 @ Component 를 사용 하여 spring 에 가입 해도 인 스 턴 스 를 얻 을 수 없다 는 것 을 알 게 되 었 습 니 다. 제 추측 은 우리 가 Topolgy 를 구축 할 때 topologyBuilder.setBolt("alarm-bolt",new AlarmBolt(),1).setNumTasks(2).shuffleGrouping("kafka-spout");
bolt 를 실행 하 는 것 입 니 다. @Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
StormLauncher stormLauncher = StormLauncher.getStormLauncher();
dataRepositorys =(AlarmDataRepositorys) stormLauncher.getBean("alarmdataRepositorys");
}
bolt 를 예화 하지 않 고 스 레 드 가 다 르 기 때문에 spring 을 얻 지 못 할 것 입 니 다.(여기 서 저도 잘 모 르 겠 습 니 다. 만약 에 큰 사람 이 한 파 를 공유 할 수 있다 는 것 을 알 게 된다 면) 우리 가 spring boot 를 사용 하 는 의 미 는 바로 이런 복잡 한 대상 을 얻 는 데 있 습 니 다. 이 문 제 는 저 를 오랫동안 괴 롭 혔 습 니 다. 마지막 으로 우 리 는 문맥 getbean 을 통 해 인 스 턴 스 를 얻 을 수 있 을 지 모 르 겠 습 니 다. 그리고 저 는 정 의 를 내 렸 습 니 다. 예 를 들 어 저 는 bolt 에서 서 비 스 를 사용 해 야 합 니 다.여기 서 제 가 이 bean 의 이름 을 지정 하면 bolt 에서 prepare 를 실행 할 때 getbean 방법 으로 관련 bean 을 얻 으 면 해당 하 는 작업 을 완성 할 수 있 습 니 다. /**
* @author Leezer
* @date 2017/12/27
*
**/
@Service("alarmdataRepositorys")
public class AlarmDataRepositorys extends RedisBase implements IAlarmDataRepositorys {
private static final String ERRO = "erro";
/**
* @param type
* @param key key
* @return
**/
@Override
public String getErrNumFromRedis(String type,String key) {
if(type==null || key == null){
return null;
}else {
ValueOperations valueOper = primaryStringRedisTemplate.opsForValue();
return valueOper.get(String.format("%s:%s:%s",ERRO,type,key));
}
}
/**
* @param type
* @param key key
* @param value
**/
@Override
public void setErrNumToRedis(String type, String key,String value) {
try {
ValueOperations valueOper = primaryStringRedisTemplate.opsForValue();
valueOper.set(String.format("%s:%s:%s", ERRO,type, key), value, Dictionaries.ApiKeyDayOfLifeCycle, TimeUnit.SECONDS);
}catch (Exception e){
logger.info(Dictionaries.REDIS_ERROR_PREFIX+String.format("key %s redis ",key));
}
}
이 통합 storm 과 kafka 에서 spring boot 까지 끝 났 습 니 다. kafka 및 기타 설정 은 github 에 넣 겠 습 니 다.
참, 여기 카 프 카 클 라 이언 트 구덩이 가 하나 더 있어 요.
@SpringBootApplication
@EnableTransactionManagement
@ComponentScan({"service","storm"})
@EnableMongoRepositories(basePackages = {"storm"})
@PropertySource(value = {"classpath:service.properties", "classpath:application.properties","classpath:storm.properties"})
@ImportResource(locations = {
"classpath:/configs/spring-hadoop.xml",
"classpath:/configs/spring-hbase.xml"})
public class StormLauncher extends SpringBootServletInitializer {
// launcher
private volatile static StormLauncher stormLauncher;
//
private ApplicationContext context;
public static void main(String[] args) {
SpringApplicationBuilder application = new SpringApplicationBuilder(StormLauncher.class);
// application.web(false).run(args); spring boot web
application.run(args);
StormLauncher s = new StormLauncher();
s.setApplicationContext(application.context());
setStormLauncher(s);
}
private static void setStormLauncher(StormLauncher stormLauncher) {
StormLauncher.stormLauncher = stormLauncher;
}
public static StormLauncher getStormLauncher() {
return stormLauncher;
}
@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
return application.sources(StormLauncher.class);
}
/**
*
*
* @return the application context
*/
public ApplicationContext getApplicationContext() {
return context;
}
/**
* .
*
* @param appContext
*/
private void setApplicationContext(ApplicationContext appContext) {
this.context = appContext;
}
/**
* name Bean.
*
* @param name the name
* @return the bean
*/
public Object getBean(String name) {
return context.getBean(name);
}
/**
* class Bean.
*
* @param the type parameter
* @param clazz the clazz
* @return the bean
*/
public T getBean(Class clazz) {
return context.getBean(clazz);
}
/**
* name, Clazz Bean
*
* @param the type parameter
* @param name the name
* @param clazz the clazz
* @return the bean
*/
public T getBean(String name, Class clazz) {
return context.getBean(name, clazz);
}
프로젝트 는 kafka client 문 제 를 보고 할 것 입 니 다. 이것 은 storm - kafka 에서 kafka 는 0.8 버 전 을 사 용 했 고 NetworkSend 는 0.9 이상 의 버 전 입 니 다. 여기 서 집성 은 당신 이 집성 한 kafka 관련 버 전과 일치 해 야 합 니 다.
집적 은 비교적 간단 하지만 참고 가 비교적 적다. 게다가 처음에 storm 을 접 했 기 때문에 생각 이 비교적 많 고 여기에 기록 해 보 자.
프로젝트 주소 - github
참고 문헌: springboot - storm - integration
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Keycloak이 Active Directory에 등록된 사용자로 인증할 수 있도록 합니다.사내 시스템을 출시함에 있어서, 전회사에서는 Web시스템마다 로그인하고 있어 혐오가 있었으므로, 꼭 싱글 사인온으로 하고 싶다고 생각했다. 그 실현에, 옛날 조금만 평가한 OpenAM라든지의 정보를 구구어 낚시하기 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.