這篇文章給大家分享的是有關(guān)如何使用RabbitMQ實(shí)現(xiàn)RPC的內(nèi)容。小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,一起跟隨小編過來看看吧。
創(chuàng)新互聯(lián)公司是一家集網(wǎng)站建設(shè),周口企業(yè)網(wǎng)站建設(shè),周口品牌網(wǎng)站建設(shè),網(wǎng)站定制,周口網(wǎng)站建設(shè)報(bào)價(jià),網(wǎng)絡(luò)營(yíng)銷,網(wǎng)絡(luò)優(yōu)化,周口網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強(qiáng)企業(yè)競(jìng)爭(zhēng)力??沙浞譂M足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時(shí)我們時(shí)刻保持專業(yè)、時(shí)尚、前沿,時(shí)刻以成就客戶成長(zhǎng)自我,堅(jiān)持不斷學(xué)習(xí)、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實(shí)用型網(wǎng)站。背景知識(shí)
RabbitMQ
RabbitMQ 是基于 AMQP 協(xié)議實(shí)現(xiàn)的一個(gè)消息隊(duì)列(Message Queue),Message Queue 是一個(gè)典型的生產(chǎn)者/消費(fèi)者模式。生產(chǎn)者發(fā)布消息,消費(fèi)者消費(fèi)消息,生產(chǎn)者和消費(fèi)者之間是解耦的,互相不知道對(duì)方的存在。
RPC
Remote Procedure Call:遠(yuǎn)程過程調(diào)用,一次遠(yuǎn)程過程調(diào)用的流程即客戶端發(fā)送一個(gè)請(qǐng)求到服務(wù)端,服務(wù)端根據(jù)請(qǐng)求信息進(jìn)行處理后返回響應(yīng)信息,客戶端收到響應(yīng)信息后結(jié)束。
如何使用 RabbitMQ 實(shí)現(xiàn) RPC?
使用 RabbitMQ 實(shí)現(xiàn) RPC,相應(yīng)的角色是由生產(chǎn)者來作為客戶端,消費(fèi)者作為服務(wù)端。
但 RPC 調(diào)用一般是同步的,客戶端和服務(wù)器也是緊密耦合的。即客戶端通過 IP/域名和端口鏈接到服務(wù)器,向服務(wù)器發(fā)送請(qǐng)求后等待服務(wù)器返回響應(yīng)信息。
但 MQ 的生產(chǎn)者和消費(fèi)者是完全解耦的,那么如何用 MQ 實(shí)現(xiàn) RPC 呢?很明顯就是把 MQ 當(dāng)作中間件實(shí)現(xiàn)一次雙向的消息傳遞:
客戶端和服務(wù)端即是生產(chǎn)者也是消費(fèi)者??蛻舳税l(fā)布請(qǐng)求,消費(fèi)響應(yīng);服務(wù)端消費(fèi)請(qǐng)求,發(fā)布響應(yīng)。
具體實(shí)現(xiàn)
MQ部分的定義
請(qǐng)求信息的隊(duì)列
我們需要一個(gè)隊(duì)列來存放請(qǐng)求信息,客戶端向這個(gè)隊(duì)列發(fā)布請(qǐng)求信息,服務(wù)端消費(fèi)該隊(duì)列處理請(qǐng)求。該隊(duì)列不需要復(fù)雜的路由規(guī)則,直接使用 RabbitMQ 默認(rèn)的 direct exchange 來路由消息即可。
響應(yīng)信息的隊(duì)列
存放響應(yīng)信息的隊(duì)列不應(yīng)只有一個(gè)。如果存在多個(gè)客戶端,不能保證響應(yīng)信息被發(fā)布請(qǐng)求的那個(gè)客戶端消費(fèi)到。所以應(yīng)為每一個(gè)客戶端創(chuàng)建一個(gè)響應(yīng)隊(duì)列,這個(gè)隊(duì)列應(yīng)該由客戶端來創(chuàng)建且只能由這個(gè)客戶端使用并在使用完畢后刪除,這里可以使用 RabbitMQ 提供的排他隊(duì)列(Exclusive Queue):
channel.queueDeclare(queue:"", durable:false, exclusive:true, autoDelete:false, new HashMap<>())
并且要保證隊(duì)列名唯一,聲明隊(duì)列時(shí)名稱設(shè)為空 RabbitMQ 會(huì)生成一個(gè)唯一的隊(duì)列名。
exclusive 設(shè)為 true 表示聲明一個(gè)排他隊(duì)列,排他隊(duì)列的特點(diǎn)是只能被當(dāng)前的連接使用,并且在連接關(guān)閉后被刪除。
一個(gè)簡(jiǎn)單的 demo(使用 pull 機(jī)制)
我們使用一個(gè)簡(jiǎn)單的 demo 來了解客戶端和服務(wù)端的處理流程。
發(fā)布請(qǐng)求
編寫代碼前的一個(gè)小問題
我們?cè)诼暶麝?duì)列時(shí)為每一個(gè)客戶端聲明了獨(dú)有的響應(yīng)隊(duì)列,那服務(wù)器在發(fā)布響應(yīng)時(shí)如何知道發(fā)布到哪個(gè)隊(duì)列呢?其實(shí)就是客戶端需要告訴服務(wù)端將響應(yīng)發(fā)布到哪個(gè)隊(duì)列,RabbitMQ 提供了這個(gè)支持,消息體的 Properties 中有一個(gè)屬性 reply_to 就是用來標(biāo)記回調(diào)隊(duì)列的名稱,服務(wù)器需要將響應(yīng)發(fā)布到 reply_to 指定的回調(diào)隊(duì)列中。
解決了這個(gè)問題之后我們就可以編寫客戶端發(fā)布請(qǐng)求的代碼了:
// 定義響應(yīng)回調(diào)隊(duì)列 String replyQueueName = channel.queueDeclare("", false, true, false, new HashMap<>()).getQueue(); // 設(shè)置回調(diào)隊(duì)列到 Properties AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .replyTo(replyQueueName) .build(); String request = "request"; // 發(fā)布請(qǐng)求 channel.basicPublish("", "rpc_queue", properties, request.getBytes());
Direct reply-to:
RabbitMQ 提供了一種更便捷的機(jī)制來實(shí)現(xiàn) RPC,不需要客戶端每次都定義回調(diào)隊(duì)列,客戶端發(fā)布請(qǐng)求時(shí)將 replyTo 設(shè)為 amq.rabbitmq.reply-to ,消費(fèi)響應(yīng)時(shí)也指定消費(fèi) amq.rabbitmq.reply-to ,RabbitMQ 會(huì)為客戶端創(chuàng)建一個(gè)內(nèi)部隊(duì)列
消費(fèi)請(qǐng)求
接下來是服務(wù)端處理請(qǐng)求的部分,接收到請(qǐng)求后經(jīng)過處理將響應(yīng)信息發(fā)布到 reply_to 指定的回調(diào)隊(duì)列:
// 服務(wù)端 Consumer 的定義 public class RpcServer extends DefaultConsumer { public RpcServer(Channel channel) { super(channel); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); String response = (msg + " Received"); // 獲取回調(diào)隊(duì)列名 String replyTo = properties.getReplyTo(); // 發(fā)布響應(yīng)消息到回調(diào)隊(duì)列 this.getChannel().basicPublish("", replyTo, new AMQP.BasicProperties(), response.getBytes()); } } ... // 啟動(dòng)服務(wù)端 Consumer channel.basicConsume("rpc_queue", true, new RpcServer(channel));
接收響應(yīng)
客戶端如何接收服務(wù)器的響應(yīng)呢?有兩種方式:1.輪詢的去 pull 回調(diào)隊(duì)列中的消息,2.異步的消費(fèi)回調(diào)隊(duì)列中的消息。我們?cè)谶@里簡(jiǎn)單實(shí)現(xiàn)第一種方案。
GetResponse getResponse = null; while (getResponse == null) { getResponse = channel.basicGet(replyQueueName, true); } String response = new String(getResponse.getBody());
一個(gè)簡(jiǎn)單的基于 RabbitMQ 的 RPC 模型已經(jīng)實(shí)現(xiàn)了,但這個(gè) demo 并不實(shí)用,因?yàn)榭蛻舳嗣看伟l(fā)送完請(qǐng)求都要同步的輪詢等待響應(yīng)消息,只能每次處理一個(gè)請(qǐng)求。RabbitMQ 的 pull 模式效率也比較低。
實(shí)現(xiàn)一個(gè)完備可用的 RPC 模式需要做的工作還有很多,要處理的關(guān)鍵點(diǎn)也比較復(fù)雜,有句話叫不要重復(fù)造輪子,spring 已經(jīng)實(shí)現(xiàn)了一個(gè)完備可用的 RPC 模式的庫(kù),接下來我們來了解一下。順便在此給大家推薦一個(gè)Java架構(gòu)方面的交流學(xué)習(xí)群:698581634,進(jìn)群即可獲取Java架構(gòu)師資料:有Spring,MyBatis,Netty源碼分析,高并發(fā)、高性能、分布式、微服務(wù)架構(gòu)的原理,JVM性能優(yōu)化這些成為架構(gòu)師必備的知識(shí)體系,群里一定有你需要的資料,大家趕緊加群吧。
Spring Rabbit 中的實(shí)現(xiàn)
和上面 demo 的 pull 模式一次只能處理一個(gè)請(qǐng)求相對(duì)應(yīng)的:如何異步的接收響應(yīng)并處理多個(gè)請(qǐng)求呢?關(guān)鍵點(diǎn)就在于我們需要記錄請(qǐng)求和響應(yīng)并將它們關(guān)聯(lián)起來,RabbitMQ 也提供了支持,Properties 中的另一個(gè)屬性 correlation_id 用來標(biāo)識(shí)一個(gè)消息的唯一 id。
參考 spring-rabbit 中的 convertSendAndReceive 方法的實(shí)現(xiàn),為每一次請(qǐng)求生成一個(gè)唯一的 correlation_id :
private final AtomicInteger messageTagProvider = new AtomicInteger(); ... String messageTag = String.valueOf(this.messageTagProvider.incrementAndGet()); ... message.getMessageProperties().setCorrelationId(messageTag);
并使用一個(gè) ConcurrentHashMap 來維護(hù) correlation_id 和響應(yīng)信息的映射:
private final Map<String, PendingReply> replyHolder = new ConcurrentHashMap<String, PendingReply>(); ... final PendingReply pendingReply = new PendingReply(); this.replyHolder.put(correlationId, pendingReply);
PendingReply 中有一個(gè) BlockingQueue 存放響應(yīng)信息,在發(fā)送完請(qǐng)求信息后調(diào)用 BlockingQueue 的 pull 方法并設(shè)置超時(shí)時(shí)間來獲取響應(yīng):
private final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(1);
public Message get ( long timeout , TimeUnit unit ) throws InterruptedException { Object reply = this . queue . poll ( timeout , unit ); return reply == null ? null : processReply ( reply
);
}
在獲取響應(yīng)后不論結(jié)果如何,都會(huì)將 PendingReply 從 replyHolder 中移除,防止 replyHolder 中積壓超時(shí)的響應(yīng)消息:
try { reply = exchangeMessages(exchange, routingKey, message, correlationData, channel, pendingReply,messageTag); } finally { this.replyHolder.remove(messageTag); ... }
響應(yīng)信息是何時(shí)如何被放到這個(gè) BlockingQueue 中的呢?看一下 RabbitTemplate 接收消息的地方:
public void onMessage(Message message) { String messageTag; if (this.correlationKey == null) { // using standard correlationId property messageTag = message.getMessageProperties().getCorrelationId(); } else { messageTag = (String) message.getMessageProperties() .getHeaders().get(this.correlationKey); } // 存在 correlation_id 才認(rèn)為是RPC的響應(yīng)信息,不存在時(shí)不處理 if (messageTag == null) { logger.error("No correlation header in reply"); return; } // 從 replyHolder 中取出 correlation_id 對(duì)應(yīng)的 PendingReply PendingReply pendingReply = this.replyHolder.get(messageTag); if (pendingReply == null) { if (logger.isWarnEnabled()) { logger.warn("Reply received after timeout for " + messageTag); } throw new AmqpRejectAndDontRequeueException("Reply received after timeout"); } else { restoreProperties(message, pendingReply); // 將響應(yīng)信息 add 到 BlockingQueue 中 pendingReply.reply(message); } }
以上的 spring 代碼隱去了很多額外部分的處理和細(xì)節(jié),只關(guān)注關(guān)鍵的部分。
至此一個(gè)完整可用的由 RabbitMQ 作為中間件實(shí)現(xiàn)的 RPC 模式就完成了。
總結(jié)
服務(wù)端
服務(wù)端的實(shí)現(xiàn)比較簡(jiǎn)單,和一般的 Consumer 的區(qū)別只在于需要將請(qǐng)求回復(fù)到 replyTo 指定的 queue 中并帶上消息標(biāo)識(shí) correlation_id 即可
服務(wù)端的一點(diǎn)小優(yōu)化:
超時(shí)的處理是由客戶端來實(shí)現(xiàn)的,那服務(wù)端有沒有可以優(yōu)化的地方呢?
答案是有的:如果我們的服務(wù)端處理比較耗時(shí),如何判斷客戶端是否還在等待響應(yīng)呢?
我們可以使用 passive 參數(shù)去檢查 replyTo 的 queue 是否存在,因?yàn)榭蛻舳寺暶鞯氖莾?nèi)部隊(duì)列,客戶端如果斷掉鏈接了這個(gè) queue 就不存在了,這時(shí)服務(wù)端就無需處理這個(gè)消息了。
客戶端
客戶端承擔(dān)了更多的工作量,包括:
聲明 replyTo 隊(duì)列(使用 amq.rabbitmq.reply-to 會(huì)簡(jiǎn)單很多)
維護(hù)請(qǐng)求和響應(yīng)消息(使用唯一的 correlation_id 來關(guān)聯(lián))
消費(fèi)服務(wù)端的返回
處理超時(shí)等異常情況(使用BlockingQueue來阻塞獲?。?/p>
好在 spring 已經(jīng)實(shí)現(xiàn)了一套完備可靠的代碼,我們?cè)谇宄肆鞒毯完P(guān)鍵點(diǎn)之后,可以直接使用 spring 提供的 RabbitTemplate ,無需自己實(shí)現(xiàn)。
使用 MQ 實(shí)現(xiàn) RPC 的意義
通過 MQ 實(shí)現(xiàn) RPC 看起來比客戶端和服務(wù)器直接通訊要復(fù)雜一些,那我們?yōu)槭裁匆@樣做呢?或者說這樣做有什么好處:
將客戶端和服務(wù)器解耦:客戶端只是發(fā)布一個(gè)請(qǐng)求到 MQ 并消費(fèi)這個(gè)請(qǐng)求的響應(yīng)。并不關(guān)心具體由誰來處理這個(gè)請(qǐng)求,MQ 另一端的請(qǐng)求的消費(fèi)者可以隨意替換成任何可以處理請(qǐng)求的服務(wù)器,并不影響到客戶端。
減輕服務(wù)器的壓力:傳統(tǒng)的 RPC 模式中如果客戶端和請(qǐng)求過多,服務(wù)器的壓力會(huì)過大。由 MQ 作為中間件的話,過多的請(qǐng)求而是被 MQ 消化掉,服務(wù)器可以控制消費(fèi)請(qǐng)求的頻次,并不會(huì)影響到服務(wù)器。
服務(wù)器的橫向擴(kuò)展更加容易:如果服務(wù)器的處理能力不能滿足請(qǐng)求的頻次,只需要增加服務(wù)器來消費(fèi) MQ 的消息即可,MQ會(huì)幫我們實(shí)現(xiàn)消息消費(fèi)的負(fù)載均衡。
可以看出 RabbitMQ 對(duì)于 RPC 模式的支持也是比較友好地,
amq.rabbitmq.reply-to , reply_to , correlation_id 這些特性都說明了這一點(diǎn),再加上 spring-rabbit 的實(shí)現(xiàn),可以讓我們很簡(jiǎn)單的使用消息隊(duì)列模式的 RPC 調(diào)用。
感謝各位的閱讀!關(guān)于“如何使用RabbitMQ實(shí)現(xiàn)RPC”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,讓大家可以學(xué)到更多知識(shí),如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到吧!
標(biāo)題名稱:如何使用RabbitMQ實(shí)現(xiàn)RPC-創(chuàng)新互聯(lián)
文章起源:http://www.rwnh.cn/article10/cehogo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供虛擬主機(jī)、網(wǎng)站建設(shè)、做網(wǎng)站、定制開發(fā)、網(wǎng)站營(yíng)銷、面包屑導(dǎo)航
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容