RocketMQ(8): 메시지 전송

6724 단어
창의력 영도 전재 오리지널의 출처를 밝혀 주십시오. 감사합니다!

RocketMQ 네트워크 배포도

  • NameServer: 시스템에서 이름 서비스를 하고 브로커 서비스를 업데이트하고 발견합니다.
  • Broker-Master:broker 메시지 호스트 서버.
  • Broker-Slave: 브로커 메시지 랜덤 서버.
  • Producer: 메시지 생산자.
  • Consumer: 메시지 소비자.

  • 설명:rocketmq 시리즈는 모두 rocketmq-4.1.0-incubating으로 소개됩니다.
    원본 코드를 읽을 때 일정한 주석을 만들었습니다. 공중번호 [장심 영도] 회답: rocketmq를 기반으로 rocketmq4.1.0에 상세한 중국어 코드 주석을 얻을 수 있습니다.스타, 포크 여러분 환영합니다!
    드잡이는 메시지 중간부품의 본질적인 메시지 중간부품의 길을 간단하게 말했다. 한 발에 한 발씩 소비를 저장하고 오늘은 주로 다음 발췌를 논의한다. 바로 Rocket MQ 네트워크 배치도에서 색깔로 표시된 부분이다.

    이전 rocketmq 시리즈

  • RocketMQ 일부 데이터 소비 불능 문제 해결
  • rocketmq를 예상보다 잘 쓰게 하는 방법 1가지
  • RocketMQ(1): 소스 디버깅
  • rocketmq번외편(一): 개발명령행
  • RocketMQ(6):namesrv 재탐색
  • RocketMQ(5):namesrv 초기 탐색
  • CRC 체크
  • RocketMQ(2): RPC 통신
  • RocketMQ 해혹편
  • RocketMQ 빠른 시작
  • MQ 응용 장면
  • RocketMQ 클러스터 배포 구성
  • Ali RocketMQ
  • 메시지 전송 개요


    위의 그림은 아마도 프로덕터가 브로커에게 메시지를 보내는 핵심 논리일 것이다.
    질문:
    브로커 관련 정보를 클라이언트에 캐시하는 것은namesrv와의 상호작용을 감소시켰지만,broker 변화의 실시간성을 떨어뜨렸다. 어떻게 갑자기 브로커 한 대를 사용할 수 없게 되면 어떻게 될까?(다음에 rocketmq의 처리를 보십시오), 왜 프로덕터가 그렇게 빨리 발송됩니까?본질은 넷티의 write And Flush 때문?프로덕터는 어떻게 비동기적으로 발송합니까?동기화 발송?모뉴웨이가 보낸 건요?만약 발송에 실패하면 어떻게 처리합니까?

    메시지 발송 일반 프로세스 분석


    발송은 정시 발송, 순서 발송, 대량 발송 등 상황도 관련되기 때문에 본고는 편폭 문제가 일반적인 발송 논리 설명임을 고려하여 다음에 다른 상황을 계속 공유한다.
    이 편을 읽기 전에 중점적으로 읽어야 한다. RocketMQ(二): RPC 통신.
    어떻게 로컬 디버깅 전에 글을 공유했는지 여기서 언급하지 않겠습니다. 보내는 논리는 저장과 소비에 비해 가장 간단합니다(한 줄에 따라 끊임없이 따라가면 기본적으로 차이가 많지 않습니다). 그리고 저장이 가장 복잡하고 그 다음에 소비합니다(이 과정은 한 줄로 찾기 어렵고 후속적으로 공유할 수 있습니다).

    동기식 전송 쓰기


    참고: RocketMQ를 참조하여 빠르게 시작하십시오.

    producer.start

        /**
         * Start this producer instance.
         * 
         *
         * 
         * Much internal initializing procedures are carried out to make this instance prepared, thus, it's a must to invoke
         * this method before sending or querying messages.
         * 
         * 
         *
         * @throws MQClientException if there is any unexpected error.
         */
        @Override
        public void start() throws MQClientException {
            this.defaultMQProducerImpl.start();
        }
    

    다음과 같은 작업을 주로 수행했습니다(핵심 작업).
  • 일부 구성 검사.
  • namesrv와 통신하는netty 클라이언트를 구축합니다.
  • 기본적으로 30s마다namesrv와 교환하여 브로커에 대한 정보를 얻습니다.
  • 기본적으로 30s마다 실효된 브로커 정보를 제거하고 모든 브로커에 심장박동을 보냅니다.

  • 메시지 객체 만들기


    producer는 메시지 대상으로 발송됩니다. 메시지 구조를 보십시오.
        public Message() {
        }
    
        public Message(String topic, byte[] body) {
            this(topic, "", "", 0, body, true);
        }
    
        public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {
            this.topic = topic;
            this.flag = flag;
            this.body = body;
    
            if (tags != null && tags.length() > 0)
                this.setTags(tags);
    
            if (keys != null && keys.length() > 0)
                this.setKeys(keys);
    
            this.setWaitStoreMsgOK(waitStoreMsgOK);
        }
    
        public Message(String topic, String tags, byte[] body) {
            this(topic, tags, "", 0, body, true);
        }
    
        public Message(String topic, String tags, String keys, byte[] body) {
            this(topic, tags, keys, 0, body, true);
        }
    

    비고: 주로 topic, tags, 그리고 바디의 진실한 내용 등입니다.

    보내기

    SendResult sendResult = producer.send(msg);
    

    발송 처리를 진행하다.다음은 send가 어떻게 처리하는지 중점적으로 봅시다.

    전송send 핵심 분석


    보내는 몇 가지 방식: 동기화 비동기화 원웨이(어떤 것을 선택해야 할지 상황에 따라 판단해야 함)
    동기화 발송을 예로 들면 기본 시간 초과 시간은 3s입니다.
    SendResult sendResult = producer.send(msg);
    

    이것은 바로 발송의 트리거 방법이다. 우리는 계속 따라가면 된다. **첫 번째 초보적인 느낌: **추적을 통해 들어가는 첫 번째 느낌은 JUC와 관련된 사용이다. 향원 모드(본질적인 맵을 이용하여 캐시를 하는 것)와 넷티를 대량으로 사용하는 것이다.
    핵심 논리:
    코드는 대량으로 복제되지 않습니다. 필요한github에서rocketmq4.1.0을 기반으로 자세한 중국어 코드 설명을 얻을 수 있습니다.스타, 포크 여러분 환영합니다!
  • 서비스를 사용할 수 있는지 판단합니까?프로세스를 직접 종료할 수 없습니다.
  • 메시지 확인:
  • topic 루트 정보 캐시에서 가져오면namesrv와 한 번 상호작용을 하지 않습니다. (2번도 가능합니다.) topic 정보가broker 서버에 반드시 존재하지 않기 때문에 존재하지 않으면 기본 (TBW102) 을 사용합니다.

  • 패키지 요청 헤더 정보:
    // Namesrv  Topic Broker Name、 ( )
     public static final int GET_ROUTEINTO_BY_TOPIC = 105;
    

    namesrv 서버가 이 요청을 받아들인 처리 상황입니다.
    마지막으로 얻은 라우팅 정보는 다음과 같습니다.
  • 발송 모드는sync로 3회 기타 1회
    // sync  3 1 
    int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
    
  • queue를 선택하면 그 브로커를 보내는 그queueid 위를 어떻게 선택하나요?(클라이언트 자체 부하), 브로커 관련 정보가 클라이언트에 캐시되어 있기 때문에 문제가 발생했습니다(30s가 정보를 한 번 동기화하기 때문에 30s 안에 브로커에 문제가 생기면 어떻게 될까요?)rocketmq는 이렇게 처리됩니다:sendLatencyFaultEnable 스위치를 켜는지 여부1.열기 --> 사용 불가 기간 2.안 열기 (기본값) --> 랜덤으로 하기 (lastBrokerName이 비어 있지 않으면 이 브로커가 아닌 것으로 바꾸고, 없으면 랜덤으로 하기)
  • sendKernelImpl을 호출하여 메시지를 보내는 핵심은 브로커의name에 따라 IP 주소를 가져옵니다. 채널이 만들어지지 않고 저장되지 않으면.설정 UNIQ_id, 클라이언트 IP 주소 정보를 보호합니다.보낼 때 갈고리 함수가 실행됩니다. (메시지 갈고리 금지, 메시지 갈고리 보내기 (execute Send Message Hook Before, execute Send Message Hook After)SendMessageRequest Header를 구축하는데 메시지 스탬프를 생성하는 것을 포함하기 때문에 각 기계의 시간은 일치하는 것이 가장 좋다(이렇게 하면 브로커가 메시지를 받아들이는 데 얼마나 걸렸는지 알 수 있다).
  • 발송 메시지 모드에 따라 발송 방식을 선택한다. 다음은 주로 동기화 발송 상황을 본다.만약에 1 상황이 nettywrite And Flush 발송 성공자를 실행하여 뛰어나오면, 3 상황에 도달하여 진행하는 등 최대 3s를 기다립니다.여기는 언제 깨울까요?사실은 브로커 상황이 클라이언트에게 응답할 때 깨우는 것이다.
    참고: CountDownLatch를 사용하여 비동기식으로 동기화합니다.
    2상황이면 발송 실패를 표시하고, 3상황을 깨워서 막지 않습니다(마지막으로 이상을 던지면 발송 실패를 표시합니다)
  • 브로커 업데이트 가능 시간
  • retryAnotherBrokerWhenNotStoreOK 상황판단retryAnotherBrokerWhenNotStoreOK를true로 설정한 후 발송에 실패했을 때 브로커를 바꾸는 것을 선택합니다.
  • 다음과 같은 이상continue, 발송 메시지 재시도
  • 클라이언트 송신 절차는 대략 여기에 와서 분석이 끝났다.
    읽은 후에 얻은 것이 있다면 좋아요, 관심, 공중호[장심영도]를 눌러서 더 많은 멋진 역사를 찾아보세요!!!
    지식별에 가입하여 함께 탐구하자!

    좋은 웹페이지 즐겨찾기