摘要: 原创出处 http://www.iocoder.cn/RocketMQ/message-store/
1、概述
本文接《RocketMQ 源码分析 —— Message 发送与接收》。 主要解析 CommitLog
存储消息部分。
2、CommitLog 结构
CommitLog
、MappedFileQueue
、MappedFile
的关系如下:
CommitLog
:MappedFileQueue
:MappedFile
= 1 : 1 : N。
反应到系统文件如下:
Yunai-MacdeMacBook-Pro-2:commitlog yunai$
pwd
/Users/yunai/store/commitlog
Yunai-MacdeMacBook-Pro-2:commitlog yunai$ ls -l
total 10485760
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:27 00000000000000000000
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:29 00000000001073741824
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:32 00000000002147483648
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:33 00000000003221225472
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:32 00000000004294967296
|
CommitLog
、MappedFileQueue
、MappedFile
的定义如下:
MappedFile
:00000000000000000000、00000000001073741824、00000000002147483648等文件。MappedFileQueue
:MappedFile
所在的文件夹,对MappedFile
进行封装成文件队列,对上层提供可无限使用的文件容量。- 每个
MappedFile
统一文件大小。 - 文件命名方式:fileName[n] = fileName[n - 1] + mappedFileSize。在
CommitLog
里默认为 1GB。
- 每个
CommitLog
:针对MappedFileQueue
的封装使用。
CommitLog
目前存储在 MappedFile
有两种内容类型:
- MESSAGE :消息。
- BLANK :文件不足以存储消息时的空白占位。
CommitLog
存储在 MappedFile
的结构:
MESSAGE
在 CommitLog
存储结构:
第几位 | 字段 | 说明 | 数据类型 | 字节数 |
---|---|---|---|---|
1 | MsgLen | 消息总长度 | Int | 4 |
2 | MagicCode | MESSAGE_MAGIC_CODE | Int | 4 |
3 | BodyCRC | 消息内容CRC | Int | 4 |
4 | QueueId | 消息队列编号 | Int | 4 |
5 | Flag | flag | Int | 4 |
6 | QueueOffset | 消息队列位置 | Long | 8 |
7 | PhysicalOffset | 物理位置。在 CommitLog 的顺序存储位置。 |
Long | 8 |
8 | SysFlag | MessageSysFlag | Int | 4 |
9 | BornTimestamp | 生成消息时间戳 | Long | 8 |
10 | BornHost | 生效消息的地址+端口 | Long | 8 |
11 | StoreTimestamp | 存储消息时间戳 | Long | 8 |
12 | StoreHost | 存储消息的地址+端口 | Long | 8 |
13 | ReconsumeTimes | 重新消费消息次数 | Int | 4 |
14 | PreparedTransationOffset | Long | 8 | |
15 | BodyLength + Body | 内容长度 + 内容 | Int + Bytes | 4 + bodyLength |
16 | TopicLength + Topic | Topic长度 + Topic | Byte + Bytes | 1 + topicLength |
17 | PropertiesLength + Properties | 拓展字段长度 + 拓展字段 | Short + Bytes | 2 + PropertiesLength |
BLANK
在 CommitLog
存储结构:
第几位 | 字段 | 说明 | 数据类型 | 字节数 |
---|---|---|---|---|
1 | maxBlank | 空白长度 | Int | 4 |
2 | MagicCode | BLANK_MAGIC_CODE | Int | 4 |
3、CommitLog 存储消息
CommitLog#putMessage(...)
1:
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
2:
// Set the storage time
3: msg.setStoreTimestamp(System.currentTimeMillis());
4:
// Set the message body BODY CRC (consider the most appropriate setting
5:
// on the client)
6: msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
7:
// Back to Results
8: AppendMessageResult result =
null;
9:
10: StoreStatsService storeStatsService =
this.defaultMessageStore.getStoreStatsService();
11:
12: String topic = msg.getTopic();
13:
int queueId = msg.getQueueId();
14:
15:
// 事务相关 TODO 待读:事务相关
16:
final
int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
17:
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
//
18: || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
19:
// Delay Delivery
20:
if (msg.getDelayTimeLevel() >
0) {
21:
if (msg.getDelayTimeLevel() >
this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
22: msg.setDelayTimeLevel(
this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
23: }
24:
25: topic = ScheduleMessageService.SCHEDULE_TOPIC;
26: queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
27:
28:
// Backup real topic, queueId
29: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
30: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
31: msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
32:
33: msg.setTopic(topic);
34: msg.setQueueId(queueId);
35: }
36: }
37:
38:
long eclipseTimeInLock =
0;
39:
40:
// 获取写入映射文件
41: MappedFile unlockMappedFile =
null;
42: MappedFile mappedFile =
this.mappedFileQueue.getLastMappedFile();
43:
44:
// 获取写入锁
45: lockForPutMessage();
//spin...
46:
try {
47:
long beginLockTimestamp =
this.defaultMessageStore.getSystemClock().now();
48:
this.beginTimeInLock = beginLockTimestamp;
49:
50:
// Here settings are stored timestamp, in order to ensure an orderly
51:
// global
52: msg.setStoreTimestamp(beginLockTimestamp);
53:
54:
// 当不存在映射文件时,进行创建
55:
if (
null == mappedFile || mappedFile.isFull()) {
56: mappedFile =
this.mappedFileQueue.getLastMappedFile(
0);
// Mark: NewFile may be cause noise
57: }
58:
if (
null == mappedFile) {
59: log.error(
"create maped file1 error, topic: " + msg.getTopic() +
" clientAddr: " + msg.getBornHostString());
60: beginTimeInLock =
0;
61:
return
new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED,
null);
62: }
63:
64:
// 存储消息
65: result = mappedFile.appendMessage(msg,
this.appendMessageCallback);
66:
switch (result.getStatus()) {
67:
case PUT_OK:
68:
break;
69:
case END_OF_FILE:
// 当文件尾时,获取新的映射文件,并进行插入
70: unlockMappedFile = mappedFile;
71:
// Create a new file, re-write the message
72: mappedFile =
this.mappedFileQueue.getLastMappedFile(
0);
73:
if (
null == mappedFile) {
74:
// XXX: warn and notify me
75: log.error(
"create maped file2 error, topic: " + msg.getTopic() +
" clientAddr: " + msg.getBornHostString());
76: beginTimeInLock =
0;
77:
return
new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
78: }
79: result = mappedFile.appendMessage(msg,
this.appendMessageCallback);
80:
break;
81:
case MESSAGE_SIZE_EXCEEDED:
82:
case PROPERTIES_SIZE_EXCEEDED:
83: beginTimeInLock =
0;
84:
return
new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
85:
case UNKNOWN_ERROR:
86: beginTimeInLock =
0;
87:
return
new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
88:
default:
89: beginTimeInLock =
0;
90:
return
new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
91: }
92:
93: eclipseTimeInLock =
this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
94: beginTimeInLock =
0;
95: }
finally {
96:
// 释放写入锁
97: releasePutMessageLock();
98: }
99:
100:
if (eclipseTimeInLock >
500) {
101: log.warn(
"[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
102: }
103:
104:
//
105:
if (
null != unlockMappedFile &&
this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
106:
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
107: }
108:
109: PutMessageResult putMessageResult =
new PutMessageResult(PutMessageStatus.PUT_OK, result);
110:
111:
// Statistics
112: storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
113: storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
114:
115:
// 进行同步||异步 flush||commit
116: GroupCommitRequest request =
null;
117:
// Synchronization flush
118:
if (FlushDiskType.SYNC_FLUSH ==
this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
119:
final GroupCommitService service = (GroupCommitService)
this.flushCommitLogService;
120:
if (msg.isWaitStoreMsgOK()) {
121: request =
new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
122: service.putRequest(request);
123:
boolean flushOK = request.waitForFlush(
this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
124:
if (!flushOK) {
125: log.error(
"do groupcommit, wait for flush failed, topic: " + msg.getTopic() +
" tags: " + msg.getTags()
126: +
" client address: " + msg.getBornHostString());
127: putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
128: }
129: }
else {
130: service.wakeup();
131: }
132: }
133:
// Asynchronous flush
134:
else {
135:
if (!
this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
136: flushCommitLogService.wakeup();
// important:唤醒commitLog线程,进行flush
137: }
else {
138: commitLogService.wakeup();
139: }
140: }
141:
142:
// Synchronous write double 如果是同步Master,同步到从节点 // TODO 待读:数据同步
143:
if (BrokerRole.SYNC_MASTER ==
this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
144: HAService service =
this.defaultMessageStore.getHaService();
145:
if (msg.isWaitStoreMsgOK()) {
146:
// Determine whether to wait
147:
if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
148:
if (
null == request) {
149: request =
new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
150: }
151: service.putRequest(request);
152:
153: service.getWaitNotifyObject().wakeupAll();
154:
155:
boolean flushOK =
156:
// TODO
157: request.waitForFlush(
this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
158:
if (!flushOK) {
159: log.error(
"do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() +
" tags: "
160: + msg.getTags() +
" client address: " + msg.getBornHostString());
161: putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
162: }
163: }
164:
// Slave problem
165:
else {
166:
// Tell the producer, slave not available
167: putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
168: }
169: }
170: }
171:
172:
return putMessageResult;
173: }
|
- 说明 :存储消息,并返回存储结果。
- 第 2 行 :设置存储时间等。
- 第 16 至 36 行 :事务消息相关,暂未了解。
- 第 45 & 97 行 :获取锁与释放锁。
- 第 52 行 :再次设置存储时间。目前会有多处地方设置存储时间。
- 第 55 至 62 行 :获取
MappedFile
,若不存在或已满,则进行创建。详细解析见:MappedFileQueue#getLastMappedFile(...)。 - 第 65 行 :插入消息到
MappedFile
,解析解析见:MappedFile#appendMessage(...)。 - 第 69 至 80 行 :
MappedFile
已满,创建新的,再次插入消息。 - 第 116 至 140 行 :消息刷盘,即持久化到文件。上面插入消息实际未存储到硬盘。此处,根据不同的刷盘策略,执行会有不同。详细解析见:FlushCommitLogService。
- 第 143 至 173 行 :
Broker
主从同步。后面的文章会详细解析