Apache Storm 의 설치, 설정 및 입문 기초 (3): 간단 한 topology

10363 단어 빅 데이터
$HOME / storm 디 렉 터 리 에 examples 디 렉 터 리 가 있 습 니 다. 이 디 렉 터 리 는 많은 프 리 젠 테 이 션 examples 가 있 습 니 다. 특히 storm - starter 디 렉 터 리 에서 이 버 전 은 다른 버 전 을 호 환 합 니 다. 1.1.0 은 많은 내용 을 추 가 했 고 가방 이름 도 org. apache. storm 으로 바 뀌 었 습 니 다.
       다음 사례 보기: examples / storm - starter / src / jvm / org / apache / storm / starter / ExclamationTopology. 자바
      다음은 전체 소스 코드 입 니 다.
* Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.storm.starter;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.testing.TestWordSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;

/**
 * This is a basic example of a Storm topology.
 */
public class ExclamationTopology {

  public static class ExclamationBolt extends BaseRichBolt {
    OutputCollector _collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      _collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
      _collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
    }


  }

  public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("word", new TestWordSpout(), 10);
    builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
    builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");

    Config conf = new Config();
    conf.setDebug(true);

    if (args != null && args.length > 0) {
      conf.setNumWorkers(3);

      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
    }
    else {

      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("test", conf, builder.createTopology());
      Utils.sleep(10000);
      cluster.killTopology("test");
      cluster.shutdown();
    }
  }
}
   ExclamationTopology  정의:
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("words", new TestWordSpout(), 10); builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("words"); builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
     토 폴로 지 를 정의 하 다
     두 번 째 말: Spout 설정.Spout 단어 생 성,
     세 번 째 말: Bolt 설정.단어 가 생기 는 기초 위 에!!!
    네 번 째 말: Bolt 설정.위 에 계속!!
    하면, 만약, 만약... spout 생 성 tuples ["bob"] 와 ["john"], 첫 번 째 Bolt 생 성 ["bob!!!!"] 과 ["john!!!!!!"], 두 번 째 bolt 생 성 ["bob!!!!!!!!"] 과 ["john!!!!!!!!!!!!!!!"]
노드 정의
      setSpout, setBolt, 그 중에서 id 는 위의 예 에서 다음 과 같이 정의 되 었 습 니 다. words,  exclaim1 , exclaim2
이원 의 인터페이스
        org.apache.storm.topology.TopologyBuilder.setSpout(String, IRichSpout) :SpoutDeclarer         org.apache.storm.topology.TopologyBuilder.setSpout(String, IRichSpout, Number):SpoutDeclarer
       org.apache.storm.topology.TopologyBuilder.setBolt(String, IRichBolt):BoltDeclarer        org.apache.storm.topology.TopologyBuilder.setBolt(String, IRichBolt, Number):BoltDeclarer
        spout 의 처리 인 터 페 이 스 는 IRichSpout 인터페이스 입 니 다. bolt 의 처리 인 터 페 이 스 는 IRichBolt 인터페이스 입 니 다.마지막 으로 지정 한 10, 3, 2 는 모두 병행 도로 선택 할 수 있 습 니 다.
3. InputDeclarer 대상
    setBolt 는 input Declare 대상 을 되 돌려 줍 니 다. 이 대상 은 Bolt 에 입력 할 수 있 습 니 다.
    예 를 들 어 ID exclaim 1 은 무 작위 그룹 (shuffleGrouping) 으로 구성 요소 ID words 가 만 든 모든 원본 그룹 을 읽 고 싶다 고 주장 합 니 다.
   ID exclaim 2 는 구성 요소 ID exclaim 1 에서 생 성 된 모든 원 그룹 을 무 작위 로 그룹 으로 나 누 어 설명 합 니 다.
   무 작위 그룹 (shufflegrouping) 은 입력 작업 부터 bolt 작업 까지 원 그룹 이 무 작위 로 분 포 될 수 있다 는 뜻 입 니 다.무 작위 그룹 (Shuffle Grouping) 은 가장 자주 사용 하 는 스 트림 그룹 방식 으로 볼트 에 원 조 를 무 작위 로 나 누 어 주 는 작업 입 니 다. 이렇게 하면 모든 작업 이 같은 수량의 원 조 를 얻 을 수 있 습 니 다.각 ID 에 그룹 을 나 눌 수 있 는 방법 이 많 고 나중에 계속 설명 할 것 입 니 다.
     따라서 구성 요소 ID exclaim 2 가 구성 요소 ID words 와 exclaim 1 에서 발생 하 는 모든 원 그룹 을 읽 을 수 있다 면 다음 과 같이 간단하게 정의 해 야 합 니 다.
   
builder.setBolt("exclaim2", new ExclamationBolt(), 5)
            .shuffleGrouping("words")
            .shuffleGrouping("exclaim1");

     이것 은 실현 하기에 매우 직접적 이 고 간편 하 다.
4. ExclamationBolt 실현
       ExclamationBolt 는 입력 한 원 그룹 뒤에 세 개의 느낌표 "!!!" 를 추가 하여 다음 과 같이 완전 하 게 실현 합 니 다.
public static class ExclamationBolt extends BaseRichBolt {
    OutputCollector _collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      _collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
      _collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
    }


  }
    ExclamationBolt 는 기본 베이스 리 치 볼트 를 이 어 받 았 습 니 다.
    prepare 방법 은 OutputCollector 류 로 bolt 를 제공 하고 OutputCollector 류 는 이 Bolt 에서 원 조 를 만 드 는 데 사 용 됩 니 다. 예 를 들 어 위 와 같 습 니 다.
 _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
 _collector.ack(tuple); 

   execute 방법 으로new Value 는 새로운 값 을 만 드 는 것 이다.ack () 방법 은 Storm 의 신뢰성 의 상징, 즉 라벨 을 붙 여 잃 어 버 리 지 않 는 다 는 것 이다.
     
declare Output Fields 는 'word' 라 는 필드 의 tuple 을 정의 합 니 다.
5. 로 컬 모드 에서 ExamationTopogy 실행
        Storm 은 당연히 두 가지 운행 모델 이 있 는데 하 나 는 로 컬 운행 모델 이 고 하 나 는 분포 식 모델 이다.
       
       
       로 컬 모드 에서 storm 은 하나의 프로 세 스 안의 스 레 드 로 모든 spout 과 bolt 를 모 의 합 니 다. 로 컬 모드 는 개발 과 테스트 에 유용 합 니 다.
           분포 식 모드 에서 storm 은 한 무더기 의 기계 로 구성 되 어 있다.마스터 에 게 토폴로지 를 제출 할 때, 동시에 토폴로지 에 필요 한 코드 를 제출 합 니 다.master 는 코드 를 나 눠 주 고 topolgoy 에 작업 프로 세 스 를 배정 하 는 일 을 맡 습 니 다.작업 프로 세 스 가 끊 어 지면 master 노드 는 다른 노드 로 재배 치 된다 고 생각 합 니 다.
      로 컬 실행 모드 의 코드:
      
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();

          
우선, 이 코드 정 의 는 LocalCluster 대상 을 정의 함으로써 프로 세 스 내의 집단 을 정의 합 니 다.토폴로지 를 이 가상 클 러 스 터 에 제출 하 는 것 과 토폴로지 를 분포 식 클 러 스 터 에 제출 하 는 것 은 같다.submitTopology 방법 을 호출 하여 토폴로지 를 제출 합 니 다. 세 가지 인 자 를 받 습 니 다. 실행 할 토폴로지 의 이름, 설정 대상 및 실행 할 토폴로지 자체 입 니 다.
토폴로지 의 이름 (여기 가 'test') 은 하나의 토폴로지 를 구별 하 는 유일한 이름 입 니 다. 그러면 이 이름 으로 이 토폴로지 를 죽 일 수 있 습 니 다.앞에서 말 했 듯 이, 너 는 반드시 명시 적 으로 토폴로지 하 나 를 죽여 야 한다. 그렇지 않 으 면 그것 은 계속 운행 할 것 이다.
Conf 대상 은 많은 것 을 설정 할 수 있 습 니 다. 다음 두 가 지 는 가장 흔 한 것 입 니 다.
TOPOLOGY_WORKERS (setNumWorkers) 는 이 토폴로지 를 수행 하기 위해 몇 개의 작업 프로 세 스 를 그룹 으로 할당 하 기 를 원 하 는 지 정의 합 니 다.topology 의 모든 구성 요 소 는 스 레 드 가 필요 합 니 다.각 구성 요소 가 몇 개의 스 레 드 를 사용 하 는 지 는 setBolt 와 setSpout 을 통 해 지정 합 니 다.이 스 레 드 들 은 모두 작업 프로 세 스 에서 실 행 됩 니 다. 모든 작업 프로 세 스 는 일부 노드 의 작업 스 레 드 를 포함 합 니 다.예 를 들 어 300 개의 스 레 드, 60 개의 프로 세 스 를 지정 하면 모든 작업 프로 세 스 에서 6 개의 스 레 드 를 실행 해 야 합 니 다. 이 6 개의 스 레 드 는 서로 다른 구성 요소 (Spout, Bolt) 에 속 할 수 있 습 니 다.모든 구성 요소 의 병행 도와 이 스 레 드 가 있 는 프로 세 스 수 를 조정 하여 토폴로지 의 성능 을 조정 할 수 있 습 니 다
TOPOLOGY_DEBUG (setDebug) 는 true 로 설정 되면 storm 은 모든 구성 요소 가 발사 하 는 모든 메 시 지 를 기록 합 니 다.이것 은 로 컬 환경 에서 토폴로지 를 디 버 깅 하 는 데 매우 유용 하지만, 온라인 상에 서 이렇게 하면 성능 에 영향 을 줄 수 있다
      sleep 은 시간 을 기다 리 는 거 야.
6. 클 러 스 터 모드 운행
       
if (args != null && args.length > 0) {
      conf.setNumWorkers(3);

      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
    이것 은 비교적 간단 합 니 다. workers 수량 을 3 개 로 정의 하고 지정 한 인 자 를 입력 해 야 합 니 다. 바로 Topology 의 이름 입 니 다.
        기본적으로 상기 내용 을 통 해 이 사례 의 대체적인 구성 과 운영 구 조 를 알 수 있다.

좋은 웹페이지 즐겨찾기