storm 흐름 계산 프로 그래 밍 전화 호출 로그 처리

11927 단어 빅 데이터
storm 은 무료, 개원, 분포 식, 실시 간 계산 시스템 이다.물동량 이 높다.초당 노드 당 백만 원 그룹.
      storm                           VS                        hadoop -----------------------------------------------------------     실시 간 흐름 처리                                                  일괄 처리    무상 태                                                         상태 가 있다    zk 협동 의 마스터 구 조 를 사용 합 니 다.                                zk 없 는 주종 구조.  
    초당 수만 소식 처리                                       HDFS MR 몇 분, 몇 시간
    주동 적 으로 멈 추 지 않 는 다                                               결국 완성 할 때 가 있다.
 
storm 장점    1. 크로스 언어       2. 신축 가능     3. 저지 연, 초급 / 분 급     4. 잘못 사용 하 다.
핵심 개념    1.Tuple           주요 데이터 구조, 질서 있 는 요소 의 목록 입 니 다.    2.Stream        Tuple 의 서열.    3.Spouts         데이터 스 트림 원본.kafka 대기 열 메 시 지 를 읽 을 수 있 습 니 다.사용자 정의 가능.    4.Bolts            머리 를 돌리다. 논리 처리 장치.spout 의 데 이 터 는 bolt, bolt 계산 을 전달 하고 완성 후 새로운 데 이 터 를 생 성 합 니 다.IBolt 는 인터페이스 입 니 다.
Topology  :           Spout + bolt 가 연결 되 어 top 을 형성 하고 방향 도 를 형성 합 니 다. 고정 점 은 계산 이 고 변 은 데이터 흐름 task 입 니 다.    Bolt 에 서 는 모든 Spout 이나 bolt 가 하나의 task 입 니 다.
스 톰 구조    1. 님 버스 (영기)        마스터 노드.        핵심 구성 요소, top 실행.        top 를 분석 하고 실행 task 를 수집 합 니 다.슈퍼 바 이 저 에 게 task 를 나 눠 줍 니 다.        감시 탑.        상태 없 이 zk 에 의 해 top 의 운행 상황 을 감시 합 니 다.
    2. 감독자 (감찰)        모든 슈퍼 바 이 저 는 n 개의 워 커 프로 세 스 가 있 으 며, 워 커 에 게 대리 태 스 크 를 맡 깁 니 다.        worker 는 부화 실행 라인 에서 최종 적 으로 task 를 실행 합 니 다.        storm 은 내부 메시지 시스템 을 사용 하여 Nimbus 와 슈퍼 visor 사이 에서 통신 을 합 니 다.
        nimbus 명령 을 받 아들 여 worker 프로 세 스 를 관리 하여 task 발송 을 완료 합 니 다.
    3.worker         특정한 task 를 수행 합 니 다. worker 자 체 는 임 무 를 수행 하지 않 고 executors 를 부화 시 킵 니 다.        executors 로 하여 금 task 를 실행 하 게 하 다.         4.Executor         본질 적 으로 워 커 프로 세 스 가 부화 한 스 레 드 일 뿐 입 니 다.        executor 실행 task 는 모두 같은 spout 또는 bolt 에 속 합 니 다.         5.task         실제 임 무 를 수행 하 다.아니면 Spout 아니면 bolt.
     storm 작업 절차    1. nimbus 제출 대기 top    2. top 제출 후, nimbus 수집 task,    3. nimbus 는 task 를 사용 가능 한 모든 슈퍼 바 이 저 에 게 나 누 어 줍 니 다.    4. 슈퍼 바 이 저 는 nimbus 에 게 주기 적 으로 심장 박동 을 보 내 자신 이 살아 있다 는 것 을 나타 낸다.    5. 슈퍼 바 이 저 를 끊 으 면 nimubs 에 게 심장 박동 을 보 내지 않 고, nimbus 는 task 를 다른 슈퍼 바 이 저 에 게 보 냅 니 다.    6. Nimubs 가 끊 으 면 슈퍼 는 자신의 task 를 계속 수행 합 니 다.    7. task 완료 후 슈퍼 바 이 저 는 새로운 task 를 기다 리 고 있 습 니 다.    8. 동시에 끊 긴 nimbus 는 모니터링 도구 소프트웨어 를 통 해 자동 으로 재 부팅 할 수 있 습 니 다.
 
storm 군집 설치    [datanode1 ~ datanode4]     1.jdk     2.tar     3. 환경 변수    4. 설치 검증        $>source /etc/profile         $>./storm version     5. 설치 파일 을 다른 노드 에 나 누 어 줍 니 다.           6. 설정        [storm/conf/storm.yaml]         storm.local.dir: "/soft/storm"         storm.zookeeper.servers:             - "datanode2"             - "datanode3"
        storm.zookeeper.port: 2181
        ### nimbus.* configs are for the master         nimbus.seeds : ["datanode1"]
        ### ui.* configs are for the master         ui.host: 0.0.0.0         ui.port: 8080
        supervisor.slots.ports:             - 6700             - 6701             - 6702             - 6703     7. datanode 2 ~ datanode 4 에 배포
    8. 프로 세 스 시작        a) datanode 1 nimbus 프로 세 스 시작            $>storm nimbus &
        b) datanode 2 시작 ~ datanode 4 관리자 프로 세 스            $>storm supervisor &           c) datanode 1 ui 프로 세 스 시작            $>storm ui &          9. 웹 ui 로 보기        http://datanode1:8080/
 
프로 그래 밍 구현 CallLog 로그 통계:
pom.xml       
 
        
            4.0.0

            com.jr
            StormDemo
            1.0-SNAPSHOT

            
                
                    org.apache.storm
                    storm-core
                    1.0.3
                
            
            
        


CallLogSpout :
		import org.apache.storm.spout.SpoutOutputCollector;
		import org.apache.storm.task.TopologyContext;
		import org.apache.storm.topology.IRichSpout;
		import org.apache.storm.topology.OutputFieldsDeclarer;
		import org.apache.storm.tuple.Fields;
		import org.apache.storm.tuple.Values;

		import java.util.ArrayList;
		import java.util.List;
		import java.util.Map;
		import java.util.Random;

		/**
		 * Spout ,       
		 */
		public class CallLogSpout implements IRichSpout{

			//Spout     
			private SpoutOutputCollector collector;

			//    
			private boolean completed = false;

			//   
			private TopologyContext context;

			//     
			private Random randomGenerator = new Random();

			//
			private Integer idx = 0;

			public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
				this.context = context;
				this.collector = collector;
			}

			public void close() {
			}

			public void activate() {
			}

			public void deactivate() {

			}

			/**
			 *      
			 */
			public void nextTuple() {
				if (this.idx <= 1000) {
					List mobileNumbers = new ArrayList();
					mobileNumbers.add("1234123401");
					mobileNumbers.add("1234123402");
					mobileNumbers.add("1234123403");
					mobileNumbers.add("1234123404");

					Integer localIdx = 0;
					while (localIdx++ < 100 && this.idx++ < 1000) {
						//    
						String caller = mobileNumbers.get(randomGenerator.nextInt(4));
						//    
						String callee = mobileNumbers.get(randomGenerator.nextInt(4));
						while (caller == callee) {
							//      
							callee = mobileNumbers.get(randomGenerator.nextInt(4));
						}
						//      
						Integer duration = randomGenerator.nextInt(60);

						//    
						this.collector.emit(new Values(caller, callee, duration));
					}
				}
			}

			public void ack(Object msgId) {

			}

			public void fail(Object msgId) {

			}

			/**
			 *          
			 */
			public void declareOutputFields(OutputFieldsDeclarer declarer) {
				declarer.declare(new Fields("from", "to", "duration"));
			}

			public Map getComponentConfiguration() {
				return null;
			}
		}

CreatorBolt:

		import org.apache.storm.task.OutputCollector;
		import org.apache.storm.task.TopologyContext;
		import org.apache.storm.topology.IRichBolt;
		import org.apache.storm.topology.OutputFieldsDeclarer;
		import org.apache.storm.tuple.Fields;
		import org.apache.storm.tuple.Tuple;
		import org.apache.storm.tuple.Values;

		import java.util.Map;

		/**
		 *   CallLog   Bolt
		 */
		public class CallLogCreatorBolt implements IRichBolt {
			//
			private OutputCollector collector;

			public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
				this.collector = collector ;
			}

			public void execute(Tuple tuple) {
				//      
				String from = tuple.getString(0);
				String to = tuple.getString(1);
				Integer duration = tuple.getInteger(2);
				//    tuple
				collector.emit(new Values(from + " - " + to, duration));
			}

			public void cleanup() {

			}

			/**
			 *          
			 */
			public void declareOutputFields(OutputFieldsDeclarer declarer) {
				declarer.declare(new Fields("call", "duration"));
			}

			public Map getComponentConfiguration() {
				return null;
			}
		}

CounterBolt 만 들 기
package com.it18zhang.stormdemo;

		import org.apache.storm.task.IBolt;
		import org.apache.storm.task.OutputCollector;
		import org.apache.storm.task.TopologyContext;
		import org.apache.storm.topology.IRichBolt;
		import org.apache.storm.topology.OutputFieldsDeclarer;
		import org.apache.storm.tuple.Fields;
		import org.apache.storm.tuple.Tuple;

		import java.util.HashMap;
		import java.util.Map;

		/**
		 *        Bolt
		 */
		public class CallLogCounterBolt implements IRichBolt{

			Map counterMap;
			private OutputCollector collector;

			public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
				this.counterMap = new HashMap();
				this.collector = collector;
			}

			public void execute(Tuple tuple) {
				String call = tuple.getString(0);
				Integer duration = tuple.getInteger(1);

				if (!counterMap.containsKey(call)) {
					counterMap.put(call, 1);
				} else {
					Integer c = counterMap.get(call) + 1;
					counterMap.put(call, c);
				}
				collector.ack(tuple);
			}

			public void cleanup() {
				for (Map.Entry entry : counterMap.entrySet()) {
					System.out.println(entry.getKey() + " : " + entry.getValue());
				}
			}

			public void declareOutputFields(OutputFieldsDeclarer declarer) {
				declarer.declare(new Fields("call"));
			}

			public Map getComponentConfiguration() {
				return null;
			}
		}

4. App 호출 클래스 구현
package com.it18zhang.stormdemo;

		import org.apache.storm.Config;
		import org.apache.storm.LocalCluster;
		import org.apache.storm.topology.TopologyBuilder;
		import org.apache.storm.tuple.Fields;

		/**
		 * App
		 */
		public class App {
			public static void main(String[] args) throws InterruptedException {
				TopologyBuilder builder = new TopologyBuilder();
				//  Spout
				builder.setSpout("spout", new CallLogSpout());
				//  creator-Bolt
				builder.setBolt("creator-bolt", new CallLogCreatorBolt()).shuffleGrouping("spout");
				//  counter-Bolt
				builder.setBolt("counter-bolt", new CallLogCounterBolt()).fieldsGrouping("creator-bolt", new Fields("call"));

				Config conf = new Config();
				conf.setDebug(true);

				LocalCluster cluster = new LocalCluster();
				cluster.submitTopology("LogAnalyserStorm", conf, builder.createTopology());
				Thread.sleep(10000);

				//    
				cluster.shutdown();
			}
		}

IDEA 에서 실행 하면 프로그램 실행 결 과 를 볼 수 있 습 니 다.
jar 패키지 로 만들어 서 클 러 스 터 에 넣 고 명령 을 실행 할 수도 있 습 니 다.
우선 제출 방식 을 수정 하 겠 습 니 다.            [App.java]              public static void main(String[] args) throws Exception {                     TopologyBuilder builder = new TopologyBuilder();                     //Spout 설정                    builder.setSpout("spout", new CallLogSpout());                     //크 리 에이 터 볼트 설정                    builder.setBolt("creator-bolt", new CallLogCreatorBolt()).shuffleGrouping("spout");                     //카운터 볼트 설정                    builder.setBolt("counter-bolt", new CallLogCounterBolt()).fieldsGrouping("creator-bolt", new Fields("call"));
                    Config conf = new Config();                     conf.setDebug(true);
                    /**                      * 로 컬 모드 storm                     */             //        LocalCluster cluster = new LocalCluster();             //        cluster.submitTopology("LogAnalyserStorm", conf, builder.createTopology());             //        Thread.sleep(10000);                     StormSubmitter.submitTopology("mytop", conf, builder.createTopology());                 }         b) jar 가방 가 져 오기.            maven projects - > package 오른쪽 단 추 를 누 르 면 run stormdemo 를 누 르 고 프로젝트 의 target 아래 에 jar 패키지 가 생 성 되 어 jar 를 서버 에 넣 습 니 다.                 c) centos 에서 top 실행            $>storm jar xxx.jar com.jr.stormdemo.App

좋은 웹페이지 즐겨찾기