RocketMq에서 MessageQueue 할당
답은 간단하지만 문제를 가지고 원본을 볼 수 있는 좋은 기회다.
RocketMq 구조
그림에서 볼 수 있듯이 MQ는 주로 메시지를 배달하고 끌어당기는 두 가지 부분을 볼 수 있다.
많은 구조는 시대의 흐름에 순응하는 것이다. Rocketmq의 구조 체계는 당연히 아리가 독창적인 것이 아니라 AMQP 협의에 근거한 것이다.Rocketmq의 Producer, Broker, 그리고 Consumer는 모두 AMQP의 개념에 따라 파생된 것이다.따라서 AMQP(Advanced Message Queuing Protocal, 고급 메시지 대기열 프로토콜)를 설명하면 기술의 발전 과정을 더욱 잘 이해할 수 있습니다.
paper 다운로드http://www.amqp.org/specification/0-9-1/amqp-org-download
메시지 대기열의 3가지 유형
물론 이러한 프로토콜을 바탕으로 RocketMq만이 메시지 대기열 선택 모델에 빛나는 것이 아니라 다른 메시지 대기열도 있다.
https://mp.weixin.qq.com/s/B1D-J_1wpaqj0sxcmaArbQ
주로 3대 진영으로 나뉜다.
물론 AMQP 프로토콜에 익숙해지면 메시지 대기열을 직접 연구할 수도 있습니다
https://zhuanlan.zhihu.com/p/28967866
몇 가지 배경을 알아보고 RocketMQ의 메시지 배달 과정을 살펴보겠습니다.아니면 그 구체적인 질문, Rocket MQ는 어떻게 하나의 대기열을 선택하여 배달합니까?
Producer가 다른 큐에 메시지를 보내는 방법
여기서 말하자면, Rocket Mq의 모든 생산자와 소비자에 관한 코드는client 패키지 아래에 있다.원본을 열면 Procuder 아래에 셀렉터 패키지가 있는데 이 패키지가 그 느낌인지 볼 수 있습니다.
selector 아래의 세 종류가 모두 MessageQueue Selector를 실현한 것을 볼 수 있습니다. MessageQueue Selector의 코드를 보십시오.
public interface MessageQueueSelector {
MessageQueue select(final List mqs, final Message msg, final Object arg) ;
}
public class MessageQueue {
private String topic;
private String brokerName;
private int queueId;
}
MessageQueue Selector가 어디에 호출되었는지 확인해 보세요.select(), Default MQProducer Impl을 발견했습니다. 그러면 MessageQueue Selector에서 어떤 대기열을 선택했는지 확인할 수 있습니다.
RocketMq는 다음과 같은 3가지 선택 대기열 방식을 제공합니다.
기본 대기열 수
세심한 학우들은 틀림없이 그렇게 대열의 수량이 무한대냐고 물어볼 것이다.이것은 RocketMq의 사용 설명서를 볼 수 있습니다. 기본 대기열 수량은 4 (default TopicQueue Nums: 4)입니다. 물론 설정도 선택할 수 있습니다.
동시에 학우들이 장소를 잘못 찾았는지 모르겠다. 필자는 처음에 장소를 잘못 찾았고 Topic Publish Info에서도 selectOne MessageQueue를 찾았다. 코드는 다음과 같다.
public class TopicPublishInfo{
// , ,
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName != null) {
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return null;
}
else {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
return this.messageQueueList.get(pos);
}
}
}
아래 호출자를 찾아보니 MQFault Strategy입니다. 보아하니 Rocketmq 소비가 실패했을 때 메시지를 다른 대기열에 다시 전송하여 집단 모드에서 서로 다른 기계 소비에 분포할 수 있도록 합니다.(혹시 다른 기계가 왜 보장되는지 의문이 남았는지 아래를 보세요)
Consumer가 메시지 대기열에서 메시지를 가져오는 방법
이것은 이해하기 어려운 단계입니다. 먼저 RocketMQ 매뉴얼을 보면 다음과 같습니다.
Rocket MQ의 Consumer는 모두 Broker에서 메시지를 끌어서 소비하지만 실시간으로 메시지를 받을 수 있도록 Rocket MQ는 긴 퀴즈 방식을 사용하여 메시지의 실시간성이 Push 방식과 일치하도록 보장합니다.반종 장륜 조회 방식은 亍 웹 QQ 수신 메시지 메커니즘과 유사하다.자세한 내용은 다음 정보를 참조하십시오.http://www.ibm.com/developerworks/cn/web/wa-lo-comet/
상세하게 설명했지만 초보자에게는 우호적이지 않았다.간단하게 말하자면 긴 휠체어를 사용하여 클라이언트가 요청과 서버를 먼저 연결합니다. 그러나 서버가 데이터가 없으면 연결입니까hold가 사는지 데이터push가 클라이언트에게 있을 때 연결을 닫습니다.이렇게 하면 소비자들이 상류의 소식에 무너지지 않을 뿐만 아니라, 소식의 실시간성도 보장한다.
그러면 또 질문이 있습니다. Consumer는 어떻게 MessageQueue에서 메시지를 끌어냅니까?랜덤으로 당기는 거예요?
MQPullConsumer를 보십시오. DefaultMQPullConsumer는 그 계승입니다.
public class MQPullConsumer {
// ,
//
// @param mq from which message queue
// @param subExpression tag, "tag1 || tag2 || tag3"
// @param offset
// @param maxNums
PullResult pull(final MessageQueue mq, final String subExpression, final long offset,
final int maxNums) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
}
메시지 Queue가 들어오는 것을 볼 수 있어서 어색합니다. 언제 어느 대기열에서 메시지를 끌어낼지 결정할 수 없습니다.만능 검색엔진이 있어서 다행이지만,
https://zhuanlan.zhihu.com/p/25140744
Rocket Mq에는 Allocate MessageQueue Strategy 전문 클래스가 있습니다.class, 클라이언트에 숨겨져 있습니다.Consumer.리밸런스 싸.
매번 Consumer 수량의 변경은 AllocateMessageQueue Strategy를 촉발합니다.즉, 매번 Consumer가 끄는 대기열은 고정되어 있다.
이제 고개를 돌려 첫 번째 Rocket MQ의 구조도를 보면 투철하게 그려진 것 같지 않아요?
총결산
다음으로 전송:https://juejin.im/post/5a9212bf6fb9a063395c8ae0
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.