這篇文章主要介紹“怎么用c#寫(xiě)開(kāi)源分布式消息隊(duì)列equeue”,在日常操作中,相信很多人在怎么用c#寫(xiě)開(kāi)源分布式消息隊(duì)列equeue問(wèn)題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”怎么用c#寫(xiě)開(kāi)源分布式消息隊(duì)列equeue”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!
創(chuàng)新互聯(lián)公司是一家集成都網(wǎng)站制作、成都網(wǎng)站建設(shè)、外貿(mào)營(yíng)銷(xiāo)網(wǎng)站建設(shè)、網(wǎng)站頁(yè)面設(shè)計(jì)、網(wǎng)站優(yōu)化SEO優(yōu)化為一體的專業(yè)的建站公司,已為成都等多地近百家企業(yè)提供網(wǎng)站建設(shè)服務(wù)。追求良好的瀏覽體驗(yàn),以探求精品塑造與理念升華,設(shè)計(jì)最適合用戶的網(wǎng)站頁(yè)面。 合作只是第一步,服務(wù)才是根本,我們始終堅(jiān)持講誠(chéng)信,負(fù)責(zé)任的原則,為您進(jìn)行細(xì)心、貼心、認(rèn)真的服務(wù),與眾多客戶在蓬勃發(fā)展的市場(chǎng)環(huán)境中,互促共生。
Topic
一個(gè)topic就是一個(gè)主題。一個(gè)系統(tǒng)中,我們可以對(duì)消息劃分為一些topic,這樣我們就能通過(guò)topic,將消息發(fā)送到不同的queue。
Queue
一個(gè)topic下,我們可以設(shè)置多個(gè)queue,每個(gè)queue就是我們平時(shí)所說(shuō)的消息隊(duì)列;因?yàn)閝ueue是完全從屬于某個(gè)特定的topic的,所以當(dāng)我們要發(fā)送消息時(shí),總是要指定該消息所屬的topic是什么。然后equeue就能知道該topic下有幾個(gè)queue了。但是到底發(fā)送到哪個(gè)queue呢?比如一個(gè)topic下有4個(gè)queue,那對(duì)于這個(gè)topic下的消息,發(fā)送時(shí),到底該發(fā)送到哪個(gè)queue呢?那必定有個(gè)消息被路由的過(guò)程。目前equeue的做法是在發(fā)送一個(gè)消息時(shí),需要用戶指定這個(gè)消息對(duì)應(yīng)的topic以及一個(gè)用來(lái)路由的一個(gè)object類型的參數(shù)。equeue會(huì)根據(jù)topic得到所有的queue,然后根據(jù)該object參數(shù)通過(guò)hash code然后取模queue的個(gè)數(shù)***得到要發(fā)送的queue的編號(hào),從而知道該發(fā)送到哪個(gè)queue。這個(gè)路由消息的過(guò)程是在發(fā)送消息的這一方做的,也就是下面要說(shuō)的producer。之所以不在消息服務(wù)器上做是因?yàn)檫@樣可以讓用戶自己決定該如何路由消息,具有更大的靈活性。
Producer
就是消息隊(duì)列的生產(chǎn)者。我們知道,消息隊(duì)列的本質(zhì)就是實(shí)現(xiàn)了publish-subscribe的模式,即生產(chǎn)者-消費(fèi)者模式。生產(chǎn)者生產(chǎn)消息,消費(fèi)者消費(fèi)消息。所以這里的Producer就是用來(lái)生產(chǎn)和發(fā)送消息的。
Consumer
就是消息隊(duì)列的消費(fèi)者,一個(gè)消息可以有多個(gè)消費(fèi)者。
Consumer Group
消費(fèi)者分組,這可能對(duì)大家來(lái)說(shuō)是一個(gè)新概念。之所以要搞出一個(gè)消費(fèi)者分組,是為了實(shí)現(xiàn)下面要說(shuō)的集群消費(fèi)。一個(gè)消費(fèi)者分組中包含了一些消費(fèi)者,如果這些消費(fèi)者是要集群消費(fèi),那這些消費(fèi)者會(huì)平均消費(fèi)該分組中的消息。
Broker
equeue中的broker負(fù)責(zé)消息的中轉(zhuǎn),即接收producer發(fā)送過(guò)來(lái)的消息,然后持久化消息到磁盤(pán),然后接收consumer發(fā)送過(guò)來(lái)的拉取消息的請(qǐng)求,然后根據(jù)請(qǐng)求拉取相應(yīng)的消息給consumer。所以,broker可以理解為消息隊(duì)列服務(wù)器,提供消息的接收、存儲(chǔ)、拉取服務(wù)??梢?jiàn),broker對(duì)于equeue來(lái)說(shuō)是核心,它絕對(duì)不能掛,一旦掛了,那producer,consumer就無(wú)法實(shí)現(xiàn)publish-subscribe了。
集群消費(fèi)
集群消費(fèi)是指,一個(gè)consumer group下的consumer,平均消費(fèi)topic下的queue。具體如何平均可以看一下下面的架構(gòu)圖,這里先用文字簡(jiǎn)單描述一下。假如一個(gè)topic下有4個(gè)queue,然后當(dāng)前有一個(gè)consumer group,該分組下有4個(gè)consumer,那每個(gè)consumer就被分配到該topic下的一個(gè)queue,這樣就達(dá)到了平均消費(fèi)topic下的queue的目的。如果consumer group下只有兩個(gè)consumer,那每個(gè)consumer就消費(fèi)2個(gè)queue。如果有3個(gè)consumer,則***個(gè)消費(fèi)2個(gè)queue,后面兩個(gè)每個(gè)消費(fèi)一個(gè)queue,從而達(dá)到盡量平均消費(fèi)。所以,可以看出,我們應(yīng)該盡量讓consumer group下的consumer的數(shù)目和topic的queue的數(shù)目一致或成倍數(shù)關(guān)系。這樣每個(gè)consumer消費(fèi)的queue的數(shù)量總是一樣的,這樣每個(gè)consumer服務(wù)器的壓力才會(huì)差不多。當(dāng)前前提是這個(gè)topic下的每個(gè)queue里的消息的數(shù)量總是差不多多的。這點(diǎn)我們可以對(duì)消息根據(jù)某個(gè)用戶自己定義的key來(lái)進(jìn)行hash路由來(lái)保證。
廣播消費(fèi)
廣播消費(fèi)是指一個(gè)consumer只要訂閱了某個(gè)topic的消息,那它就會(huì)收到該topic下的所有queue里的消息,而不管這個(gè)consumer的group是什么。所以對(duì)于廣播消費(fèi)來(lái)說(shuō),consumer group沒(méi)什么實(shí)際意義。consumer可以在實(shí)例化時(shí),我們可以指定是集群消費(fèi)還是廣播消費(fèi)。
消費(fèi)進(jìn)度(offset)
消費(fèi)進(jìn)度是指,當(dāng)一個(gè)consumer group里的consumer在消費(fèi)某個(gè)queue里的消息時(shí),equeue是通過(guò)記錄消費(fèi)位置(offset)來(lái)知道當(dāng)前消費(fèi)到哪里了。以便該consumer重啟后繼續(xù)從該位置開(kāi)始消費(fèi)。比如一個(gè)topic有4個(gè)queue,一個(gè)consumer group有4個(gè)consumer,則每個(gè)consumer分配到一個(gè)queue,然后每個(gè)consumer分別消費(fèi)自己的queue里的消息。equeue會(huì)分別記錄每個(gè)consumer對(duì)其queue的消費(fèi)進(jìn)度,從而保證每個(gè)consumer重啟后知道下次從哪里開(kāi)始繼續(xù)消費(fèi)。實(shí)際上,也許下次重啟后不是由該consumer消費(fèi)該queue了,而是由group里的其他consumer消費(fèi)了,這樣也沒(méi)關(guān)系,因?yàn)槲覀円呀?jīng)記錄了這個(gè)queue的消費(fèi)位置了。所以可以看出,消費(fèi)位置和consumer其實(shí)無(wú)關(guān),消費(fèi)位置完全是queue的一個(gè)屬性,用來(lái)記錄當(dāng)前被消費(fèi)到哪里了。另外一點(diǎn)很重要的是,一個(gè)topic可以被多個(gè)consumer group里的consumer訂閱。不同consumer group里的consumer即便是消費(fèi)同一個(gè)topic下的同一個(gè)queue,那消費(fèi)進(jìn)度也是分開(kāi)存儲(chǔ)的。也就是說(shuō),不同的consumer group內(nèi)的consumer的消費(fèi)完全隔離,彼此不受影響。還有一點(diǎn)就是,對(duì)于集群消費(fèi)和廣播消費(fèi),消費(fèi)進(jìn)度持久化的地方是不同的,集群消費(fèi)的消費(fèi)進(jìn)度是放在broker,也就是消息隊(duì)列服務(wù)器上的,而廣播消費(fèi)的消費(fèi)進(jìn)度是存儲(chǔ)在consumer本地磁盤(pán)上的。之所以這樣設(shè)計(jì)是因?yàn)?,?duì)于集群消費(fèi),由于一個(gè)queue的消費(fèi)者可能會(huì)更換,因?yàn)閏onsumer group下的consumer數(shù)量可能會(huì)增加或減少,然后就會(huì)重新計(jì)算每個(gè)consumer該消費(fèi)的queue是哪些,這個(gè)能理解的把?所以,當(dāng)出現(xiàn)一個(gè)queue的consumer變動(dòng)的時(shí)候,新的consumer如何知道該從哪里開(kāi)始消費(fèi)這個(gè)queue呢?如果這個(gè)queue的消費(fèi)進(jìn)度是存儲(chǔ)在前一個(gè)consumer服務(wù)器上的,那就很難拿到這個(gè)消費(fèi)進(jìn)度了,因?yàn)橛锌赡苣莻€(gè)服務(wù)器已經(jīng)掛了,或者下架了,都有可能。而因?yàn)閎roker對(duì)于所有的consumer總是在服務(wù)的,所以,在集群消費(fèi)的情況下,被訂閱的topic的queue的消費(fèi)位置是存儲(chǔ)在broker上的,存儲(chǔ)的時(shí)候按照不同的consumer group做隔離,以確保不同的consumer group下的consumer的消費(fèi)進(jìn)度互補(bǔ)影響。然后,對(duì)于廣播消費(fèi),由于不會(huì)出現(xiàn)一個(gè)queue的consumer會(huì)變動(dòng)的情況,所以我們沒(méi)必要讓broker來(lái)保存消費(fèi)位置,所以是保存在consumer自己的服務(wù)器上。
equeue是什么?
通過(guò)上圖,我們能直觀的理解equeue。這個(gè)圖是從rocketmq的設(shè)計(jì)文檔中拿來(lái)的,呵呵。由于equeue的設(shè)計(jì)思想完全和rocketmq一致,所以我就拿過(guò)來(lái)用了。每個(gè)producer可以向某個(gè)topic發(fā)消息,發(fā)送的時(shí)候根據(jù)某種路由策略(producer可自定義)將消息發(fā)送到某個(gè)特定的queue。然后consumer可以消費(fèi)特定topic下的queue里的消息。上圖中,TOPIC_A有兩個(gè)消費(fèi)者,這兩個(gè)消費(fèi)者是在一個(gè)group里,所以應(yīng)該平均消費(fèi)TOPIC_A下的queue但由于有三個(gè)queue,所以***個(gè)consumer分到了2個(gè)queue,第二個(gè)consumer分到了1個(gè)。對(duì)于TOPIC_B,由于只有一個(gè)消費(fèi)者,那TOPIC_B下的所有queue都由它消費(fèi)。所有的topic信息、queue信息、還有消息本身,都存儲(chǔ)在broker服務(wù)器上。這點(diǎn)上圖中沒(méi)有體現(xiàn)出來(lái)。上圖主要關(guān)注producer,consumer,topic,queue這四個(gè)東西之間的關(guān)系,并不關(guān)注物理服務(wù)器的部署架構(gòu)。
關(guān)鍵問(wèn)題的思考
1.producer,broker,consumer三者之間如何通信
由于是用c#實(shí)現(xiàn),且因?yàn)橐话闶窃诰钟蚓W(wǎng)內(nèi)部署,為了實(shí)現(xiàn)高性能通信,我們可以利用異步socket來(lái)通信。.net本身提供了很好的異步socket通信的支持;我們也可以用zeromq來(lái)實(shí)現(xiàn)高性能的socket通信。本來(lái)想直接使用zeromq來(lái)實(shí)現(xiàn)通信模塊就好了,但后來(lái)自己學(xué)習(xí)了一下.net自帶的socket通信相關(guān)知識(shí),發(fā)現(xiàn)也不難,所以就自己實(shí)現(xiàn)了一個(gè),呵呵。自己實(shí)現(xiàn)的好處是我可以自己定義消息的協(xié)議,目前這部分實(shí)現(xiàn)代碼在ecommon基礎(chǔ)類庫(kù)中,是一個(gè)獨(dú)立的可服用的與業(yè)務(wù)場(chǎng)景無(wú)關(guān)的基礎(chǔ)類庫(kù)。有興趣的可以去下載下來(lái)看看代碼。經(jīng)過(guò)了自己的一些性能測(cè)試,發(fā)現(xiàn)通信模塊的性能還是不錯(cuò)的。一臺(tái)broker,四臺(tái)producer同時(shí)向這個(gè)broker發(fā)送消息,每秒能發(fā)送的消息4W沒(méi)有問(wèn)題,更多的producer還沒(méi)測(cè)試。
2.消息如何持久化
消息持久化方面主要考慮的是性能問(wèn)題,還有就是消息如何快速的讀取。
1. 首先,一臺(tái)broker上的消息不需要一直保存在該broker服務(wù)器上,因?yàn)檫@些消息總會(huì)被消費(fèi)掉。根據(jù)阿里rocketmq的設(shè)計(jì),默認(rèn)會(huì)1天刪除一次已經(jīng)被消費(fèi)過(guò)的消息。所以,我們可以理解,broker上的消息應(yīng)該不會(huì)無(wú)限制增長(zhǎng),因?yàn)闀?huì)被定期刪除。所以不必考慮一臺(tái)broker上消息放不下的問(wèn)題。
2. 如何快速的持久化消息?一般來(lái)說(shuō),我覺(jué)得有兩種方式:1)順序?qū)懘疟P(pán)文件;2)用現(xiàn)成的key,value的NOSQL產(chǎn)品來(lái)存儲(chǔ);rocketmq目前用的是自己寫(xiě)文件的方式,這種方式的難點(diǎn)是寫(xiě)文件比較復(fù)雜,因?yàn)樗邢⒍际琼樞騛ppend到文件末尾,雖然性能非常高,但復(fù)雜度也很高;比如所有消息不能全寫(xiě)在一個(gè)文件里,一個(gè)文件到達(dá)一定大小后需要拆分,一旦拆分就會(huì)產(chǎn)生很多問(wèn)題,呵呵。拆分后如何讀取也是比較復(fù)雜的問(wèn)題。還有由于是順序?qū)懭胛募?,那我們還需要把每一個(gè)消息在文件中的起始位置和長(zhǎng)度需要記錄下來(lái),這樣consumer在消費(fèi)消息時(shí),才能根據(jù)offset從文件中拿到該消息??傊枰紤]的問(wèn)題很多。如果是用nosql來(lái)持久化消息,那可以省去我們寫(xiě)文件時(shí)遇到的各種問(wèn)題,我們只需要關(guān)心如何把消息的key和該消息在queue中的offset對(duì)應(yīng)起來(lái)即可。另外一點(diǎn)疑問(wèn)是,queue里的信息要持久化嗎?先要想清楚queue里放的是什么東西。當(dāng)broker接收到一個(gè)消息后,首先肯定是要先持久化,完成后需要把消息放入queue里。但由于內(nèi)存很有限,我們不可能把這個(gè)消息直接放入queue里,我們其實(shí)要放的只需要時(shí)該消息在nosql里的key即可,或者如果是用文件來(lái)持久化,那放的是該消息在文件中的偏移量offset,即存儲(chǔ)在文件的那個(gè)位置(比如是哪個(gè)行號(hào))。所以,實(shí)際上,queue只是一個(gè)消息的索引。那有必要持久化queue嗎?可以持久化,這樣畢竟在broker重啟的時(shí)候,恢復(fù)queue的時(shí)間可以縮短。那需要和持久化消息同步持久化嗎?顯然不需要,我們可以異步定時(shí)持久化每個(gè)queue,然后恢復(fù)queue的時(shí)候,可以先從持久化的部分恢復(fù),然后再把剩下的部分通過(guò)持久化的消息來(lái)補(bǔ)充以達(dá)到queue因?yàn)楫惒匠志没牟糠挚梢宰菲?。所以,?jīng)過(guò)上面的分析,消息本身都是放在nosql中,queue全部在內(nèi)存中。
那消息如何持久化呢?我覺(jué)得***的辦法是讓每個(gè)消息有一個(gè)全局的順序號(hào),一旦消息被寫(xiě)入nosql后,該消息的全局順序號(hào)就確定了,然后我們?cè)诟聦?duì)應(yīng)的queue的信息時(shí),把該消息的全局順序號(hào)傳給queue,這樣queue就能把queue自己對(duì)該消息的本地順序號(hào)和該消息的全局順序號(hào)建立映射關(guān)系。相關(guān)代碼如下:
public MessageStoreResult StoreMessage(Message message, int queueId) { var queues = GetQueues(message.Topic); var queueCount = queues.Count; if (queueId >= queueCount || queueId < 0) { throw new InvalidQueueIdException(message.Topic, queueCount, queueId); } var queue = queues[queueId]; var queueOffset = queue.IncrementCurrentOffset(); var storeResult = _messageStore.StoreMessage(message, queue.QueueId, queueOffset); queue.SetMessageOffset(queueOffset, storeResult.MessageOffset); return storeResult; }
沒(méi)什么比代碼更能說(shuō)明問(wèn)題了,呵呵。上的代碼的思路是,接收一個(gè)消息對(duì)象和一個(gè)queueId,queueId表示當(dāng)前消息要放到第幾個(gè)queue里。然后內(nèi)部邏輯是,先獲取該消息的topic的所有queue,由于queue和topic都在內(nèi)存,所以這里沒(méi)性能問(wèn)題。然后檢查一下當(dāng)前傳遞進(jìn)來(lái)的queueId是否合法。如果合法,那就定位到該queue,然后通過(guò)IncrementCurrentOffset方法,將queue的內(nèi)部序號(hào)加1并返回,然后持久化消息,持久化的時(shí)候把queueId以及queueOffset一起持久化,完成后返回一個(gè)消息的全局序列號(hào)。由于messageStore內(nèi)部會(huì)把消息內(nèi)容、queueId、queueOffset,以及消息的全局順序號(hào)一起作為一個(gè)整體保存到nosql中,key就是消息的全局序列號(hào),value就是前面說(shuō)的整體(被序列化為二進(jìn)制)。然后,在調(diào)用queue的SetMessageOffset方法,把queueOffset和message的全局offset建立映射關(guān)系即可。***返回一個(gè)結(jié)果。messageStore.StoreMessage的內(nèi)存實(shí)現(xiàn)大致如下:
public MessageStoreResult StoreMessage(Message message, int queueId, long queueOffset) { var offset = GetNextOffset(); _queueCurrentOffsetDict[offset] = new QueueMessage(message.Topic, message.Body, offset, queueId, queueOffset, DateTime.Now); return new MessageStoreResult(offset, queueId, queueOffset); }
GetNextOffset就是獲取下一個(gè)全局的消息序列號(hào),QueueMessage就是上面所說(shuō)的“整體”,因?yàn)槭莾?nèi)存實(shí)現(xiàn),所以就用了一個(gè)ConcurrentDictionary來(lái)保存一下queueMessage對(duì)象。如果是用nosql來(lái)實(shí)現(xiàn)messageStore,則這里需要寫(xiě)入nosql,key就是消息的全局序列號(hào),value就是queueMessage的二進(jìn)制序列化數(shù)據(jù)。通過(guò)上面的分析我們可以知道我們會(huì)將消息的全局序列號(hào)+queueId+queueOffset一起整體作為一條記錄持久化起來(lái)。這樣做有兩個(gè)非常好的特性:1)實(shí)現(xiàn)了消息持久化和消息在queue中的位置的持久化的原子事務(wù);2)我們總是可以根據(jù)這些持久化的queueMessage還原出所有的queue的信息,因?yàn)閝ueueMessage里包含了消息和消息在queue的中的位置信息;
基于這樣的消息存儲(chǔ),當(dāng)某個(gè)consumer要消費(fèi)某個(gè)位置的消息時(shí),我們可以通過(guò)先通過(guò)queueId找到queue,然后通過(guò)消息在queueOffset(由consumer傳遞過(guò)來(lái)的)獲取消息的全局offset,然后根據(jù)該全局的offset作為key從nosql拿到消息。實(shí)際上現(xiàn)在的equeue是批量拉取消息的,也就是一次socket請(qǐng)求不是拉一個(gè)消息,而是拉一批,默認(rèn)是32個(gè)消息。這樣consumer可以用更少的網(wǎng)絡(luò)請(qǐng)求拿到更多的消息,可以加快消息消費(fèi)的速度。
3.producer發(fā)送消息時(shí)的消息路由的細(xì)節(jié)
producer在發(fā)送消息時(shí),如何知道當(dāng)前topic下有多少個(gè)queue呢?每次發(fā)送消息時(shí)都要去broker上查一下嗎?顯然不行,這樣發(fā)送消息的性能就上不去了。那怎么辦呢?就是異步,呵呵。producer可以定時(shí)向broker發(fā)送請(qǐng)求,獲取topic下的queue數(shù)量,然后保存起來(lái)。這樣每次producer在發(fā)送消息時(shí),就只要從本地緩存里拿即可。因?yàn)閎roker上topic的queue的數(shù)量一般不會(huì)變化,所以這樣的緩存很有意義。那還有一個(gè)問(wèn)題,當(dāng)前producer***次對(duì)某個(gè)topic發(fā)送消息時(shí),queue哪里來(lái)呢?因?yàn)槎〞r(shí)線程不知道要向broker拿哪個(gè)topic下的queue數(shù)量,因?yàn)榇藭r(shí)producer端還沒(méi)有一個(gè)topic呢,因?yàn)橐粋€(gè)消息都還沒(méi)發(fā)送過(guò)。那就是需要判斷一下,如果當(dāng)前topic沒(méi)有queue的count信息,則直接從broker上獲取queue的count信息。然后再緩存起來(lái),在發(fā)送當(dāng)前消息。然后第二次發(fā)送時(shí),因?yàn)榫彺胬镆呀?jīng)有了該消息,所以就不必再?gòu)腷roker拿了,且后續(xù)定時(shí)線程也會(huì)自動(dòng)去更新該topic下的queue的count了。好,producer有了topic的queue的count,那用戶在發(fā)送消息時(shí),框架就能把這個(gè)topic的queueCount傳遞給用戶,然后用戶就能根據(jù)自己的需要將消息路由到第幾個(gè)queue了。
4.consumer負(fù)載均衡如何實(shí)現(xiàn)
consumer負(fù)載均衡的意思是指,在消費(fèi)者集群消費(fèi)的情況下,如何讓同一個(gè)consumer group里的消費(fèi)者平均消費(fèi)同一個(gè)topic下的queue。所以這個(gè)負(fù)載均衡本質(zhì)上是一個(gè)將queue平均分配給consumer的過(guò)程。那么怎么實(shí)現(xiàn)呢?通過(guò)上面負(fù)載均衡的定義,我們只要,要做負(fù)載均衡,必須要確定consumer group和topic;然后拿到consumer group下的所有consumer,以及topic下的所有queue;然后對(duì)于當(dāng)前的consumer,就能計(jì)算出來(lái)當(dāng)前consumer應(yīng)該被分配到哪些queue了。我們可以通過(guò)如下的函數(shù)來(lái)得到當(dāng)前的consumer應(yīng)該被分配到哪幾個(gè)queue。
public class AverageAllocateMessageQueueStrategy : IAllocateMessageQueueStrategy { public IEnumerable<MessageQueue> Allocate(string currentConsumerId, IList<MessageQueue> totalMessageQueues, IList<string> totalConsumerIds) { var result = new List<MessageQueue>(); if (!totalConsumerIds.Contains(currentConsumerId)) { return result; } var index = totalConsumerIds.IndexOf(currentConsumerId); var totalMessageQueueCount = totalMessageQueues.Count; var totalConsumerCount = totalConsumerIds.Count; var mod = totalMessageQueues.Count() % totalConsumerCount; var size = mod > 0 && index < mod ? totalMessageQueueCount / totalConsumerCount + 1 : totalMessageQueueCount / totalConsumerCount; var averageSize = totalMessageQueueCount <= totalConsumerCount ? 1 : size; var startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; var range = Math.Min(averageSize, totalMessageQueueCount - startIndex); for (var i = 0; i < range; i++) { result.Add(totalMessageQueues[(startIndex + i) % totalMessageQueueCount]); } return result; } }
函數(shù)里的實(shí)現(xiàn)就不多分析了。這個(gè)函數(shù)的目的就是根據(jù)給定的輸入,返回當(dāng)前consumer該分配到的queue。分配的原則就是平均分配。好了,有了這個(gè)函數(shù),我們就能很方便的實(shí)現(xiàn)負(fù)載均衡了。我們可以對(duì)每一個(gè)正在運(yùn)行的consumer內(nèi)部開(kāi)一個(gè)定時(shí)job,該job每隔一段時(shí)間進(jìn)行一次負(fù)載均衡,也就是執(zhí)行一次上面的函數(shù),得到當(dāng)前consumer該綁定的***queue。因?yàn)槊總€(gè)consumer都有一個(gè)groupName屬性,用于表示當(dāng)前consumer屬于哪個(gè)group。所以,我們就可以在負(fù)載均衡時(shí)到broker獲取當(dāng)前group下的所有consumer;另一方面,因?yàn)槊總€(gè)consumer都知道它自己訂閱了哪些topic,所以有了topic信息,就能獲取topic下的所有queue的信息了,有了這兩樣信息,每個(gè)consumer就能自己做負(fù)載均衡了。先看一下下面的代碼:
_scheduleService.ScheduleTask(Rebalance, Setting.RebalanceInterval, Setting.RebalanceInterval); _scheduleService.ScheduleTask(UpdateAllTopicQueues, Setting.UpdateTopicQueueCountInterval, Setting.UpdateTopicQueueCountInterval); _scheduleService.ScheduleTask(SendHeartbeat, Setting.HeartbeatBrokerInterval, Setting.HeartbeatBrokerInterval);
每個(gè)consumer內(nèi)部都會(huì)啟動(dòng)三個(gè)定時(shí)的task,***個(gè)task表示要定時(shí)做一次負(fù)載均衡;第二個(gè)task表示要定時(shí)更新當(dāng)前consumer訂閱的所有topic的queueCount信息,并把***的queueCount信息都保存在本地;第三個(gè)task表示當(dāng)前consumer會(huì)向broker定時(shí)發(fā)送心跳,這樣broker就能通過(guò)心跳知道某個(gè)consumer是否還活著,broker上維護(hù)了所有的consumer信息。一旦有新增或者發(fā)現(xiàn)沒(méi)有及時(shí)發(fā)送心跳過(guò)來(lái)的consumer,就會(huì)認(rèn)為有新增或者死掉的consumer。因?yàn)閎roker上維護(hù)了所有的consumer信息,所以他就能提供查詢服務(wù),比如根據(jù)某個(gè)consumer group查詢?cè)揼roup下的consumer。
通過(guò)這三個(gè)定時(shí)任務(wù),就能完成消費(fèi)者的負(fù)載均衡了。先看一下Rebalance方法:
private void Rebalance() { foreach (var subscriptionTopic in _subscriptionTopics) { try { RebalanceClustering(subscriptionTopic); } catch (Exception ex) { _logger.Error(string.Format("[{0}]: rebalanceClustering for topic [{1}] has exception", Id, subscriptionTopic), ex); } } }
代碼很簡(jiǎn)單,就是對(duì)每個(gè)訂閱的topic做負(fù)載均衡處理。再看一下RebalanceClustering方法:
上面的代碼不多分析了,就是先根據(jù)consumer group和topic獲取所有的consumer,然后對(duì)consumer做排序處理。之所以要做排序處理是為了確保負(fù)載均衡時(shí)對(duì)已有的分配情況盡量不發(fā)生改變。接下來(lái)就是從本地獲取topic下的所有queue,同樣根據(jù)queueId做一下排序。然后就是調(diào)用上面的分配算法計(jì)算出當(dāng)前consumer應(yīng)該分配到哪些queue。***調(diào)用UpdatePullRequestDict方法,用來(lái)對(duì)新增或刪除的queue做處理。對(duì)于新增的queue,要?jiǎng)?chuàng)建一個(gè)獨(dú)立的worker線程,開(kāi)始從broker拉取消息;對(duì)于刪除的queue,要停止其對(duì)應(yīng)的work,停止拉取消息。
通過(guò)上面的介紹和分析,我們大家知道了equeue是如何實(shí)現(xiàn)消費(fèi)者的負(fù)載均衡的。我們可以看出,因?yàn)槊總€(gè)topic下的queue的更新是異步的定時(shí)的,且負(fù)載均衡本身也是定時(shí)的,且broker上維護(hù)的consumer的信息也不是事實(shí)的,因?yàn)槊總€(gè)consumer發(fā)送心跳到broker不是實(shí)時(shí)發(fā)送的,而是比如每隔5s發(fā)送一次。所有這些因?yàn)槎际钱惒降脑O(shè)計(jì),所以可能會(huì)導(dǎo)致在負(fù)載均衡的過(guò)程中,同一個(gè)queue可能會(huì)被兩個(gè)消費(fèi)者同時(shí)消費(fèi)。這個(gè)就是所謂的,我們只能做到一個(gè)消息至少被消費(fèi)一次,但equeue層面做不到一個(gè)消息只會(huì)被消費(fèi)一次。實(shí)際上像rocketmq這種也是這樣的思路,放棄一個(gè)消息只會(huì)被消費(fèi)一次的實(shí)現(xiàn)(因?yàn)榇鷥r(jià)太大,且過(guò)于復(fù)雜,實(shí)際上對(duì)于分布式的環(huán)境,不太可能做到一個(gè)消息只會(huì)被消費(fèi)一次),而是采用確保一個(gè)消息至少會(huì)被消費(fèi)一次(即at least once).所以使用equeue,應(yīng)用方要自己做好對(duì)每個(gè)消息的冪等處理。
5.如何實(shí)現(xiàn)實(shí)時(shí)消息推送
消息的實(shí)時(shí)推送,一般有兩種做法:推模式(push)和拉模式(pull)。push的方式是指broker主動(dòng)對(duì)所有訂閱了該topic的消費(fèi)者推送消息;pull的方式是指消費(fèi)者主動(dòng)到broker上拉取消息;對(duì)于推模式,***的好處就是實(shí)時(shí),因?yàn)橐挥行碌南?,就?huì)立即推送給消費(fèi)者。但是有一個(gè)缺點(diǎn)就是如果消費(fèi)者來(lái)不及消費(fèi),它也會(huì)給消費(fèi)者推消息,這樣就會(huì)導(dǎo)致消費(fèi)者端的消息會(huì)堵塞。而通過(guò)拉的方式,有兩種實(shí)現(xiàn):1)輪訓(xùn)的方式拉,比如每隔5s輪訓(xùn)一下是否有新消息,這種方式的缺點(diǎn)是消息不實(shí)時(shí),但是消費(fèi)進(jìn)度完全由消費(fèi)者自己把控了;2)開(kāi)長(zhǎng)連接的方式來(lái)拉,就是不輪訓(xùn),消費(fèi)者和broker之間一直保持的連接通道,然后broker一有新消息,就會(huì)利用這個(gè)通道把消息發(fā)送給消費(fèi)者。
equeue中目前采用的是通過(guò)長(zhǎng)連接拉取消息的方式。長(zhǎng)連接通過(guò)socket長(zhǎng)連接實(shí)現(xiàn)。但是雖然叫長(zhǎng)連接,也不是一直不斷開(kāi),而是也會(huì)設(shè)計(jì)一個(gè)超時(shí)的限制,比如一個(gè)長(zhǎng)連接***不超過(guò)15s,超過(guò)15s,則broker發(fā)送回復(fù)給consumer,告訴consumer當(dāng)前沒(méi)有新消息;然后consumer接受到這個(gè)回復(fù)后,就知道要繼續(xù)發(fā)起下一個(gè)長(zhǎng)連接來(lái)拉取。然后假如在這15s中之內(nèi),broker上有新消息了,則broker就能立即主動(dòng)利用這個(gè)長(zhǎng)連接通知相應(yīng)的消費(fèi)者,把消息傳給消費(fèi)者。所以,可以看出,broker上在處理消費(fèi)者的拉取消息的請(qǐng)求時(shí),如果當(dāng)前沒(méi)有新消息,則會(huì)hold住這個(gè)socket連接,最多hold 15s,超過(guò)15s,則發(fā)送返回信息,告訴消費(fèi)者當(dāng)前無(wú)消息,然后消費(fèi)者再次發(fā)送pull message request過(guò)來(lái)。通過(guò)這樣的基于長(zhǎng)連接的拉取模式,我們可以實(shí)現(xiàn)兩個(gè)好處:1)消息實(shí)時(shí)推送;2)由消費(fèi)者控制消息消費(fèi)進(jìn)度;
另外,equeue里還實(shí)現(xiàn)了消費(fèi)者自身的自動(dòng)限流功能。就是假如當(dāng)前broker上消息很多,即生產(chǎn)者生產(chǎn)消息的速度大于消費(fèi)者消費(fèi)消息的速度,那broker上就會(huì)有消息被堆積。那此時(shí)消費(fèi)者在拉取消息時(shí),總是會(huì)有新消息拉取到,但是消費(fèi)者又來(lái)不及處理這么多消息。所以equeue框架內(nèi)置了一個(gè)限流(流控,流量控制)的設(shè)計(jì),就是可以允許用于配制一個(gè)消費(fèi)者端堆積的消息的上限,比如3000,超過(guò)這個(gè)數(shù)目(可配置),則equeue會(huì)讓消費(fèi)者以慢一點(diǎn)的頻率拉取消息。比如延遲個(gè)多少毫秒(延遲時(shí)間可配置)再拉取。這樣就簡(jiǎn)單的實(shí)現(xiàn)了流控的目的。
6.如何處理消息消費(fèi)失敗的情況
作為一個(gè)消息隊(duì)列,消費(fèi)者總是可能會(huì)在消費(fèi)消息時(shí)拋出異常,在equeue中這種情況就是消息消費(fèi)失敗的情況。通過(guò)上面的消費(fèi)進(jìn)度的介紹,大家知道了每個(gè)queue對(duì)某個(gè)特定的consumer group,都有一個(gè)唯一的消費(fèi)進(jìn)度。實(shí)際上,消息被拉取到consumer本地后,可能會(huì)被以兩種方式消費(fèi),一種是并行消費(fèi),一種是線性消費(fèi)。
并行消費(fèi)的意思是,假如當(dāng)前一次性拉取過(guò)來(lái)32個(gè)消息,那equeue會(huì)通過(guò)啟動(dòng)task(即開(kāi)多線程)的方式并行消費(fèi)每個(gè)消息;
線性消費(fèi)的意思是,消息是在一個(gè)獨(dú)立的單線程中順序消費(fèi),消費(fèi)順序和拉取過(guò)來(lái)的順序相同。
對(duì)于線性消費(fèi),假如前一個(gè)消息消費(fèi)的時(shí)候失敗了,也就是拋異常了,那該怎么辦呢?可能想到的辦法是重試個(gè)3次,但是要是重試后還是失敗呢?總不能因?yàn)檫@個(gè)消息而導(dǎo)致后面的消息無(wú)法把消費(fèi)吧?呵呵!對(duì)于這種情況,先說(shuō)一下rocketmq里的處理方式吧:它的做法是,當(dāng)遇到消費(fèi)失敗的情況,沒(méi)有立馬重試,而是直接把這個(gè)消息發(fā)送到broker上的某個(gè)重試隊(duì)列,發(fā)送成功后,就可以往下消費(fèi)下一個(gè)消息了。因?yàn)橐坏┌l(fā)送到重試隊(duì)列,那意味著這個(gè)消息就***總是會(huì)被消費(fèi)了,因?yàn)樵撓⒉粫?huì)丟了。但是要是發(fā)送到broker的重試隊(duì)列也不成功呢?這個(gè)?!其實(shí)這種情況不大應(yīng)該出現(xiàn),如果出現(xiàn),那基本就是broker掛了,呵呵。
rocketmq中,對(duì)于這種情況,那會(huì)把這個(gè)失敗的消息放入本地內(nèi)存隊(duì)列,慢慢消費(fèi)它。然后繼續(xù)往后消費(fèi)后面的消息?,F(xiàn)在你一定很關(guān)心queue的offset是如何更新的?這里涉及到一個(gè)滑動(dòng)門(mén)的概念。當(dāng)一批消息從broker拉取到消費(fèi)者本地后,并不是馬上消費(fèi)的,而是先放入一個(gè)本地的SortedDictionary,key就是消息在queue里的位置,value就是消息本身。因?yàn)槭且粋€(gè)排序的dictionary,所以key最小的消息意味著是最前面的消息,***的消息就是***面的消息。然后不管是并行消費(fèi)還是線性消費(fèi),只要某個(gè)消息被消費(fèi)了,那就從這個(gè)SortedDictionary里移除掉。每次被移除一個(gè)消息時(shí),總是會(huì)返回當(dāng)前這個(gè)SortedDictionary里的最小的key,然后我們就能判斷這個(gè)key是否和上次比是否前移了,如果是,則更新queue的這個(gè)***的offset。因?yàn)槊看我瞥粋€(gè)消息的時(shí)候,總是返回當(dāng)前SortedDictionary里的最小的key,所以,假如當(dāng)前offset是3,然后offset為4的這個(gè)消息一直消費(fèi)失敗,所以不會(huì)被移除,但是offset為5,6,7,8的這些消息雖然都消費(fèi)成功了,但是只要offset為4的這個(gè)消息沒(méi)有被移除,那最小的key就不會(huì)往前移動(dòng)。這個(gè)就是所謂的滑動(dòng)門(mén)的概念了。就好比是在鐵軌上一輛在跑的動(dòng)車(chē),offset的往前移動(dòng)就好比是動(dòng)車(chē)在不斷往前移動(dòng)。因?yàn)槲覀兿M鹢ffset總是會(huì)不斷往前移動(dòng),所以不希望前面的某個(gè)消費(fèi)失敗的消息讓這個(gè)滑動(dòng)門(mén)停止移動(dòng)(即我們總是希望這個(gè)最小的key能不斷變大),所以我們會(huì)想方設(shè)法讓消費(fèi)失敗的消息能不阻礙滑動(dòng)門(mén)的往前移動(dòng)。所以才把消費(fèi)失敗的消息放入重試隊(duì)列。
另外一點(diǎn)需要注意一下:并不是每次成功消費(fèi)完一個(gè)消息,就會(huì)立馬告訴broker更新offset,因?yàn)檫@樣那性能肯定很低,broker也會(huì)忙死,更好的辦法是先只是在本地內(nèi)存更新queue的offset,然后定時(shí)比如5s一次,將***的offset更新到broker。所以,因?yàn)檫@個(gè)異步的存在,同樣也會(huì)導(dǎo)致某個(gè)消息被重復(fù)消費(fèi)的可能性,因?yàn)閎roker上的offset肯定比實(shí)際的消費(fèi)進(jìn)度要慢,有5s的時(shí)間差。所以,再次強(qiáng)調(diào),應(yīng)用方必須要處理好對(duì)消息的冪等處理!比如enode框架中,對(duì)每個(gè)command消息,框架內(nèi)部都做了command的冪等處理。所以使用enode框架的應(yīng)用,自身無(wú)需對(duì)command做冪等處理方面的考慮。
上面提到了并行消費(fèi)和線性消費(fèi),其實(shí)對(duì)于offset的更新來(lái)說(shuō)是一樣的,因?yàn)椴⑿邢M(fèi)無(wú)非是多線程同時(shí)從SortedDictionary中移除消費(fèi)成功的消息,而單線程只是單個(gè)線程去移除SortedDictionary中的消息。所以我們要通過(guò)鎖的機(jī)制,保證對(duì)SortedDictionary的操作是線程安全的。目前用了ReaderWriterLockSlim來(lái)實(shí)現(xiàn)對(duì)方法調(diào)用的線層安全。有興趣的朋友可以去看一下代碼。
***,也是重點(diǎn),呵呵。equeue目前還沒(méi)有實(shí)現(xiàn)將失敗的消息發(fā)回到broker的重試隊(duì)列。這個(gè)功能以后會(huì)考慮加進(jìn)去。
7.如何解決broker的單點(diǎn)問(wèn)題
這個(gè)問(wèn)題比較復(fù)雜,目前equeue不支持broker的master-salve或master-master,而是單點(diǎn)的。我覺(jué)得一個(gè)成熟的消息隊(duì)列,為了確保在一個(gè)broker掛了的時(shí)候,要盡量能確保有其他broker可以接替它,這樣才能讓消息隊(duì)列服務(wù)器的可靠性。但是這個(gè)問(wèn)題實(shí)在太復(fù)雜。rocketmq目前實(shí)現(xiàn)的也只是master-slave的方式。也就是只要主的master掛了,那producer就無(wú)法向broker發(fā)送消息了,因?yàn)閟lave的broker是只讀的,不能直接接受新消息,slave的broker只能允許被consumer拉取消息。
到此,關(guān)于“怎么用c#寫(xiě)開(kāi)源分布式消息隊(duì)列equeue”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!
網(wǎng)站欄目:怎么用c#寫(xiě)開(kāi)源分布式消息隊(duì)列equeue
文章URL:http://www.rwnh.cn/article30/pgsipo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站策劃、虛擬主機(jī)、定制開(kāi)發(fā)、移動(dòng)網(wǎng)站建設(shè)、手機(jī)網(wǎng)站建設(shè)、營(yíng)銷(xiāo)型網(wǎng)站建設(shè)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)