rocketmq 소비 부하 균형--push 소비 상세 설명

머리말
본 고 는 DefaultMQPushConsumer Impl 소비자,클 라 이언 트 부하 균형 에 관 한 지식 을 소개 한다.본 고 는 DefaultMQPushConsumer Impl 의 시작 과정 부터 부하 균형 을 실현 하기 까지 소스 코드 에서 한 걸음 한 걸음 분석 하여 모두 6 개 부분 으로 나 누 어 소개 하 였 으 며,그 중에서 6 번 째 부분 인 rebalance ByTopic 은 부하 균형 의 핵심 논리 모듈 로 구체 적 인 과정 은 그림 과 글 을 활용 하여 논술 하 였 다.
소개 하기 전에 먼저 몇 가지 문 제 를 던 졌 다.
1.부하 균형 을 잡 으 려 면 먼저 해결 해 야 할 문 제 는 무엇 입 니까?
2.부하 균형 은 Client 엔 드 처리 입 니까?Broker 엔 드 처리 입 니까?
개인 적 인 이해:
1.부하 균형 을 잡 으 려 면 먼저 신호 수집 을 해 야 한다.
신호 수집 이란 모든 consumer Group 에 어떤 consumer 가 있 는 지,대응 하 는 topic 이 누구 인지 알 아야 한 다 는 것 이다.신호 수집 은 Client 엔 드 신호 수집 과 Broker 엔 드 신호 수집 두 부분 으로 나 뉜 다.
2.부하 균형 을 클 라 이언 트 엔 드 에 두 고 처리 합 니 다.
구체 적 인 방법 은 소비자 클 라 이언 트 가 시작 할 때 rebalance Impl 인 스 턴 스 를 보완 하 는 동시에 구독 정 보 를 복사 하여 rebalance Impl 인 스 턴 스 대상 에 저장 하 는 것 입 니 다.또한 매우 중요 한 단계 입 니 다.심장 박동 정 보 를 통 해 자신 을 모든 Broker 에 보고 하고 Register Consumer 를 등록 하 는 것 입 니 다.상기 과정 이 준 비 된 후에 Client 에서 계속 실행 되 는 부하 균형 서비스 스 레 드 는 Broker 에서 전체 정보(이 consumer Group 에서 모든 소비 Client)를 얻 은 다음 에 이러한 전체 정 보 를 분배 하여 현재 클 라 이언 트 가 배정 한 소비 대기 열 을 얻 습 니 다.
본문의 구체 적 인 내용:
I. copySubscription
Client 단 신호 수집,구독 정보 복사.
DefaultMQPushConsumerImpl.start()에 서 는 소비자 의 topic 구독 관 계 를 rebalance Impl 의 SubscriptionInner 맵 에 부하 로 설정 합 니 다.

private void copySubscription() throws MQClientException {
try {
// :  consumer        topic
Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
if (sub != null) {
for (final Map.Entry<String, String> entry : sub.entrySet()) {
final String topic = entry.getKey();
final String subString = entry.getValue();
SubscriptionData subscriptionData =
FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),//
topic, subString);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
}
if (null == this.messageListenerInner) {
this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING:
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData =
FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),//
retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
}
catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
FilterAPI.buildSubscriptionData 인 터 페 이 스 는 구독 관 계 를 SubscriptionData 데이터 로 변환 합 니 다.그 중에서 subString 은 구독 tag 등 정 보 를 포함 합 니 다.또 이 소비자 의 소비 패턴 이 클 러 스 터 소비 라면 retry 의 topic 를 함께 넣는다.
II.rebalanceImpl 실례 보완
클 라 이언 트 가 계속 정 보 를 수집 합 니 다:

this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer
.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
본 고 는 DefaultMQPushConsumerImpl 을 예 로 들 기 때문에 this 대상 유형 은 DefaultMQPushConsumerImp 이다.
III. this.rebalanceService.start()
부하 균형 서비스 오픈.this.rebalance Service 는 Rebalance Service 인 스 턴 스 대상 으로 ServiceThread 와 계승 되 며 스 레 드 클래스 입 니 다.this.rebalance Service.start()가 실 행 될 때 Rebalance Service 스 레 드 를 실행 합 니 다.

@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStoped()) {
this.waitForRunning(WaitInterval);
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
IV. this.mqClientFactory.doRebalance
클 라 이언 트 는 소비 그룹 table 을 옮 겨 다 니 며 이 클 라 이언 트 의 모든 소비자 에 게 독립 적 으로 부하 균형 을 잡 고 소비 대기 열 을 나 누 어 줍 니 다.

public void doRebalance() {
for (String group : this.consumerTable.keySet()) {
MQConsumerInner impl = this.consumerTable.get(group);
if (impl != null) {
try {
impl.doRebalance();
} catch (Exception e) {
log.error("doRebalance exception", e);
}
}
}
}
V. MQConsumerInner.doRebalance
본 고 는 DefaultMQPushConsumerImpl 소비 과정 을 예 로 들 면 DefaultMQPushConsumerImpl.do Rebalance:

@Override
public void doRebalance() {
if (this.rebalanceImpl != null) {
this.rebalanceImpl.doRebalance();
}
}
단계 II 에서 rebalance Impl 실례 를 보완 하여 rebalance Impl.do Rebalance()를 호출 하기 위해 초기 데 이 터 를 제공 하 였 습 니 다.
rebalanceImpl.do Rebalance()과정 은 다음 과 같 습 니 다.

public void doRebalance() {
     //   copySubscription     SubscriptionInner
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
this.rebalanceByTopic(topic);
} catch (Exception e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
VI.rebalance ByTopic--핵심 절차 중 하나
rebalance ByTopic 방법 은 소비자 의 소비 유형 에 따라 BROAdcasting 또는 CLUSTERING 으로 서로 다른 논리 적 처 리 를 한다.CLUSTERING 논 리 는 BROADCASTING 논 리 를 포함 하고 이 부분 은 클 러 스 터 소비 부하 균형 의 논리 만 소개 한다.
클 러 스 터 소비 부하 균형 논리 주요 코드 는 다음 과 같다(log 등 코드 생략).

//1. topicSubscribeInfoTable       topic         
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//2.  broker               clientId
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
f (null == mqSet) { ... }
if (null == cidAll) { ... }
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);

     // 3.  DefaultMQPushConsumer        AllocateMessageQueueAveragely
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

List<MessageQueue> allocateResult = null;
try {
         // 4.  AllocateMessageQueueAveragely.allocate  ,    client      
allocateResult = strategy.allocate(
this.consumerGroup, 
this.mQClientFactory.getClientId(), 
mqAll,
cidAll);
} catch (Throwable e) {
return;
}
    // 5.       allocateResult       allocateResultSet   
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
、
     //6.   updateProcessQueue
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet);
if (changed) {
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
주:BROADCASTING 논 리 는 상기 1,6 만 포함 합 니 다.
집단 소비 부하 균형 논리 중의 1,2,4 이 세 가지 관련 지식 은 그 핵심 과정 이 고 각 점 의 관련 지식 은 다음 과 같다.
첫 번 째:topic Subscribe InfoTable 목록 에서 이 topic 와 관련 된 모든 메시지 큐 를 가 져 옵 니 다.

두 번 째:broker 에서 이 소비 팀 을 소비 하 는 모든 클 라 이언 트 clientId 를 가 져 옵 니 다.
먼저,소비자 대상 은 모든 broker 에 게 심장 박동 가방 을 계속 보 내 고 자신 을 보고 하 며 구독 관계 와 클 라 이언 트 Channel InfoTable 을 등록 하고 업데이트 합 니 다.그 후에 클 라 이언 트 는 소비 부하 균형 을 이 룰 때 소비 클 라 이언 트 를 가 져 와 이런 클 라 이언 트 에 대해 부하 균형 을 이 루 고 소 비 를 나 누 어 주 는 대기 열 을 가진다.구체 적 인 과정 은 다음 그림 과 같다.

제4 시:AllocateMessage QueueAveragely.allocate 방법 을 호출 하여 현재 클 라 이언 트 분배 소비 대기 열 을 가 져 옵 니 다.

주:위의 그림 에서 cId 1,cId 2,...,cIdN 은 getConsumerIdListByGroup 을 통 해 가 져 옵 니 다.이 Consumer Group 의 모든 온라인 클 라 이언 트 목록 에 있 습 니 다.
현재 소 비 는 부하 균형 전략 을 실시 한 후 대응 하 는 메시지 소비 대기 열 을 가 져 옵 니 다.구체 적 인 알고리즘 은 매우 간단 해서 소스 코드 를 볼 수 있다.
이상 이 바로 본 고의 모든 내용 입 니 다.여러분 의 학습 에 도움 이 되 고 저 희 를 많이 응원 해 주 셨 으 면 좋 겠 습 니 다.

좋은 웹페이지 즐겨찾기