storm 흐름 계산 프로 그래 밍 전화 호출 로그 처리
11927 단어 빅 데이터
storm VS hadoop ----------------------------------------------------------- 실시 간 흐름 처리 일괄 처리 무상 태 상태 가 있다 zk 협동 의 마스터 구 조 를 사용 합 니 다. zk 없 는 주종 구조.
초당 수만 소식 처리 HDFS MR 몇 분, 몇 시간
주동 적 으로 멈 추 지 않 는 다 결국 완성 할 때 가 있다.
storm 장점 1. 크로스 언어 2. 신축 가능 3. 저지 연, 초급 / 분 급 4. 잘못 사용 하 다.
핵심 개념 1.Tuple 주요 데이터 구조, 질서 있 는 요소 의 목록 입 니 다. 2.Stream Tuple 의 서열. 3.Spouts 데이터 스 트림 원본.kafka 대기 열 메 시 지 를 읽 을 수 있 습 니 다.사용자 정의 가능. 4.Bolts 머리 를 돌리다. 논리 처리 장치.spout 의 데 이 터 는 bolt, bolt 계산 을 전달 하고 완성 후 새로운 데 이 터 를 생 성 합 니 다.IBolt 는 인터페이스 입 니 다.
Topology : Spout + bolt 가 연결 되 어 top 을 형성 하고 방향 도 를 형성 합 니 다. 고정 점 은 계산 이 고 변 은 데이터 흐름 task 입 니 다. Bolt 에 서 는 모든 Spout 이나 bolt 가 하나의 task 입 니 다.
스 톰 구조 1. 님 버스 (영기) 마스터 노드. 핵심 구성 요소, top 실행. top 를 분석 하고 실행 task 를 수집 합 니 다.슈퍼 바 이 저 에 게 task 를 나 눠 줍 니 다. 감시 탑. 상태 없 이 zk 에 의 해 top 의 운행 상황 을 감시 합 니 다.
2. 감독자 (감찰) 모든 슈퍼 바 이 저 는 n 개의 워 커 프로 세 스 가 있 으 며, 워 커 에 게 대리 태 스 크 를 맡 깁 니 다. worker 는 부화 실행 라인 에서 최종 적 으로 task 를 실행 합 니 다. storm 은 내부 메시지 시스템 을 사용 하여 Nimbus 와 슈퍼 visor 사이 에서 통신 을 합 니 다.
nimbus 명령 을 받 아들 여 worker 프로 세 스 를 관리 하여 task 발송 을 완료 합 니 다.
3.worker 특정한 task 를 수행 합 니 다. worker 자 체 는 임 무 를 수행 하지 않 고 executors 를 부화 시 킵 니 다. executors 로 하여 금 task 를 실행 하 게 하 다. 4.Executor 본질 적 으로 워 커 프로 세 스 가 부화 한 스 레 드 일 뿐 입 니 다. executor 실행 task 는 모두 같은 spout 또는 bolt 에 속 합 니 다. 5.task 실제 임 무 를 수행 하 다.아니면 Spout 아니면 bolt.
storm 작업 절차 1. nimbus 제출 대기 top 2. top 제출 후, nimbus 수집 task, 3. nimbus 는 task 를 사용 가능 한 모든 슈퍼 바 이 저 에 게 나 누 어 줍 니 다. 4. 슈퍼 바 이 저 는 nimbus 에 게 주기 적 으로 심장 박동 을 보 내 자신 이 살아 있다 는 것 을 나타 낸다. 5. 슈퍼 바 이 저 를 끊 으 면 nimubs 에 게 심장 박동 을 보 내지 않 고, nimbus 는 task 를 다른 슈퍼 바 이 저 에 게 보 냅 니 다. 6. Nimubs 가 끊 으 면 슈퍼 는 자신의 task 를 계속 수행 합 니 다. 7. task 완료 후 슈퍼 바 이 저 는 새로운 task 를 기다 리 고 있 습 니 다. 8. 동시에 끊 긴 nimbus 는 모니터링 도구 소프트웨어 를 통 해 자동 으로 재 부팅 할 수 있 습 니 다.
storm 군집 설치 [datanode1 ~ datanode4] 1.jdk 2.tar 3. 환경 변수 4. 설치 검증 $>source /etc/profile $>./storm version 5. 설치 파일 을 다른 노드 에 나 누 어 줍 니 다. 6. 설정 [storm/conf/storm.yaml] storm.local.dir: "/soft/storm" storm.zookeeper.servers: - "datanode2" - "datanode3"
storm.zookeeper.port: 2181
### nimbus.* configs are for the master nimbus.seeds : ["datanode1"]
### ui.* configs are for the master ui.host: 0.0.0.0 ui.port: 8080
supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 7. datanode 2 ~ datanode 4 에 배포
8. 프로 세 스 시작 a) datanode 1 nimbus 프로 세 스 시작 $>storm nimbus &
b) datanode 2 시작 ~ datanode 4 관리자 프로 세 스 $>storm supervisor & c) datanode 1 ui 프로 세 스 시작 $>storm ui & 9. 웹 ui 로 보기 http://datanode1:8080/
프로 그래 밍 구현 CallLog 로그 통계:
pom.xml
4.0.0
com.jr
StormDemo
1.0-SNAPSHOT
org.apache.storm
storm-core
1.0.3
CallLogSpout :
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
/**
* Spout ,
*/
public class CallLogSpout implements IRichSpout{
//Spout
private SpoutOutputCollector collector;
//
private boolean completed = false;
//
private TopologyContext context;
//
private Random randomGenerator = new Random();
//
private Integer idx = 0;
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.context = context;
this.collector = collector;
}
public void close() {
}
public void activate() {
}
public void deactivate() {
}
/**
*
*/
public void nextTuple() {
if (this.idx <= 1000) {
List mobileNumbers = new ArrayList();
mobileNumbers.add("1234123401");
mobileNumbers.add("1234123402");
mobileNumbers.add("1234123403");
mobileNumbers.add("1234123404");
Integer localIdx = 0;
while (localIdx++ < 100 && this.idx++ < 1000) {
//
String caller = mobileNumbers.get(randomGenerator.nextInt(4));
//
String callee = mobileNumbers.get(randomGenerator.nextInt(4));
while (caller == callee) {
//
callee = mobileNumbers.get(randomGenerator.nextInt(4));
}
//
Integer duration = randomGenerator.nextInt(60);
//
this.collector.emit(new Values(caller, callee, duration));
}
}
}
public void ack(Object msgId) {
}
public void fail(Object msgId) {
}
/**
*
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("from", "to", "duration"));
}
public Map getComponentConfiguration() {
return null;
}
}
CreatorBolt:
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
/**
* CallLog Bolt
*/
public class CallLogCreatorBolt implements IRichBolt {
//
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector ;
}
public void execute(Tuple tuple) {
//
String from = tuple.getString(0);
String to = tuple.getString(1);
Integer duration = tuple.getInteger(2);
// tuple
collector.emit(new Values(from + " - " + to, duration));
}
public void cleanup() {
}
/**
*
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("call", "duration"));
}
public Map getComponentConfiguration() {
return null;
}
}
CounterBolt 만 들 기
package com.it18zhang.stormdemo;
import org.apache.storm.task.IBolt;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import java.util.HashMap;
import java.util.Map;
/**
* Bolt
*/
public class CallLogCounterBolt implements IRichBolt{
Map counterMap;
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.counterMap = new HashMap();
this.collector = collector;
}
public void execute(Tuple tuple) {
String call = tuple.getString(0);
Integer duration = tuple.getInteger(1);
if (!counterMap.containsKey(call)) {
counterMap.put(call, 1);
} else {
Integer c = counterMap.get(call) + 1;
counterMap.put(call, c);
}
collector.ack(tuple);
}
public void cleanup() {
for (Map.Entry entry : counterMap.entrySet()) {
System.out.println(entry.getKey() + " : " + entry.getValue());
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("call"));
}
public Map getComponentConfiguration() {
return null;
}
}
4. App 호출 클래스 구현
package com.it18zhang.stormdemo;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
/**
* App
*/
public class App {
public static void main(String[] args) throws InterruptedException {
TopologyBuilder builder = new TopologyBuilder();
// Spout
builder.setSpout("spout", new CallLogSpout());
// creator-Bolt
builder.setBolt("creator-bolt", new CallLogCreatorBolt()).shuffleGrouping("spout");
// counter-Bolt
builder.setBolt("counter-bolt", new CallLogCounterBolt()).fieldsGrouping("creator-bolt", new Fields("call"));
Config conf = new Config();
conf.setDebug(true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LogAnalyserStorm", conf, builder.createTopology());
Thread.sleep(10000);
//
cluster.shutdown();
}
}
IDEA 에서 실행 하면 프로그램 실행 결 과 를 볼 수 있 습 니 다.
jar 패키지 로 만들어 서 클 러 스 터 에 넣 고 명령 을 실행 할 수도 있 습 니 다.
우선 제출 방식 을 수정 하 겠 습 니 다. [App.java] public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); //Spout 설정 builder.setSpout("spout", new CallLogSpout()); //크 리 에이 터 볼트 설정 builder.setBolt("creator-bolt", new CallLogCreatorBolt()).shuffleGrouping("spout"); //카운터 볼트 설정 builder.setBolt("counter-bolt", new CallLogCounterBolt()).fieldsGrouping("creator-bolt", new Fields("call"));
Config conf = new Config(); conf.setDebug(true);
/** * 로 컬 모드 storm */ // LocalCluster cluster = new LocalCluster(); // cluster.submitTopology("LogAnalyserStorm", conf, builder.createTopology()); // Thread.sleep(10000); StormSubmitter.submitTopology("mytop", conf, builder.createTopology()); } b) jar 가방 가 져 오기. maven projects - > package 오른쪽 단 추 를 누 르 면 run stormdemo 를 누 르 고 프로젝트 의 target 아래 에 jar 패키지 가 생 성 되 어 jar 를 서버 에 넣 습 니 다. c) centos 에서 top 실행 $>storm jar xxx.jar com.jr.stormdemo.App
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
spark 의 2: 원리 소개Google Map/Reduce 를 바탕 으로 이 루어 진 Hadoop 은 개발 자 에 게 map, reduce 원 어 를 제공 하여 병렬 일괄 처리 프로그램 을 매우 간단 하고 아름 답 게 만 들 었 습 니 다.S...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.