RocketMQ 저장 파일 의 실현
1.개술
디 렉 터 리 파일 보기
commitlog
:메시지 의 저장 디 렉 터 리config
:운영 기간 에 일부 설정 정보consumequeue
:메시지 소비 대기 열 저장 디 렉 터 리index
:메시지 색인 파일 저장 디 렉 터 리abort
:abort 파일 이 존재 하면 Broker 가 비정상적 으로 닫 혔 음 을 설명 합 니 다.이 파일 은 기본 시작 시 생 성 되 고 정상 종료 시 삭 제 됩 니 다checkpoint
:파일 검 측 점.commitlog 파일 의 마지막 디스크 시간 스탬프,consumequeue 의 마지막 디스크 시간,index 색인 파일 의 마지막 디스크 시간 스탬프 를 저장 합 니 다.2.1 commitlog 파일
commitlog 파일 의 저장 주소:$HOME\store\commitlog${fileName},각 파일 의 크기 는 기본 1G=102410241024,commitlog 의 파일 이름 fileName,이름 길 이 는 20 비트,왼쪽 은 0 을 보충 하고 나머지 는 시작 오프셋 입 니 다.예 를 들 어 0000000000000000000 은 첫 번 째 파일 을 대표 하 는데 시작 편 이 량 은 0 이 고 파일 크기 는 1G=1073741824 이다.이 파일 이 가득 차 면 두 번 째 파일 의 이름 은 000000000001073741824 이 고 시작 오프셋 은 1073741824 입 니 다.이런 식 으로 세 번 째 파일 의 이름 은 00000002147483648 이 며 시작 오프셋 은 2147483648 입 니 다.메 시 지 를 저장 할 때 순서대로 파일 을 기록 하고 파일 이 가득 차 면 다음 파일 을 기록 합 니 다.
commtlog 디 렉 터 리 에 있 는 파일 은 주로 메 시 지 를 저장 합 니 다.메시지 의 길이 가 다 르 기 때문에 저 장 된 논리 적 보 기 를 보십시오.메시지 의 앞 에 있 는 4 개의 바이트 에 이 메시지 의 총 길 이 를 저장 합 니 다.
파일 메시지 셀 에 자세 한 정 보 를 저장 합 니 다.
번호
필드 줄 임 말
필드 크기(바이트)
필드 의미
1
msgSize
4
이 메시지 의 크기 를 나타 낸다
2
MAGICCODE
4
MAGICCODE = daa320a7
3
BODY CRC
4
메시지 체 BODY CRC 는 broker 가 recover 를 다시 시작 할 때 검사 합 니 다.
4
queueId
4
5
flag
4
6
QUEUEOFFSET
8
이 값 은 자체 부가 가치 가 진정한 consume quue 의 오프셋 이 아 닙 니 다.이 consume Queue 대기 열 이나 tranState Table 대기 열 에 있 는 메시지 의 개 수 를 대표 할 수 있 습 니 다.비 사무 메시지 나 commt 사무 메시지 라면 이 값 을 통 해 consume Queue 의 데 이 터 를 찾 을 수 있 습 니 다.QUEUOFFSET*20 이 야 말로 오프셋 주소 입 니 다.PREPARED 나 Rollback 트 랜 잭 션 이 라면 이 값 을 통 해 tranState Table 에서 데 이 터 를 찾 을 수 있 습 니 다.
7
PHYSICALOFFSET
8
commitLog 에서 메시지 의 물리 적 시작 주소 오프셋 을 대표 합 니 다.
8
SYSFLAG
4
메 시 지 는 사물 의 사물 상태 등 메시지 특징 을 가리 키 며,이 진 은 네 개의 바이트 가 오른쪽 에서 왼쪽으로 셀 수 있 습 니 다.네 개의 바이트 가 모두 0(값 이 0)일 때 비 사무 적 인 메 시 지 를 표시 합 니 다.첫 번 째 바이트 가 1(값 이 1)일 때 메시지 가 압축 되 었 음 을 표시 합 니 다(Compressed).두 번 째 바이트 가 1(값 이 2)이면 다 중 메시지(MultiTags)를 표시 합 니 다.세 번 째 바이트 가 1(값 이 4)일 때 prepared 메 시 지 를 표시 합 니 다.네 번 째 바이트 가 1(값 이 8)일 때 commt 메 시 지 를 표시 합 니 다.3/4 바이트 가 모두 1 일 때(값 이 12)롤 백 메 시 지 를 표시 합 니 다.3/4 바이트 가 모두 0 일 때 비 사무 적 인 메 시 지 를 표시 합 니 다.
9
BORNTIMESTAMP
8
메시지 생 성 단자(producer)의 시간 스탬프
10
BORNHOST
8
메시지 생 성 단(producer)주소(address:port)
11
STORETIMESTAMP
8
메 시 지 는 broker 에 저 장 됩 니 다.
12
STOREHOSTADDRESS
8
메 시 지 를 broker 의 주소 로 저장 합 니 다(address:port)
13
RECONSUMETIMES
8
메 시 지 는 한 구독 그룹 에 의 해 몇 번 다시 소비 되 었 습 니 다.(구독 그룹 간 독립 계수)다시 시도 메 시 지 는 topic 이름 이%retry%groupName 인 대기 열 quueId=0 인 대기 열 에 보 내 졌 기 때문에 한 번 의 소 비 는 0 으로 기록 되 었 습 니 다.
14
PreparedTransaction Offset
8
prepared 상태
15
messagebodyLength
4
메시지 크기
16
messagebody
bodyLength
메시지 내용
17
topicLength
1
토픽 이름 내용 크기
18
topic
topicLength
topic 의 내용 값
19
propertiesLength
2
속성 값 크기
20
properties
propertiesLength
propertiesLength 크기 의 속성 데이터
2.2、consumequeue
RocketMQ 는 주제 구독 모델 을 바탕 으로 메시지 소 비 를 실현 하고 소비자 들 이 관심 을 가 지 는 것 은 주제 아래 의 모든 소식 이다.
그러나 서로 다른 주제 의 메시지 가 commitlog 파일 에 연속 적 으로 저장 되 지 않 기 때문에 이 메시지 파일 만 검색 하면 얼마나 느 린 지 알 수 있 습 니 다.효율 을 높이 기 위해 해당 주제 의 대기 열 에 색인 파일 을 만 들 었 습 니 다.메시지 의 검색 을 가속 화하 고 디스크 공간 을 절약 하기 위해 모든 consumequeue 항목 은 메시지 의 핵심 정보 commotog 파일 의 오프셋 을 저장 합 니 다.메시지 길이,tag 의 hashcode 값 입 니 다.
디 렉 터 리 구조 보기:
하나의 consumequeue 파일 에는 기본적으로 30 만 개의 항목 이 포함 되 어 있 습 니 다.각 항목 마다 20 개의 바이트 가 있 기 때문에 각 파일 의 크기 는 고정된 20w x 20 바이트 입 니 다.하나의 consumequeue 파일 은 하나의 배열 이 라 고 볼 수 있 습 니 다.아래 표 시 는 논리 적 오프셋 이 고 메시지 의 소비 진도 저장 의 오프셋 은 논리 적 오프셋 입 니 다.
2.3、IndexFile
Index File:생 성 된 색인 파일 에 접근 서 비 스 를 제공 하고 메시지 Key 값 을 통 해 메시지 의 실제 내용 을 조회 합 니 다.실제 물리 적 저장 소 에서 파일 이름 은 생 성 된 시간 스탬프 로 명명 되 며,고정된 단일 인덱스 파일 크기 는 약 400 M 이 며,하나의 인덱스 파일 은 2000 W 개의 인덱스 를 저장 할 수 있다.
2.3.1 IndexFile 구조 분석
IndexHead 데이터:beginTimestamp:이 색인 파일 은 메 시 지 를 포함 하 는 최소 저장 시간 endTimestamp:이 색인 파일 은 메 시 지 를 포함 하 는 최대 저장 시간 beginPhyoffset:이 색인 파일 에 메 시 지 를 포함 하 는 최소 물리 적 오프셋(commitlog 파일 오프셋)endPhyoffset:이 색인 파일 에 메 시 지 를 포함 하 는 최대 물리 적 오프셋(commitlog 파일 오프셋)양)hashSlotCount:hashslot 개 수 는 hash 슬롯 에서 사용 하 는 개수 가 아 닙 니 다.여기 서 의미 가 크 지 않 습 니 다.indexCount:사용 한 Index 항목 갯 수 입 니 다.
Hash 슬롯:하나의 IndexFile 은 기본적으로 500 W 개의 Hash 슬롯 을 포함 하고 있 으 며,각 Hash 슬롯 은 이 Hash 슬롯 에 떨 어 진 hashcode 의 최신 Index 색인 을 저장 하고 있 습 니 다.
Index 항목 목록 hashcode:key 의 hashcode pyoffset:메시지 에 대응 하 는 물리 적 오프셋 timedif:이 메시지 저장 시간 과 첫 번 째 메시지 의 시간 스탬프 의 차이 점 은 0 보다 적 으 면 이 메시지 가 잘못 되 었 음 을 나타 내 는 preIndex No:이 항목 의 이전 기록 인 Index 색인,hash 가 충돌 할 때 이 값 에 따라 링크 구 조 를 구축 합 니 다.
2.3.2,IndexFile 항목 저장
RocketMQ 는 메시지 색인 키 와 메시지 의 오프셋 맵 관 계 를 Index File 에 기록 합 니 다.그 핵심 적 인 실현 방법 은 Public boolean putKey(final String key,final long py Offset,final long store Timestamp)입 니 다.매개 변수 의 미 는 메시지 의 색인,메시지 의 물리 적 오프셋,메시지 의 저장 시간 이다.
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
//
if (this.indexHeader.getIndexCount() < this.indexNum) {
// KEY hash ( )
int keyHash = indexKeyHashMethod(key);
// hash
int slotPos = keyHash % this.hashSlotNum;
// hash
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
FileLock fileLock = null;
try {
// fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
// false);
// hash
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
// 0
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}
//
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
//
timeDiff = timeDiff / 1000;
if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}
// = (40 ) + hash (4 )* (500w) + * index (20 )
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;
// key hash (4 )+ (8 )+ index (4 )+ hash (4 )
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
// index hash , hash
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
// ,hash 、index 、 、
this.indexHeader.incHashSlotCount();
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);
return true;
} catch (Exception e) {
log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}
}
} else {
log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
+ "; index max num = " + this.indexNum);
}
return false;
}
이상 은 Index File 항목 에 저 장 된 업무 논 리 를 상세 하 게 분석 하 였 다.2.3.3、KEY 를 통 해 소식 찾기
DefaultMessage Store 클래스 의
public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end)
핵심 방법 은?
QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime);
메 시 지 를 가 져 오 는 물리 적 저장 소 주 소 는 오프셋 을 통 해 commitLog 에서 메시지 집합 을 가 져 옵 니 다.
public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end)
핵심 방법 은 Index File 류 중의
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum, final long begin, final long end, boolean lock)
방법.
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
final long begin, final long end, boolean lock) {
if (this.mappedFile.hold()) {
// key hash
int keyHash = indexKeyHashMethod(key);
// hash
int slotPos = keyHash % this.hashSlotNum;
// hash
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
FileLock fileLock = null;
try {
if (lock) {
// fileLock = this.fileChannel.lock(absSlotPos,
// hashSlotSize, true);
}
// hash
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
// if (fileLock != null) {
// fileLock.release();
// fileLock = null;
// }
// 0
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
|| this.indexHeader.getIndexCount() <= 1) {
} else {
for (int nextIndexToRead = slotValue; ; ) {
if (phyOffsets.size() >= maxNum) {
break;
}
// = (40 ) + hash (4 )* (500w) + * index (20 )
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ nextIndexToRead * indexSize;
// key hash
int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
//
long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
// index
long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
// ( )
int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
if (timeDiff < 0) {
break;
}
timeDiff *= 1000L;
long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
//
boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
// key hash
if (keyHash == keyHashRead && timeMatched) {
// List
phyOffsets.add(phyOffsetRead);
}
if (prevIndexRead <= invalidIndex
|| prevIndexRead > this.indexHeader.getIndexCount()
|| prevIndexRead == nextIndexToRead || timeRead < begin) {
break;
}
//
nextIndexToRead = prevIndexRead;
}
}
} catch (Exception e) {
log.error("selectPhyOffset exception ", e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}
this.mappedFile.release();
}
}
}
1.조 회 된 key 의 hashcode%slotNum 에 따라 구체 적 인 슬롯 의 위 치 를 얻 을 수 있 습 니 다(slotNum 은 색인 파일 에 포 함 된 최대 슬롯 의 수 입 니 다.예 를 들 어 그림 에서 보 여 준 slotNum=5000000).2.slotValue(slot 위치 에 대응 하 는 값)에 따라 색인 항목 목록 의 마지막 항목 을 찾 습 니 다.(거꾸로 배열 되 어 있 으 며,slotValue 는 항상 최신 색인 항목 을 가리 키 고 있 습 니 다.)
3.색인 항목 목록 을 옮 겨 다 니 며 조회 시간 범위 내의 결과 집합 을 되 돌려 줍 니 다(기본 값 으로 최대 되 돌아 오 는 32 개의 기록)
4.Hash 충돌;key 의 slot 위 치 를 찾 을 때 두 번 의 해시 함 수 를 실행 한 것 과 같 습 니 다.한 번 key 의 hash,한 번 key 의 hash 값 을 모드 로 추출 한 것 과 같 기 때문에 두 번 의 충돌 이 있 습 니 다.첫 번 째,key 의 hash 는 다 르 지만 모드 는 같 습 니 다.이 때 검색 할 때 key 의 hash 값(색인 항목 마다 key 의 hash 값 을 저장 합 니 다)을 비교 하여 hash 값 이 다른 항목 을 걸 러 냅 니 다.두 번 째,hash 값 은 같 지만 key 는 다 릅 니 다.성능 을 고려 하여 충돌 하 는 검 측 을 클 라 이언 트 처리(key 의 원시 값 은 메시지 파일 에 저 장 된 것 이 므 로 데이터 파일 에 대한 분석 을 피하 십시오).클 라 이언 트 는 메시지 체 의 key 가 같은 지 비교 합 니 다.
2.4、checkpoint
checkpoint 파일 의 역할 은 commitlog,consumequeue,index 파일 의 브러시 시간 을 기록 하 는 것 입 니 다.파일 의 고정 길 이 는 4k 이 며 이 파일 의 앞 24 바이트 만 사용 합 니 다.저장 소 형식 보기
physicMsgTimestamp
:commitlog 파일 브러시 시간logicsMsgTimestamp
:메시지 의 소비 대기 열 파일 브러시 시간indexMsgTimestamp
:색인 파일 브러시 시간이상 은 개인 적 인 경험 이 므 로 여러분 에 게 참고 가 되 기 를 바 랍 니 다.여러분 들 도 저 희 를 많이 응원 해 주시 기 바 랍 니 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
1분에 실장! 파이썬에서 LINE Notify파이썬으로 길게 처리 할 때, 「하지만, 계속 PC에 붙어 보고 있는 것은 피곤하다」 LINE에서 제공하는 API를 사용하면 특별한 인증 없이 파이썬 코드에서 자동으로 LINE으로 메시지를 보낼 수 있습니다. (덧붙...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.