storm 재 정립 실전

더 읽 기
다시 정 하 다
       우리 의 tuple 이 다음 처리 층 으로 어떻게 route 되 는 지 다시 정의 합 니 다. 물론 서로 다른 층 간 에 서로 다른 병행 도가 있 을 수 있 습 니 다.storm 은 다음 과 같은 리 셋 동작 을 제공 합 니 다.    shuffle: 무 작위 할당 알고리즘 을 통 해 tuple 을 각 파 티 션 으로 균형 시 킵 니 다.    broadcast: 모든 tuple 은 모든 파 티 션 에 방송 되 는데 이런 방식 은 drcp 에서 매우 유용 합 니 다. 예 를 들 어 각 파 티 션 에서 state Query 를 하 는 것 입 니 다.    partition By: 지정 한 필드 목록 에 따라 구분 합 니 다. 구체 적 인 방법 은 지정 한 필드 목록 의 hash 값 으로 파 티 션 개 수 를 모드 로 연산 하여 같은 필드 목록 의 데이터 가 같은 파 티 션 으로 구분 되도록 하 는 것 입 니 다.    global: 모든 tuple 은 하나의 파 티 션 으로 보 내 집 니 다. 이 파 티 션 은 전체 Stream 을 처리 합 니 다.    batchGlobal: 하나의 Batch 에 있 는 모든 tuple 은 같은 파 티 션 으로 보 내 집 니 다. 다른 Batch 는 다른 파 티 션 으로 갑 니 다.    Partition: 사용자 정의 파 티 션 함 수 를 통 해 파 티 션 을 진행 합 니 다. 이 사용자 정의 함 수 는 backtype. storm. grouping. CustomStreamGrouping 을 실현 합 니 다.
실전
   Main:
	
pubzlic static void main(String[] args) throws AlreadyAliveException,
			InvalidTopologyException, AuthorizationException {
		FixedBatchSpout spout = new FixedBatchSpout(
				new Fields("actor", "text"), 2,
				new Values("dave", "dave text"), new Values("dave",
						"dave text2"), new Values("dave", "dave text3"),
				new Values("dave", "dave text4"), new Values(
						"tanjie is a very good man", "very very good man"));
		spout.setCycle(false);
		TridentTopology topology = new TridentTopology();
		topology.newStream("spout", spout)
				.parallelismHint(5)
				.partitionBy(new Fields("actor"))
//				.shuffle()
//			        .batchGlobal()
				.each(new Fields("actor", "text"),
						new PerActorTweetsFilter("dave")).parallelismHint(5)
				.each(new Fields("actor", "text"), new PrintFilter());
		Config config = new Config();
		config.setNumWorkers(2);
		config.setNumAckers(1);
		config.setDebug(false);
		StormSubmitter.submitTopology("trident_aggregate_partitionBy", config,
				topology.build());
	}

public class PerActorTweetsFilter extends BaseFilter {

	private static final long serialVersionUID = 1L;
	private int partitionIndex;
	private String actor;

	public PerActorTweetsFilter(String actor) {
		this.actor = actor;
	}

	@SuppressWarnings("rawtypes")
	@Override
	public void prepare(Map conf, TridentOperationContext context) {
		this.partitionIndex = context.getPartitionIndex();
	}

	@Override
	public boolean isKeep(TridentTuple tuple) {
		boolean filter = tuple.getString(0).equals(actor);
		if (filter) {
			System.out.println("I am partition [" + partitionIndex
					+ "] and I have kept a tweet by: " + actor);
		}
		return filter;
	}
}

   테스트:
 
  1. paritionBy: 같은 필드 hash 후에 같은 구역 으로 갑 니 다. 저 는 actor 에 따라 구역 을 나 누 었 습 니 다. hash 후에 같은 구역 에 도 착 했 을 것 입 니 다. 제 가 5 개의 구역 을 지정 하 더 라 도.
2016-11-14 17:06:50.806 STDIO [INFO] I am partition [2] and I have kept a tweet by: dave
2016-11-14 17:06:50.808 STDIO [INFO] first value: dave
2016-11-14 17:06:50.814 STDIO [INFO] seconde value: dave text
2016-11-14 17:06:50.819 STDIO [INFO] I am partition [2] and I have kept a tweet by: dave
2016-11-14 17:06:50.826 STDIO [INFO] first value: dave
2016-11-14 17:06:50.832 STDIO [INFO] seconde value: dave text2
2016-11-14 17:06:50.992 STDIO [INFO] I am partition [2] and I have kept a tweet by: dave
2016-11-14 17:06:50.993 STDIO [INFO] first value: dave
2016-11-14 17:06:50.998 STDIO [INFO] seconde value: dave text3
2016-11-14 17:06:51.001 STDIO [INFO] I am partition [2] and I have kept a tweet by: dave
2016-11-14 17:06:51.004 STDIO [INFO] first value: dave
2016-11-14 17:06:51.007 STDIO [INFO] seconde value: dave text4

    2. shuffle 로 변경 하면 무 작위 로 특정한 구역 에 분 배 됩 니 다.
2016-11-14 17:18:16.019 STDIO [INFO] I am partition [0] and I have kept a tweet by: dave
2016-11-14 17:18:16.027 STDIO [INFO] I am partition [4] and I have kept a tweet by: dave
2016-11-14 17:18:16.028 STDIO [INFO] first value: dave
2016-11-14 17:18:16.030 STDIO [INFO] seconde value: dave text
2016-11-14 17:18:16.037 STDIO [INFO] first value: dave
2016-11-14 17:18:16.037 STDIO [INFO] seconde value: dave text2
2016-11-14 17:18:16.101 STDIO [INFO] I am partition [2] and I have kept a tweet by: dave
2016-11-14 17:18:16.102 STDIO [INFO] first value: dave
2016-11-14 17:18:16.102 STDIO [INFO] seconde value: dave text3
2016-11-14 17:18:16.103 STDIO [INFO] I am partition [2] and I have kept a tweet by: dave
2016-11-14 17:18:16.105 STDIO [INFO] first value: dave
2016-11-14 17:18:16.107 STDIO [INFO] seconde value: dave text4

    3. batchGlobal 로 변경
2016-11-14 17:23:30.333 STDIO [INFO] I am partition [2] and I have kept a tweet by: dave
2016-11-14 17:23:30.341 STDIO [INFO] first value: dave
2016-11-14 17:23:30.342 STDIO [INFO] seconde value: dave text
2016-11-14 17:23:30.343 STDIO [INFO] I am partition [2] and I have kept a tweet by: dave
2016-11-14 17:23:30.343 STDIO [INFO] first value: dave
2016-11-14 17:23:30.344 STDIO [INFO] seconde value: dave text2
2016-11-14 17:24:15.683 STDIO [INFO] I am partition [3] and I have kept a tweet by: dave
2016-11-14 17:24:15.684 STDIO [INFO] first value: dave
2016-11-14 17:24:15.685 STDIO [INFO] seconde value: dave text3
2016-11-14 17:24:15.685 STDIO [INFO] I am partition [3] and I have kept a tweet by: dave
2016-11-14 17:24:15.686 STDIO [INFO] first value: dave
2016-11-14 17:24:15.686 STDIO [INFO] seconde value: dave text4

 
 
 

좋은 웹페이지 즐겨찾기