트 위 터 Storm Stream Grouping 사용자 정의 그룹 작성 실현
Storm 은 사용자 정의 그룹 을 지원 합 니 다. 이 글 은 Storm 이 사용자 정의 그룹 을 어떻게 만 드 는 지, 그리고 Storm 그룹 을 어떻게 묶 는 지 에 대한 이 해 를 탐구 하 는 것 입 니 다.
이것 은 내 가 쓴 사용자 정의 그룹 입 니 다. 항상 데 이 터 를 첫 번 째 Task 로 나 눕 니 다.
public class MyFirstStreamGrouping implements CustomStreamGrouping {
private static Logger log = LoggerFactory.getLogger(MyFirstStreamGrouping.class);
private List<Integer> tasks;
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream,
List<Integer> targetTasks) {
this.tasks = targetTasks;
log.info(tasks.toString());
}
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
log.info(values.toString());
return Arrays.asList(tasks.get(0));
}
}
위의 코드 를 통 해 알 수 있 듯 이 이 사용자 정의 그룹 은 데 이 터 를 첫 번 째 Task
Arrays.asList(tasks.get(0));
로 통합 합 니 다. 즉, 데이터 가 도착 하면 항상 첫 번 째 그룹 으로 배 송 됩 니 다.테스트 코드:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("words", new TestWordSpout(), 2);
// ,
builder.setBolt("exclaim1", new DefaultStringBolt(), 3)
.customGrouping("words", new MyFirstStreamGrouping());
이전 테스트 용례 와 마찬가지 로 Spout 은 항상
new String[] {“nathan”, “mike”, “jackson”, “golda”, “bertels”}
목록 의 문자열 을 보 냅 니 다.인증 을 실행 합 니 다:11878 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
11943 [Thread-41-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [nathan]
11944 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
11979 [Thread-29-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [mike]
11980 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike
12045 [Thread-41-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [jackson]
12045 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
12080 [Thread-29-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [jackson]
12081 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
12145 [Thread-41-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [mike]
12146 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike
이 실행 로 그 를 통 해 알 수 있 듯 이 데 이 터 는 항상 Blot: Thread - 25 - exclaim 1 로 전 송 됩 니 다.로 컬 테스트 를 할 때 Thread - 25 - exclaim 1 은 스 레 드 이름 이기 때 문 입 니 다.배 송 된 스 레 드 는 데이터 여러 스 레 드 입 니 다.따라서 이 테스트 는 항상 하나의 Task 로 보 내 고 이 Task 도 첫 번 째 입 니 다.
사용자 정의 그룹 구현 이해
사용자 정의 그룹 을 만 드 는 것 이 어렵 습 니까?사실 Hadoop 의 Partitioner 를 이해 했다 면 Storm 의 Custom Stream Grouping 도 마찬가지 입 니 다.
Hadoop MapReduce 의 Map 이 완성 되면 Map 의 중간 결 과 를 디스크 에 기록 합 니 다. 디스크 를 쓰기 전에 스 레 드 는 먼저 데이터 가 최종 적 으로 전송 할 Reducer 에 따라 데 이 터 를 해당 하 는 파 티 션 으로 나 눈 다음 에 서로 다른 파 티 션 이 서로 다른 Reduce 에 들 어 갑 니 다.Hadoop 이 데 이 터 를 어떻게 묶 는 지 먼저 살 펴 보 겠 습 니 다. 이것 은 Partitioner 의 유일한 방법 입 니 다.
public class Partitioner<K, V> {
@Override
public int getPartition(K key, V value, int numReduceTasks) {
return 0;
}
}
위의 코드 에서: Map 에서 출력 한 데 이 터 는 getPartition () 방법 을 거 쳐 다음 그룹 을 확인 합 니 다.numReduceTasks 는 Job 의 Reduce 수량 이 고 반환 값 은 이 데이터 가 어느 Reduce 에 들 어 가 는 지 확인 하 는 것 입 니 다.반환 값 은 0 보다 크 고 numReduceTasks 보다 작 아야 합 니 다. 그렇지 않 으 면 잘못 보 고 됩 니 다.0 을 되 돌려 주 는 것 은 이 데이터 가 첫 번 째 Reduce 에 들 어 갔다 는 것 을 의미한다.무 작위 로 그룹 을 나 누 는 데 있어 서 이 방법 은 이렇게 실 현 될 수 있다.
public int getPartition(K key, V value, int numReduceTasks) {
return hash(key) % numReduceTasks;
}
사실 Hadoop 의 기본 Hash 그룹 정책 도 이렇게 이 루어 졌 습 니 다.이런 장점 은 데이터 가 전체 군집 에서 기본적으로 부하 균형 을 이 루 는 것 이다.
Hadoop 의 Partitioner 를 통 해 Storm 의 Custom Stream Grouping 을 살 펴 보 겠 습 니 다.
이것 은 Custom Stream Grouping 류 의 소스 코드 입 니 다.
public interface CustomStreamGrouping extends Serializable {
void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks);
List<Integer> chooseTasks(int taskId, List<Object> values);
}
똑 같은 이치 입 니 다. targetTasks 는 Storm 이 실 행 될 때 현재 몇 개의 목표 Task 를 선택 할 수 있 고 모든 것 에 숫자 번 호 를 매 겼 습 니 다.그리고
chooseTasks(int taskId, List values); , values, Task ?
, Task , return Arrays.asList(tasks.get(0));
。Hadoop 과 달리 Storm 은 하나의 데 이 터 를 여러 Task 에서 처리 할 수 있 도록 허용 하기 때문에 반환 값 은 List 입 니 다. 'List targetTasks' Task 에서 임의의 몇 개 (적어도 하나 여야 함) Task 를 선택 하여 데 이 터 를 처리 하 라 는 것 입 니 다.이로써 Storm 의 사용자 정의 그룹 정책 도 그리 번 거 롭 지 않 겠 지?
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
storm Async loop died! & reconnect자세히 보기 storm이 슈퍼바이저가 리셋되었을 때 topology가 오류를 보고하여 모든 spout이 소비되지 않습니다. 로그 위에 대량의reconnection IP에 로그인하여 6703 포트에 두 개의 워커가 있...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.