RocketMQ 소스 코드 분석 --- 프로 세 스 큐
코드 에 있 는 설명 보기:
Queue consumption snapshot
즉, 소식 스냅 사진 이라는 뜻 인 데 왜 이렇게 형용 합 니까?메 시 지 를 끌 어 올 릴 때 메 시 지 를 저장 하기 때문이다.또한 메 시 지 를 끌 어 올 릴 때 는 Pull Request 를 사용 하여 요청 합 니 다. 내부 구 조 는 다음 과 같 습 니 다.
public class PullRequest {
private String consumerGroup;
private MessageQueue messageQueue;
private ProcessQueue processQueue;
private long nextOffset;
private boolean lockedFirst = false;
}
ProcessQueue 와 하나의 Message Queue 가 대응 하 는 것 을 볼 수 있 습 니 다. 즉, 하나의 대기 열 에 ProcessQueue 의 데이터 구조 가 있 을 것 입 니 다. 주요 필드 를 보 세 요.
public class ProcessQueue {
public final static long RebalanceLockMaxLiveTime =
Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockMaxLiveTime", "30000"));
public final static long RebalanceLockInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));
private final static long PullMaxIdleTime = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000"));
//
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
// , ,
private final AtomicLong msgCount = new AtomicLong();
// , ProcessQueue
private final Lock lockConsume = new ReentrantLock();
//
private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<Long, MessageExt>();
// ProcessQueue lockConsume
private final AtomicLong tryUnlockTimes = new AtomicLong(0);
// ProcessQueue offset, ConsumeQueue offset
private volatile long queueOffsetMax = 0L;
//
private volatile boolean dropped = false;
//
private volatile long lastPullTimestamp = System.currentTimeMillis();
//
private volatile long lastConsumeTimestamp = System.currentTimeMillis();
private volatile boolean locked = false;
//
private volatile long lastLockTimestamp = System.currentTimeMillis();
//
private volatile boolean consuming = false;
//
private volatile long msgAccCnt = 0;
}
isLockExpired
public boolean isLockExpired() {
boolean result = (System.currentTimeMillis() - this.lastLockTimestamp) > RebalanceLockMaxLiveTime;
return result;
}
순서대로 소비 할 때 사용 하고 소비 하기 전에 ProcessQueue 잠 금 시간 이 한도 값 (기본 30000 ms) 을 초과 하 는 지 판단 합 니 다. 시간 초과 가 없 으 면 잠 금 을 가지 고 있 는 것 을 의미 합 니 다. 구체 적 인 세부 사항 은 순서대로 소비 할 때 상세 하 게 설명 합 니 다. 부하
isPullExpired
public boolean isPullExpired() {
boolean result = (System.currentTimeMillis() - this.lastPullTimestamp) > PullMaxIdleTime;
return result;
}
끌 어 올 릴 때 lastPull Timestamp 의 값 을 업데이트 한 다음 rebalance 에서 ProcessQueue 가 일정 시간 이 넘 도록 메 시 지 를 끌 어 올 리 지 않 았 다 고 판단 합 니 다. 그렇다면 ProcessQueue 를 폐기 (setDropped (true) 하고 ProcessQueue 와 Message Queue 의 대응 관계 에서 이 ProcessQueue 를 제거 합 니 다. 코드 디 테 일 은 다음 과 같 습 니 다.
if (pq.isPullExpired()) {
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
로그 에 따 르 면 이것 은 BUG 일 것 입 니 다. 어떤 상황 에서 끌 면 멈 추고 시간 이 업데이트 되 지 않 을 것 입 니 다. 이때 ProcessQueue 를 재건 하 는 것 은 구체 적 으로 어떤 원인 인지 잘 모 르 겠 습 니 다.
cleanExpiredMsg
public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {//
return;
}
// 16
int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;
for (int i = 0; i < loop; i++) {
MessageExt msg = null;
try {
this.lockTreeMap.readLock().lockInterruptibly();
try {
//
// offset consumeTimeout() * 60 * 1000( 15 )
if (!msgTreeMap.isEmpty()
&& System.currentTimeMillis() -
Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue()))
> pushConsumer.getConsumeTimeout() * 60 * 1000) {
msg = msgTreeMap.firstEntry().getValue();
} else {
break;
}
} finally {
this.lockTreeMap.readLock().unlock();
}
} catch (InterruptedException e) {
log.error("getExpiredMsg exception", e);
}
try {
// Broker, , 3
//
pushConsumer.sendMessageBack(msg, 3);
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
// ,ProcessQueue offset
// , , , ProcessQueue
if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {
try {
msgTreeMap.remove(msgTreeMap.firstKey());
} catch (Exception e) {
log.error("send expired msg exception", e);
}
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("getExpiredMsg exception", e);
}
} catch (Exception e) {
log.error("send expired msg exception", e);
}
}
}
위 는 동시 소비 모델 에서 소비 시간 이 15 분 이 넘 는 정 보 를 정기 적 으로 정리 하 는 논리 로 소비자 가 시작 할 때 정기 적 인 임 무 를 시작 하여 이 방법 을 정기 적 으로 호출 할 수 있다.
public void start() {
this.CleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
cleanExpireMsg();
}
}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
}
15 분 에 한 번 씩.
putMessage
public boolean putMessage(final List<MessageExt> msgs) {
// , , true
boolean dispatchToConsume = false;
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
int validMsgCnt = 0;
for (MessageExt msg : msgs) {
// offset key, treemap
MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
if (null == old) {//
validMsgCnt++;
// ProcessQueue offset
this.queueOffsetMax = msg.getQueueOffset();
}
}
//
msgCount.addAndGet(validMsgCnt);
// ProcessQueue ( , msgs msgTreeMap )
// consuming false, true,
// true, , false
if (!msgTreeMap.isEmpty() && !this.consuming) {
// , ,
dispatchToConsume = true;
this.consuming = true;
}
if (!msgs.isEmpty()) {
MessageExt messageExt = msgs.get(msgs.size() - 1);
// property ConsumeQueue offset
String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
if (property != null) {
long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
if (accTotal > 0) {// offset , offset
this.msgAccCnt = accTotal;
}
}
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("putMessage exception", e);
}
return dispatchToConsume;
}
getMaxSpan
public long getMaxSpan() {
try {
this.lockTreeMap.readLock().lockInterruptibly();
try {
if (!this.msgTreeMap.isEmpty()) {
return this.msgTreeMap.lastKey() - this.msgTreeMap.firstKey();
}
}
finally {
this.lockTreeMap.readLock().unlock();
}
}
catch (InterruptedException e) {
log.error("getMaxSpan exception", e);
}
return 0;
}
현재 이 메시지 에서 가장 작은 offset 이전의 차 이 를 가 져 옵 니 다. 이 방법 은 메 시 지 를 끌 어 올 릴 때 현재 처리 되 지 않 은 메시지 가 얼마나 있 는 지 판단 하 는 데 사 용 됩 니 다. 만약 에 특정한 값 (기본 2000) 보다 크 면 흐름 제어 처 리 를 합 니 다.
//DefaultMQPushConsumerImpl.pullMessage
if (!this.consumeOrderly) {
// 2000
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
// PullRequest (PullRequest )
//PullTimeDelayMillsWhenFlowControl 50
this.executePullRequestLater(pullRequest, PullTimeDelayMillsWhenFlowControl);
// , 1000
if ((flowControlTimes2++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, flowControlTimes2);
}
return;
}
} else {
//....
}
//DefaultMQPushConsumerImpl.executePullRequestLater
private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay);
}
//PullMessageService.executePullRequestLater
public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
// timeDelay
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
// PullRequest , ,
PullMessageService.this.executePullRequestImmediately(pullRequest);
}
}, timeDelay, TimeUnit.MILLISECONDS);
}
removeMessage
public long removeMessage(final List<MessageExt> msgs) {
// , offset
long result = -1;
final long now = System.currentTimeMillis();
try {
this.lockTreeMap.writeLock().lockInterruptibly();
this.lastConsumeTimestamp = now;
try {
if (!msgTreeMap.isEmpty()) {
result = this.queueOffsetMax + 1;
int removedCnt = 0;
// , TreeMap
for (MessageExt msg : msgs) {
MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
if (prev != null) {//
removedCnt--;//
}
}
// msgCount ProcessQueue , ,
msgCount.addAndGet(removedCnt);
// , offset
// , ProcessQueue offset
if (!msgTreeMap.isEmpty()) {
result = msgTreeMap.firstKey();
}
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (Throwable t) {
}
return result;
}
이곳 의 반환 치 와 소비 진 도 는 매우 큰 관계 가 있 기 때문에 뒤에서 소비 진 도 를 분석 할 때 다시 깊이 분석 할 것 이다.
takeMessags
public List<MessageExt> takeMessags(final int batchSize) {
List<MessageExt> result = new ArrayList<MessageExt>(batchSize);
final long now = System.currentTimeMillis();
try {
this.lockTreeMap.writeLock().lockInterruptibly();
this.lastConsumeTimestamp = now;
try {
if (!this.msgTreeMap.isEmpty()) {
// treeMap batchSize , offset
for (int i = 0; i < batchSize; i++) {
Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
if (entry != null) {
// treemapp
result.add(entry.getValue());
msgTreeMapTemp.put(entry.getKey(), entry.getValue());
} else {
break;
}
}
}
// , , , consuming false
if (result.isEmpty()) {
consuming = false;
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("take Messages exception", e);
}
return result;
}
이 방법 은 순서대로 소비 모드 에서 사 용 됩 니 다. 이 메 시 지 를 가 져 오 면 저희 가 정의 한 Message Listener 를 사용 하여 소비 합 니 다.
commit
순차 적 소비 모드 에서 takeMessages 를 호출 하여 메 시 지 를 가 져 옵 니 다. 내부 논리 에서 treeMap 의 메 시 지 를 임시 적 인 treeMap 에 넣 고 소 비 를 합 니 다. 소비 가 완료 되면 이 임시 맵 을 제거 해 야 합 니 다. commt 방법 을 사용 합 니 다.
public long commit() {
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
// msgTreeMapTemp ,lastKey
Long offset = this.msgTreeMapTemp.lastKey();
// ,
msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));
//
this.msgTreeMapTemp.clear();
if (offset != null) {
//
return offset + 1;
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("commit exception", e);
}
return -1;
}
makeMessageToCosumeAgain
순서 모드 에서 SUSPEND 로 돌아 가기CURRENT_QUEUE_A_MOMENT, 이 방법 을 사용 할 수 있 습 니 다. 이 방법 은 메 시 지 를 다시 소비 하 라 는 뜻 입 니 다.위 에서 말 한 절 차 를 돌 이 켜 보면:
public void makeMessageToCosumeAgain(List<MessageExt> msgs) {
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
// treeMap
// treeMap,
for (MessageExt msg : msgs) {
this.msgTreeMapTemp.remove(msg.getQueueOffset());
this.msgTreeMap.put(msg.getQueueOffset(), msg);
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("makeMessageToCosumeAgain exception", e);
}
}
위 는 바로 ProcessQueue 가 제공 하 는 일부 기능 입 니 다. 많은 상부 의 기능 들 이 그의 실현 에 의존 하고 있 습 니 다. 다른 것 을 보기 전에 ProcessQueue 를 먼저 알 아야 하기 때문에 위 에서 ProcessQueue 의 기능 을 분 석 했 고 약간 발산 되 었 습 니 다. 관련 된 범위 가 넓 기 때문에 관련 된 세부 사항 은 전개 되 지 않 았 습 니 다. 나중에 글 이 만 나 서 분석 하 겠 습 니 다.
ProcessQueue 지식 포인트:
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
자바 클래스 상용 방법 분석Class 클래스 는 자바 에서 클래스 정 보 를 저장 하 는 인 스 턴 스 입 니 다.그 안에 각종 반사 방법 이 있 는데 이미 가지 고 있 는 정 보 를 파악 하고 그것 을 익히 면 우리 의 일상적인 반사 프로 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.