RocketMq에서 MessageQueue 할당

8251 단어
Rocketmq에 Consumer Group이라는 개념이 있다는 거 다 알아요.집단 모드에서 여러 대의 서버가 같은 Consumer Group을 설정하면 매번 한 대의 서버만 메시지를 소비할 수 있다(주의하지만 한 번만 소비할 수 있고 네트워크가 떨리는 상황은 보장되지 않는다).그렇다면 필자는 Rocketmq가 어떻게 이 모델을 실현했는지 의심스럽다.어떻게 서버 한 대만 소비할 수 있습니까?
답은 간단하지만 문제를 가지고 원본을 볼 수 있는 좋은 기회다.

RocketMq 구조


그림에서 볼 수 있듯이 MQ는 주로 메시지를 배달하고 끌어당기는 두 가지 부분을 볼 수 있다.
많은 구조는 시대의 흐름에 순응하는 것이다. Rocketmq의 구조 체계는 당연히 아리가 독창적인 것이 아니라 AMQP 협의에 근거한 것이다.Rocketmq의 Producer, Broker, 그리고 Consumer는 모두 AMQP의 개념에 따라 파생된 것이다.따라서 AMQP(Advanced Message Queuing Protocal, 고급 메시지 대기열 프로토콜)를 설명하면 기술의 발전 과정을 더욱 잘 이해할 수 있습니다.
paper 다운로드http://www.amqp.org/specification/0-9-1/amqp-org-download
  • Broker: 수신 및 배포 응용 프로그램
  • Virtual host: 여러 임대인과 안전 요소로 AMQP의 기본 구성 요소를 가상 그룹으로 구분합니다.각 세입자 간에는 Linux의namespace 개념(자체 Google 가능)과 유사한 네트워크 격리가 있습니다
  • 연결:publisher/consumer와 브로커 간의 TCP 연결
  • Channel: Connection보다 가벼운 연결이고 Connection의 논리적 연결입니다
  • Exchange: 메시지를 서로 다른Queue에 나누어 줍니다
  • Queue: 메시지는 최종적으로 Queue에 떨어진다. 메시지는 Broker push가 Consumer에게 주거나 Consumer가 pull 메시지를 보낸다
  • Binding:exchange와queue 사이의 메시지 루트 정책

  • 메시지 대기열의 3가지 유형


    물론 이러한 프로토콜을 바탕으로 RocketMq만이 메시지 대기열 선택 모델에 빛나는 것이 아니라 다른 메시지 대기열도 있다.
    https://mp.weixin.qq.com/s/B1D-J_1wpaqj0sxcmaArbQ
    주로 3대 진영으로 나뉜다.
  • Broker 중Topic 흐름: kafka, JMS가 있습니다
  • Broker 가벼운 Topic 흐름: RocketMQ가 있습니다
  • 브로커 없음: ZeroMQ

  • 물론 AMQP 프로토콜에 익숙해지면 메시지 대기열을 직접 연구할 수도 있습니다
    https://zhuanlan.zhihu.com/p/28967866
    몇 가지 배경을 알아보고 RocketMQ의 메시지 배달 과정을 살펴보겠습니다.아니면 그 구체적인 질문, Rocket MQ는 어떻게 하나의 대기열을 선택하여 배달합니까?

    Producer가 다른 큐에 메시지를 보내는 방법


    여기서 말하자면, Rocket Mq의 모든 생산자와 소비자에 관한 코드는client 패키지 아래에 있다.원본을 열면 Procuder 아래에 셀렉터 패키지가 있는데 이 패키지가 그 느낌인지 볼 수 있습니다.
    selector 아래의 세 종류가 모두 MessageQueue Selector를 실현한 것을 볼 수 있습니다. MessageQueue Selector의 코드를 보십시오.
    public interface MessageQueueSelector {
        MessageQueue select(final List mqs, final Message msg, final Object arg);
    }
    
    public class MessageQueue {
    	private String topic;
    	private String brokerName;
    	private int queueId;
    }
    

    MessageQueue Selector가 어디에 호출되었는지 확인해 보세요.select(), Default MQProducer Impl을 발견했습니다. 그러면 MessageQueue Selector에서 어떤 대기열을 선택했는지 확인할 수 있습니다.
    RocketMq는 다음과 같은 3가지 선택 대기열 방식을 제공합니다.
  • SelectMessageQueueByHash
  • SelectMessageQueueByMachineRoom
  • SelectMessageQueueByRandom

  • 기본 대기열 수


    세심한 학우들은 틀림없이 그렇게 대열의 수량이 무한대냐고 물어볼 것이다.이것은 RocketMq의 사용 설명서를 볼 수 있습니다. 기본 대기열 수량은 4 (default TopicQueue Nums: 4)입니다. 물론 설정도 선택할 수 있습니다.
    동시에 학우들이 장소를 잘못 찾았는지 모르겠다. 필자는 처음에 장소를 잘못 찾았고 Topic Publish Info에서도 selectOne MessageQueue를 찾았다. 코드는 다음과 같다.
    public class TopicPublishInfo{
        //  , , 
        public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
            if (lastBrokerName != null) {
                int index = this.sendWhichQueue.getAndIncrement();
                for (int i = 0; i < this.messageQueueList.size(); i++) {
                    int pos = Math.abs(index++) % this.messageQueueList.size();
                    MessageQueue mq = this.messageQueueList.get(pos);
                    if (!mq.getBrokerName().equals(lastBrokerName)) {
                        return mq;
                    }
                }
    
                return null;
            }
            else {
                int index = this.sendWhichQueue.getAndIncrement();
                int pos = Math.abs(index) % this.messageQueueList.size();
                return this.messageQueueList.get(pos);
            }
        }
    }
    

    아래 호출자를 찾아보니 MQFault Strategy입니다. 보아하니 Rocketmq 소비가 실패했을 때 메시지를 다른 대기열에 다시 전송하여 집단 모드에서 서로 다른 기계 소비에 분포할 수 있도록 합니다.(혹시 다른 기계가 왜 보장되는지 의문이 남았는지 아래를 보세요)

    Consumer가 메시지 대기열에서 메시지를 가져오는 방법


    이것은 이해하기 어려운 단계입니다. 먼저 RocketMQ 매뉴얼을 보면 다음과 같습니다.
    Rocket MQ의 Consumer는 모두 Broker에서 메시지를 끌어서 소비하지만 실시간으로 메시지를 받을 수 있도록 Rocket MQ는 긴 퀴즈 방식을 사용하여 메시지의 실시간성이 Push 방식과 일치하도록 보장합니다.반종 장륜 조회 방식은 亍 웹 QQ 수신 메시지 메커니즘과 유사하다.자세한 내용은 다음 정보를 참조하십시오.http://www.ibm.com/developerworks/cn/web/wa-lo-comet/
    상세하게 설명했지만 초보자에게는 우호적이지 않았다.간단하게 말하자면 긴 휠체어를 사용하여 클라이언트가 요청과 서버를 먼저 연결합니다. 그러나 서버가 데이터가 없으면 연결입니까hold가 사는지 데이터push가 클라이언트에게 있을 때 연결을 닫습니다.이렇게 하면 소비자들이 상류의 소식에 무너지지 않을 뿐만 아니라, 소식의 실시간성도 보장한다.
    그러면 또 질문이 있습니다. Consumer는 어떻게 MessageQueue에서 메시지를 끌어냅니까?랜덤으로 당기는 거예요?
    MQPullConsumer를 보십시오. DefaultMQPullConsumer는 그 계승입니다.
    public class MQPullConsumer {
    
        //  , 
        // 
        // @param mq from which message queue
        // @param subExpression  tag, "tag1 || tag2 || tag3"
        // @param offset  
        // @param maxNums  
        PullResult pull(final MessageQueue mq, final String subExpression, final long offset,
    	    final int maxNums) throws MQClientException, RemotingException, MQBrokerException,
    	    InterruptedException;
    }
    

    메시지 Queue가 들어오는 것을 볼 수 있어서 어색합니다. 언제 어느 대기열에서 메시지를 끌어낼지 결정할 수 없습니다.만능 검색엔진이 있어서 다행이지만,
    https://zhuanlan.zhihu.com/p/25140744
    Rocket Mq에는 Allocate MessageQueue Strategy 전문 클래스가 있습니다.class, 클라이언트에 숨겨져 있습니다.Consumer.리밸런스 싸.
  • AllocateMessageQueueAveragely
  • AllocateMessageQueueAveragelyByCircle
  • AllocateMessageQueueByConfig
  • AllocateMessageQueueByMachineRoom
  • AllocateMessageQueueConsistentHash

  • 매번 Consumer 수량의 변경은 AllocateMessageQueue Strategy를 촉발합니다.즉, 매번 Consumer가 끄는 대기열은 고정되어 있다.
    이제 고개를 돌려 첫 번째 Rocket MQ의 구조도를 보면 투철하게 그려진 것 같지 않아요?

    총결산

  • 모든 구조는 그 파생 변화의 역사를 가지고 구조 변화의 역사를 이해해야만 하나의 구조를 더욱 잘 이해할 수 있다
  • 사용 매뉴얼을 잘 연구하여 많은 구조의 세부 사항을 포함하였다
  • 문제를 가지고 원본을 연구한다

  • 다음으로 전송:https://juejin.im/post/5a9212bf6fb9a063395c8ae0

    좋은 웹페이지 즐겨찾기