亚洲精品中文免费|亚洲日韩中文字幕制服|久久精品亚洲免费|一本之道久久免费

      
      

            <dl id="hur0q"><div id="hur0q"></div></dl>

                五張圖帶你理解 RocketMQ 延時(shí)消息機(jī)制

                五張圖帶你理解 RocketMQ 延時(shí)消息機(jī)制

                今天來(lái)聊一聊 RocketMQ 的延時(shí)消息是怎么實(shí)現(xiàn)的。

                延時(shí)消息是指發(fā)送到 RocketMQ 后不會(huì)馬上被消費(fèi)者拉取到,而是等待固定的時(shí)間,才能被消費(fèi)者拉取到。

                延時(shí)消息的使用場(chǎng)景很多,比如電商場(chǎng)景下關(guān)閉超時(shí)未支付的訂單,某些場(chǎng)景下需要在固定時(shí)間后發(fā)送提示消息。

                1.生產(chǎn)者

                首先看一個(gè)生產(chǎn)者發(fā)送延時(shí)消息的官方示例代碼

                public static void main(String[] args) throws Exception { // Instantiate a producer to send scheduled messages DefaultMQProducer producer = new DefaultMQProducer(“ExampleProducerGroup”); // Launch producer producer.start(); int totalMessagesToSend = 100; for (int i = 0; i < totalMessagesToSend; i++) { Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes()); // This message will be delivered to consumer 10 seconds later. message.setDelayTimeLevel(3); // Send the message producer.send(message); } // Shutdown producer after use. producer.shutdown();}

                從上面的代碼可以看到,跟普通消息不一樣的是,消息設(shè)置 setDelayTimeLevel 屬性值,這里設(shè)置為 3,這里最終將 3 這個(gè)延時(shí)級(jí)別復(fù)制給了 DELAY 屬性。

                關(guān)于延時(shí)級(jí)別,可以看下面這個(gè)定義:

                //MessageStoreConfig類(lèi)private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;

                這里延時(shí)級(jí)別有 18 個(gè),上面的示例代碼中延遲級(jí)別是 3,消息會(huì)延遲 10s 后消費(fèi)者才能拉取。

                2.Broker 處理

                2.1 寫(xiě)入消息

                Broker 收到消息后,會(huì)將消息寫(xiě)入 CommitLog。在寫(xiě)入時(shí),會(huì)判斷消息 DELAY 屬性是否大于 0。代碼如下:

                //CommitLog 類(lèi)if (msg.getDelayTimeLevel() > 0) { if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId);}

                從上面的代碼可以看到,CommitLog 寫(xiě)入時(shí)并沒(méi)有直接寫(xiě)入,而是把 Topic 改為 SCHEDULE_TOPIC_XXXX,把 queueId 改為延時(shí)級(jí)別減 1。因?yàn)檠訒r(shí)級(jí)別有 18 個(gè),所以這里有 18 個(gè)隊(duì)列。如下圖:

                2.2 調(diào)度消息

                延時(shí)消息寫(xiě)入后,會(huì)有一個(gè)調(diào)度任務(wù)不停地拉取這些延時(shí)消息,這個(gè)邏輯在類(lèi) ScheduleMessageService。這個(gè)類(lèi)的初始化代碼如下:

                public void start() { if (started.compareAndSet(false, true)) { this.load(); this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl(“ScheduleMessageTimerThread_”)); //省略部分邏輯 for (Map.Entry entry : this.delayLevelTable.entrySet()) { Integer level = entry.getKey(); Long timeDelay = entry.getValue(); Long offset = this.offsetTable.get(level); if (null == offset) { offset = 0L; } if (timeDelay != null) { //省略部分邏輯 this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS); } } //省略持久化的邏輯 }}

                上面的 load() 方法會(huì)加載一個(gè) delayLevelTable(ConcurrentHashMap類(lèi)型),key 保存延時(shí)級(jí)別(從 1 開(kāi)始),value 保存延時(shí)時(shí)間(單位是 ms)。

                load() 方法結(jié)束后,創(chuàng)建了一個(gè)有 18 個(gè)核心線(xiàn)程的定時(shí)線(xiàn)程池,然后遍歷 delayLevelTable,創(chuàng)建 18 個(gè)任務(wù)(DeliverDelayedMessageTimerTask)進(jìn)行每個(gè)延時(shí)級(jí)別的任務(wù)調(diào)度。任務(wù)調(diào)度的代碼邏輯如下:

                public void executeOnTimeup() { ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); if (cq == null) { this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE); return; } SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); if (bufferCQ == null) { //省略部分邏輯 this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE); return; } long nextOffset = this.offset; try { int i = 0; ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); for (; i 0) { this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE); return; } MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy); if (msgExt == null) { continue; } MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt); //事務(wù)消息判斷省略 boolean deliverSuc; //只保留同步 deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy); if (!deliverSuc) { this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE); return; } } nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); } catch (Exception e) { log.error(“ScheduleMessageService, messageTimeup execute error, offset = {}”, nextOffset, e); } finally { bufferCQ.release(); } this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);}

                這段代碼可以參考下面的流程圖來(lái)進(jìn)行理解:

                上面有一個(gè)修正投遞時(shí)間的函數(shù),這個(gè)函數(shù)的意義是如果已經(jīng)過(guò)了投遞時(shí)間,那么立即投遞。代碼如下:

                private long correctDeliverTimestamp(final long now, final long deliverTimestamp) { long result = deliverTimestamp; long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel); if (deliverTimestamp > maxTimestamp) { result = now; } return result;}

                注意:消息從 CommitLog 轉(zhuǎn)發(fā)到 ConsumeQueue 時(shí),會(huì)判斷是否是延時(shí)消息(Topic = SCHEDULE_TOPIC_XXXX 并且延時(shí)級(jí)別大于 0),如果是延時(shí)消息,就會(huì)修改 tagsCode 值為消息投遞的時(shí)間戳,而 tagsCode 原值是 tag 的 HashCode。代碼如下:

                //CommitLog類(lèi)checkMessageAndReturnSize方法if (delayLevel > 0) { tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel, storeTimestamp);}

                如下圖:

                而 ScheduleMessageService 調(diào)度線(xiàn)程將消息從 ConsumeQueue 重新投遞到原始隊(duì)列中時(shí),會(huì)把 tagsCode 再次修改為 tag 的 HashCode,代碼如下:

                //類(lèi)MessageExtBrokerInner,這個(gè)方法被 messageTimeup 方法調(diào)用。public static long tagsString2tagsCode(final TopicFilterType filter, final String tags) { if (null == tags || tags.length() == 0) { return 0; } return tags.hashCode();}

                如下圖:

                2.3 一個(gè)問(wèn)題

                如果有一個(gè)業(yè)務(wù)場(chǎng)景,要求延時(shí)消息 3 小時(shí)才能消費(fèi),而 RocketMQ 的延時(shí)消息最大延時(shí)級(jí)別只支持延時(shí) 2 小時(shí),怎么處理?

                這里提供兩個(gè)思路供大家參考:

                在 Broker 上修改 messageDelayLevel 的默認(rèn)配置;

                客戶(hù)端緩存 msgId,先設(shè)置延時(shí)級(jí)別是 18(2h),當(dāng)客戶(hù)端拉取到消息后首先判斷有沒(méi)有緩存,如果有緩存則再次發(fā)送延時(shí)消息,這次延時(shí)級(jí)別是 17(1h),如果沒(méi)有緩存則進(jìn)行消費(fèi)。

                3 總結(jié)

                經(jīng)過(guò)上面的講解,延時(shí)消息的處理流程如下:

                最后,延時(shí)消息的延時(shí)時(shí)間并不精確,這個(gè)時(shí)間是 Broker 調(diào)度線(xiàn)程把消息重新投遞到原始的 MessageQueue 的時(shí)間,如果發(fā)生消息積壓或者 RocketMQ 客戶(hù)端發(fā)生流量管控,客戶(hù)端拉取到消息后進(jìn)行處理的時(shí)間可能會(huì)超出預(yù)設(shè)的延時(shí)時(shí)間

                鄭重聲明:本文內(nèi)容及圖片均整理自互聯(lián)網(wǎng),不代表本站立場(chǎng),版權(quán)歸原作者所有,如有侵權(quán)請(qǐng)聯(lián)系管理員(admin#wlmqw.com)刪除。
                用戶(hù)投稿
                上一篇 2022年6月14日 21:18
                下一篇 2022年6月15日 06:02

                相關(guān)推薦

                • 分享4條發(fā)微商朋友圈的方法(微商朋友圈應(yīng)該怎么發(fā))

                  對(duì)于微商朋友來(lái)說(shuō),朋友圈的重要性不言而喻了。 那么微商的朋友圈到底該怎么發(fā)呢? 為什么同樣是經(jīng)營(yíng)一個(gè)朋友圈,有的微商看起來(lái)逼格滿(mǎn)滿(mǎn),實(shí)際效果也不錯(cuò);而有的卻動(dòng)都不動(dòng)就被屏蔽甚至拉黑…

                  2022年11月27日
                • 劉畊宏回應(yīng)梅西輸球后哭了:跳操流汗到眼睛 剛好有點(diǎn)流鼻水

                  11月23日,劉畊宏發(fā)言回應(yīng)自己再梅西輸球后流淚的消息,他寫(xiě)道:“我是有些難過(guò)… 然后…跳操流汗到眼睛,剛好有點(diǎn)流鼻水,阿根廷之后的比賽會(huì)贏的!”據(jù)悉,11月22日的世界杯比賽中,…

                  2022年11月26日
                • EDG粉絲酸了!JDG重磅官宣,頂級(jí)打野Kanavi留在LPL賽區(qū)

                  2022英雄聯(lián)盟職業(yè)聯(lián)賽冬季轉(zhuǎn)會(huì)期已經(jīng)于11月22日拉開(kāi)帷幕,在轉(zhuǎn)會(huì)期首日作為L(zhǎng)PL觀(guān)眾關(guān)注的焦點(diǎn)的JDG戰(zhàn)隊(duì),就官宣了Yagao離隊(duì)以及Homme續(xù)約的消息,這讓人十分意外。畢竟…

                  2022年11月25日
                • 《寶可夢(mèng)朱紫》夢(mèng)特性怎么獲得?隱藏特性獲取方法推薦

                  寶可夢(mèng)朱紫里有很多寶可夢(mèng)都是擁有夢(mèng)特性會(huì)變強(qiáng)的寶可夢(mèng),很多玩家不知道夢(mèng)特性怎么獲得,下面就給大家?guī)?lái)寶可夢(mèng)朱紫隱藏特性獲取方法推薦,感興趣的小伙伴一起來(lái)看看吧,希望能幫助到大家。 …

                  2022年11月25日
                • 《寶可夢(mèng)朱紫》奇魯莉安怎么進(jìn)化?奇魯莉安進(jìn)化方法分享

                  寶可夢(mèng)朱紫中的奇魯莉安要怎么進(jìn)化呢?很多玩家都不知道,下面就給大家?guī)?lái)寶可夢(mèng)朱紫奇魯莉安進(jìn)化方法分享,感興趣的小伙伴一起來(lái)看看吧,希望能幫助到大家。 奇魯莉安進(jìn)化方法分享 奇魯莉安…

                  2022年11月25日
                • 5+3疫情防控從哪天開(kāi)始算(遼寧疫情防控最新政策)

                  最近有關(guān)國(guó)內(nèi)各地的疫情大家也都有在持續(xù)關(guān)注,目前國(guó)內(nèi)各地疫情隔離時(shí)間也根據(jù)二十條防控措施有了新的調(diào)整。那么,5+3疫情防控從哪天開(kāi)始算?對(duì)于密接的5+3隔離時(shí)間計(jì)算大家還是比較關(guān)心…

                  2022年11月25日
                • 藍(lán)碼怎么變綠碼需要幾天(藍(lán)碼怎么變綠碼需要幾天)

                  大家都知道健康碼的顏色有紅碼、綠碼、黃碼,近日湖南健康碼上線(xiàn)“藍(lán)碼”,不少小伙伴發(fā)現(xiàn)自己健康碼變藍(lán)了,都想趕緊恢復(fù)綠碼,那么藍(lán)碼怎么變綠碼需要幾天?下面小編為大家?guī)?lái)藍(lán)碼變綠碼需要…

                  2022年11月25日
                • 拼多多百億補(bǔ)貼預(yù)售一般多久發(fā)貨(拼多多百億補(bǔ)貼預(yù)售)

                  拼多多里面有很多優(yōu)惠活動(dòng),其中百億補(bǔ)貼活動(dòng)非?;鸨恍├锩娴臇|西價(jià)格比別的平臺(tái)便宜,質(zhì)量也有保障,還有預(yù)售的活動(dòng),那么拼多多百億補(bǔ)貼預(yù)售一般多久發(fā)貨?下面小編為大家?guī)?lái)拼多多百億…

                  2022年11月25日
                • 北京疫情多久能解除封控(北京疫情還要多久結(jié)束)

                  最近一段時(shí)間北京疫情形勢(shì)備受關(guān)注,馬上就要到年底了,不少人想要去北京辦事,。都非常關(guān)注當(dāng)?shù)匾咔橄嚓P(guān)政策,那么 北京疫情多久能解除封控?北京疫情什么時(shí)候恢復(fù)正常生活?下面小編為大家?guī)А?/p>

                  2022年11月25日
                • 賈乃亮的消息的最新動(dòng)態(tài)(賈乃亮終于又宣布好消息)

                  本以為賈乃亮與李小璐官宣離婚后的畫(huà)風(fēng),該是“一別兩寬,各生歡喜”。 誰(shuí)知卻是“剪不斷,理還亂”,八卦傳聞比離婚前還多。 最近,就有不少新聞報(bào)道稱(chēng),賈乃亮和李小璐又決定為了女兒復(fù)合。…

                  2022年11月25日

                聯(lián)系我們

                聯(lián)系郵箱:admin#wlmqw.com
                工作時(shí)間:周一至周五,10:30-18:30,節(jié)假日休息