RocketMQ 是一款開源的消息中間件,采用Java實(shí)現(xiàn),設(shè)計(jì)思想來自于Kafka(Scala實(shí)現(xiàn)),在具體設(shè)計(jì)時(shí)體現(xiàn)了自己的選擇和需求,具體差別可以看RocketMQ與Kafka對(duì)比。接下來是自己閱讀源碼的一些探索。
RocketMQ的整體架構(gòu)如下,可以看到各個(gè)組件充當(dāng)?shù)慕巧琋ame Server 負(fù)責(zé)維護(hù)一些全局的路由信息:當(dāng)前有哪些broker,每個(gè)Topic在哪個(gè)broker上; Broker具體處理消息的存儲(chǔ)和服務(wù);生產(chǎn)者和消費(fèi)者是消息的源頭和歸宿。
一、Producer 發(fā)送消息
Producer發(fā)送消息是如何得知發(fā)到哪個(gè)broker的 ? 每個(gè)應(yīng)用在收發(fā)消息之前,一般會(huì)調(diào)用一次producer.start()/consumer.start()做一些初始化工作,其中包括:創(chuàng)建需要的實(shí)例對(duì)象,如MQClientInstance;設(shè)置定時(shí)任務(wù),如從Nameserver中定時(shí)更新本地的Topic route info,發(fā)送心跳信息到所有的 broker,動(dòng)態(tài)調(diào)整線程池的大小,把當(dāng)前producer加入到指定的組中等等。客戶端會(huì)緩存路由信息TopicPublishInfo, 同時(shí)定期從NameServer取Topic路由信息,每個(gè)Broker與NameServer集群中的所有節(jié)點(diǎn)建立長(zhǎng)連接,定時(shí)注冊(cè)Topic信息到所有的NameServer。Producer在發(fā)送消息的時(shí)候會(huì)去查詢本地的topicPublishInfoTable(一個(gè)ConcurrentHashMap),如果沒有命中的話就會(huì)詢問NameServer得到路由信息 (RequestCode=GET_ROUTEINTO_BY_TOPIC) 如果nameserver中也沒有查詢到(表示該主題的消息第一次發(fā)送),那么將會(huì)發(fā)送一個(gè)default的topic進(jìn)行路由查詢。
具體過程如下圖所示:
Producer 在得到了具體的通信地址后,發(fā)送過程就顯而易見了。通過代碼可以看到在選擇消息隊(duì)列進(jìn)行發(fā)送時(shí)采用隨機(jī)方式,同時(shí)和上一次發(fā)送的broker保持不同,防止熱點(diǎn)。
二、Broker處理來自Producer的消息
每個(gè)producer在發(fā)送消息的時(shí)候都和對(duì)應(yīng)的Broker建立了長(zhǎng)連接,此時(shí)broker已經(jīng)準(zhǔn)備好接收Message,Broker的SendMessageProcessor.sendMessage處理消息的存儲(chǔ),具體過程如下。接收到消息后,會(huì)先寫入Commit Log文件(順序?qū)懀瑢憹M了會(huì)新建一個(gè)新的文件),然后更新Consume queue文件(存儲(chǔ)如何由topic定位到具體的消息)。
三、RocketMQ 存儲(chǔ)特點(diǎn)
RocketMQ的消息采用順序?qū)懙絚ommitlog文件,然后利用consume queue文件作為索引,如圖。RocketMQ采用零拷貝mmap+write的方式來回應(yīng)Consumer的請(qǐng)求,RocketMQ宣稱大部分請(qǐng)求都會(huì)在Page Cache層得到滿足,所以消息過多不會(huì)因?yàn)榇疟P讀使得性能下降,這里自己的理解是,在64bit機(jī)器下,虛存地址空間(vm_area_struct)不是問題,所以相關(guān)的文件都會(huì)被映射到內(nèi)存中(有定期刪除文件的操作),即使此刻不在內(nèi)存,操作系統(tǒng)也會(huì)因?yàn)槿表摦惓_M(jìn)行換入,雖然地址空間不是問題,但是一個(gè)進(jìn)程映射文件的個(gè)數(shù)(/proc/sys/vm/max_map_count)是有限的,所以可能在這里發(fā)生OOM。
通過Broker中的存儲(chǔ)目錄(默認(rèn)路徑是 $HOME/store)也能看到存儲(chǔ)的邏輯視圖:
四、順序消息是如何保證的?
需要業(yè)務(wù)層自己決定哪些消息應(yīng)該順序到達(dá),然后發(fā)送的時(shí)候通過規(guī)則(hash)映射到同一個(gè)隊(duì)列,因?yàn)闆]有誰比業(yè)務(wù)自己更加知道關(guān)于消息順序的特點(diǎn)。這樣的順序是相對(duì)順序,局部順序,因?yàn)榘l(fā)送方只保證把這些消息順序的發(fā)送到broker上的同一隊(duì)列,但是不保證其他Producer也會(huì)發(fā)送消息到那個(gè)隊(duì)列,所以需要Consumer在拉到消息后做一些過濾。
五、RocketMQ 刷盤實(shí)現(xiàn)
Broker 在消息的存取時(shí)直接操作的是內(nèi)存(內(nèi)存映射文件),這可以提供系統(tǒng)的吞吐量,但是無法避免機(jī)器掉電時(shí)數(shù)據(jù)丟失,所以需要持久化到磁盤中。刷盤的最終實(shí)現(xiàn)都是使用NIO中的 MappedByteBuffer.force() 將映射區(qū)的數(shù)據(jù)寫入到磁盤,如果是同步刷盤的話,在Broker把消息寫到CommitLog映射區(qū)后,就會(huì)等待寫入完成。異步而言,只是喚醒對(duì)應(yīng)的線程,不保證執(zhí)行的時(shí)機(jī),流程如圖所示,更多細(xì)節(jié)可以參考。
六、消息過濾
類似于重復(fù)數(shù)據(jù)刪除技術(shù)(Data Deduplication),可以在源端做,也可以在目的端實(shí)現(xiàn),就是網(wǎng)絡(luò)和存儲(chǔ)的權(quán)衡,如果在Broker端做消息過濾就需要逐一比對(duì)consume queue 的 tagsCode 字段(hashcode),如果符合則傳輸給消費(fèi)者,因?yàn)槭?hashcode,所以存在誤判,需要在 Consumer 接收到消息后進(jìn)行字符串級(jí)別的過濾,確保準(zhǔn)確性。
小結(jié)
這次代碼閱讀主要著眼于消息的發(fā)送過程和Broker上的存儲(chǔ),其他方面的細(xì)節(jié)有待深入。
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。
本文標(biāo)題:RocketMQ源碼閱讀-創(chuàng)新互聯(lián)
分享路徑:http://www.rwnh.cn/article46/cecdhg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站設(shè)計(jì)、商城網(wǎng)站、移動(dòng)網(wǎng)站建設(shè)、小程序開發(fā)、網(wǎng)站設(shè)計(jì)、建站公司
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容