RocketMQ(8): 메시지 전송
RocketMQ 네트워크 배포도
설명:rocketmq 시리즈는 모두 rocketmq-4.1.0-incubating으로 소개됩니다.
원본 코드를 읽을 때 일정한 주석을 만들었습니다. 공중번호 [장심 영도] 회답: rocketmq를 기반으로 rocketmq4.1.0에 상세한 중국어 코드 주석을 얻을 수 있습니다.스타, 포크 여러분 환영합니다!
드잡이는 메시지 중간부품의 본질적인 메시지 중간부품의 길을 간단하게 말했다. 한 발에 한 발씩 소비를 저장하고 오늘은 주로 다음 발췌를 논의한다. 바로 Rocket MQ 네트워크 배치도에서 색깔로 표시된 부분이다.
이전 rocketmq 시리즈
메시지 전송 개요
위의 그림은 아마도 프로덕터가 브로커에게 메시지를 보내는 핵심 논리일 것이다.
질문:
브로커 관련 정보를 클라이언트에 캐시하는 것은namesrv와의 상호작용을 감소시켰지만,broker 변화의 실시간성을 떨어뜨렸다. 어떻게 갑자기 브로커 한 대를 사용할 수 없게 되면 어떻게 될까?(다음에 rocketmq의 처리를 보십시오), 왜 프로덕터가 그렇게 빨리 발송됩니까?본질은 넷티의 write And Flush 때문?프로덕터는 어떻게 비동기적으로 발송합니까?동기화 발송?모뉴웨이가 보낸 건요?만약 발송에 실패하면 어떻게 처리합니까?
메시지 발송 일반 프로세스 분석
발송은 정시 발송, 순서 발송, 대량 발송 등 상황도 관련되기 때문에 본고는 편폭 문제가 일반적인 발송 논리 설명임을 고려하여 다음에 다른 상황을 계속 공유한다.
이 편을 읽기 전에 중점적으로 읽어야 한다. RocketMQ(二): RPC 통신.
어떻게 로컬 디버깅 전에 글을 공유했는지 여기서 언급하지 않겠습니다. 보내는 논리는 저장과 소비에 비해 가장 간단합니다(한 줄에 따라 끊임없이 따라가면 기본적으로 차이가 많지 않습니다). 그리고 저장이 가장 복잡하고 그 다음에 소비합니다(이 과정은 한 줄로 찾기 어렵고 후속적으로 공유할 수 있습니다).
동기식 전송 쓰기
참고: RocketMQ를 참조하여 빠르게 시작하십시오.
producer.start
/**
* Start this producer instance.
*
*
*
* Much internal initializing procedures are carried out to make this instance prepared, thus, it's a must to invoke
* this method before sending or querying messages.
*
*
*
* @throws MQClientException if there is any unexpected error.
*/
@Override
public void start() throws MQClientException {
this.defaultMQProducerImpl.start();
}
다음과 같은 작업을 주로 수행했습니다(핵심 작업).
메시지 객체 만들기
producer는 메시지 대상으로 발송됩니다. 메시지 구조를 보십시오.
public Message() {
}
public Message(String topic, byte[] body) {
this(topic, "", "", 0, body, true);
}
public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {
this.topic = topic;
this.flag = flag;
this.body = body;
if (tags != null && tags.length() > 0)
this.setTags(tags);
if (keys != null && keys.length() > 0)
this.setKeys(keys);
this.setWaitStoreMsgOK(waitStoreMsgOK);
}
public Message(String topic, String tags, byte[] body) {
this(topic, tags, "", 0, body, true);
}
public Message(String topic, String tags, String keys, byte[] body) {
this(topic, tags, keys, 0, body, true);
}
비고: 주로 topic, tags, 그리고 바디의 진실한 내용 등입니다.
보내기
SendResult sendResult = producer.send(msg);
발송 처리를 진행하다.다음은 send가 어떻게 처리하는지 중점적으로 봅시다.
전송send 핵심 분석
보내는 몇 가지 방식: 동기화 비동기화 원웨이(어떤 것을 선택해야 할지 상황에 따라 판단해야 함)
동기화 발송을 예로 들면 기본 시간 초과 시간은 3s입니다.
SendResult sendResult = producer.send(msg);
이것은 바로 발송의 트리거 방법이다. 우리는 계속 따라가면 된다. **첫 번째 초보적인 느낌: **추적을 통해 들어가는 첫 번째 느낌은 JUC와 관련된 사용이다. 향원 모드(본질적인 맵을 이용하여 캐시를 하는 것)와 넷티를 대량으로 사용하는 것이다.
핵심 논리:
코드는 대량으로 복제되지 않습니다. 필요한github에서rocketmq4.1.0을 기반으로 자세한 중국어 코드 설명을 얻을 수 있습니다.스타, 포크 여러분 환영합니다!
패키지 요청 헤더 정보:
// Namesrv Topic Broker Name、 ( )
public static final int GET_ROUTEINTO_BY_TOPIC = 105;
namesrv 서버가 이 요청을 받아들인 처리 상황입니다.
마지막으로 얻은 라우팅 정보는 다음과 같습니다.
// sync 3 1
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
참고: CountDownLatch를 사용하여 비동기식으로 동기화합니다.
2상황이면 발송 실패를 표시하고, 3상황을 깨워서 막지 않습니다(마지막으로 이상을 던지면 발송 실패를 표시합니다)
읽은 후에 얻은 것이 있다면 좋아요, 관심, 공중호[장심영도]를 눌러서 더 많은 멋진 역사를 찾아보세요!!!
지식별에 가입하여 함께 탐구하자!
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.