자바 rocketmq-메시지 생 성(일반 메시지)
메시지 전송 과 밀접 한 관 계 를 가 진 몇 줄 코드:
1. DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
2. producer.start();
3. Message msg = new Message(...)
4. SendResult sendResult = producer.send(msg);
5. producer.shutdown();
그럼 이 몇 줄 의 코드 가 실 행 될 때 뒤에서 무엇 을 했 습 니까?
1.우선 DefaultMQ Producer.start
@Override
public void start() throws MQClientException {
this.defaultMQProducerImpl.start();
}
기본 생 성 메시지 의 구현 클래스 인 DefaultMQProducerImpl 을 호출 하 였 습 니 다.defaultMQProducerImpl.start()방법 을 호출 하면 DefaultMQProducerImpl.start()는 MQClient Instance 인 스 턴 스 대상 을 초기 화 합 니 다.MQClient Instance 인 스 턴 스 대상 은 자신의 start 방법 을 호출 하여 메시지 서비스 PullMessage Service.Start()를 끌 어 내 고 부하 균형 서비스 Rebalance Service.Start()를 시작 합 니 다.예 를 들 어 네트워크 통신 서비스 MQClient APIImpl.Start()
또한 생산 메시지 와 관련 된 정 보 를 실행 합 니 다.예 를 들 어 produceGroup,new Topic PublishInfo 대상 을 등록 하고 기본 TopicKey 를 키 로 하여 DefaultMQ ProducerImpl 에 저 장 된 topic PublishInfoTable 에 키 값 을 구성 합 니 다.
efaultMQProducerImpl.start()후 가 져 온 MQClient Instance 인 스 턴 스 대상 은 sendHeartbeatToAllBroker()방법 을 호출 하여 broker 에 게 심장 박동 패 키 지 를 계속 보 냅 니 다.yin'b 는 다음 그림 을 사용 하여 DefaultMQProducerImpl.start()과정 을 대체적으로 설명 할 수 있 습 니 다.
위의 그림 의 세 부분 에서 언급 된 내용:
1.1 MQClient 인 스 턴 스 초기 화
하나의 클 라 이언 트 는 하나의 MQClient Instance 인 스 턴 스 대상 만 생 성 할 수 있 고 생산 방식 은 공장 모드 와 단일 모드 를 사용 합 니 다.MQClient Instance.start()방법 으로 서 비 스 를 시작 합 니 다.원본 코드 는 다음 과 같 습 니 다.
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
1.2 등록 프로듀서이 과정 은 현재 producer 대상 을 MQClient Instance 인 스 턴 스 대상 의 producerTable 에 등록 합 니 다.하나의 jvm(하나의 클 라 이언 트)중 하나의 producer Group 은 하나의 인 스 턴 스 만 있 을 수 있 습 니 다.MQClient Instance 작업 producer Table 은 다음 과 같은 몇 가지 방법 이 있 습 니 다.
클 라 이언 트 Id 에 따라 MQClient Manager 는 서로 다른 MQClient Instance 를 제공 합 니 다.
그룹 에 따라 MQClient Instance 는 서로 다른 MQProducer 와 MQConsumer 를 제공 합 니 다.
1.3 루트 정보 표 에 루트 추가
topicPublishInfoTable 정의:
public class DefaultMQProducerImpl implements MQProducerInner {
private final Logger log = ClientLogger.getLog();
private final Random random = new Random();
private final DefaultMQProducer defaultMQProducer;
private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>();
topic 를 key 로 하 는 Map 형 데이터 구조 입 니 다.DefaultMQ ProducerImpl.start()는 기본적으로 key=MixAll.DEPAULT 를 만 듭 니 다.TOPIC 의 Topic PublishInfo 는 topic PublishInfoTable 에 저 장 됩 니 다.1.4 심 박 가방 발송
MQClient Instance 가 broker 에 게 심장 박동 패 키 지 를 보 낼 때 sendHeartbeat ToAllBroker()를 호출 하고 MQClient Instance 인 스 턴 스 대상 의 broker Addr Table 에서 모든 broker 주 소 를 받 아 이 broker 에 게 심장 박동 패 키 지 를 보 냅 니 다.
sendHeartbeat ToAllBroker 는 prepare Heartbeat Data()방법 과 관련 되 며,이 방법 은 heartbeatData 데 이 터 를 생 성하 여 심장 박동 패 키 지 를 보 낼 때 heartbeatData 를 심장 박동 패 키 지 를 하 는 body 로 한다.producer 와 관련 된 부분 코드 는 다음 과 같 습 니 다.
// Producer
for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
MQProducerInner impl = entry.getValue();
if (impl != null) {
ProducerData producerData = new ProducerData();
producerData.setGroupName(entry.getKey());
heartbeatData.getProducerDataSet().add(producerData);
}
2.sendResult sendResult=producer.send(msg)먼저 DefaultMQProducer.send(msg)를 호출 한 다음 sendDefaultImpl 을 호출 합 니 다.
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
sendDefault Impl 은 무엇 을 했 습 니까?2.1.topicPublishInfo 가 져 오기
msg 의 topic 에 따라 topic PublishInfoTable 에서 해당 하 는 topic PublishInfo 를 가 져 옵 니 다.없 으 면 경로 정 보 를 업데이트 하고 nameserver 에서 최신 경로 정 보 를 가 져 옵 니 다.nameserver 에서 최신 경로 정 보 를 끌 어 내 는 것 은 크게 다음 과 같 습 니 다.
먼저 getTopicRouteInfoFromNameServer,그리고 topicRouteData2topicPublishInfo.
2.2 메시지 발송 대기 열 선택
일반 메시지:기본 값 으로 selectOne Message Queue 는 topic PublishInfo 의 message QueueList 에서 하나의 대기 열(Message Queue)을 선택 하여 메 시 지 를 보 냅 니 다.기본 값 은 긴 폴 링 방식 으로 대기 열 을 선택 합 니 다.
그것 의 메커니즘 은 다음 과 같다.정상 적 인 상황 에서 순서대로 queue 를 선택 하여 발송 한다.만약 에 특정한 노드 에 시간 초과 가 발생 하면 다음 에 quue 를 선택 할 때 같은 broker 를 건 너 뜁 니 다.서로 다른 대기 열 선택 전략 은 순서 메시지,사무 메시지 등 생산 메시지 의 몇 가지 모델 을 형성 했다.
순서 메시지:한 그룹 이 질서 있 게 소비 해 야 할 메 시 지 를 같은 broker 의 같은 대기 열 에 보 내 면 순서 메 시 지 를 실현 할 수 있 습 니 다.같은 주문 번호 의 지불 을 가정 하고 환불 은 같은 대기 열 에 두 어야 합 니 다.그러면 send 할 때 Message Queue Selector 를 스스로 실현 하고 매개 변수 arg 필드 에 따라 quue 를 선택 할 수 있 습 니 다.
private SendResult sendSelectImpl(
Message msg,
MessageQueueSelector selector,
Object arg,
final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 。。。}
트 랜 잭 션 메시지:메 시 지 를 성공 적 으로 보 내 고 로 컬 작업 이 실 행 될 때 만 트 랜 잭 션 메 시 지 를 보 내 고 트 랜 잭 션 메 시 지 를 보 내 는 데 실 패 했 습 니 다.메 시 지 를 보 내 는 데 실 패 했 습 니 다.스크롤 백 메 시 지 를 직접 보 내 고 스크롤 백 메 시 지 를 보 내 고 스크롤 백 을 합 니 다.구체 적 으로 어떻게 실현 하2.3 패키지 메시지 패키지,패 킷 전송
우선,가 져 온 Message Queue 의 getBrokerName 에 따라 findBrokerAddress InPublish 를 호출 하여 해당 하 는 broker 주 소 를 저장 합 니 다.찾 지 못 하면 새 경로 정보 와 주 소 를 다시 가 져 옵 니 다.
brokerAddrTable.get(brokerName).get(MixAll.MASTER_ID)
가 져 온 broker 는 모두 master(id=0)임 을 알 수 있 습 니 다.그리고 이 메시지 와 관련 된 정 보 를 Remoting Command 패 킷,RequestCode.SEND 로 포장 합 니 다.MESSAGE
가 져 온 broke 주소 에 따라 해당 하 는 broker 에 패 키 지 를 보 냅 니 다.기본적으로 시간 초과 시간 은 3s 입 니 다.
패키지 메시지 요청 패키지 의 헤더:
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
메시지 패키지 보 내기(일반 메시지 기본 값 은 동기 화 방식):
SendResult sendResult = null;
switch (communicationMode) {
case SYNC:
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout,
communicationMode,
context,
this);
break;
broker 에서 온 응답 패 킷 처리:
private SendResult sendMessageSync(
final String addr,
final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
return this.processSendResponse(brokerName, msg, response);
}
broker 단 에서 request 패 킷 을 처리 한 후에 메 시 지 를 commitLog 에 저장 하고 구체 적 인 과정 은 후속 분석 합 니 다.이상 이 바로 본 고의 모든 내용 입 니 다.여러분 의 학습 에 도움 이 되 고 저 희 를 많이 응원 해 주 셨 으 면 좋 겠 습 니 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Is Eclipse IDE dying?In 2014 the Eclipse IDE is the leading development environment for Java with a market share of approximately 65%. but ac...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.