자바 rocketmq-메시지 생 성(일반 메시지)

9088 단어 자바rocketmq소식.
머리말
메시지 전송 과 밀접 한 관 계 를 가 진 몇 줄 코드:
1. DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
2. producer.start();
3. Message msg = new Message(...)
4. SendResult sendResult = producer.send(msg);
5. producer.shutdown();
그럼 이 몇 줄 의 코드 가 실 행 될 때 뒤에서 무엇 을 했 습 니까?
1.우선 DefaultMQ Producer.start

@Override
public void start() throws MQClientException {
this.defaultMQProducerImpl.start();
}
기본 생 성 메시지 의 구현 클래스 인 DefaultMQProducerImpl 을 호출 하 였 습 니 다.
defaultMQProducerImpl.start()방법 을 호출 하면 DefaultMQProducerImpl.start()는 MQClient Instance 인 스 턴 스 대상 을 초기 화 합 니 다.MQClient Instance 인 스 턴 스 대상 은 자신의 start 방법 을 호출 하여 메시지 서비스 PullMessage Service.Start()를 끌 어 내 고 부하 균형 서비스 Rebalance Service.Start()를 시작 합 니 다.예 를 들 어 네트워크 통신 서비스 MQClient APIImpl.Start()
또한 생산 메시지 와 관련 된 정 보 를 실행 합 니 다.예 를 들 어 produceGroup,new Topic PublishInfo 대상 을 등록 하고 기본 TopicKey 를 키 로 하여 DefaultMQ ProducerImpl 에 저 장 된 topic PublishInfoTable 에 키 값 을 구성 합 니 다.
efaultMQProducerImpl.start()후 가 져 온 MQClient Instance 인 스 턴 스 대상 은 sendHeartbeatToAllBroker()방법 을 호출 하여 broker 에 게 심장 박동 패 키 지 를 계속 보 냅 니 다.yin'b 는 다음 그림 을 사용 하여 DefaultMQProducerImpl.start()과정 을 대체적으로 설명 할 수 있 습 니 다.

위의 그림 의 세 부분 에서 언급 된 내용:
1.1 MQClient 인 스 턴 스 초기 화
하나의 클 라 이언 트 는 하나의 MQClient Instance 인 스 턴 스 대상 만 생 성 할 수 있 고 생산 방식 은 공장 모드 와 단일 모드 를 사용 합 니 다.MQClient Instance.start()방법 으로 서 비 스 를 시작 합 니 다.원본 코드 는 다음 과 같 습 니 다.

public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
1.2 등록 프로듀서
이 과정 은 현재 producer 대상 을 MQClient Instance 인 스 턴 스 대상 의 producerTable 에 등록 합 니 다.하나의 jvm(하나의 클 라 이언 트)중 하나의 producer Group 은 하나의 인 스 턴 스 만 있 을 수 있 습 니 다.MQClient Instance 작업 producer Table 은 다음 과 같은 몇 가지 방법 이 있 습 니 다.
  • -- selectProducer
  • -- updateTopicRouteInfoFromNameServer
  • -- prepareHeartbeatData
  • -- isNeedUpdateTopicRouteInfo
  • -- shutdown
  • 주:
    클 라 이언 트 Id 에 따라 MQClient Manager 는 서로 다른 MQClient Instance 를 제공 합 니 다.
    그룹 에 따라 MQClient Instance 는 서로 다른 MQProducer 와 MQConsumer 를 제공 합 니 다.
    1.3 루트 정보 표 에 루트 추가
    topicPublishInfoTable 정의:
    
    public class DefaultMQProducerImpl implements MQProducerInner {
    private final Logger log = ClientLogger.getLog();
    private final Random random = new Random();
    private final DefaultMQProducer defaultMQProducer;
    private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>();
    topic 를 key 로 하 는 Map 형 데이터 구조 입 니 다.DefaultMQ ProducerImpl.start()는 기본적으로 key=MixAll.DEPAULT 를 만 듭 니 다.TOPIC 의 Topic PublishInfo 는 topic PublishInfoTable 에 저 장 됩 니 다.
    1.4 심 박 가방 발송
    MQClient Instance 가 broker 에 게 심장 박동 패 키 지 를 보 낼 때 sendHeartbeat ToAllBroker()를 호출 하고 MQClient Instance 인 스 턴 스 대상 의 broker Addr Table 에서 모든 broker 주 소 를 받 아 이 broker 에 게 심장 박동 패 키 지 를 보 냅 니 다.
    sendHeartbeat ToAllBroker 는 prepare Heartbeat Data()방법 과 관련 되 며,이 방법 은 heartbeatData 데 이 터 를 생 성하 여 심장 박동 패 키 지 를 보 낼 때 heartbeatData 를 심장 박동 패 키 지 를 하 는 body 로 한다.producer 와 관련 된 부분 코드 는 다음 과 같 습 니 다.
    
    // Producer
    for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
    MQProducerInner impl = entry.getValue();
    if (impl != null) {
    ProducerData producerData = new ProducerData();
    producerData.setGroupName(entry.getKey());
    heartbeatData.getProducerDataSet().add(producerData);
    }
    2.sendResult sendResult=producer.send(msg)
    먼저 DefaultMQProducer.send(msg)를 호출 한 다음 sendDefaultImpl 을 호출 합 니 다.
    
    public SendResult send(Message msg,
    long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
    }
    sendDefault Impl 은 무엇 을 했 습 니까?
    2.1.topicPublishInfo 가 져 오기
    msg 의 topic 에 따라 topic PublishInfoTable 에서 해당 하 는 topic PublishInfo 를 가 져 옵 니 다.없 으 면 경로 정 보 를 업데이트 하고 nameserver 에서 최신 경로 정 보 를 가 져 옵 니 다.nameserver 에서 최신 경로 정 보 를 끌 어 내 는 것 은 크게 다음 과 같 습 니 다.
    먼저 getTopicRouteInfoFromNameServer,그리고 topicRouteData2topicPublishInfo.

    2.2 메시지 발송 대기 열 선택
    일반 메시지:기본 값 으로 selectOne Message Queue 는 topic PublishInfo 의 message QueueList 에서 하나의 대기 열(Message Queue)을 선택 하여 메 시 지 를 보 냅 니 다.기본 값 은 긴 폴 링 방식 으로 대기 열 을 선택 합 니 다.
    그것 의 메커니즘 은 다음 과 같다.정상 적 인 상황 에서 순서대로 queue 를 선택 하여 발송 한다.만약 에 특정한 노드 에 시간 초과 가 발생 하면 다음 에 quue 를 선택 할 때 같은 broker 를 건 너 뜁 니 다.서로 다른 대기 열 선택 전략 은 순서 메시지,사무 메시지 등 생산 메시지 의 몇 가지 모델 을 형성 했다.
    순서 메시지:한 그룹 이 질서 있 게 소비 해 야 할 메 시 지 를 같은 broker 의 같은 대기 열 에 보 내 면 순서 메 시 지 를 실현 할 수 있 습 니 다.같은 주문 번호 의 지불 을 가정 하고 환불 은 같은 대기 열 에 두 어야 합 니 다.그러면 send 할 때 Message Queue Selector 를 스스로 실현 하고 매개 변수 arg 필드 에 따라 quue 를 선택 할 수 있 습 니 다.
    
    private SendResult sendSelectImpl(
    Message msg,
    MessageQueueSelector selector,
    Object arg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback, final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 。。。}
    트 랜 잭 션 메시지:메 시 지 를 성공 적 으로 보 내 고 로 컬 작업 이 실 행 될 때 만 트 랜 잭 션 메 시 지 를 보 내 고 트 랜 잭 션 메 시 지 를 보 내 는 데 실 패 했 습 니 다.메 시 지 를 보 내 는 데 실 패 했 습 니 다.스크롤 백 메 시 지 를 직접 보 내 고 스크롤 백 메 시 지 를 보 내 고 스크롤 백 을 합 니 다.구체 적 으로 어떻게 실현 하
    2.3 패키지 메시지 패키지,패 킷 전송
    우선,가 져 온 Message Queue 의 getBrokerName 에 따라 findBrokerAddress InPublish 를 호출 하여 해당 하 는 broker 주 소 를 저장 합 니 다.찾 지 못 하면 새 경로 정보 와 주 소 를 다시 가 져 옵 니 다.brokerAddrTable.get(brokerName).get(MixAll.MASTER_ID)가 져 온 broker 는 모두 master(id=0)임 을 알 수 있 습 니 다.
    그리고 이 메시지 와 관련 된 정 보 를 Remoting Command 패 킷,RequestCode.SEND 로 포장 합 니 다.MESSAGE
    가 져 온 broke 주소 에 따라 해당 하 는 broker 에 패 키 지 를 보 냅 니 다.기본적으로 시간 초과 시간 은 3s 입 니 다.
    패키지 메시지 요청 패키지 의 헤더:
    
    SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    requestHeader.setTopic(msg.getTopic());
    requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
    requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
    requestHeader.setQueueId(mq.getQueueId());
    requestHeader.setSysFlag(sysFlag);
    requestHeader.setBornTimestamp(System.currentTimeMillis());
    requestHeader.setFlag(msg.getFlag());
    requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
    requestHeader.setReconsumeTimes(0);
    requestHeader.setUnitMode(this.isUnitMode());
    requestHeader.setBatch(msg instanceof MessageBatch);
    메시지 패키지 보 내기(일반 메시지 기본 값 은 동기 화 방식):
    
    SendResult sendResult = null;
    switch (communicationMode) {
       case SYNC:
      sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
      brokerAddr,
      mq.getBrokerName(),
       msg,
      requestHeader,
       timeout,
      communicationMode,
      context,
      this);
    break;
    broker 에서 온 응답 패 킷 처리:
    
    private SendResult sendMessageSync(
    final String addr,
    final String brokerName,
    final Message msg,
    final long timeoutMillis,
    final RemotingCommand request
    ) throws RemotingException, MQBrokerException, InterruptedException {
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
    assert response != null;
    return this.processSendResponse(brokerName, msg, response);
    }
    broker 단 에서 request 패 킷 을 처리 한 후에 메 시 지 를 commitLog 에 저장 하고 구체 적 인 과정 은 후속 분석 합 니 다.
    이상 이 바로 본 고의 모든 내용 입 니 다.여러분 의 학습 에 도움 이 되 고 저 희 를 많이 응원 해 주 셨 으 면 좋 겠 습 니 다.

    좋은 웹페이지 즐겨찾기