트 위 터 Storm Stream Grouping 사용자 정의 그룹 작성 실현

사용자 정의 그룹 화 테스트
Storm 은 사용자 정의 그룹 을 지원 합 니 다. 이 글 은 Storm 이 사용자 정의 그룹 을 어떻게 만 드 는 지, 그리고 Storm 그룹 을 어떻게 묶 는 지 에 대한 이 해 를 탐구 하 는 것 입 니 다.
이것 은 내 가 쓴 사용자 정의 그룹 입 니 다. 항상 데 이 터 를 첫 번 째 Task 로 나 눕 니 다.
public class MyFirstStreamGrouping implements CustomStreamGrouping {
    private static Logger log = LoggerFactory.getLogger(MyFirstStreamGrouping.class);

    private List<Integer> tasks;

    @Override
    public void prepare(WorkerTopologyContext context, GlobalStreamId stream,
        List<Integer> targetTasks) {
        this.tasks = targetTasks;
        log.info(tasks.toString());
    }   
    @Override
    public List<Integer> chooseTasks(int taskId, List<Object> values) {
        log.info(values.toString());
        return Arrays.asList(tasks.get(0));
    }
}

위의 코드 를 통 해 알 수 있 듯 이 이 사용자 정의 그룹 은 데 이 터 를 첫 번 째 Task Arrays.asList(tasks.get(0)); 로 통합 합 니 다. 즉, 데이터 가 도착 하면 항상 첫 번 째 그룹 으로 배 송 됩 니 다.
테스트 코드:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("words", new TestWordSpout(), 2); 
//     ,
builder.setBolt("exclaim1", new DefaultStringBolt(), 3)
        .customGrouping("words", new MyFirstStreamGrouping());

이전 테스트 용례 와 마찬가지 로 Spout 은 항상 new String[] {“nathan”, “mike”, “jackson”, “golda”, “bertels”} 목록 의 문자열 을 보 냅 니 다.인증 을 실행 합 니 다:
11878 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
11943 [Thread-41-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [nathan]
11944 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
11979 [Thread-29-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [mike]
11980 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike
12045 [Thread-41-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [jackson]
12045 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
12080 [Thread-29-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [jackson]
12081 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
12145 [Thread-41-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [mike]
12146 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike

이 실행 로 그 를 통 해 알 수 있 듯 이 데 이 터 는 항상 Blot: Thread - 25 - exclaim 1 로 전 송 됩 니 다.로 컬 테스트 를 할 때 Thread - 25 - exclaim 1 은 스 레 드 이름 이기 때 문 입 니 다.배 송 된 스 레 드 는 데이터 여러 스 레 드 입 니 다.따라서 이 테스트 는 항상 하나의 Task 로 보 내 고 이 Task 도 첫 번 째 입 니 다.
사용자 정의 그룹 구현 이해
사용자 정의 그룹 을 만 드 는 것 이 어렵 습 니까?사실 Hadoop 의 Partitioner 를 이해 했다 면 Storm 의 Custom Stream Grouping 도 마찬가지 입 니 다.
Hadoop MapReduce 의 Map 이 완성 되면 Map 의 중간 결 과 를 디스크 에 기록 합 니 다. 디스크 를 쓰기 전에 스 레 드 는 먼저 데이터 가 최종 적 으로 전송 할 Reducer 에 따라 데 이 터 를 해당 하 는 파 티 션 으로 나 눈 다음 에 서로 다른 파 티 션 이 서로 다른 Reduce 에 들 어 갑 니 다.Hadoop 이 데 이 터 를 어떻게 묶 는 지 먼저 살 펴 보 겠 습 니 다. 이것 은 Partitioner 의 유일한 방법 입 니 다.
public class Partitioner<K, V> {
    @Override
    public int getPartition(K key, V value, int numReduceTasks) {
        return 0;
    }
}

위의 코드 에서: Map 에서 출력 한 데 이 터 는 getPartition () 방법 을 거 쳐 다음 그룹 을 확인 합 니 다.numReduceTasks 는 Job 의 Reduce 수량 이 고 반환 값 은 이 데이터 가 어느 Reduce 에 들 어 가 는 지 확인 하 는 것 입 니 다.반환 값 은 0 보다 크 고 numReduceTasks 보다 작 아야 합 니 다. 그렇지 않 으 면 잘못 보 고 됩 니 다.0 을 되 돌려 주 는 것 은 이 데이터 가 첫 번 째 Reduce 에 들 어 갔다 는 것 을 의미한다.무 작위 로 그룹 을 나 누 는 데 있어 서 이 방법 은 이렇게 실 현 될 수 있다.
public int getPartition(K key, V value, int numReduceTasks) {
    return hash(key) % numReduceTasks;
}

사실 Hadoop 의 기본 Hash 그룹 정책 도 이렇게 이 루어 졌 습 니 다.이런 장점 은 데이터 가 전체 군집 에서 기본적으로 부하 균형 을 이 루 는 것 이다.
Hadoop 의 Partitioner 를 통 해 Storm 의 Custom Stream Grouping 을 살 펴 보 겠 습 니 다.
이것 은 Custom Stream Grouping 류 의 소스 코드 입 니 다.
public interface CustomStreamGrouping extends Serializable {

   void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks);

   List<Integer> chooseTasks(int taskId, List<Object> values); 
}

똑 같은 이치 입 니 다. targetTasks 는 Storm 이 실 행 될 때 현재 몇 개의 목표 Task 를 선택 할 수 있 고 모든 것 에 숫자 번 호 를 매 겼 습 니 다.그리고 chooseTasks(int taskId, List values); , values, Task ?

, Task , return Arrays.asList(tasks.get(0)); 。Hadoop 과 달리 Storm 은 하나의 데 이 터 를 여러 Task 에서 처리 할 수 있 도록 허용 하기 때문에 반환 값 은 List 입 니 다. 'List targetTasks' Task 에서 임의의 몇 개 (적어도 하나 여야 함) Task 를 선택 하여 데 이 터 를 처리 하 라 는 것 입 니 다.이로써 Storm 의 사용자 정의 그룹 정책 도 그리 번 거 롭 지 않 겠 지?

좋은 웹페이지 즐겨찾기