RocketMQ Rebalance 프로 세 스 분석

9494 단어 자바
이 절 은 Rebalance 절 차 를 소개 합 니 다.Consumer 소비 메시지 프로 세 스 를 소개 하기 전에 먼저 Rebalance 의 프로 세 스 를 소개 하 는데 이 과정 은 Consumer 의 시작 과 관련된다.
 앞서 소 개 했 듯 이 Topic 은 하나의 논리 적 개념 으로 Topic 에서 여러 개의 Queue 를 나 누 어 Consumer 소비의 병행 도 를 높 일 수 있다.하나의 Consumer Group 에서 Queue 와 Consumer 간 의 대응 관 계 는 한 쌍 이 많은 관계 입 니 다.하나의 Queue 는 최대 한 명의 Consumer 에 게 만 분배 되 고 하나의 Cosumer 는 여러 개의 Queue 를 분배 할 수 있 습 니 다.다음 그림 입 니 다.
한편,Rebalance 는 하나의 Consumer Group 의 모든 consumer 가 Queue 를 어떻게 분배 하 는 지 규정 하 는 협의 이다.Consumer 가 구독 하 는 Topic 에 변화 가 생기 거나 Consumer Group 내 Consumer 인 스 턴 스 가 변 화 를 보 낼 때 모든 인 스 턴 스 에 대응 하 는 Queue 를 재배 치 하기 위해 Rebalance 를 촉발 합 니 다.
1.RebalanceService
 앞에서 클 라 이언 트 시작 절 차 를 소개 할 때 MQClient Instance 는 start 방법 에서 일련의 배경 작업 을 시작 합 니 다.그 중에서 Rebalance 작업 을 포함 하여 Rebalance Service 의 start 방법 을 호출 했 습 니 다.Rebalance Service 는 ServiceThread 에서 계승 되 며,start 방법 은 배경 스 레 드 를 시작 하여 일정 시간(기본 20 초)마다 MQClient Instance 의 doRebalance 방법 을 호출 할 수 있 도록 합 니 다.다음 그림:
MQClient Instance.do Rebalance 는 MQConsumer Inner.do Rebalance 를 호출 합 니 다.MQConsumer Inner 는 DefaultMQPull Consumer Impl 과 DefaultMQPush Consumer Imp 의 부모 인터페이스 입 니 다.다음 과 같 습 니 다.즉,MQClient Instance 는 doRebalance 방법 을 Consumer 인 스 턴 스 처리 에 맡 겼 습 니 다.
이 어 Consumer 인 스 턴 스 는 내부 RebalnaceImpl 의 doRebalance 방법 으로 진정한 동작 을 수행 합 니 다.
여기 서 한 가지 말씀 드 리 겠 습 니 다.Rebalance Service 는 MQClient Instane 에 의 해 소유 되 고 있 습 니 다.하나의 MQClient Instance 는 하나의 Rebalance 인 스 턴 스 만 있 습 니 다.전에 클 라 이언 트 가 시 작 될 때 MQClient Instance 는 MQClient Manager 가 관리 하 는데 이 컴퓨터 ip,프로 세 스 pid 와 관련 이 있다 고 언급 했 습 니 다.Rebalance Impl 은 Consumer 인 스 턴 스 와 관련 되 고 하나의 Consumer 인 스 턴 스 는 Rebalance Impl 대상 에 대응 합 니 다.
2.RebalanceImpl
우선 이런 종류의 기본 상황 을 소개 한다.
1.속성
protected final ConcurrentMapConcurrentMap*Queue*/MessageQueue, /*Queue      */ProcessQueue> processQueueTable = new ConcurrentHashMap(64);

//DefaultMQXxxxConsumerImpl updateTopicSubscribeInfo   
protected final ConcurrentMap> topicSubscribeInfoTable = new ConcurrentHashMap>();
    
//DefaultMQXxxxConsumerImpl subscript    
protected final ConcurrentMap subscriptionInner = new ConcurrentHashMap();

protected String consumerGroup;//Consumer     ConsumerGroup
    
protected MessageModel messageModel;//      
    
protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;//Queue    ,   AllocateMessageQueueAveragely

2.상속 관계
 제1 파 트 의 내용 을 연결 하면 이러한 doRebalce 방법 을 사용 합 니 다.주요 논 리 는 다음 과 같 습 니 다.
(1)이 인 스 턴 스 구독 의 모든 topic 를 교대로 훈련 시 키 고 subscription Inner 의 값 을 옮 겨 다 니 며 topic 정 보 를 얻 습 니 다.이 속성 내용 은 클 라 이언 트 인 스 턴 스 가 subscript 를 호출 할 때 증가 합 니 다.
(2)topic 에 따라 rebalance ByTopic 를 호출 하여 rebalance 실행
(3)방송 모드 라면 topic Subscribe InfoTable 에서 이 topic 아래 의 모든 Queue 를 가 져 와 후속 업데이트 에 사용 합 니 다.즉,방송 모드 는 모든 클 라 이언 트 가 topic 아래 의 모든 q 를 받 을 수 있 고 클 라 이언 트 에 게 분 배 된 Queue 집합 은 전체 수량의 집합 입 니 다.
(4)클 러 스 터 모드 라면 topic 의 모든 Queue 를 가 져 옵 니 다.broker 에서 이 topic 에서 모든 클 라 이언 트 id 목록 가 져 오기;정렬 후 AllocateMessageAueue Strategy 를 호출 하여 Consumer Group 에서 이 클 라 이언 트 가 할당 해 야 할 Queue 집합 을 가 져 옵 니 다.즉,클 라 이언 트 마다 나 누 어 진 q 목록 은 AllocateMessage QueueStrategy 에서 분 배 됩 니 다.
(5)이 클 라 이언 트 가 속 한 Queue 집합 을 가 져 온 후 updateProcessQueueTableInRebalance 로 업데이트 합 니 다.
(6)실행 이 끝 난 후 변화 가 발생 하면 message QueueChanged 를 호출 하여 하위 클래스 에 구체 적 으로 처리 합 니 다.
(7)truncateMessage Queue NotMyTopic 을 호출 하여 캐 시 에서 이 인 스 턴 스 가 처리 하 는 Queue 가 아 닙 니 다.
2.1.broker 에서 이 topic 에 대응 하 는 클 라 이언 트 id 목록 찾기
 MQClient Instance 캐 시 에서 이 topic 에 대응 하 는 broker 주 소 를 가 져 온 다음 Netty 를 호출 하여 broker 에 직접 방문 하여 결 과 를 가 져 옵 니 다.
2.2.클 라 이언 트 인 스 턴 스 소속 Queue 집합 할당
앞에서 말 했 듯 이 방송 모델 의 모든 클 라 이언 트 인 스 턴 스 는 전체 수량의 Queue 집합 에 배분 되 었 습 니 다.여 기 는 주로 클 라 이언 트 모델 에서 AllocateMessage Queue Strategy 의 처리 상황 을 소개 합 니 다.기본 값 은 평균 분배 이 고,클래스 는 AllocateMessage Queue Averagely 입 니 다.
 AllocateMessage QueueStrategy 의 정 의 를 먼저 봅 니 다.
List allocate(
        final String consumerGroup,//  Consumer    ConsumerGroup
        final String currentCID,//       ID
        final List mqAll,//    Queue  
        final List cidAll // topic, ConsumerGroup         ID  
    );

이 방법 은 currentCID 가 속 한 Queue 목록 을 선택 합 니 다.AllocateMessage Queue Averagely 는 currentCID 가 속 한 위치 에 따라 평균 적 으로 분 배 됩 니 다.과정 은 다음 과 같 습 니 다.
위 에 원본 코드 에 대응 하 는 주석 을 붙 였 다.앞에서 언급 한 mqAll 과 cidAll 은 모두 순서 가 있 습 니 다.이 과정 은 클 라 이언 트 번호 에 따라 모든 Queue 목록 에서 자신 이 속 한 Queue 를 평균 적 으로 분배 하 는 것 입 니 다.관련 된 여러 가지 가능성 은 다음 과 같 습 니 다.
대략적인 과정 은 바로 정리 할 수 있 으 면 평균 점수 이다.정리 할 수 없 으 면 cid 는 mod 수 내 에 있 는 것 은 1 개의 Queue 로 나 누고,mod 수 외 에 있 는 것 은 1 개 로 나 뉜 다.
 앞에서 말 했 듯 이 이 분배 정책 이 실 행 될 때 topic 아래 의 Queue 목록 과 클 라 이언 트 를 정렬 합 니 다.분배 할 때 앞 에 있 는 클 라 이언 트 가 Queue 로 나 눌 수 있 고 나 눌 수 있 는 Queue 가 많 습 니 다.한 가지 상황 을 고려 해 볼 때,만약 하나의 Consumer Group 이 2 개의 topic 를 구독 했다 면,TopicX 와 TopicY,각 topic 마다 2 개의 Queue 가 있 고 이 Consumer Group 아래 4 개의 클 라 이언 트 인 스 턴 스 가 있 습 니 다.Rebalance 는 topic 에 의 해 왔 기 때문에 4 개의 Queue 가 평균 적 으로 소비 되 는 상황 이 발생 하지 않 습 니 다.결 과 는 다음 과 같 습 니 다.
따라서 초기 화 할 때 Consumer Group 의 클 라 이언 트 수량<=Topic 의 Queue 수량 을 확보 하 는 것 이 좋 습 니 다.
2.3. updateProcessQueueTableInRebalance
 이 방법의 정 의 는 다음 과 같다.
    /**
     * @param topic topic
     * @param mqSet Rebalance       , topic          q  
     * @param isOrder        
     * @return
     */
    private boolean updateProcessQueueTableInRebalance(final String topic, final Set mqSet,final boolean isOrder) {}

앞에서 언급 한 바 와 같이 Rebalance Impl 은 하나의 process QueueTable 속성 만 있 습 니 다.이 속성 은 현재 클 라 이언 트 가 정말 처리 하고 있 는 모든 Queue 와 Queue 에 대응 하 는 소비 진 도 를 유지 합 니 다.updateProcess QueueTable InRebalance 는 이 속성 을 업데이트 합 니 다.
1.rebalance 를 찾 은 후 현재 클 라 이언 트 인 스 턴 스 에 속 하지 않 는 Queue 나 만 료 된 Queue 를 찾 아 drop 으로 표시 하고 하위 클래스 에서 제거 할 지 여 부 를 판단 합 니 다.제거 가 필요 하 다 면 이 클 라 이언 트 인 스 턴 스 가 속 한 Queue 가 변 경 됩 니 다.
2 Rebalance 후 배 정 된 Queue 에 새로 추 가 된 Queue 가 있 는 지 판단 하고,있 으 면 캐 시 에 있 는 이 Queue 의 소비 편 이 량(기본 동작)을 제거 하고,이 Queue 의 소비 편 이 량 을 계산 한 다음 PullRequest 대상 을 만들어 목록 에 저장 하고,Rebalane 을 표시 한 후 처 리 된 Queue 에 변경 사항 이 있 음
3.rebalance 를 나 눠 준 후 추 가 된 Queue 목록,즉 PullRequest 목록 은 구체 적 인 하위 클래스 에서 처리 합 니 다.pull 모드 는 처리 하지 않 습 니 다.push 모드 는 PullMessageService 에 넣 고 순환 처리 합 니 다.구체 적 으로 Rebalance PushImpl 의 dispatchPullRequest 방법 에 있 습 니 다.
@Override
    public void dispatchPullRequest(List pullRequestList) {
        for (PullRequest pullRequest : pullRequestList) {
            this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
            log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
        }
    }

이 단계 에 서 는 DefaultMQPushConsumer Impl 의 executePull Request Immediately 방법 을 되 돌려 Pull Request 대상 을 추가 하여 전체 Push 모드 의 실행 을 촉발 합 니 다.구체 적 인 과정 은 다음 절 에 소개 합 니 다<1>.
2.4. messageQueueChanged
 Rebalance 이후 처 리 된 Queue 목록 이 변경 되면 해당 동작 을 수행 합 니 다.Push 모드 에 대해 클 라 이언 트 가 topic 를 구독 하 는 버 전 번 호 를 업데이트 하고 broker 에 게 알 립 니 다.Pull 모드 에 대해 서 는 DefaultMQPull Consumer Impl 의 Message Queue Listener 가 Queue 로 변 경 됩 니 다.MQPull Consumer ScheduleService 에 사 용 됩 니 다.Pull 모드 의 정시 소비 메시지<2>에 사 용 됩 니 다.
2.5. truncateMessageQueueNotMyTopic
 Queue 에서 topic 를 제거 하 는 것 은 이 인 스 턴 스 구독 대상 이 아 닙 니 다.
 위 에서 소개 한<1>,<2>점 은 Consumer 자동/정시 에 정 보 를 끌 어 올 리 는 데 사용 할 수 있 으 며 구체 적 으로 다음 클 라 이언 트 의 소비 과정 에서 소개 할 것 입 니 다.
3.브로커 측 알림 Rebalance
 위 에서 언급 한 Rebalance 는 클 라 이언 트 가 스스로 정 해진 시간(기본 20 초)에 실 행 된 것 으로 Broker 측 에서 주동 적 으로 통지 하 는 상황 도 존재 합 니 다.
 Broker 는 Consumer Manager 가 있 습 니 다.클 라 이언 트 인 스 턴 스 가 변경 되면(상하 선)각 클 라 이언 트 에 게 알 립 니 다.클 라 이언 트 가 통 지 를 받 은 후에 MQClient Instance 의 rebalance Immediately 를 호출 하여 rebalance 를 직접 실행 합 니 다.이 방법 은 ServiceThread 를 깨 워 서 Rebalance Service 가 기다 리 지 않 고 직접 실 행 될 수 있 도록 합 니 다.
 RocketMQ 는 Kafka Rebalance 체제 와 유사 하고 이들 의 Rebalance 분 배 는 모두 클 라 이언 트 에서 이 루어 집 니 다.다른 것 은:
4.567917.Kafka:소비자 팀 의 여러 소비자 사례 에서 하 나 를 Group Leader 로 선정 하여 이 Group Leader 가 구역 별로 분배 하고 분배 결 과 는 Cordinator(특수 캐릭터 의 broker)를 통 해 다른 소비자 에 게 동기 화 됩 니 다.카 프 카 에 해당 하 는 분 구 분 배 는 뇌 가 하나 밖 에 없 는데 바로 Group Leader 입 니 다
4.567917.RocketMQ:모든 소비자 가 자신 에 게 대열 을 배정 하 는 것 은 모든 소비자 가 하나의 두뇌 에 해당 합 니 다
4.Rebalance 의 잠재 적 위해
4.567917.소비 일시 정지:Consumer 1 만 있 는 상황 에서 모든 5 개 대열 을 소비 하 는 것 을 고려 합 니 다.Consumer 2 를 추가 하여 Rebalance 를 촉발 할 때 2 개의 대기 열 을 할당 하여 소비 해 야 합 니 다.그러면 Consumer 1 은 이 두 대열 의 소 비 를 멈 추고 이 두 대열 이 Consumer 2 에 배 치 된 후에 야 이 두 대열 이 계속 소 비 될 수 있다
4.567917.중복 소비:Consumer 2 는 소비 가 자신의 2 개 대열 에 분 배 될 때 반드시 Consumer 1 이전에 소 비 된 offset 에서 계속 소 비 를 시작 해 야 한다.그러나 기본 적 인 상황 에서 offset 은 비동기 적 으로 제출 되 었 습 니 다.예 를 들 어 consumer 1 은 현재 offset 에서 10 으로 소비 되 고 있 지만 비동기 적 으로 broker 에 제출 되 는 offset 은 8 입 니 다.그렇다면 consumer 2 가 8 의 offset 에서 소 비 를 시작 하면 2 가지 메시지 가 반복 된다.즉,소비자 2 는 소비자 1 이 offset 을 제출 한 뒤 리 밸 런 스 를 진행 하 기 를 기다 리 지 않 기 때문에 제출 간격 이 길 어 질 수록 중복 소비 로 이 어 질 수 있다
4.567917.소비 돌진:rebalance 로 인해 중복 소 비 를 초래 할 수 있 기 때문에 중복 소비 가 필요 하 다 는 소식 이 너무 많다.혹은 rebalance 일시 정지 시간 이 너무 길 어 일부 메시지 가 쌓 였 다.그러면 rebalance 가 끝나 면 순식간에 많은 정 보 를 소비 해 야 할 수도 있 습 니 다
이 부분 은 그 당시 소스 코드 읽 기 과정 에서 만 든 필기 약 도 를 동봉 합 니 다.
더 많은 오리지널 콘 텐 츠 는 위 챗 공식 번 호 를 검색 하 세 요:아 낙타(doubaotaizi)

좋은 웹페이지 즐겨찾기