内射老阿姨1区2区3区4区_久久精品人人做人人爽电影蜜月_久久国产精品亚洲77777_99精品又大又爽又粗少妇毛片

RocketMQ如何獲取指定消息-創(chuàng)新互聯(lián)

小編給大家分享一下RocketMQ如何獲取指定消息,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

創(chuàng)新互聯(lián)建站專注于新余網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗(yàn)。 熱誠(chéng)為您提供新余營(yíng)銷型網(wǎng)站建設(shè),新余網(wǎng)站制作、新余網(wǎng)頁(yè)設(shè)計(jì)、新余網(wǎng)站官網(wǎng)定制、成都小程序開(kāi)發(fā)服務(wù),打造新余網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供新余網(wǎng)站排名全網(wǎng)營(yíng)銷落地服務(wù)。

概要

消息查詢是什么?

消息查詢就是根據(jù)用戶提供的msgId從MQ中取出該消息

RocketMQ如果有多個(gè)節(jié)點(diǎn)如何查詢?

問(wèn)題:RocketMQ分布式結(jié)構(gòu)中,數(shù)據(jù)分散在各個(gè)節(jié)點(diǎn),即便是同一Topic的數(shù)據(jù),也未必都在一個(gè)broker上??蛻舳嗽趺粗罃?shù)據(jù)該去哪個(gè)節(jié)點(diǎn)上查?

猜想1:逐個(gè)訪問(wèn)broker節(jié)點(diǎn)查詢數(shù)據(jù)

猜想2:有某種數(shù)據(jù)中心存在,該中心知道所有消息存儲(chǔ)的位置,只要向該中心查詢即可得到消息具體位置,進(jìn)而取得消息內(nèi)容

實(shí)際:

1.消息Id中含有消息所在的broker的地址信息(IP\Port)以及該消息在CommitLog中的偏移量。

2.客戶端實(shí)現(xiàn)會(huì)從msgId字符串中解析出broker地址,向指定broker節(jié)查詢消息。

問(wèn)題:CommitLog文件有多個(gè),只有偏移量估計(jì)不能確定在哪個(gè)文件吧?

實(shí)際:?jiǎn)蝹€(gè)Broker節(jié)點(diǎn)內(nèi)offset是全局唯一的,不是每個(gè)CommitLog文件的偏移量都是從0開(kāi)始的。單個(gè)節(jié)點(diǎn)內(nèi)所有CommitLog文件共用一套偏移量,每個(gè)文件的文件名為其第一個(gè)消息的偏移量。所以可以根據(jù)偏移量和文件名確定CommitLog文件。

源碼閱讀

0.使用方式

MessageExt  msg = consumer.viewMessage(msgId);

1.消息ID解析

這個(gè)了解下就可以了

public class MessageId {
 private SocketAddress address;
 private long offset;

 public MessageId(SocketAddress address, long offset) {
  this.address = address;
  this.offset = offset;
 }

 //get-set
}

//from MQAdminImpl.java
public MessageExt viewMessage(
 String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {

 MessageId messageId = null;
 try {
  //從msgId字符串中解析出address和offset
  //address = ip:port
  //offset為消息在CommitLog文件中的偏移量
  messageId = MessageDecoder.decodeMessageId(msgId);
 } catch (Exception e) {
  throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by id finished, but no message.");
 }
 return this.mQClientFactory.getMQClientAPIImpl().viewMessage(RemotingUtil.socketAddress2String(messageId.getAddress()),
  messageId.getOffset(), timeoutMillis);
}

//from MessageDecoder.java
public static MessageId decodeMessageId(final String msgId) throws UnknownHostException {
 SocketAddress address;
 long offset;
 //ipv4和ipv6的區(qū)別
 //如果msgId總長(zhǎng)度超過(guò)32字符,則為ipv6
 int ipLength = msgId.length() == 32 ? 4 * 2 : 16 * 2;

 byte[] ip = UtilAll.string2bytes(msgId.substring(0, ipLength));
 byte[] port = UtilAll.string2bytes(msgId.substring(ipLength, ipLength + 8));
 ByteBuffer bb = ByteBuffer.wrap(port);
 int portInt = bb.getInt(0);
 address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt);

 // offset
 byte[] data = UtilAll.string2bytes(msgId.substring(ipLength + 8, ipLength + 8 + 16));
 bb = ByteBuffer.wrap(data);
 offset = bb.getLong(0);

 return new MessageId(address, offset);
}

本文標(biāo)題:RocketMQ如何獲取指定消息-創(chuàng)新互聯(lián)
本文URL:http://www.rwnh.cn/article44/dsccee.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站營(yíng)銷、搜索引擎優(yōu)化、商城網(wǎng)站、云服務(wù)器手機(jī)網(wǎng)站建設(shè)、做網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都app開(kāi)發(fā)公司
含山县| 正阳县| 兴宁市| 武汉市| 明水县| 登封市| 云南省| 迁安市| 襄垣县| 延庆县| 龙海市| 陇西县| 牟定县| 娄烦县| 正蓝旗| 都昌县| 静宁县| 兴隆县| 三门峡市| 沧州市| 阿城市| 房产| 卢湾区| 繁昌县| 获嘉县| 蓝山县| 论坛| 枞阳县| 绥江县| 辽阳市| 沙坪坝区| 河津市| 日土县| 嵩明县| 吉安县| 崇文区| 通海县| 博罗县| 体育| 龙门县| 汝南县|