這篇文章主要為大家展示了“RocketMQ中broker消息存儲之如何添加消息”,內(nèi)容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學習一下“RocketMQ中broker消息存儲之如何添加消息”這篇文章吧。
創(chuàng)新互聯(lián)始終致力于在企業(yè)網(wǎng)站建設(shè)領(lǐng)域發(fā)展。秉承“創(chuàng)新、求實、誠信、拼搏”的企業(yè)精神,致力為企業(yè)提供全面的網(wǎng)絡(luò)宣傳與技術(shù)應(yīng)用整體策劃方案,為企業(yè)提供包括“網(wǎng)站建設(shè)、響應(yīng)式網(wǎng)站、手機網(wǎng)站建設(shè)、微信網(wǎng)站建設(shè)、微信小程序定制開發(fā)、商城網(wǎng)站建設(shè)、平臺網(wǎng)站建設(shè)秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長,共同發(fā)展。
broker存儲子系統(tǒng)追加消息的入口是DefaultMessageStore.asyncPutMessage,該方法會調(diào)用CommitLog.asyncPutMessage,主要實現(xiàn)如下:
// CommitLog.java public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) { // msg準備... // 1. 獲取最后一個block,準備寫入 MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); putMessageLock.lock(); //spin or ReentrantLock ,depending on store config try { //.... if (null == mappedFile || mappedFile.isFull()) { // 加鎖后重試一次,嘗試新建一個block mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise } if (null == mappedFile) { // 新建block失敗,返回 log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null)); } // 2. append到block末尾 result = mappedFile.appendMessage(msg, this.appendMessageCallback); switch (result.getStatus()) { case PUT_OK: break; case END_OF_FILE: // 3. block剩余空間不足,新建下一個block,再次嘗試寫入 unlockMappedFile = mappedFile; // Create a new file, re-write the message mappedFile = this.mappedFileQueue.getLastMappedFile(0); if (null == mappedFile) { // XXX: warn and notify me log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result)); } result = mappedFile.appendMessage(msg, this.appendMessageCallback); break; // 發(fā)送錯誤,失敗返回... } // ... } finally { putMessageLock.unlock(); } // 4. 提交刷盤請求 CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, putMessageResult, msg); // 5. 提交同步請求 CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, putMessageResult, msg); return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> { if (flushStatus != PutMessageStatus.PUT_OK) { putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } if (replicaStatus != PutMessageStatus.PUT_OK) { putMessageResult.setPutMessageStatus(replicaStatus); } return putMessageResult; }); }
MappedFile在執(zhí)行數(shù)據(jù)追加時實際調(diào)用的是MappedFile.appendMessagesInner方法:
// MappedFile public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) { assert messageExt != null; assert cb != null; int currentPos = this.wrotePosition.get(); if (currentPos < this.fileSize) { ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); byteBuffer.position(currentPos); AppendMessageResult result; // 調(diào)用callback執(zhí)行實際的數(shù)據(jù)寫入,callback實現(xiàn)決定了數(shù)據(jù)的存儲格式 if (messageExt instanceof MessageExtBrokerInner) { result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt); } else if (messageExt instanceof MessageExtBatch) { result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt); } else { return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } this.wrotePosition.addAndGet(result.getWroteBytes()); this.storeTimestamp = result.getStoreTimestamp(); return result; } log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize); return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); }
msg實際是由CommitLog.DefaultAppendMessageCallback執(zhí)行寫入的,DefaultAppendMessageCallback決定了msg在文件中的存儲格式:
// CommitLog.DefaultAppendMessageCallback public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, // 數(shù)據(jù)準備,字段超長則直接返回... // Determines whether there is sufficient free space if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { // 一. block剩余空間不足,則寫入一個特殊的msg,標示已達block末尾 this.resetByteBuffer(this.msgStoreItemMemory, maxBlank); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(maxBlank); // 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); // 3 The remaining space may be any value // Here the length of the specially set maxBlank final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank); return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); } // 二、按約定格式寫入msg // Initialization of storage space this.resetByteBuffer(msgStoreItemMemory, msgLen); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(msgLen); // 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE); // 3 BODYCRC this.msgStoreItemMemory.putInt(msgInner.getBodyCRC()); // 4 QUEUEID this.msgStoreItemMemory.putInt(msgInner.getQueueId()); // 5 FLAG this.msgStoreItemMemory.putInt(msgInner.getFlag()); // 6 QUEUEOFFSET this.msgStoreItemMemory.putLong(queueOffset); // 7 PHYSICALOFFSET this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position()); // 8 SYSFLAG this.msgStoreItemMemory.putInt(msgInner.getSysFlag()); // 9 BORNTIMESTAMP this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp()); // 10 BORNHOST this.resetByteBuffer(bornHostHolder, bornHostLength); this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder)); // 11 STORETIMESTAMP this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp()); // 12 STOREHOSTADDRESS this.resetByteBuffer(storeHostHolder, storeHostLength); this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder)); // 13 RECONSUMETIMES this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes()); // 14 Prepared Transaction Offset this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset()); // 15 BODY this.msgStoreItemMemory.putInt(bodyLength); if (bodyLength > 0) this.msgStoreItemMemory.put(msgInner.getBody()); // 16 TOPIC this.msgStoreItemMemory.put((byte) topicLength); this.msgStoreItemMemory.put(topicData); // 17 PROPERTIES this.msgStoreItemMemory.putShort((short) propertiesLength); if (propertiesLength > 0) this.msgStoreItemMemory.put(propertiesData); final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); // Write messages to the queue buffer byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen); //... return result; }
數(shù)據(jù)追加的整體流程如下圖
以上是“RocketMQ中broker消息存儲之如何添加消息”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學習更多知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!
當前標題:RocketMQ中broker消息存儲之如何添加消息
網(wǎng)站路徑:http://www.rwnh.cn/article46/jcjjeg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供域名注冊、軟件開發(fā)、品牌網(wǎng)站設(shè)計、網(wǎng)站營銷、企業(yè)建站、建站公司
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)