中文字幕日韩精品一区二区免费_精品一区二区三区国产精品无卡在_国精品无码专区一区二区三区_国产αv三级中文在线

如何使用RabbitMQ實(shí)現(xiàn)RPC-創(chuàng)新互聯(lián)

這篇文章給大家分享的是有關(guān)如何使用RabbitMQ實(shí)現(xiàn)RPC的內(nèi)容。小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,一起跟隨小編過來看看吧。

創(chuàng)新互聯(lián)公司是一家集網(wǎng)站建設(shè),周口企業(yè)網(wǎng)站建設(shè),周口品牌網(wǎng)站建設(shè),網(wǎng)站定制,周口網(wǎng)站建設(shè)報(bào)價(jià),網(wǎng)絡(luò)營(yíng)銷,網(wǎng)絡(luò)優(yōu)化,周口網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強(qiáng)企業(yè)競(jìng)爭(zhēng)力??沙浞譂M足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時(shí)我們時(shí)刻保持專業(yè)、時(shí)尚、前沿,時(shí)刻以成就客戶成長(zhǎng)自我,堅(jiān)持不斷學(xué)習(xí)、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實(shí)用型網(wǎng)站。

背景知識(shí)

RabbitMQ

RabbitMQ 是基于 AMQP 協(xié)議實(shí)現(xiàn)的一個(gè)消息隊(duì)列(Message Queue),Message Queue 是一個(gè)典型的生產(chǎn)者/消費(fèi)者模式。生產(chǎn)者發(fā)布消息,消費(fèi)者消費(fèi)消息,生產(chǎn)者和消費(fèi)者之間是解耦的,互相不知道對(duì)方的存在。

如何使用RabbitMQ實(shí)現(xiàn)RPC

RPC

Remote Procedure Call:遠(yuǎn)程過程調(diào)用,一次遠(yuǎn)程過程調(diào)用的流程即客戶端發(fā)送一個(gè)請(qǐng)求到服務(wù)端,服務(wù)端根據(jù)請(qǐng)求信息進(jìn)行處理后返回響應(yīng)信息,客戶端收到響應(yīng)信息后結(jié)束。

如何使用RabbitMQ實(shí)現(xiàn)RPC

如何使用 RabbitMQ 實(shí)現(xiàn) RPC?

使用 RabbitMQ 實(shí)現(xiàn) RPC,相應(yīng)的角色是由生產(chǎn)者來作為客戶端,消費(fèi)者作為服務(wù)端。

但 RPC 調(diào)用一般是同步的,客戶端和服務(wù)器也是緊密耦合的。即客戶端通過 IP/域名和端口鏈接到服務(wù)器,向服務(wù)器發(fā)送請(qǐng)求后等待服務(wù)器返回響應(yīng)信息。

但 MQ 的生產(chǎn)者和消費(fèi)者是完全解耦的,那么如何用 MQ 實(shí)現(xiàn) RPC 呢?很明顯就是把 MQ 當(dāng)作中間件實(shí)現(xiàn)一次雙向的消息傳遞:

如何使用RabbitMQ實(shí)現(xiàn)RPC

客戶端和服務(wù)端即是生產(chǎn)者也是消費(fèi)者??蛻舳税l(fā)布請(qǐng)求,消費(fèi)響應(yīng);服務(wù)端消費(fèi)請(qǐng)求,發(fā)布響應(yīng)。

具體實(shí)現(xiàn)

MQ部分的定義

請(qǐng)求信息的隊(duì)列

我們需要一個(gè)隊(duì)列來存放請(qǐng)求信息,客戶端向這個(gè)隊(duì)列發(fā)布請(qǐng)求信息,服務(wù)端消費(fèi)該隊(duì)列處理請(qǐng)求。該隊(duì)列不需要復(fù)雜的路由規(guī)則,直接使用 RabbitMQ 默認(rèn)的 direct exchange 來路由消息即可。

響應(yīng)信息的隊(duì)列

存放響應(yīng)信息的隊(duì)列不應(yīng)只有一個(gè)。如果存在多個(gè)客戶端,不能保證響應(yīng)信息被發(fā)布請(qǐng)求的那個(gè)客戶端消費(fèi)到。所以應(yīng)為每一個(gè)客戶端創(chuàng)建一個(gè)響應(yīng)隊(duì)列,這個(gè)隊(duì)列應(yīng)該由客戶端來創(chuàng)建且只能由這個(gè)客戶端使用并在使用完畢后刪除,這里可以使用 RabbitMQ 提供的排他隊(duì)列(Exclusive Queue):

channel.queueDeclare(queue:"", durable:false, exclusive:true, autoDelete:false, new HashMap<>())

并且要保證隊(duì)列名唯一,聲明隊(duì)列時(shí)名稱設(shè)為空 RabbitMQ 會(huì)生成一個(gè)唯一的隊(duì)列名。

exclusive 設(shè)為 true 表示聲明一個(gè)排他隊(duì)列,排他隊(duì)列的特點(diǎn)是只能被當(dāng)前的連接使用,并且在連接關(guān)閉后被刪除。

一個(gè)簡(jiǎn)單的 demo(使用 pull 機(jī)制)

我們使用一個(gè)簡(jiǎn)單的 demo 來了解客戶端和服務(wù)端的處理流程。

發(fā)布請(qǐng)求

  • 編寫代碼前的一個(gè)小問題

我們?cè)诼暶麝?duì)列時(shí)為每一個(gè)客戶端聲明了獨(dú)有的響應(yīng)隊(duì)列,那服務(wù)器在發(fā)布響應(yīng)時(shí)如何知道發(fā)布到哪個(gè)隊(duì)列呢?其實(shí)就是客戶端需要告訴服務(wù)端將響應(yīng)發(fā)布到哪個(gè)隊(duì)列,RabbitMQ 提供了這個(gè)支持,消息體的 Properties 中有一個(gè)屬性 reply_to 就是用來標(biāo)記回調(diào)隊(duì)列的名稱,服務(wù)器需要將響應(yīng)發(fā)布到 reply_to 指定的回調(diào)隊(duì)列中。

解決了這個(gè)問題之后我們就可以編寫客戶端發(fā)布請(qǐng)求的代碼了:

// 定義響應(yīng)回調(diào)隊(duì)列
String replyQueueName = channel.queueDeclare("", false, true, false, new HashMap<>()).getQueue();
// 設(shè)置回調(diào)隊(duì)列到 Properties
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
 .replyTo(replyQueueName)
 .build();
String request = "request";
// 發(fā)布請(qǐng)求
channel.basicPublish("", "rpc_queue", properties, request.getBytes());

Direct reply-to:

RabbitMQ 提供了一種更便捷的機(jī)制來實(shí)現(xiàn) RPC,不需要客戶端每次都定義回調(diào)隊(duì)列,客戶端發(fā)布請(qǐng)求時(shí)將 replyTo 設(shè)為 amq.rabbitmq.reply-to ,消費(fèi)響應(yīng)時(shí)也指定消費(fèi) amq.rabbitmq.reply-to ,RabbitMQ 會(huì)為客戶端創(chuàng)建一個(gè)內(nèi)部隊(duì)列

消費(fèi)請(qǐng)求

接下來是服務(wù)端處理請(qǐng)求的部分,接收到請(qǐng)求后經(jīng)過處理將響應(yīng)信息發(fā)布到 reply_to 指定的回調(diào)隊(duì)列:

// 服務(wù)端 Consumer 的定義
public class RpcServer extends DefaultConsumer {
 public RpcServer(Channel channel) {
 super(channel);
 }
 @Override
 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 String msg = new String(body);
 String response = (msg + " Received");
 // 獲取回調(diào)隊(duì)列名
 String replyTo = properties.getReplyTo();
 // 發(fā)布響應(yīng)消息到回調(diào)隊(duì)列
 this.getChannel().basicPublish("", replyTo, new AMQP.BasicProperties(), response.getBytes());
 }
}
...
// 啟動(dòng)服務(wù)端 Consumer
channel.basicConsume("rpc_queue", true, new RpcServer(channel));

接收響應(yīng)

客戶端如何接收服務(wù)器的響應(yīng)呢?有兩種方式:1.輪詢的去 pull 回調(diào)隊(duì)列中的消息,2.異步的消費(fèi)回調(diào)隊(duì)列中的消息。我們?cè)谶@里簡(jiǎn)單實(shí)現(xiàn)第一種方案。

GetResponse getResponse = null;
while (getResponse == null) {
 getResponse = channel.basicGet(replyQueueName, true);
}
String response = new String(getResponse.getBody());

一個(gè)簡(jiǎn)單的基于 RabbitMQ 的 RPC 模型已經(jīng)實(shí)現(xiàn)了,但這個(gè) demo 并不實(shí)用,因?yàn)榭蛻舳嗣看伟l(fā)送完請(qǐng)求都要同步的輪詢等待響應(yīng)消息,只能每次處理一個(gè)請(qǐng)求。RabbitMQ 的 pull 模式效率也比較低。

實(shí)現(xiàn)一個(gè)完備可用的 RPC 模式需要做的工作還有很多,要處理的關(guān)鍵點(diǎn)也比較復(fù)雜,有句話叫不要重復(fù)造輪子,spring 已經(jīng)實(shí)現(xiàn)了一個(gè)完備可用的 RPC 模式的庫(kù),接下來我們來了解一下。順便在此給大家推薦一個(gè)Java架構(gòu)方面的交流學(xué)習(xí)群:698581634,進(jìn)群即可獲取Java架構(gòu)師資料:有Spring,MyBatis,Netty源碼分析,高并發(fā)、高性能、分布式、微服務(wù)架構(gòu)的原理,JVM性能優(yōu)化這些成為架構(gòu)師必備的知識(shí)體系,群里一定有你需要的資料,大家趕緊加群吧。

Spring Rabbit 中的實(shí)現(xiàn)

和上面 demo 的 pull 模式一次只能處理一個(gè)請(qǐng)求相對(duì)應(yīng)的:如何異步的接收響應(yīng)并處理多個(gè)請(qǐng)求呢?關(guān)鍵點(diǎn)就在于我們需要記錄請(qǐng)求和響應(yīng)并將它們關(guān)聯(lián)起來,RabbitMQ 也提供了支持,Properties 中的另一個(gè)屬性 correlation_id 用來標(biāo)識(shí)一個(gè)消息的唯一 id。

參考 spring-rabbit 中的 convertSendAndReceive 方法的實(shí)現(xiàn),為每一次請(qǐng)求生成一個(gè)唯一的 correlation_id :

private final AtomicInteger messageTagProvider = new AtomicInteger();
...
String messageTag = String.valueOf(this.messageTagProvider.incrementAndGet());
...
message.getMessageProperties().setCorrelationId(messageTag);

并使用一個(gè) ConcurrentHashMap 來維護(hù) correlation_id 和響應(yīng)信息的映射:

private final Map<String, PendingReply> replyHolder = new ConcurrentHashMap<String, PendingReply>();
...
final PendingReply pendingReply = new PendingReply();
this.replyHolder.put(correlationId, pendingReply);

PendingReply 中有一個(gè) BlockingQueue 存放響應(yīng)信息,在發(fā)送完請(qǐng)求信息后調(diào)用 BlockingQueue 的 pull 方法并設(shè)置超時(shí)時(shí)間來獲取響應(yīng):

private final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(1);

public Message get ( long timeout , TimeUnit unit ) throws InterruptedException { Object reply = this . queue . poll ( timeout , unit ); return reply == null ? null : processReply ( reply

);

}

在獲取響應(yīng)后不論結(jié)果如何,都會(huì)將 PendingReply 從 replyHolder 中移除,防止 replyHolder 中積壓超時(shí)的響應(yīng)消息:

try {
 reply = exchangeMessages(exchange, routingKey, message, correlationData, channel, pendingReply,messageTag);
} finally {
 this.replyHolder.remove(messageTag);
 ...
}

響應(yīng)信息是何時(shí)如何被放到這個(gè) BlockingQueue 中的呢?看一下 RabbitTemplate 接收消息的地方:

public void onMessage(Message message) {
String messageTag;
 if (this.correlationKey == null) { // using standard correlationId property
 messageTag = message.getMessageProperties().getCorrelationId();
 } else {
 messageTag = (String) message.getMessageProperties()
 .getHeaders().get(this.correlationKey);
 }
 // 存在 correlation_id 才認(rèn)為是RPC的響應(yīng)信息,不存在時(shí)不處理
 if (messageTag == null) {
 logger.error("No correlation header in reply");
 return;
 }
 // 從 replyHolder 中取出 correlation_id 對(duì)應(yīng)的 PendingReply
 PendingReply pendingReply = this.replyHolder.get(messageTag);
 if (pendingReply == null) {
 if (logger.isWarnEnabled()) {
 logger.warn("Reply received after timeout for " + messageTag);
 }
 throw new AmqpRejectAndDontRequeueException("Reply received after timeout");
 }
 else {
 restoreProperties(message, pendingReply);
 // 將響應(yīng)信息 add 到 BlockingQueue 中
 pendingReply.reply(message);
 }
}

以上的 spring 代碼隱去了很多額外部分的處理和細(xì)節(jié),只關(guān)注關(guān)鍵的部分。

至此一個(gè)完整可用的由 RabbitMQ 作為中間件實(shí)現(xiàn)的 RPC 模式就完成了。

總結(jié)

服務(wù)端

服務(wù)端的實(shí)現(xiàn)比較簡(jiǎn)單,和一般的 Consumer 的區(qū)別只在于需要將請(qǐng)求回復(fù)到 replyTo 指定的 queue 中并帶上消息標(biāo)識(shí) correlation_id 即可

服務(wù)端的一點(diǎn)小優(yōu)化:

超時(shí)的處理是由客戶端來實(shí)現(xiàn)的,那服務(wù)端有沒有可以優(yōu)化的地方呢?

答案是有的:如果我們的服務(wù)端處理比較耗時(shí),如何判斷客戶端是否還在等待響應(yīng)呢?

我們可以使用 passive 參數(shù)去檢查 replyTo 的 queue 是否存在,因?yàn)榭蛻舳寺暶鞯氖莾?nèi)部隊(duì)列,客戶端如果斷掉鏈接了這個(gè) queue 就不存在了,這時(shí)服務(wù)端就無需處理這個(gè)消息了。

客戶端

客戶端承擔(dān)了更多的工作量,包括:

  • 聲明 replyTo 隊(duì)列(使用 amq.rabbitmq.reply-to 會(huì)簡(jiǎn)單很多)

  • 維護(hù)請(qǐng)求和響應(yīng)消息(使用唯一的 correlation_id 來關(guān)聯(lián))

  • 消費(fèi)服務(wù)端的返回

  • 處理超時(shí)等異常情況(使用BlockingQueue來阻塞獲?。?/p>

好在 spring 已經(jīng)實(shí)現(xiàn)了一套完備可靠的代碼,我們?cè)谇宄肆鞒毯完P(guān)鍵點(diǎn)之后,可以直接使用 spring 提供的 RabbitTemplate ,無需自己實(shí)現(xiàn)。

使用 MQ 實(shí)現(xiàn) RPC 的意義

通過 MQ 實(shí)現(xiàn) RPC 看起來比客戶端和服務(wù)器直接通訊要復(fù)雜一些,那我們?yōu)槭裁匆@樣做呢?或者說這樣做有什么好處:

  1. 將客戶端和服務(wù)器解耦:客戶端只是發(fā)布一個(gè)請(qǐng)求到 MQ 并消費(fèi)這個(gè)請(qǐng)求的響應(yīng)。并不關(guān)心具體由誰來處理這個(gè)請(qǐng)求,MQ 另一端的請(qǐng)求的消費(fèi)者可以隨意替換成任何可以處理請(qǐng)求的服務(wù)器,并不影響到客戶端。

  2. 減輕服務(wù)器的壓力:傳統(tǒng)的 RPC 模式中如果客戶端和請(qǐng)求過多,服務(wù)器的壓力會(huì)過大。由 MQ 作為中間件的話,過多的請(qǐng)求而是被 MQ 消化掉,服務(wù)器可以控制消費(fèi)請(qǐng)求的頻次,并不會(huì)影響到服務(wù)器。

  3. 服務(wù)器的橫向擴(kuò)展更加容易:如果服務(wù)器的處理能力不能滿足請(qǐng)求的頻次,只需要增加服務(wù)器來消費(fèi) MQ 的消息即可,MQ會(huì)幫我們實(shí)現(xiàn)消息消費(fèi)的負(fù)載均衡。

  4. 可以看出 RabbitMQ 對(duì)于 RPC 模式的支持也是比較友好地,

  5. amq.rabbitmq.reply-to , reply_to , correlation_id 這些特性都說明了這一點(diǎn),再加上 spring-rabbit 的實(shí)現(xiàn),可以讓我們很簡(jiǎn)單的使用消息隊(duì)列模式的 RPC 調(diào)用。

感謝各位的閱讀!關(guān)于“如何使用RabbitMQ實(shí)現(xiàn)RPC”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,讓大家可以學(xué)到更多知識(shí),如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到吧!

標(biāo)題名稱:如何使用RabbitMQ實(shí)現(xiàn)RPC-創(chuàng)新互聯(lián)
文章起源:http://www.rwnh.cn/article10/cehogo.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供虛擬主機(jī)、網(wǎng)站建設(shè)、做網(wǎng)站定制開發(fā)網(wǎng)站營(yíng)銷、面包屑導(dǎo)航

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

成都定制網(wǎng)站網(wǎng)頁設(shè)計(jì)
房产| 友谊县| 寿宁县| 萍乡市| 青河县| 巴彦淖尔市| 兴仁县| 涞源县| 尼木县| 白河县| 洞口县| 渑池县| 博湖县| 八宿县| 喀喇沁旗| 章丘市| 武定县| 萍乡市| 会同县| 南江县| 淄博市| 梓潼县| 噶尔县| 左云县| 河南省| 开鲁县| 石台县| 腾冲县| 巩义市| 凤山市| 潞城市| 云南省| 津南区| 昌黎县| 黄梅县| 山东| 长兴县| 万年县| 华阴市| 永和县| 岚皋县|