Strom 데이터 흐름 그룹 분석

11598 단어 jstormrocketmqtair
본문은 <> 한 권의 책 1.5절의 독서 노트로 쓸 수 있다
데이터 흐름 그룹은 데이터 흐름 중의tuple을 topology에서 다른bolt의task에 어떻게 나누어 주는지 정의합니다.
Shuf fl e grouping (무작위 그룹): 이런 방식은tuple을bolt의 각task에 무작위로 나누어 주고,bolt 실례마다 같은 수량의tuple을 받는다.
필드별로 그룹화: 지정된 필드의 값에 따라 그룹화됩니다.예를 들어 하나의 데이터 흐름은'word'필드에 따라 그룹을 나누고 같은'word'필드 값을 가진tuple은 같은bolt의task로 연결된다.
All grouping(전체 복제 그룹): 모든tuple 복제를 모든bolt task에 배포합니다.모든 구독 데이터 흐름의task는tuple의 복사본을 수신합니다.
Globle grouping (전역 그룹): 이 그룹 방식은 모든tuples를 유일한task로 연결합니다.Storm에서 데이터를 받는 가장 작은task ID를 기준으로 task를 선택합니다.전역적으로 그룹을 나누는 방식을 사용할 때bolt의task 병발도를 설정하는 것은 무의미하다. 모든tuple이 같은task로 전송되기 때문이다.전역 그룹을 사용할 때 모든 tuple이 하나의 JVM 실례에 전송되기 때문에 Storm 그룹의 어떤 JVM이나 서버에 성능 병목이나 붕괴를 일으킬 수 있음을 주의해야 한다.
None grouping (그룹을 나누지 않음): 기능상 무작위 그룹과 같으며, 미래를 위한 것입니다.
Direct grouping (지향형 그룹): 데이터 원본은emitDirect () 방법을 사용해서tuple이 어느 Storm 구성 요소로 수신되어야 하는지 판단합니다.지향형이라는 데이터 흐름에서만 사용할 수 있습니다.
Local or shuf e grouping (로컬 또는 무작위 그룹): 무작위 그룹과 유사하지만, tuple을 같은 워크맨에 있는bolt task (워크맨에 데이터를 받는bolt task) 에 나누어 줍니다.다른 경우에는 무작위로 조를 나누는 방식을 채택한다.topology의 병발도에 따라 로컬 또는 무작위 그룹을 나누면 네트워크 전송을 줄이고 topology 성능을 향상시킬 수 있다.

무작위 그룹

가장 많이 쓰는 것이 바로 Shuf e grouping, Fields grouping, Direct grouping 등입니다.
이제 예를 살펴보겠습니다.
가장 흔히 볼 수 있는 단어를 세는 예입니다.
public class WordCountBolt extends BaseRichBolt{
    private OutputCollector collector;
    private HashMap counts = null;


    public void prepare(Map config, TopologyContext context, 
            OutputCollector collector) {
        this.collector = collector;
        this.counts = new HashMap();
    }


    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word");
        Long count = this.counts.get(word);
        if(count == null){
            count = 0L;
        }
        count++;
        this.counts.put(word, count);
        this.collector.emit(new Values(word, count));
    }


    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}
이bolt를 추가할 때 필드에 따라 그룹을 나누는 것을 사용합니다. 아래와 같습니다.
 builder.setBolt(COUNT_BOLT_ID, countBolt,4)
                .fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
만약에 저희가 그룹 모드를 바꾸면...
 builder.setBolt(COUNT_BOLT_ID, countBolt,4)
                .shuffleGrouping(SPLIT_BOLT_ID);
그러면 단어에 대한 통계가 적을 것이다.
왜?
여러분, 생각해 보세요. 만약에 제가 무작위로 팀을 나누면 the라는 단어가 3회, 앞의 2회는countbolta, 3회는countboltb로 분배되고, 3회는countboltb로 분배된다면, 뒤의 Reportbolt는 이tuple(countbolta에서) 먼저 받고, 그 다음에 이tuple(countboltb에서) 받았습니다. 마지막 출력은the:1입니다.
하면, 만약, 만약...
 builder.setBolt(COUNT_BOLT_ID, countBolt,4)
                .fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
아까의 문제가 자연스럽게 생기지 않을 거예요. 왜, 다들 생각해 보세요.

직접 그룹

여기에서 나는storm으로 문장에 느낌표를 붙인 예를 인용했다. 코드는 마지막에 있다.
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
다음은 16-7-4 수정
사실 저의 다음 예는 좋지 않습니다.
직접 그룹을 나누는 것은 주로bolt의 어떤task에 메시지를 보내는 것을 보증한다
다음 예의 실제 효과는 생각하세요. 메시지는 볼타에게, 메시지는 볼타에게.
그렇다면 사실 더 편리한 방법은
보낸 날짜:
public void execute(Tuple tuple, BasicOutputCollector collector) {
     tpsCounter.count();

     Long tupleId = tuple.getLong(0);
     Object obj = tuple.getValue(1);

     if (obj instanceof TradeCustomer) {

         TradeCustomer tradeCustomer = (TradeCustomer)obj;

         Pair trade = tradeCustomer.getTrade();
         Pair customer = tradeCustomer.getCustomer();

            collector.emit(SequenceTopologyDef.TRADE_STREAM_ID, 
                    new Values(tupleId, trade));

            collector.emit(SequenceTopologyDef.CUSTOMER_STREAM_ID, 
                    new Values(tupleId, customer));
     }else if (obj != null){
         LOG.info("Unknow type " + obj.getClass().getName());
     }else {
         LOG.info("Nullpointer " );
     }

 }

커밋 시:
builder.setBolt(SequenceTopologyDef.SPLIT_BOLT_NAME, new SplitRecord(), 2).shuffleGrouping(
                        SequenceTopologyDef.SEQUENCE_SPOUT_NAME);

                builder.setBolt(SequenceTopologyDef.TRADE_BOLT_NAME, new PairCount(), 1).shuffleGrouping(
                        SequenceTopologyDef.SPLIT_BOLT_NAME,  // ---      
                        SequenceTopologyDef.TRADE_STREAM_ID); // ---       stream  tuple

                builder.setBolt(SequenceTopologyDef.CUSTOMER_BOLT_NAME, new PairCount(), 1)
                        .shuffleGrouping(SequenceTopologyDef.SPLIT_BOLT_NAME, // ---      
                                SequenceTopologyDef.CUSTOMER_STREAM_ID);      // ---       stream  tuple
출력 형식 정의
public void declareOutputFields(OutputFieldsDeclarer declarer) {
  declarer.declareStream(SequenceTopologyDef.TRADE_STREAM_ID, new Fields("ID", "TRADE"));
  declarer.declareStream(SequenceTopologyDef.CUSTOMER_STREAM_ID, new Fields("ID", "CUSTOMER"));
 }
마지막으로 받을 때 판단을 해야 돼요.
if (input.getSourceStreamId().equals(SequenceTopologyDef.TRADE_STREAM_ID) ) {
            customer = pair;
            customerTuple = input;

            tradeTuple = tradeMap.get(tupleId);
            if (tradeTuple == null) {
                customerMap.put(tupleId, input);
                return;
            }

            trade = (Pair) tradeTuple.getValue(1);

        }

참고 자료
데이터의 분류
이상 16-7-4 수정
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
언제 시작 해요?
실행 결과는 다음과 같습니다.
mystorm.PrintBolt@67178f5d   String recieved: edi:I'm happy!
mystorm.PrintBolt@67178f5d   String recieved: marry:I'm angry!
mystorm.PrintBolt@393ddf54   String recieved: ted:I'm excited!
mystorm.PrintBolt@393ddf54   String recieved: john:I'm sad!
mystorm.PrintBolt@5f97cfcb   String recieved: marry:I'm angry!
다른task는 평균적으로tuple을 받았어요.
그리고 어떤 문장을 지정해서 어떤task만 받아들이게 하고 싶은데 어떡하지?
먼저 ExclaimBasicBolt를 보도록 하겠습니다.
public class ExclaimBasicBolt extends BaseBasicBolt {


	/**
	 * 
	 */
	private static final long serialVersionUID = -6239845315934660303L;
	private List list;
	private List list2;


	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		//String sentence = tuple.getString(0);
		String sentence = (String) tuple.getValue(0);
		String out = sentence + "!";
		if (out.startsWith("e")) {
			collector.emitDirect(list.get(0),new Values(out));
		}else {
			collector.emitDirect(list2.get(0),new Values(out));
		}
		
	}


	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(true,new Fields("excl_sentence"));
	}
	
	
    @Override
    public void prepare(Map stormConf, TopologyContext context) {
    	list =context.getComponentTasks("print");
    	list2=context.getComponentTasks("print2");
    }
}
topology를 구축할 때
directGrouping 사용
		builder.setSpout("spout", new RandomSpout());
		builder.setBolt("exclaim", new ExclaimBasicBolt(),3).shuffleGrouping("spout");
		builder.setBolt("print", new PrintBolt(),3).directGrouping("exclaim");
		builder.setBolt("print2", new PrintBolt2(),3).directGrouping("exclaim");
PrintBolt2는 PrintBolt와 유사
단지 인쇄할 때 시스템만 출력합니다.err.println(this+" i am two   String recieved: "+ rec);
OK. 이제 운행할 때 저희가 볼 수 있어요.
mystorm.PrintBolt2@238ac8bf   String recieved: ted:I'm excited!
mystorm.PrintBolt2@238ac8bf   String recieved: john:I'm sad!
mystorm.PrintBolt2@238ac8bf   String recieved: marry:I'm angry!
mystorm.PrintBolt2@238ac8bf   String recieved: ted:I'm excited!
mystorm.PrintBolt@611b7a20  i am two   String recieved: edi:I'm happy!
mystorm.PrintBolt@611b7a20  i am two   String recieved: edi:I'm happy!
mystorm.PrintBolt@611b7a20  i am two   String recieved: edi:I'm happy!
mystorm.PrintBolt@611b7a20  i am two   String recieved: edi:I'm happy!
mystorm.PrintBolt@611b7a20  i am two   String recieved: edi:I'm happy!
mystorm.PrintBolt@611b7a20  i am two   String recieved: edi:I'm happy!
mystorm.PrintBolt@611b7a20  i am two   String recieved: edi:I'm happy!
mystorm.PrintBolt@611b7a20  i am two   String recieved: edi:I'm happy!
mystorm.PrintBolt@611b7a20  i am two   String recieved: edi:I'm happy!
mystorm.PrintBolt2@238ac8bf   String recieved: marry:I'm angry!
mystorm.PrintBolt2@238ac8bf   String recieved: ted:I'm excited!
mystorm.PrintBolt2@238ac8bf   String recieved: marry:I'm angry!
모든 e로 시작하는 문장이 Print2라는 볼트의 어떤task 안으로 들어갔다.

이 절의 전체 코드는

package mystorm;

public class ExclaimBasicTopo {
	public static void main(String[] args) throws Exception {
		TopologyBuilder builder = new TopologyBuilder();
		
		builder.setSpout("spout", new RandomSpout());
		builder.setBolt("exclaim", new ExclaimBasicBolt(),3).shuffleGrouping("spout");
		builder.setBolt("print", new PrintBolt(),3).shuffleGrouping("exclaim");

		Config conf = new Config();
		conf.setDebug(false);

		if (args != null && args.length > 0) {
			conf.setNumWorkers(3);
			StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
		} else {
			LocalCluster cluster = new LocalCluster();
			cluster.submitTopology("test", conf, builder.createTopology());
		}
	}
}


package mystorm;
 
public class RandomSpout extends BaseRichSpout {

	private SpoutOutputCollector collector;
	private Random rand;
	private int index;
	private static String[] sentences = new String[] {
		"edi:I'm happy", "marry:I'm angry", "john:I'm sad", "ted:I'm excited", "laden:I'm dangerous"};
	
	@Override
	public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {
		this.collector = collector;
		this.rand = new Random();
	}

	@Override
	public void nextTuple() {
		if (index<10*sentences.length) {
			String toSay = sentences[rand.nextInt(sentences.length)];
			this.collector.emit(new Values(toSay));
			index++;
		}else {
			try {
				Thread.sleep(1000);
				System.out.println("     ");
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("sentence"));
	}

}

package mystorm;

public class ExclaimBasicBolt extends BaseBasicBolt {

	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		//String sentence = tuple.getString(0);
		String sentence = (String) tuple.getValue(0);
		String out = sentence + "!";
		collector.emit(new Values(out));		
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("excl_sentence"));
	}

}


package mystorm;

public class PrintBolt extends BaseBasicBolt {

	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		String rec = tuple.getString(0);
		System.err.println(this+"   String recieved: " + rec);
	}
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// do nothing
	}

}

좋은 웹페이지 즐겨찾기