中文字幕日韩精品一区二区免费_精品一区二区三区国产精品无卡在_国精品无码专区一区二区三区_国产αv三级中文在线

RocketMQ如何獲取指定消息-創(chuàng)新互聯(lián)

小編給大家分享一下RocketMQ如何獲取指定消息,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

創(chuàng)新互聯(lián)建站專注于新余網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗(yàn)。 熱誠(chéng)為您提供新余營(yíng)銷型網(wǎng)站建設(shè),新余網(wǎng)站制作、新余網(wǎng)頁(yè)設(shè)計(jì)、新余網(wǎng)站官網(wǎng)定制、成都小程序開(kāi)發(fā)服務(wù),打造新余網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供新余網(wǎng)站排名全網(wǎng)營(yíng)銷落地服務(wù)。

概要

消息查詢是什么?

消息查詢就是根據(jù)用戶提供的msgId從MQ中取出該消息

RocketMQ如果有多個(gè)節(jié)點(diǎn)如何查詢?

問(wèn)題:RocketMQ分布式結(jié)構(gòu)中,數(shù)據(jù)分散在各個(gè)節(jié)點(diǎn),即便是同一Topic的數(shù)據(jù),也未必都在一個(gè)broker上??蛻舳嗽趺粗罃?shù)據(jù)該去哪個(gè)節(jié)點(diǎn)上查?

猜想1:逐個(gè)訪問(wèn)broker節(jié)點(diǎn)查詢數(shù)據(jù)

猜想2:有某種數(shù)據(jù)中心存在,該中心知道所有消息存儲(chǔ)的位置,只要向該中心查詢即可得到消息具體位置,進(jìn)而取得消息內(nèi)容

實(shí)際:

1.消息Id中含有消息所在的broker的地址信息(IP\Port)以及該消息在CommitLog中的偏移量。

2.客戶端實(shí)現(xiàn)會(huì)從msgId字符串中解析出broker地址,向指定broker節(jié)查詢消息。

問(wèn)題:CommitLog文件有多個(gè),只有偏移量估計(jì)不能確定在哪個(gè)文件吧?

實(shí)際:?jiǎn)蝹€(gè)Broker節(jié)點(diǎn)內(nèi)offset是全局唯一的,不是每個(gè)CommitLog文件的偏移量都是從0開(kāi)始的。單個(gè)節(jié)點(diǎn)內(nèi)所有CommitLog文件共用一套偏移量,每個(gè)文件的文件名為其第一個(gè)消息的偏移量。所以可以根據(jù)偏移量和文件名確定CommitLog文件。

源碼閱讀

0.使用方式

MessageExt  msg = consumer.viewMessage(msgId);

1.消息ID解析

這個(gè)了解下就可以了

public class MessageId {
 private SocketAddress address;
 private long offset;

 public MessageId(SocketAddress address, long offset) {
  this.address = address;
  this.offset = offset;
 }

 //get-set
}

//from MQAdminImpl.java
public MessageExt viewMessage(
 String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {

 MessageId messageId = null;
 try {
  //從msgId字符串中解析出address和offset
  //address = ip:port
  //offset為消息在CommitLog文件中的偏移量
  messageId = MessageDecoder.decodeMessageId(msgId);
 } catch (Exception e) {
  throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by id finished, but no message.");
 }
 return this.mQClientFactory.getMQClientAPIImpl().viewMessage(RemotingUtil.socketAddress2String(messageId.getAddress()),
  messageId.getOffset(), timeoutMillis);
}

//from MessageDecoder.java
public static MessageId decodeMessageId(final String msgId) throws UnknownHostException {
 SocketAddress address;
 long offset;
 //ipv4和ipv6的區(qū)別
 //如果msgId總長(zhǎng)度超過(guò)32字符,則為ipv6
 int ipLength = msgId.length() == 32 ? 4 * 2 : 16 * 2;

 byte[] ip = UtilAll.string2bytes(msgId.substring(0, ipLength));
 byte[] port = UtilAll.string2bytes(msgId.substring(ipLength, ipLength + 8));
 ByteBuffer bb = ByteBuffer.wrap(port);
 int portInt = bb.getInt(0);
 address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt);

 // offset
 byte[] data = UtilAll.string2bytes(msgId.substring(ipLength + 8, ipLength + 8 + 16));
 bb = ByteBuffer.wrap(data);
 offset = bb.getLong(0);

 return new MessageId(address, offset);
}

本文標(biāo)題:RocketMQ如何獲取指定消息-創(chuàng)新互聯(lián)
本文URL:http://www.rwnh.cn/article44/dsccee.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站營(yíng)銷、搜索引擎優(yōu)化、商城網(wǎng)站、云服務(wù)器手機(jī)網(wǎng)站建設(shè)、做網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都app開(kāi)發(fā)公司
腾冲县| 桦甸市| 石家庄市| 广平县| 永川市| 双鸭山市| 启东市| 太湖县| 阳泉市| 岳阳市| 四平市| 宁强县| 安远县| 康马县| 长宁区| 灯塔市| 鸡泽县| 浦县| 永定县| 新乡县| 屯昌县| 济南市| 西林县| 常宁市| 溆浦县| 桂平市| 龙山县| 托克逊县| 桂东县| 竹山县| 万宁市| 南木林县| 连平县| 郧西县| 广德县| 方城县| 天水市| 湖州市| 广饶县| 沙田区| 长武县|