Strom 데이터 흐름 그룹 분석
데이터 흐름 그룹은 데이터 흐름 중의tuple을 topology에서 다른bolt의task에 어떻게 나누어 주는지 정의합니다.
Shuf fl e grouping (무작위 그룹): 이런 방식은tuple을bolt의 각task에 무작위로 나누어 주고,bolt 실례마다 같은 수량의tuple을 받는다.
필드별로 그룹화: 지정된 필드의 값에 따라 그룹화됩니다.예를 들어 하나의 데이터 흐름은'word'필드에 따라 그룹을 나누고 같은'word'필드 값을 가진tuple은 같은bolt의task로 연결된다.
All grouping(전체 복제 그룹): 모든tuple 복제를 모든bolt task에 배포합니다.모든 구독 데이터 흐름의task는tuple의 복사본을 수신합니다.
Globle grouping (전역 그룹): 이 그룹 방식은 모든tuples를 유일한task로 연결합니다.Storm에서 데이터를 받는 가장 작은task ID를 기준으로 task를 선택합니다.전역적으로 그룹을 나누는 방식을 사용할 때bolt의task 병발도를 설정하는 것은 무의미하다. 모든tuple이 같은task로 전송되기 때문이다.전역 그룹을 사용할 때 모든 tuple이 하나의 JVM 실례에 전송되기 때문에 Storm 그룹의 어떤 JVM이나 서버에 성능 병목이나 붕괴를 일으킬 수 있음을 주의해야 한다.
None grouping (그룹을 나누지 않음): 기능상 무작위 그룹과 같으며, 미래를 위한 것입니다.
Direct grouping (지향형 그룹): 데이터 원본은emitDirect () 방법을 사용해서tuple이 어느 Storm 구성 요소로 수신되어야 하는지 판단합니다.지향형이라는 데이터 흐름에서만 사용할 수 있습니다.
Local or shuf e grouping (로컬 또는 무작위 그룹): 무작위 그룹과 유사하지만, tuple을 같은 워크맨에 있는bolt task (워크맨에 데이터를 받는bolt task) 에 나누어 줍니다.다른 경우에는 무작위로 조를 나누는 방식을 채택한다.topology의 병발도에 따라 로컬 또는 무작위 그룹을 나누면 네트워크 전송을 줄이고 topology 성능을 향상시킬 수 있다.
무작위 그룹
가장 많이 쓰는 것이 바로 Shuf e grouping, Fields grouping, Direct grouping 등입니다.이제 예를 살펴보겠습니다.
가장 흔히 볼 수 있는 단어를 세는 예입니다.
public class WordCountBolt extends BaseRichBolt{
private OutputCollector collector;
private HashMap counts = null;
public void prepare(Map config, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
this.counts = new HashMap();
}
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Long count = this.counts.get(word);
if(count == null){
count = 0L;
}
count++;
this.counts.put(word, count);
this.collector.emit(new Values(word, count));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
이bolt를 추가할 때 필드에 따라 그룹을 나누는 것을 사용합니다. 아래와 같습니다. builder.setBolt(COUNT_BOLT_ID, countBolt,4)
.fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
만약에 저희가 그룹 모드를 바꾸면... builder.setBolt(COUNT_BOLT_ID, countBolt,4)
.shuffleGrouping(SPLIT_BOLT_ID);
그러면 단어에 대한 통계가 적을 것이다.왜?
여러분, 생각해 보세요. 만약에 제가 무작위로 팀을 나누면 the라는 단어가 3회, 앞의 2회는countbolta, 3회는countboltb로 분배되고, 3회는countboltb로 분배된다면, 뒤의 Reportbolt는 이tuple(countbolta에서) 먼저 받고, 그 다음에 이tuple(countboltb에서) 받았습니다. 마지막 출력은the:1입니다.
하면, 만약, 만약...
builder.setBolt(COUNT_BOLT_ID, countBolt,4)
.fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
아까의 문제가 자연스럽게 생기지 않을 거예요. 왜, 다들 생각해 보세요.직접 그룹
여기에서 나는storm으로 문장에 느낌표를 붙인 예를 인용했다. 코드는 마지막에 있다.////////////////////////////////////////////////////////////////////////////////////////////////////////////////
다음은 16-7-4 수정
사실 저의 다음 예는 좋지 않습니다.
직접 그룹을 나누는 것은 주로bolt의 어떤task에 메시지를 보내는 것을 보증한다
다음 예의 실제 효과는 생각하세요. 메시지는 볼타에게, 메시지는 볼타에게.
그렇다면 사실 더 편리한 방법은
보낸 날짜:
public void execute(Tuple tuple, BasicOutputCollector collector) {
tpsCounter.count();
Long tupleId = tuple.getLong(0);
Object obj = tuple.getValue(1);
if (obj instanceof TradeCustomer) {
TradeCustomer tradeCustomer = (TradeCustomer)obj;
Pair trade = tradeCustomer.getTrade();
Pair customer = tradeCustomer.getCustomer();
collector.emit(SequenceTopologyDef.TRADE_STREAM_ID,
new Values(tupleId, trade));
collector.emit(SequenceTopologyDef.CUSTOMER_STREAM_ID,
new Values(tupleId, customer));
}else if (obj != null){
LOG.info("Unknow type " + obj.getClass().getName());
}else {
LOG.info("Nullpointer " );
}
}
커밋 시:
builder.setBolt(SequenceTopologyDef.SPLIT_BOLT_NAME, new SplitRecord(), 2).shuffleGrouping(
SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
builder.setBolt(SequenceTopologyDef.TRADE_BOLT_NAME, new PairCount(), 1).shuffleGrouping(
SequenceTopologyDef.SPLIT_BOLT_NAME, // ---
SequenceTopologyDef.TRADE_STREAM_ID); // --- stream tuple
builder.setBolt(SequenceTopologyDef.CUSTOMER_BOLT_NAME, new PairCount(), 1)
.shuffleGrouping(SequenceTopologyDef.SPLIT_BOLT_NAME, // ---
SequenceTopologyDef.CUSTOMER_STREAM_ID); // --- stream tuple
출력 형식 정의public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(SequenceTopologyDef.TRADE_STREAM_ID, new Fields("ID", "TRADE"));
declarer.declareStream(SequenceTopologyDef.CUSTOMER_STREAM_ID, new Fields("ID", "CUSTOMER"));
}
마지막으로 받을 때 판단을 해야 돼요.if (input.getSourceStreamId().equals(SequenceTopologyDef.TRADE_STREAM_ID) ) {
customer = pair;
customerTuple = input;
tradeTuple = tradeMap.get(tupleId);
if (tradeTuple == null) {
customerMap.put(tupleId, input);
return;
}
trade = (Pair) tradeTuple.getValue(1);
}
참고 자료
데이터의 분류
이상 16-7-4 수정
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
언제 시작 해요?
실행 결과는 다음과 같습니다.
mystorm.PrintBolt@67178f5d String recieved: edi:I'm happy!
mystorm.PrintBolt@67178f5d String recieved: marry:I'm angry!
mystorm.PrintBolt@393ddf54 String recieved: ted:I'm excited!
mystorm.PrintBolt@393ddf54 String recieved: john:I'm sad!
mystorm.PrintBolt@5f97cfcb String recieved: marry:I'm angry!
다른task는 평균적으로tuple을 받았어요.그리고 어떤 문장을 지정해서 어떤task만 받아들이게 하고 싶은데 어떡하지?
먼저 ExclaimBasicBolt를 보도록 하겠습니다.
public class ExclaimBasicBolt extends BaseBasicBolt {
/**
*
*/
private static final long serialVersionUID = -6239845315934660303L;
private List list;
private List list2;
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
//String sentence = tuple.getString(0);
String sentence = (String) tuple.getValue(0);
String out = sentence + "!";
if (out.startsWith("e")) {
collector.emitDirect(list.get(0),new Values(out));
}else {
collector.emitDirect(list2.get(0),new Values(out));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(true,new Fields("excl_sentence"));
}
@Override
public void prepare(Map stormConf, TopologyContext context) {
list =context.getComponentTasks("print");
list2=context.getComponentTasks("print2");
}
}
topology를 구축할 때directGrouping 사용
builder.setSpout("spout", new RandomSpout());
builder.setBolt("exclaim", new ExclaimBasicBolt(),3).shuffleGrouping("spout");
builder.setBolt("print", new PrintBolt(),3).directGrouping("exclaim");
builder.setBolt("print2", new PrintBolt2(),3).directGrouping("exclaim");
PrintBolt2는 PrintBolt와 유사단지 인쇄할 때 시스템만 출력합니다.err.println(this+" i am two String recieved: "+ rec);
OK. 이제 운행할 때 저희가 볼 수 있어요.
mystorm.PrintBolt2@238ac8bf String recieved: ted:I'm excited!
mystorm.PrintBolt2@238ac8bf String recieved: john:I'm sad!
mystorm.PrintBolt2@238ac8bf String recieved: marry:I'm angry!
mystorm.PrintBolt2@238ac8bf String recieved: ted:I'm excited!
mystorm.PrintBolt@611b7a20 i am two String recieved: edi:I'm happy!
mystorm.PrintBolt@611b7a20 i am two String recieved: edi:I'm happy!
mystorm.PrintBolt@611b7a20 i am two String recieved: edi:I'm happy!
mystorm.PrintBolt@611b7a20 i am two String recieved: edi:I'm happy!
mystorm.PrintBolt@611b7a20 i am two String recieved: edi:I'm happy!
mystorm.PrintBolt@611b7a20 i am two String recieved: edi:I'm happy!
mystorm.PrintBolt@611b7a20 i am two String recieved: edi:I'm happy!
mystorm.PrintBolt@611b7a20 i am two String recieved: edi:I'm happy!
mystorm.PrintBolt@611b7a20 i am two String recieved: edi:I'm happy!
mystorm.PrintBolt2@238ac8bf String recieved: marry:I'm angry!
mystorm.PrintBolt2@238ac8bf String recieved: ted:I'm excited!
mystorm.PrintBolt2@238ac8bf String recieved: marry:I'm angry!
모든 e로 시작하는 문장이 Print2라는 볼트의 어떤task 안으로 들어갔다.이 절의 전체 코드는
package mystorm;
public class ExclaimBasicTopo {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSpout());
builder.setBolt("exclaim", new ExclaimBasicBolt(),3).shuffleGrouping("spout");
builder.setBolt("print", new PrintBolt(),3).shuffleGrouping("exclaim");
Config conf = new Config();
conf.setDebug(false);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
}
}
}
package mystorm;
public class RandomSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private Random rand;
private int index;
private static String[] sentences = new String[] {
"edi:I'm happy", "marry:I'm angry", "john:I'm sad", "ted:I'm excited", "laden:I'm dangerous"};
@Override
public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {
this.collector = collector;
this.rand = new Random();
}
@Override
public void nextTuple() {
if (index<10*sentences.length) {
String toSay = sentences[rand.nextInt(sentences.length)];
this.collector.emit(new Values(toSay));
index++;
}else {
try {
Thread.sleep(1000);
System.out.println(" ");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}
package mystorm;
public class ExclaimBasicBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
//String sentence = tuple.getString(0);
String sentence = (String) tuple.getValue(0);
String out = sentence + "!";
collector.emit(new Values(out));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("excl_sentence"));
}
}
package mystorm;
public class PrintBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String rec = tuple.getString(0);
System.err.println(this+" String recieved: " + rec);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// do nothing
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Docker 설치 RocketMQ주의:docker 환경이 없는 경우 본인의centos를 참고하여 docker1을 설치할 수 있습니다.어떤 rocketMq를 사용할 수 있는지 조회docker search rocketmq2.미러 추출docker pul...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.