小編給大家分享一下RocketMQ如何獲取指定消息,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
創(chuàng)新互聯(lián)建站專注于新余網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗(yàn)。 熱誠(chéng)為您提供新余營(yíng)銷型網(wǎng)站建設(shè),新余網(wǎng)站制作、新余網(wǎng)頁(yè)設(shè)計(jì)、新余網(wǎng)站官網(wǎng)定制、成都小程序開(kāi)發(fā)服務(wù),打造新余網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供新余網(wǎng)站排名全網(wǎng)營(yíng)銷落地服務(wù)。概要
消息查詢是什么?
消息查詢就是根據(jù)用戶提供的msgId從MQ中取出該消息
RocketMQ如果有多個(gè)節(jié)點(diǎn)如何查詢?
問(wèn)題:RocketMQ分布式結(jié)構(gòu)中,數(shù)據(jù)分散在各個(gè)節(jié)點(diǎn),即便是同一Topic的數(shù)據(jù),也未必都在一個(gè)broker上??蛻舳嗽趺粗罃?shù)據(jù)該去哪個(gè)節(jié)點(diǎn)上查?
猜想1:逐個(gè)訪問(wèn)broker節(jié)點(diǎn)查詢數(shù)據(jù)
猜想2:有某種數(shù)據(jù)中心存在,該中心知道所有消息存儲(chǔ)的位置,只要向該中心查詢即可得到消息具體位置,進(jìn)而取得消息內(nèi)容
實(shí)際:
1.消息Id中含有消息所在的broker的地址信息(IP\Port)以及該消息在CommitLog中的偏移量。
2.客戶端實(shí)現(xiàn)會(huì)從msgId字符串中解析出broker地址,向指定broker節(jié)查詢消息。
問(wèn)題:CommitLog文件有多個(gè),只有偏移量估計(jì)不能確定在哪個(gè)文件吧?
實(shí)際:?jiǎn)蝹€(gè)Broker節(jié)點(diǎn)內(nèi)offset是全局唯一的,不是每個(gè)CommitLog文件的偏移量都是從0開(kāi)始的。單個(gè)節(jié)點(diǎn)內(nèi)所有CommitLog文件共用一套偏移量,每個(gè)文件的文件名為其第一個(gè)消息的偏移量。所以可以根據(jù)偏移量和文件名確定CommitLog文件。
源碼閱讀
0.使用方式
MessageExt msg = consumer.viewMessage(msgId);
1.消息ID解析
這個(gè)了解下就可以了
public class MessageId { private SocketAddress address; private long offset; public MessageId(SocketAddress address, long offset) { this.address = address; this.offset = offset; } //get-set } //from MQAdminImpl.java public MessageExt viewMessage( String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { MessageId messageId = null; try { //從msgId字符串中解析出address和offset //address = ip:port //offset為消息在CommitLog文件中的偏移量 messageId = MessageDecoder.decodeMessageId(msgId); } catch (Exception e) { throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by id finished, but no message."); } return this.mQClientFactory.getMQClientAPIImpl().viewMessage(RemotingUtil.socketAddress2String(messageId.getAddress()), messageId.getOffset(), timeoutMillis); } //from MessageDecoder.java public static MessageId decodeMessageId(final String msgId) throws UnknownHostException { SocketAddress address; long offset; //ipv4和ipv6的區(qū)別 //如果msgId總長(zhǎng)度超過(guò)32字符,則為ipv6 int ipLength = msgId.length() == 32 ? 4 * 2 : 16 * 2; byte[] ip = UtilAll.string2bytes(msgId.substring(0, ipLength)); byte[] port = UtilAll.string2bytes(msgId.substring(ipLength, ipLength + 8)); ByteBuffer bb = ByteBuffer.wrap(port); int portInt = bb.getInt(0); address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt); // offset byte[] data = UtilAll.string2bytes(msgId.substring(ipLength + 8, ipLength + 8 + 16)); bb = ByteBuffer.wrap(data); offset = bb.getLong(0); return new MessageId(address, offset); }
本文標(biāo)題:RocketMQ如何獲取指定消息-創(chuàng)新互聯(lián)
本文URL:http://www.rwnh.cn/article44/dsccee.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站營(yíng)銷、搜索引擎優(yōu)化、商城網(wǎng)站、云服務(wù)器、手機(jī)網(wǎng)站建設(shè)、做網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容