34-storm 학습-storm 튜토리얼:WordCount 프로그램을 맨손으로 두드리기
10106 단어 #Storm 실시간 빅데이터 처리
33-storm 학습-역사상 가장 통속적이고 알기 쉬운 Storm 강좌: 대백화 소개 Storm
이제 우리는 코드를 쓰고storm의 프로그램이 어떻게 개발되었는지 체험해 봅시다. 코드를 이해한 후에 전에 설명한 기본 원리를 돌아보면 알 수 있습니다.
단어 카운터를 만들다.
너는storm이 끊임없이 문장을 받아들인 후에 문장 중의 단어의 출현 횟수를 실시간으로 통계해야 한다고 생각할 수 있다
(1) 건설 공사 환경
4.0.0
storm-wordcount
jar
storm-wordcount
UTF-8
org.apache.storm
storm-core
1.1.0
commons-collections
commons-collections
3.2.1
com.google.guava
guava
src/main/java
test/main/java
org.apache.maven.plugins
maven-shade-plugin
true
*:*
META-INF/*.SF
META-INF/*.sf
META-INF/*.DSA
META-INF/*.dsa
META-INF/*.RSA
META-INF/*.rsa
META-INF/*.EC
META-INF/*.ec
META-INF/MSFTSIG.SF
META-INF/MSFTSIG.RSA
package
shade
org.codehaus.mojo
exec-maven-plugin
1.2.1
exec
java
true
false
compile
(2) 코드 작성
/**
*
*/
public class WordCountTopology {
/**
* Spout - BaseRichSpout, , kafka
*/
public static class RandomSentenceSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
Random _rand;
/**
* open Spout
* 、 、 httpclient
* open , SpoutOutputCollector, 。
*/
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
//
_rand = new Random();
}
/**
* Spout , task , worker excuter task
* task nextTuple()
* , Stream
*/
@Override
public void nextTuple() {
Utils.sleep(100);
String[] sentences = new String[]{sentence("the cow jumped over the moon"), sentence("an apple a day keeps the doctor away"),
sentence("four score and seven years ago"), sentence("snow white and the seven dwarfs"), sentence("i am at two with nature")};
// sentences.length
final String sentence = sentences[_rand.nextInt(sentences.length)];
//
_collector.emit(new Values(sentence));
}
protected String sentence(String input) {
return input;
}
@Override
public void ack(Object id) {
}
@Override
public void fail(Object id) {
}
/**
* declareOutputFields : tuple field
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}
/**
* bolt : spout bolt,
* bolt worker excuter task
*
*/
public static class SplitSentence implements IRichBolt {
private static final long serialVersionUID = 6604009953652729483L;
private OutputCollector collector;
/**
* bolt , , prepare ,
* OutputCollector, Bolt tuple
*/
@SuppressWarnings("rawtypes")
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
/**
* execute
* , , executor
*
*/
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField("sentence");
String[] words = sentence.split(" ");
for(String word : words) {
collector.emit(new Values(word));
}
}
/**
* tuple, field
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
public static class WordCount extends BaseRichBolt {
private static final long serialVersionUID = 7208077706057284643L;
private static final Logger LOGGER = LoggerFactory.getLogger(WordCount.class);
private OutputCollector collector;
private Map wordCounts = new HashMap();
@SuppressWarnings("rawtypes")
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Long count = wordCounts.get(word);
if(count == null) {
count = 0L;
}
count++;
wordCounts.put(word, count);
LOGGER.info("【 】" + word + " " + count);
collector.emit(new Values(word, count));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
public static void main(String[] args) {
//1、
//Storm , JAVA , TopologyBuilder
// main , spout bolts ,
TopologyBuilder builder = new TopologyBuilder();
// , spout
// , spout
// , spout executor
builder.setSpout("RandomSentence", new RandomSentenceSpout(), 2);
builder.setBolt("SplitSentence", new SplitSentence(), 5)
.setNumTasks(10)
.shuffleGrouping("RandomSentence");
// , , , SplitSentence ,
task
// ,
// ,hello, task1 3 hello,task2 2 hello
// 5 hello, task, 5 hello。
builder.setBolt("WordCount", new WordCount(), 10)
.setNumTasks(20)
.fieldsGrouping("SplitSentence", new Fields("word"));
//2、
// ? ?
Config config = new Config();
// , storm
if(args != null && args.length > 0) {
// topology
config.setNumWorkers(3);
try {
StormSubmitter.submitTopology(args[0], config, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
} else {
// eclipse
config.setMaxTaskParallelism(20);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("WordCountTopology", config, builder.createTopology());
Utils.sleep(60000);
cluster.shutdown();
}
}
}
로컬 부팅에서도 효과를 볼 수 있습니다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Rails Turbolinks를 페이지 단위로 비활성화하는 방법원래 Turobolinks란? Turbolinks는 링크를 생성하는 요소인 a 요소의 클릭을 후크로 하고, 이동한 페이지를 Ajax에서 가져옵니다. 그 후, 취득 페이지의 데이터가 천이 전의 페이지와 동일한 것이 있...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.