RocketMQ 소스 코드 분석 --- 프로 세 스 큐

RockerMQ 에서 매우 중요 한 데이터 구 조 를 ProcessQueue 라 고 하 는데 많은 기능 이 있 습 니 다. 예 를 들 어 소비 진도, 소비 등 기능 의 바 텀 핵심 데이터 저장 은 모두 ProcessQueue 가 제공 하 는 기능 이 있 습 니 다. 다음은 ProcessQueue 가 제공 하 는 기능 을 소개 하 겠 습 니 다. 전체 관련 절 차 는 여기 서 전개 되 지 않 고 다른 기능 분석 글 과 관련 되 어야 깊이 분석 할 수 있 습 니 다.
코드 에 있 는 설명 보기:
Queue consumption snapshot
즉, 소식 스냅 사진 이라는 뜻 인 데 왜 이렇게 형용 합 니까?메 시 지 를 끌 어 올 릴 때 메 시 지 를 저장 하기 때문이다.또한 메 시 지 를 끌 어 올 릴 때 는 Pull Request 를 사용 하여 요청 합 니 다. 내부 구 조 는 다음 과 같 습 니 다.
public class PullRequest {
    private String consumerGroup;
    private MessageQueue messageQueue;
    private ProcessQueue processQueue;
    private long nextOffset;
    private boolean lockedFirst = false;
}

ProcessQueue 와 하나의 Message Queue 가 대응 하 는 것 을 볼 수 있 습 니 다. 즉, 하나의 대기 열 에 ProcessQueue 의 데이터 구조 가 있 을 것 입 니 다. 주요 필드 를 보 세 요.
public class ProcessQueue {
    public final static long RebalanceLockMaxLiveTime =
            Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockMaxLiveTime", "30000"));
    public final static long RebalanceLockInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));
    private final static long PullMaxIdleTime = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000"));
    //           
    private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
    //         ,        ,       
    private final AtomicLong msgCount = new AtomicLong();
    //    ,          ProcessQueue     
    private final Lock lockConsume = new ReentrantLock();
    //          
    private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<Long, MessageExt>();
    //      ProcessQueue   lockConsume   
    private final AtomicLong tryUnlockTimes = new AtomicLong(0);
    // ProcessQueue          offset, ConsumeQueue offset
    private volatile long queueOffsetMax = 0L;
    //              
    private volatile boolean dropped = false;
    //            
    private volatile long lastPullTimestamp = System.currentTimeMillis();
    //              
    private volatile long lastConsumeTimestamp = System.currentTimeMillis();
    
    private volatile boolean locked = false;
    //        
    private volatile long lastLockTimestamp = System.currentTimeMillis();
    //       
    private volatile boolean consuming = false;
    //                    
    private volatile long msgAccCnt = 0;
}

isLockExpired
    public boolean isLockExpired() {
        boolean result = (System.currentTimeMillis() - this.lastLockTimestamp) > RebalanceLockMaxLiveTime;
        return result;
    }

순서대로 소비 할 때 사용 하고 소비 하기 전에 ProcessQueue 잠 금 시간 이 한도 값 (기본 30000 ms) 을 초과 하 는 지 판단 합 니 다. 시간 초과 가 없 으 면 잠 금 을 가지 고 있 는 것 을 의미 합 니 다. 구체 적 인 세부 사항 은 순서대로 소비 할 때 상세 하 게 설명 합 니 다. 부하
isPullExpired
    public boolean isPullExpired() {
        boolean result = (System.currentTimeMillis() - this.lastPullTimestamp) > PullMaxIdleTime;
        return result;
    }

끌 어 올 릴 때 lastPull Timestamp 의 값 을 업데이트 한 다음 rebalance 에서 ProcessQueue 가 일정 시간 이 넘 도록 메 시 지 를 끌 어 올 리 지 않 았 다 고 판단 합 니 다. 그렇다면 ProcessQueue 를 폐기 (setDropped (true) 하고 ProcessQueue 와 Message Queue 의 대응 관계 에서 이 ProcessQueue 를 제거 합 니 다. 코드 디 테 일 은 다음 과 같 습 니 다.
if (pq.isPullExpired()) {
    switch (this.consumeType()) {
        case CONSUME_ACTIVELY:
            break;
        case CONSUME_PASSIVELY:
            pq.setDropped(true);
            if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                it.remove();
                changed = true;
                log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
                        consumerGroup, mq);
            }
            break;
        default:
            break;
    }
}

로그 에 따 르 면 이것 은 BUG 일 것 입 니 다. 어떤 상황 에서 끌 면 멈 추고 시간 이 업데이트 되 지 않 을 것 입 니 다. 이때 ProcessQueue 를 재건 하 는 것 은 구체 적 으로 어떤 원인 인지 잘 모 르 겠 습 니 다.
cleanExpiredMsg
    public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
        if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {//        
            return;
        }
        //     16   
        int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;
        for (int i = 0; i < loop; i++) {
            MessageExt msg = null;
            try {
                this.lockTreeMap.readLock().lockInterruptibly();
                try {
                    //         
                    //  offset           consumeTimeout() * 60 * 1000(  15  )
                    if (!msgTreeMap.isEmpty() 
                && System.currentTimeMillis() -
                 Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) 
                > pushConsumer.getConsumeTimeout() * 60 * 1000) {
                        msg = msgTreeMap.firstEntry().getValue();
                    } else {
                        break;
                    }
                } finally {
                    this.lockTreeMap.readLock().unlock();
                }
            } catch (InterruptedException e) {
                log.error("getExpiredMsg exception", e);
            }

            try {
                //      Broker,    ,      3
                //               
                pushConsumer.sendMessageBack(msg, 3);
                try {
                    this.lockTreeMap.writeLock().lockInterruptibly();
                    try {
                        //       ,ProcessQueue offset               
                        //        ,                ,       ,  ProcessQueue   
                        if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {
                            try {
                                msgTreeMap.remove(msgTreeMap.firstKey());
                            } catch (Exception e) {
                                log.error("send expired msg exception", e);
                            }
                        }
                    } finally {
                        this.lockTreeMap.writeLock().unlock();
                    }
                } catch (InterruptedException e) {
                    log.error("getExpiredMsg exception", e);
                }
            } catch (Exception e) {
                log.error("send expired msg exception", e);
            }
        }
    }

위 는 동시 소비 모델 에서 소비 시간 이 15 분 이 넘 는 정 보 를 정기 적 으로 정리 하 는 논리 로 소비자 가 시작 할 때 정기 적 인 임 무 를 시작 하여 이 방법 을 정기 적 으로 호출 할 수 있다.
    public void start() {
        this.CleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                cleanExpireMsg();
            }

        }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
    }

15 분 에 한 번 씩.
putMessage
    public boolean putMessage(final List<MessageExt> msgs) {
        //    ,      ,  true      
        boolean dispatchToConsume = false;
        try {
            this.lockTreeMap.writeLock().lockInterruptibly();
            try {
                int validMsgCnt = 0;
                for (MessageExt msg : msgs) {
                    //  offset key,  treemap 
                    MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
                    if (null == old) {//   
                        validMsgCnt++;
                        //     ProcessQueue      offset
                        this.queueOffsetMax = msg.getQueueOffset();
                    }
                }
                //       
                msgCount.addAndGet(validMsgCnt);
                //   ProcessQueue        (    ,  msgs     msgTreeMap   )
                //   consuming false,     true,      
                //               true,       ,         false
                if (!msgTreeMap.isEmpty() && !this.consuming) {
                   //    ,       ,           
                    dispatchToConsume = true;
                    this.consuming = true;
                }

                if (!msgs.isEmpty()) {
                    MessageExt messageExt = msgs.get(msgs.size() - 1);
                    // property ConsumeQueue    offset
                    String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
                    if (property != null) {
                        long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
                        if (accTotal > 0) {//      offset        ,       offset    
                            this.msgAccCnt = accTotal;
                        }
                    }
                }
            } finally {
                this.lockTreeMap.writeLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("putMessage exception", e);
        }

        return dispatchToConsume;
    }

getMaxSpan
    public long getMaxSpan() {
        try {
            this.lockTreeMap.readLock().lockInterruptibly();
            try {
                if (!this.msgTreeMap.isEmpty()) {
                    return this.msgTreeMap.lastKey() - this.msgTreeMap.firstKey();
                }
            }
            finally {
                this.lockTreeMap.readLock().unlock();
            }
        }
        catch (InterruptedException e) {
            log.error("getMaxSpan exception", e);
        }

        return 0;
    }

현재 이 메시지 에서 가장 작은 offset 이전의 차 이 를 가 져 옵 니 다. 이 방법 은 메 시 지 를 끌 어 올 릴 때 현재 처리 되 지 않 은 메시지 가 얼마나 있 는 지 판단 하 는 데 사 용 됩 니 다. 만약 에 특정한 값 (기본 2000) 보다 크 면 흐름 제어 처 리 를 합 니 다.
//DefaultMQPushConsumerImpl.pullMessage
        if (!this.consumeOrderly) {
            //   2000
            if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
                //   PullRequest        (PullRequest                )
                //PullTimeDelayMillsWhenFlowControl   50
                this.executePullRequestLater(pullRequest, PullTimeDelayMillsWhenFlowControl);
                //       , 1000      
                if ((flowControlTimes2++ % 1000) == 0) {
                    log.warn(
                            "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
                            processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
                            pullRequest, flowControlTimes2);
                }
                return;
            }
        } else {
            //....
        }
    //DefaultMQPushConsumerImpl.executePullRequestLater
     private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
        this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay);
    }
    //PullMessageService.executePullRequestLater
    public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
          // timeDelay          
        this.scheduledExecutorService.schedule(new Runnable() {
            @Override
            public void run() {
                //  PullRequest          ,          ,      
                PullMessageService.this.executePullRequestImmediately(pullRequest);
            }
        }, timeDelay, TimeUnit.MILLISECONDS);
    }

removeMessage
    public long removeMessage(final List<MessageExt> msgs) {
        //        ,         offset
        long result = -1;
        final long now = System.currentTimeMillis();
        try {
            this.lockTreeMap.writeLock().lockInterruptibly();
            this.lastConsumeTimestamp = now;
            try {
                if (!msgTreeMap.isEmpty()) {
                    result = this.queueOffsetMax + 1;
                    int removedCnt = 0;
                    //     ,   TreeMap   
                    for (MessageExt msg : msgs) {
                        MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
                        if (prev != null) {//          
                            removedCnt--;//      
                        }
                    }
                   // msgCount ProcessQueue      ,          ,        
                    msgCount.addAndGet(removedCnt);
                    //         ,        offset      
                    //          ,     ProcessQueue    offset      
                    if (!msgTreeMap.isEmpty()) {
                        result = msgTreeMap.firstKey();
                    }
                }
            } finally {
                this.lockTreeMap.writeLock().unlock();
            }
        } catch (Throwable t) {
        }
        return result;
    }

이곳 의 반환 치 와 소비 진 도 는 매우 큰 관계 가 있 기 때문에 뒤에서 소비 진 도 를 분석 할 때 다시 깊이 분석 할 것 이다.
takeMessags
    public List<MessageExt> takeMessags(final int batchSize) {
        List<MessageExt> result = new ArrayList<MessageExt>(batchSize);
        final long now = System.currentTimeMillis();
        try {
            this.lockTreeMap.writeLock().lockInterruptibly();
            this.lastConsumeTimestamp = now;
            try {
                if (!this.msgTreeMap.isEmpty()) {
                    //  treeMap   batchSize   ,     offset        
                    for (int i = 0; i < batchSize; i++) {
                        Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
                        if (entry != null) {
                            //              treemapp 
                            result.add(entry.getValue());
                            msgTreeMapTemp.put(entry.getKey(), entry.getValue());
                        } else {
                            break;
                        }
                    }
                }
                //              ,     ,      ,  consuming  false
                if (result.isEmpty()) {
                    consuming = false;
                }
            } finally {
                this.lockTreeMap.writeLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("take Messages exception", e);
        }

        return result;
    }

이 방법 은 순서대로 소비 모드 에서 사 용 됩 니 다. 이 메 시 지 를 가 져 오 면 저희 가 정의 한 Message Listener 를 사용 하여 소비 합 니 다.
commit
순차 적 소비 모드 에서 takeMessages 를 호출 하여 메 시 지 를 가 져 옵 니 다. 내부 논리 에서 treeMap 의 메 시 지 를 임시 적 인 treeMap 에 넣 고 소 비 를 합 니 다. 소비 가 완료 되면 이 임시 맵 을 제거 해 야 합 니 다. commt 방법 을 사용 합 니 다.
    public long commit() {
        try {
            this.lockTreeMap.writeLock().lockInterruptibly();
            try {
                // msgTreeMapTemp          ,lastKey         
                Long offset = this.msgTreeMapTemp.lastKey();
                //     ,          
                msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));
                //     
                this.msgTreeMapTemp.clear();
                if (offset != null) {
                    //       
                    return offset + 1;
                }
            } finally {
                this.lockTreeMap.writeLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("commit exception", e);
        }

        return -1;
    }

makeMessageToCosumeAgain
순서 모드 에서 SUSPEND 로 돌아 가기CURRENT_QUEUE_A_MOMENT, 이 방법 을 사용 할 수 있 습 니 다. 이 방법 은 메 시 지 를 다시 소비 하 라 는 뜻 입 니 다.위 에서 말 한 절 차 를 돌 이 켜 보면:
  • 소식 찾기: treeMap 에서 소식 을 꺼 내 임시 treeMap 에 넣 고 소비 성공 을 기다린다
  • 소비 성공: 임시 treeMap 삭제
  • 위의 두 편 에서 알 수 있 듯 이 소비 가 실 패 했 을 때 임시 treeMap 과 treeMap 을 무시 해 서 는 안 된다. 임시 treeMap 의 소식 을 돌려 놓 아야 한다. 돌려 놓 지 않 으 면 잠시 후에 다시 소비 할 때 treeMap 에서 원래 의 소비 실패 의 근 거 를 찾 을 수 없다. 구체 적 인 논 리 는 뒤에서 분석 할 수 있다.
        public void makeMessageToCosumeAgain(List<MessageExt> msgs) {
            try {
                this.lockTreeMap.writeLock().lockInterruptibly();
                try {
                    //               treeMap   
                    //    treeMap,      
                    for (MessageExt msg : msgs) {
                        this.msgTreeMapTemp.remove(msg.getQueueOffset());
                        this.msgTreeMap.put(msg.getQueueOffset(), msg);
                    }
                } finally {
                    this.lockTreeMap.writeLock().unlock();
                }
            } catch (InterruptedException e) {
                log.error("makeMessageToCosumeAgain exception", e);
            }
        }
    

    위 는 바로 ProcessQueue 가 제공 하 는 일부 기능 입 니 다. 많은 상부 의 기능 들 이 그의 실현 에 의존 하고 있 습 니 다. 다른 것 을 보기 전에 ProcessQueue 를 먼저 알 아야 하기 때문에 위 에서 ProcessQueue 의 기능 을 분 석 했 고 약간 발산 되 었 습 니 다. 관련 된 범위 가 넓 기 때문에 관련 된 세부 사항 은 전개 되 지 않 았 습 니 다. 나중에 글 이 만 나 서 분석 하 겠 습 니 다.
    ProcessQueue 지식 포인트:
  • 소비 모델: 순서, 병행
  • 소비 진도 관리
  • rebalance 부하 균형
  • 등등...
  • 좋은 웹페이지 즐겨찾기