Apache Storm 의 설치, 설정 및 입문 기초 (3): 간단 한 topology
10363 단어 빅 데이터
다음 사례 보기: examples / storm - starter / src / jvm / org / apache / storm / starter / ExclamationTopology. 자바
다음은 전체 소스 코드 입 니 다.
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.starter;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.testing.TestWordSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Map;
/**
* This is a basic example of a Storm topology.
*/
public class ExclamationTopology {
public static class ExclamationBolt extends BaseRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word", new TestWordSpout(), 10);
builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
}
}
}
ExclamationTopology
정의:TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("words", new TestWordSpout(), 10); builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("words"); builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
토 폴로 지 를 정의 하 다두 번 째 말: Spout 설정.Spout 단어 생 성,
세 번 째 말: Bolt 설정.단어 가 생기 는 기초 위 에!!!
네 번 째 말: Bolt 설정.위 에 계속!!
하면, 만약, 만약... spout 생 성 tuples ["bob"] 와 ["john"], 첫 번 째 Bolt 생 성 ["bob!!!!"] 과 ["john!!!!!!"], 두 번 째 bolt 생 성 ["bob!!!!!!!!"] 과 ["john!!!!!!!!!!!!!!!"]
노드 정의
setSpout, setBolt, 그 중에서 id 는 위의 예 에서 다음 과 같이 정의 되 었 습 니 다. words, exclaim1 , exclaim2
이원 의 인터페이스
org.apache.storm.topology.TopologyBuilder.setSpout(String, IRichSpout) :SpoutDeclarer org.apache.storm.topology.TopologyBuilder.setSpout(String, IRichSpout, Number):SpoutDeclarer
org.apache.storm.topology.TopologyBuilder.setBolt(String, IRichBolt):BoltDeclarer org.apache.storm.topology.TopologyBuilder.setBolt(String, IRichBolt, Number):BoltDeclarer
spout 의 처리 인 터 페 이 스 는 IRichSpout 인터페이스 입 니 다. bolt 의 처리 인 터 페 이 스 는 IRichBolt 인터페이스 입 니 다.마지막 으로 지정 한 10, 3, 2 는 모두 병행 도로 선택 할 수 있 습 니 다.
3. InputDeclarer 대상
setBolt 는 input Declare 대상 을 되 돌려 줍 니 다. 이 대상 은 Bolt 에 입력 할 수 있 습 니 다.
예 를 들 어 ID exclaim 1 은 무 작위 그룹 (shuffleGrouping) 으로 구성 요소 ID words 가 만 든 모든 원본 그룹 을 읽 고 싶다 고 주장 합 니 다.
ID exclaim 2 는 구성 요소 ID exclaim 1 에서 생 성 된 모든 원 그룹 을 무 작위 로 그룹 으로 나 누 어 설명 합 니 다.
무 작위 그룹 (shufflegrouping) 은 입력 작업 부터 bolt 작업 까지 원 그룹 이 무 작위 로 분 포 될 수 있다 는 뜻 입 니 다.무 작위 그룹 (Shuffle Grouping) 은 가장 자주 사용 하 는 스 트림 그룹 방식 으로 볼트 에 원 조 를 무 작위 로 나 누 어 주 는 작업 입 니 다. 이렇게 하면 모든 작업 이 같은 수량의 원 조 를 얻 을 수 있 습 니 다.각 ID 에 그룹 을 나 눌 수 있 는 방법 이 많 고 나중에 계속 설명 할 것 입 니 다.
따라서 구성 요소 ID exclaim 2 가 구성 요소 ID words 와 exclaim 1 에서 발생 하 는 모든 원 그룹 을 읽 을 수 있다 면 다음 과 같이 간단하게 정의 해 야 합 니 다.
builder.setBolt("exclaim2", new ExclamationBolt(), 5)
.shuffleGrouping("words")
.shuffleGrouping("exclaim1");
이것 은 실현 하기에 매우 직접적 이 고 간편 하 다.
4. ExclamationBolt 실현
ExclamationBolt 는 입력 한 원 그룹 뒤에 세 개의 느낌표 "!!!" 를 추가 하여 다음 과 같이 완전 하 게 실현 합 니 다.
public static class ExclamationBolt extends BaseRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
ExclamationBolt 는 기본 베이스 리 치 볼트 를 이 어 받 았 습 니 다.prepare 방법 은 OutputCollector 류 로 bolt 를 제공 하고 OutputCollector 류 는 이 Bolt 에서 원 조 를 만 드 는 데 사 용 됩 니 다. 예 를 들 어 위 와 같 습 니 다.
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
execute 방법 으로new Value 는 새로운 값 을 만 드 는 것 이다.ack () 방법 은 Storm 의 신뢰성 의 상징, 즉 라벨 을 붙 여 잃 어 버 리 지 않 는 다 는 것 이다.
declare Output Fields 는 'word' 라 는 필드 의 tuple 을 정의 합 니 다.
5. 로 컬 모드 에서 ExamationTopogy 실행
Storm 은 당연히 두 가지 운행 모델 이 있 는데 하 나 는 로 컬 운행 모델 이 고 하 나 는 분포 식 모델 이다.
로 컬 모드 에서 storm 은 하나의 프로 세 스 안의 스 레 드 로 모든 spout 과 bolt 를 모 의 합 니 다. 로 컬 모드 는 개발 과 테스트 에 유용 합 니 다.
분포 식 모드 에서 storm 은 한 무더기 의 기계 로 구성 되 어 있다.마스터 에 게 토폴로지 를 제출 할 때, 동시에 토폴로지 에 필요 한 코드 를 제출 합 니 다.master 는 코드 를 나 눠 주 고 topolgoy 에 작업 프로 세 스 를 배정 하 는 일 을 맡 습 니 다.작업 프로 세 스 가 끊 어 지면 master 노드 는 다른 노드 로 재배 치 된다 고 생각 합 니 다.
로 컬 실행 모드 의 코드:
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
우선, 이 코드 정 의 는 LocalCluster 대상 을 정의 함으로써 프로 세 스 내의 집단 을 정의 합 니 다.토폴로지 를 이 가상 클 러 스 터 에 제출 하 는 것 과 토폴로지 를 분포 식 클 러 스 터 에 제출 하 는 것 은 같다.submitTopology 방법 을 호출 하여 토폴로지 를 제출 합 니 다. 세 가지 인 자 를 받 습 니 다. 실행 할 토폴로지 의 이름, 설정 대상 및 실행 할 토폴로지 자체 입 니 다.
토폴로지 의 이름 (여기 가 'test') 은 하나의 토폴로지 를 구별 하 는 유일한 이름 입 니 다. 그러면 이 이름 으로 이 토폴로지 를 죽 일 수 있 습 니 다.앞에서 말 했 듯 이, 너 는 반드시 명시 적 으로 토폴로지 하 나 를 죽여 야 한다. 그렇지 않 으 면 그것 은 계속 운행 할 것 이다.
Conf 대상 은 많은 것 을 설정 할 수 있 습 니 다. 다음 두 가 지 는 가장 흔 한 것 입 니 다.
TOPOLOGY_WORKERS (setNumWorkers) 는 이 토폴로지 를 수행 하기 위해 몇 개의 작업 프로 세 스 를 그룹 으로 할당 하 기 를 원 하 는 지 정의 합 니 다.topology 의 모든 구성 요 소 는 스 레 드 가 필요 합 니 다.각 구성 요소 가 몇 개의 스 레 드 를 사용 하 는 지 는 setBolt 와 setSpout 을 통 해 지정 합 니 다.이 스 레 드 들 은 모두 작업 프로 세 스 에서 실 행 됩 니 다. 모든 작업 프로 세 스 는 일부 노드 의 작업 스 레 드 를 포함 합 니 다.예 를 들 어 300 개의 스 레 드, 60 개의 프로 세 스 를 지정 하면 모든 작업 프로 세 스 에서 6 개의 스 레 드 를 실행 해 야 합 니 다. 이 6 개의 스 레 드 는 서로 다른 구성 요소 (Spout, Bolt) 에 속 할 수 있 습 니 다.모든 구성 요소 의 병행 도와 이 스 레 드 가 있 는 프로 세 스 수 를 조정 하여 토폴로지 의 성능 을 조정 할 수 있 습 니 다
TOPOLOGY_DEBUG (setDebug) 는 true 로 설정 되면 storm 은 모든 구성 요소 가 발사 하 는 모든 메 시 지 를 기록 합 니 다.이것 은 로 컬 환경 에서 토폴로지 를 디 버 깅 하 는 데 매우 유용 하지만, 온라인 상에 서 이렇게 하면 성능 에 영향 을 줄 수 있다
sleep 은 시간 을 기다 리 는 거 야.
6. 클 러 스 터 모드 운행
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
이것 은 비교적 간단 합 니 다. workers 수량 을 3 개 로 정의 하고 지정 한 인 자 를 입력 해 야 합 니 다. 바로 Topology 의 이름 입 니 다.기본적으로 상기 내용 을 통 해 이 사례 의 대체적인 구성 과 운영 구 조 를 알 수 있다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
spark 의 2: 원리 소개Google Map/Reduce 를 바탕 으로 이 루어 진 Hadoop 은 개발 자 에 게 map, reduce 원 어 를 제공 하여 병렬 일괄 처리 프로그램 을 매우 간단 하고 아름 답 게 만 들 었 습 니 다.S...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.