今天來(lái)聊一聊 RocketMQ 的延時(shí)消息是怎么實(shí)現(xiàn)的。
延時(shí)消息是指發(fā)送到 RocketMQ 后不會(huì)馬上被消費(fèi)者拉取到,而是等待固定的時(shí)間,才能被消費(fèi)者拉取到。
延時(shí)消息的使用場(chǎng)景很多,比如電商場(chǎng)景下關(guān)閉超時(shí)未支付的訂單,某些場(chǎng)景下需要在固定時(shí)間后發(fā)送提示消息。
1.生產(chǎn)者
首先看一個(gè)生產(chǎn)者發(fā)送延時(shí)消息的官方示例代碼:
public static void main(String[] args) throws Exception { // Instantiate a producer to send scheduled messages DefaultMQProducer producer = new DefaultMQProducer(“ExampleProducerGroup”); // Launch producer producer.start(); int totalMessagesToSend = 100; for (int i = 0; i < totalMessagesToSend; i++) { Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes()); // This message will be delivered to consumer 10 seconds later. message.setDelayTimeLevel(3); // Send the message producer.send(message); } // Shutdown producer after use. producer.shutdown();}
從上面的代碼可以看到,跟普通消息不一樣的是,消息設(shè)置 setDelayTimeLevel 屬性值,這里設(shè)置為 3,這里最終將 3 這個(gè)延時(shí)級(jí)別復(fù)制給了 DELAY 屬性。
關(guān)于延時(shí)級(jí)別,可以看下面這個(gè)定義:
//MessageStoreConfig類(lèi)private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;
這里延時(shí)級(jí)別有 18 個(gè),上面的示例代碼中延遲級(jí)別是 3,消息會(huì)延遲 10s 后消費(fèi)者才能拉取。
2.Broker 處理
2.1 寫(xiě)入消息
Broker 收到消息后,會(huì)將消息寫(xiě)入 CommitLog。在寫(xiě)入時(shí),會(huì)判斷消息 DELAY 屬性是否大于 0。代碼如下:
//CommitLog 類(lèi)if (msg.getDelayTimeLevel() > 0) { if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId);}
從上面的代碼可以看到,CommitLog 寫(xiě)入時(shí)并沒(méi)有直接寫(xiě)入,而是把 Topic 改為 SCHEDULE_TOPIC_XXXX,把 queueId 改為延時(shí)級(jí)別減 1。因?yàn)檠訒r(shí)級(jí)別有 18 個(gè),所以這里有 18 個(gè)隊(duì)列。如下圖:
2.2 調(diào)度消息
延時(shí)消息寫(xiě)入后,會(huì)有一個(gè)調(diào)度任務(wù)不停地拉取這些延時(shí)消息,這個(gè)邏輯在類(lèi) ScheduleMessageService。這個(gè)類(lèi)的初始化代碼如下:
public void start() { if (started.compareAndSet(false, true)) { this.load(); this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl(“ScheduleMessageTimerThread_”)); //省略部分邏輯 for (Map.Entry entry : this.delayLevelTable.entrySet()) { Integer level = entry.getKey(); Long timeDelay = entry.getValue(); Long offset = this.offsetTable.get(level); if (null == offset) { offset = 0L; } if (timeDelay != null) { //省略部分邏輯 this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS); } } //省略持久化的邏輯 }}
上面的 load() 方法會(huì)加載一個(gè) delayLevelTable(ConcurrentHashMap類(lèi)型),key 保存延時(shí)級(jí)別(從 1 開(kāi)始),value 保存延時(shí)時(shí)間(單位是 ms)。
load() 方法結(jié)束后,創(chuàng)建了一個(gè)有 18 個(gè)核心線(xiàn)程的定時(shí)線(xiàn)程池,然后遍歷 delayLevelTable,創(chuàng)建 18 個(gè)任務(wù)(DeliverDelayedMessageTimerTask)進(jìn)行每個(gè)延時(shí)級(jí)別的任務(wù)調(diào)度。任務(wù)調(diào)度的代碼邏輯如下:
public void executeOnTimeup() { ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); if (cq == null) { this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE); return; } SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); if (bufferCQ == null) { //省略部分邏輯 this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE); return; } long nextOffset = this.offset; try { int i = 0; ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); for (; i 0) { this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE); return; } MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy); if (msgExt == null) { continue; } MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt); //事務(wù)消息判斷省略 boolean deliverSuc; //只保留同步 deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy); if (!deliverSuc) { this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE); return; } } nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); } catch (Exception e) { log.error(“ScheduleMessageService, messageTimeup execute error, offset = {}”, nextOffset, e); } finally { bufferCQ.release(); } this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);}
這段代碼可以參考下面的流程圖來(lái)進(jìn)行理解:
上面有一個(gè)修正投遞時(shí)間的函數(shù),這個(gè)函數(shù)的意義是如果已經(jīng)過(guò)了投遞時(shí)間,那么立即投遞。代碼如下:
private long correctDeliverTimestamp(final long now, final long deliverTimestamp) { long result = deliverTimestamp; long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel); if (deliverTimestamp > maxTimestamp) { result = now; } return result;}
注意:消息從 CommitLog 轉(zhuǎn)發(fā)到 ConsumeQueue 時(shí),會(huì)判斷是否是延時(shí)消息(Topic = SCHEDULE_TOPIC_XXXX 并且延時(shí)級(jí)別大于 0),如果是延時(shí)消息,就會(huì)修改 tagsCode 值為消息投遞的時(shí)間戳,而 tagsCode 原值是 tag 的 HashCode。代碼如下:
//CommitLog類(lèi)checkMessageAndReturnSize方法if (delayLevel > 0) { tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel, storeTimestamp);}
如下圖:
而 ScheduleMessageService 調(diào)度線(xiàn)程將消息從 ConsumeQueue 重新投遞到原始隊(duì)列中時(shí),會(huì)把 tagsCode 再次修改為 tag 的 HashCode,代碼如下:
//類(lèi)MessageExtBrokerInner,這個(gè)方法被 messageTimeup 方法調(diào)用。public static long tagsString2tagsCode(final TopicFilterType filter, final String tags) { if (null == tags || tags.length() == 0) { return 0; } return tags.hashCode();}
如下圖:
2.3 一個(gè)問(wèn)題
如果有一個(gè)業(yè)務(wù)場(chǎng)景,要求延時(shí)消息 3 小時(shí)才能消費(fèi),而 RocketMQ 的延時(shí)消息最大延時(shí)級(jí)別只支持延時(shí) 2 小時(shí),怎么處理?
這里提供兩個(gè)思路供大家參考:
在 Broker 上修改 messageDelayLevel 的默認(rèn)配置;
在客戶(hù)端緩存 msgId,先設(shè)置延時(shí)級(jí)別是 18(2h),當(dāng)客戶(hù)端拉取到消息后首先判斷有沒(méi)有緩存,如果有緩存則再次發(fā)送延時(shí)消息,這次延時(shí)級(jí)別是 17(1h),如果沒(méi)有緩存則進(jìn)行消費(fèi)。
3 總結(jié)
經(jīng)過(guò)上面的講解,延時(shí)消息的處理流程如下:
最后,延時(shí)消息的延時(shí)時(shí)間并不精確,這個(gè)時(shí)間是 Broker 調(diào)度線(xiàn)程把消息重新投遞到原始的 MessageQueue 的時(shí)間,如果發(fā)生消息積壓或者 RocketMQ 客戶(hù)端發(fā)生流量管控,客戶(hù)端拉取到消息后進(jìn)行處理的時(shí)間可能會(huì)超出預(yù)設(shè)的延時(shí)時(shí)間