RocketMQ 저장 파일 의 실현

14959 단어 RocketMQ기억문건
RocketMQ 저장 경 로 는 기본적으로${ROCKRTMQ 입 니 다.HOME}/store,메시지,테마 에 대응 하 는 메시지 큐 의 색인 등 을 저장 합 니 다.
1.개술
디 렉 터 리 파일 보기
  • commitlog:메시지 의 저장 디 렉 터 리
  • config:운영 기간 에 일부 설정 정보
  • consumequeue:메시지 소비 대기 열 저장 디 렉 터 리
  • index:메시지 색인 파일 저장 디 렉 터 리
  • abort:abort 파일 이 존재 하면 Broker 가 비정상적 으로 닫 혔 음 을 설명 합 니 다.이 파일 은 기본 시작 시 생 성 되 고 정상 종료 시 삭 제 됩 니 다
  • checkpoint:파일 검 측 점.commitlog 파일 의 마지막 디스크 시간 스탬프,consumequeue 의 마지막 디스크 시간,index 색인 파일 의 마지막 디스크 시간 스탬프 를 저장 합 니 다.
  • 2.파일 소개
    2.1 commitlog 파일
    commitlog 파일 의 저장 주소:$HOME\store\commitlog${fileName},각 파일 의 크기 는 기본 1G=102410241024,commitlog 의 파일 이름 fileName,이름 길 이 는 20 비트,왼쪽 은 0 을 보충 하고 나머지 는 시작 오프셋 입 니 다.예 를 들 어 0000000000000000000 은 첫 번 째 파일 을 대표 하 는데 시작 편 이 량 은 0 이 고 파일 크기 는 1G=1073741824 이다.이 파일 이 가득 차 면 두 번 째 파일 의 이름 은 000000000001073741824 이 고 시작 오프셋 은 1073741824 입 니 다.이런 식 으로 세 번 째 파일 의 이름 은 00000002147483648 이 며 시작 오프셋 은 2147483648 입 니 다.메 시 지 를 저장 할 때 순서대로 파일 을 기록 하고 파일 이 가득 차 면 다음 파일 을 기록 합 니 다.

    commtlog 디 렉 터 리 에 있 는 파일 은 주로 메 시 지 를 저장 합 니 다.메시지 의 길이 가 다 르 기 때문에 저 장 된 논리 적 보 기 를 보십시오.메시지 의 앞 에 있 는 4 개의 바이트 에 이 메시지 의 총 길 이 를 저장 합 니 다.

    파일 메시지 셀 에 자세 한 정 보 를 저장 합 니 다.
    번호
    필드 줄 임 말
    필드 크기(바이트)
    필드 의미
    1
    msgSize
    4
    이 메시지 의 크기 를 나타 낸다
    2
    MAGICCODE
    4
    MAGICCODE = daa320a7
    3
    BODY CRC
    4
    메시지 체 BODY CRC 는 broker 가 recover 를 다시 시작 할 때 검사 합 니 다.
    4
    queueId
    4
    5
    flag
    4
    6
    QUEUEOFFSET
    8
    이 값 은 자체 부가 가치 가 진정한 consume quue 의 오프셋 이 아 닙 니 다.이 consume Queue 대기 열 이나 tranState Table 대기 열 에 있 는 메시지 의 개 수 를 대표 할 수 있 습 니 다.비 사무 메시지 나 commt 사무 메시지 라면 이 값 을 통 해 consume Queue 의 데 이 터 를 찾 을 수 있 습 니 다.QUEUOFFSET*20 이 야 말로 오프셋 주소 입 니 다.PREPARED 나 Rollback 트 랜 잭 션 이 라면 이 값 을 통 해 tranState Table 에서 데 이 터 를 찾 을 수 있 습 니 다.
    7
    PHYSICALOFFSET
    8
    commitLog 에서 메시지 의 물리 적 시작 주소 오프셋 을 대표 합 니 다.
    8
    SYSFLAG
    4
    메 시 지 는 사물 의 사물 상태 등 메시지 특징 을 가리 키 며,이 진 은 네 개의 바이트 가 오른쪽 에서 왼쪽으로 셀 수 있 습 니 다.네 개의 바이트 가 모두 0(값 이 0)일 때 비 사무 적 인 메 시 지 를 표시 합 니 다.첫 번 째 바이트 가 1(값 이 1)일 때 메시지 가 압축 되 었 음 을 표시 합 니 다(Compressed).두 번 째 바이트 가 1(값 이 2)이면 다 중 메시지(MultiTags)를 표시 합 니 다.세 번 째 바이트 가 1(값 이 4)일 때 prepared 메 시 지 를 표시 합 니 다.네 번 째 바이트 가 1(값 이 8)일 때 commt 메 시 지 를 표시 합 니 다.3/4 바이트 가 모두 1 일 때(값 이 12)롤 백 메 시 지 를 표시 합 니 다.3/4 바이트 가 모두 0 일 때 비 사무 적 인 메 시 지 를 표시 합 니 다.
    9
    BORNTIMESTAMP
    8
    메시지 생 성 단자(producer)의 시간 스탬프
    10
    BORNHOST
    8
    메시지 생 성 단(producer)주소(address:port)
    11
    STORETIMESTAMP
    8
    메 시 지 는 broker 에 저 장 됩 니 다.
    12
    STOREHOSTADDRESS
    8
    메 시 지 를 broker 의 주소 로 저장 합 니 다(address:port)
    13
    RECONSUMETIMES
    8
    메 시 지 는 한 구독 그룹 에 의 해 몇 번 다시 소비 되 었 습 니 다.(구독 그룹 간 독립 계수)다시 시도 메 시 지 는 topic 이름 이%retry%groupName 인 대기 열 quueId=0 인 대기 열 에 보 내 졌 기 때문에 한 번 의 소 비 는 0 으로 기록 되 었 습 니 다.
    14
    PreparedTransaction Offset
    8
    prepared 상태
    15
    messagebodyLength
    4
    메시지 크기
    16
    messagebody
    bodyLength
    메시지 내용
    17
    topicLength
    1
    토픽 이름 내용 크기
    18
    topic
    topicLength
    topic 의 내용 값
    19
    propertiesLength
    2
    속성 값 크기
    20
    properties
    propertiesLength
    propertiesLength 크기 의 속성 데이터
    2.2、consumequeue
    RocketMQ 는 주제 구독 모델 을 바탕 으로 메시지 소 비 를 실현 하고 소비자 들 이 관심 을 가 지 는 것 은 주제 아래 의 모든 소식 이다.
    그러나 서로 다른 주제 의 메시지 가 commitlog 파일 에 연속 적 으로 저장 되 지 않 기 때문에 이 메시지 파일 만 검색 하면 얼마나 느 린 지 알 수 있 습 니 다.효율 을 높이 기 위해 해당 주제 의 대기 열 에 색인 파일 을 만 들 었 습 니 다.메시지 의 검색 을 가속 화하 고 디스크 공간 을 절약 하기 위해 모든 consumequeue 항목 은 메시지 의 핵심 정보 commotog 파일 의 오프셋 을 저장 합 니 다.메시지 길이,tag 의 hashcode 값 입 니 다.

    디 렉 터 리 구조 보기:

    하나의 consumequeue 파일 에는 기본적으로 30 만 개의 항목 이 포함 되 어 있 습 니 다.각 항목 마다 20 개의 바이트 가 있 기 때문에 각 파일 의 크기 는 고정된 20w x 20 바이트 입 니 다.하나의 consumequeue 파일 은 하나의 배열 이 라 고 볼 수 있 습 니 다.아래 표 시 는 논리 적 오프셋 이 고 메시지 의 소비 진도 저장 의 오프셋 은 논리 적 오프셋 입 니 다.
    2.3、IndexFile
    Index File:생 성 된 색인 파일 에 접근 서 비 스 를 제공 하고 메시지 Key 값 을 통 해 메시지 의 실제 내용 을 조회 합 니 다.실제 물리 적 저장 소 에서 파일 이름 은 생 성 된 시간 스탬프 로 명명 되 며,고정된 단일 인덱스 파일 크기 는 약 400 M 이 며,하나의 인덱스 파일 은 2000 W 개의 인덱스 를 저장 할 수 있다.
    2.3.1 IndexFile 구조 분석

    IndexHead 데이터:beginTimestamp:이 색인 파일 은 메 시 지 를 포함 하 는 최소 저장 시간 endTimestamp:이 색인 파일 은 메 시 지 를 포함 하 는 최대 저장 시간 beginPhyoffset:이 색인 파일 에 메 시 지 를 포함 하 는 최소 물리 적 오프셋(commitlog 파일 오프셋)endPhyoffset:이 색인 파일 에 메 시 지 를 포함 하 는 최대 물리 적 오프셋(commitlog 파일 오프셋)양)hashSlotCount:hashslot 개 수 는 hash 슬롯 에서 사용 하 는 개수 가 아 닙 니 다.여기 서 의미 가 크 지 않 습 니 다.indexCount:사용 한 Index 항목 갯 수 입 니 다.
    Hash 슬롯:하나의 IndexFile 은 기본적으로 500 W 개의 Hash 슬롯 을 포함 하고 있 으 며,각 Hash 슬롯 은 이 Hash 슬롯 에 떨 어 진 hashcode 의 최신 Index 색인 을 저장 하고 있 습 니 다.
    Index 항목 목록 hashcode:key 의 hashcode pyoffset:메시지 에 대응 하 는 물리 적 오프셋 timedif:이 메시지 저장 시간 과 첫 번 째 메시지 의 시간 스탬프 의 차이 점 은 0 보다 적 으 면 이 메시지 가 잘못 되 었 음 을 나타 내 는 preIndex No:이 항목 의 이전 기록 인 Index 색인,hash 가 충돌 할 때 이 값 에 따라 링크 구 조 를 구축 합 니 다.
    2.3.2,IndexFile 항목 저장
    RocketMQ 는 메시지 색인 키 와 메시지 의 오프셋 맵 관 계 를 Index File 에 기록 합 니 다.그 핵심 적 인 실현 방법 은 Public boolean putKey(final String key,final long py Offset,final long store Timestamp)입 니 다.매개 변수 의 미 는 메시지 의 색인,메시지 의 물리 적 오프셋,메시지 의 저장 시간 이다.
    
    public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
        	//                     
            if (this.indexHeader.getIndexCount() < this.indexNum) {
            	//  KEY hash (   )
                int keyHash = indexKeyHashMethod(key);
                //  hash    
                int slotPos = keyHash % this.hashSlotNum;
                //  hash      
                int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
                FileLock fileLock = null;
                try {
                    // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
                    // false);
                	//  hash       
                    int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                    //         0                
                    if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                        slotValue = invalidIndex;
                    }
                    //                       
                    long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
                    // 
                    timeDiff = timeDiff / 1000;
                    if (this.indexHeader.getBeginTimestamp() <= 0) {
                        timeDiff = 0;
                    } else if (timeDiff > Integer.MAX_VALUE) {
                        timeDiff = Integer.MAX_VALUE;
                    } else if (timeDiff < 0) {
                        timeDiff = 0;
                    }
                    //           =       (40  ) + hash    (4  )*    (500w) +            * index   (20  )
                    int absIndexPos =
                        IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                            + this.indexHeader.getIndexCount() * indexSize;
                    //     key hash (4  )+        (8  )+        index       (4  )+  hash   (4  )
                    this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                    this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                    this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
                    this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
                    //    index          hash  ,    hash   
                    this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
                    
                    if (this.indexHeader.getIndexCount() <= 1) {
                        this.indexHeader.setBeginPhyOffset(phyOffset);
                        this.indexHeader.setBeginTimestamp(storeTimestamp);
                    }
                    //          ,hash    、index     、          、         
                    this.indexHeader.incHashSlotCount();
                    this.indexHeader.incIndexCount();
                    this.indexHeader.setEndPhyOffset(phyOffset);
                    this.indexHeader.setEndTimestamp(storeTimestamp);
                    return true;
                } catch (Exception e) {
                    log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
                } finally {
                    if (fileLock != null) {
                        try {
                            fileLock.release();
                        } catch (IOException e) {
                            log.error("Failed to release the lock", e);
                        }
                    }
                }
            } else {
                log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
                    + "; index max num = " + this.indexNum);
            }
            return false;
        }
    이상 은 Index File 항목 에 저 장 된 업무 논 리 를 상세 하 게 분석 하 였 다.
    2.3.3、KEY 를 통 해 소식 찾기

    DefaultMessage Store 클래스 의
    
    public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) 
    핵심 방법 은?
    
    QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime);
    메 시 지 를 가 져 오 는 물리 적 저장 소 주 소 는 오프셋 을 통 해 commitLog 에서 메시지 집합 을 가 져 옵 니 다.
    
    public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end)
    핵심 방법 은 Index File 류 중의
    
    public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum, final long begin, final long end, boolean lock)
    방법.
    
    public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
    	final long begin, final long end, boolean lock) {
    	if (this.mappedFile.hold()) {
    		//  key hash  
    		int keyHash = indexKeyHashMethod(key);
    		//  hash    
    		int slotPos = keyHash % this.hashSlotNum;
    		//  hash      
    		int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
    		FileLock fileLock = null;
    		try {
    			if (lock) {
    				// fileLock = this.fileChannel.lock(absSlotPos,
    				// hashSlotSize, true);
    			}
    			//  hash   
    			int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
    			// if (fileLock != null) {
    			// fileLock.release();
    			// fileLock = null;
    			// }
    			//         0                
    			if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
    				|| this.indexHeader.getIndexCount() <= 1) {
    			} else {
    				for (int nextIndexToRead = slotValue; ; ) {
    					if (phyOffsets.size() >= maxNum) {
    						break;
    					}
    					//           =       (40  ) + hash    (4  )*    (500w) +            * index   (20  )
    					int absIndexPos =
    						IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
    							+ nextIndexToRead * indexSize;
    					//  key hash 
    					int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
    					//          
    					long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
    					//             index        
    					long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
    					//          (    )
    					int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
    					if (timeDiff < 0) {
    						break;
    					}
    					timeDiff *= 1000L;
    					long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
    					//             
    					boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
    					//  key hash                
    					if (keyHash == keyHashRead && timeMatched) {
    						//         List 
    						phyOffsets.add(phyOffsetRead);
    					}
    					if (prevIndexRead <= invalidIndex
    						|| prevIndexRead > this.indexHeader.getIndexCount()
    						|| prevIndexRead == nextIndexToRead || timeRead < begin) {
    						break;
    					}
    					//               
    					nextIndexToRead = prevIndexRead;
    				}
    			}
    		} catch (Exception e) {
    			log.error("selectPhyOffset exception ", e);
    		} finally {
    			if (fileLock != null) {
    				try {
    					fileLock.release();
    				} catch (IOException e) {
    					log.error("Failed to release the lock", e);
    				}
    			}
    			this.mappedFile.release();
    		}
    	}
    }
    1.조 회 된 key 의 hashcode%slotNum 에 따라 구체 적 인 슬롯 의 위 치 를 얻 을 수 있 습 니 다(slotNum 은 색인 파일 에 포 함 된 최대 슬롯 의 수 입 니 다.예 를 들 어 그림 에서 보 여 준 slotNum=5000000).
    2.slotValue(slot 위치 에 대응 하 는 값)에 따라 색인 항목 목록 의 마지막 항목 을 찾 습 니 다.(거꾸로 배열 되 어 있 으 며,slotValue 는 항상 최신 색인 항목 을 가리 키 고 있 습 니 다.)
    3.색인 항목 목록 을 옮 겨 다 니 며 조회 시간 범위 내의 결과 집합 을 되 돌려 줍 니 다(기본 값 으로 최대 되 돌아 오 는 32 개의 기록)
    4.Hash 충돌;key 의 slot 위 치 를 찾 을 때 두 번 의 해시 함 수 를 실행 한 것 과 같 습 니 다.한 번 key 의 hash,한 번 key 의 hash 값 을 모드 로 추출 한 것 과 같 기 때문에 두 번 의 충돌 이 있 습 니 다.첫 번 째,key 의 hash 는 다 르 지만 모드 는 같 습 니 다.이 때 검색 할 때 key 의 hash 값(색인 항목 마다 key 의 hash 값 을 저장 합 니 다)을 비교 하여 hash 값 이 다른 항목 을 걸 러 냅 니 다.두 번 째,hash 값 은 같 지만 key 는 다 릅 니 다.성능 을 고려 하여 충돌 하 는 검 측 을 클 라 이언 트 처리(key 의 원시 값 은 메시지 파일 에 저 장 된 것 이 므 로 데이터 파일 에 대한 분석 을 피하 십시오).클 라 이언 트 는 메시지 체 의 key 가 같은 지 비교 합 니 다.
    2.4、checkpoint
    checkpoint 파일 의 역할 은 commitlog,consumequeue,index 파일 의 브러시 시간 을 기록 하 는 것 입 니 다.파일 의 고정 길 이 는 4k 이 며 이 파일 의 앞 24 바이트 만 사용 합 니 다.저장 소 형식 보기
    physicMsgTimestamp:commitlog 파일 브러시 시간logicsMsgTimestamp:메시지 의 소비 대기 열 파일 브러시 시간indexMsgTimestamp:색인 파일 브러시 시간
    이상 은 개인 적 인 경험 이 므 로 여러분 에 게 참고 가 되 기 를 바 랍 니 다.여러분 들 도 저 희 를 많이 응원 해 주시 기 바 랍 니 다.

    좋은 웹페이지 즐겨찾기