Kafka 是一款開源的消息引擎系統(tǒng),用來實現(xiàn)解耦的異步式數(shù)據(jù)傳遞。即系統(tǒng) A 發(fā)消息給到 消息引擎系統(tǒng),系統(tǒng) B 通過消息引擎系統(tǒng)讀取 A 發(fā)送的消息,在大數(shù)據(jù)場景下,能達到削峰填谷的效果。
2 Kafka 術(shù)語Kafka 中的分區(qū)機制指的是將每個主題(Topic)劃分成多個分區(qū)(Partition),每個分區(qū)是一組有序的消息日志。生產(chǎn)者生產(chǎn)的每條消息只會被發(fā)送到一個分區(qū)中,也就是說如果向一個雙分區(qū)的主題發(fā)送一條消息,這條消息要么在分區(qū) 0 中,要么在分區(qū) 1 中。Kafka 的分區(qū)編號是從 0 開始的,如果 Topic 有 100 個分區(qū),那么它們的分區(qū)號就是從 0 到 99。每個分區(qū)下可以配置若干個副本,其中只能有 1 個領(lǐng)導者副本和 N-1 個追隨者副本。
Kafka 的三層消息架構(gòu):
1)主題層,每個主題可以配置 M 個分區(qū),而每個分區(qū)又可以配置 N 個副本。
2)分區(qū)層,每個分區(qū)的 N 個副本中只能有一個充當領(lǐng)導者角色,對外提供服務(wù);其他 N-1 個副本是追隨者副本,只是提供數(shù)據(jù)冗余之用。
3)消息層,分區(qū)中包含若干條消息,每條消息的位移從 0 開始,依次遞增。最后,客戶端程序只能與分區(qū)的領(lǐng)導者副本進行交互。
Broker 如何持久化數(shù)據(jù)?
Kafka 使用消息日志(Log)來保存數(shù)據(jù),一個日志就是磁盤上一個只能追加寫(Append-only)消息的物理文件。因為只能追加寫入,故避免了緩慢的隨機 I/O 操作,改為性能較好的順序 I/O 寫操作,這也是實現(xiàn) Kafka 高吞吐量特性的一個重要手段。如果不停地向一個日志寫入消息,最終也會耗盡所有的磁盤空間,因此 Kafka 必然要定期地刪除消息以回收磁盤。怎么刪除呢?簡單來說就是通過日志段(Log Segment)機制。在 Kafka 底層,一個日志又進一步細分成多個日志段,消息被追加寫到當前最新的日志段中,當寫滿了一個日志段后,Kafka 會自動切分出一個新的日志段,并將老的日志段封存起來。Kafka 在后臺還有定時任務(wù)會定期地檢查老的日志段是否能夠被刪除,從而實現(xiàn)回收磁盤空間的目的。
3 生產(chǎn)者 3.1 消息發(fā)送Producer創(chuàng)建時,會創(chuàng)建一個Sender線程并設(shè)置為守護線程;
生產(chǎn)消息時,內(nèi)部是異步流程。生產(chǎn)的消息先經(jīng)過攔截器->序列化器->分區(qū)器,然后將消息緩存在緩沖區(qū)(該緩沖區(qū)也是在Producer創(chuàng)建時創(chuàng)建);
批次發(fā)送的條件為:緩沖區(qū)數(shù)據(jù)大小達到 batch.size 或者 linger.ms 達到上限,哪個先達到就算哪個;
批次發(fā)送后,發(fā)往指定分區(qū),然后落盤到broker;如果生產(chǎn)者配置了 retrires 參數(shù)大于 0 并且失敗原因允許重試,那么客戶端內(nèi)部會對該消息進行重試;
落盤到broker成功,返回生產(chǎn)元數(shù)據(jù)給生產(chǎn)者;
元數(shù)據(jù)返回有兩種方式:一種是通過阻塞直接返回,另一種是通過回調(diào)返回。
主題是承載真實數(shù)據(jù)的邏輯容器,而在主題之下還分為若干個分區(qū),主題下的每條消息只會保存在某一個分區(qū)中,而不會在多個分區(qū)中被保存多份。
為什么使用分區(qū)的概念而不是直接使用多個主題呢?
對數(shù)據(jù)進行分區(qū)的主要原因是為了實現(xiàn)系統(tǒng)的高伸縮性(Scalability)。不同的分區(qū)能夠被放置到不同節(jié)點的機器上,而數(shù)據(jù)的讀寫操作也都是針對分區(qū)這個粒度而進行的,這樣每個節(jié)點的機器都能獨立地執(zhí)行各自分區(qū)的讀寫請求處理。并且,還可以通過添加新的節(jié)點機器來增加整體系統(tǒng)的吞吐量。
3.3.1 分區(qū)策略所謂分區(qū)策略是決定生產(chǎn)者將消息發(fā)送到哪個分區(qū)的算法。
輪詢策略
順序分配。比如一個主題下有 3 個分區(qū),那么第一條消息被發(fā)送到分區(qū) 0,第二條被發(fā)送到分區(qū) 1,第三條被發(fā)送到分區(qū) 2,以此類推。當生產(chǎn)第 4 條消息時又會重新開始,即將其分配到分區(qū) 0。
輪詢策略有非常優(yōu)秀的負載均衡表現(xiàn),它總是能保證消息大限度地被平均分配到所有分區(qū)上,故默認情況下它是最合理的分區(qū)策略,也是最常用的分區(qū)策略之一。?
隨機策略
隨意地將消息放置到任意一個分區(qū)上
Key-ordering 策略
Kafka 允許為每條消息定義消息鍵,簡稱為 Key。這個 Key 可以是一個有著明確業(yè)務(wù)含義的字符串,比如客戶代碼、部門編號或是業(yè)務(wù) ID 等。一旦消息被定義了 Key,就可以保證同一個 Key 的所有消息都進入到相同的分區(qū)里面,由于每個分區(qū)下的消息處理都是有順序的。
假設(shè)有一個服務(wù)需要監(jiān)聽某個公眾號用戶關(guān)注取關(guān)的事件,發(fā)送的消息必須要保證有序性,不然會導致結(jié)果混亂。如果給 Kafka 主題只設(shè)置 1 個分區(qū),這樣所有的消息都只在這一個分區(qū)內(nèi)讀寫,因此保證了全局的順序性。
這樣做雖然實現(xiàn)了因果關(guān)系的順序性,但也喪失了 Kafka 多分區(qū)帶來的高吞吐量和負載均衡的優(yōu)勢。
可以在消息體中封裝了固定的標志位,并對此標志位設(shè)定專門的分區(qū)策略,保證同一標志位的所有消息都發(fā)送到同一分區(qū),這樣既可以保證分區(qū)內(nèi)的消息順序,也可以享受到多分區(qū)帶來的性能紅利。
4 消費者 4.1?消費組(Consumer Group)消費組是 kafka 提供的可擴展且具有容錯性的消費者機制,是 Kafka 實現(xiàn)單播和廣播兩種消息模型的手段。
多個從同一個主題消費的消費者可以加入到一個消費組中,消費組中的消費者共享 Group Id。組內(nèi)的所有消費者協(xié)調(diào)在一起來消費訂閱主題的所有分區(qū),每個分區(qū)只能由同一個消費者組內(nèi)的一個 Consumer 實例來消費。
4.2?消費消息Consumer 采用 pull 模式從 broker 中讀取數(shù)據(jù),可以自主控制消費方式,逐條消費或批量消費。
4.2.1 位移提交Consumer 需要向 Kafka 記錄自己的位移數(shù)據(jù),這個匯報過程稱為 提交位移(Committing Offsets)。這個過程非常靈活,可以提交任何位移值,但也會由此產(chǎn)生系列不好的結(jié)果。假設(shè)?Consumer 消費了 10 條消息,提交的位移值卻是 20,那么位移介于 11~19 之間的消息是有可能丟失的;相反地,如果提交的位移值是 5,那么位移介于 5~9 之間的消息就有可能被重復消費。
自動提交
1)開啟自動提交: enable.auto.commit=true,默認為 true
2)配置自動提交間隔: auto.commit.interval.ms ,默認 5s
自動提交會導致消息被重復消費
雖然能通過減少 auto.commit.interval.ms 的值來提高提交頻率,但這么做只能縮小重復消費的時間窗口,不可能完全消除它。
手動同步提交
使用 KafkaConsumer#commitSync(),會提交 KafkaConsumer#poll() 返回的最新 offset。
該方法為同步操作,等待直到 offset 被成功提交才返回。
while (true) {
ConsumerRecordsrecords =
consumer.poll(Duration.ofSeconds(1)); process(records); // 處理消息
try {
// Consumer 程序會處于阻塞狀態(tài),直到遠端的 Broker 返回提交結(jié)果
consumer.commitSync();
} catch (CommitFailedException e) {
// 處理提交失敗異常
handle(e);
}
}
手動異步提交
while (true) {
ConsumerRecordsrecords =
consumer.poll(Duration.ofSeconds(1));
// 處理消息
process(records);
// 會立即返回結(jié)果,不會阻塞
consumer.commitAsync((offsets, exception) ->{
if (exception != null)
handle(exception);
});
}
但?commitAsync 不能替代?commitSync,因為出現(xiàn)問題時它不會自動重試。由于是異步操作,倘若提交失敗后自動重試,那么它重試時提交的位移值可能早已經(jīng)“過期”或不是最新值了。因此,異步提交的重試其實沒有意義,所以 commitAsync 是不會重試的。
手動同步提交與異步提交結(jié)合
try {
while (true) {
ConsumerRecordsrecords =
consumer.poll(Duration.ofSeconds(1));
// 處理消息
process(records);
// 使用異步提交規(guī)避阻塞
commitAysnc();
}
} catch (Exception e) {
// 處理異常
handle(e);
} finally {
try {
// Consumer 要關(guān)閉前使用同步阻塞式提交,以確保 Consumer 關(guān)閉前能夠保存正確的位移數(shù)據(jù)
consumer.commitSync();
} finally {
consumer.close();
}
}
4.2.2 位移管理Kafka默認定期自動提交位移( enable.auto.commit = true ),也手動提交位移。另外kafka會定期把group消費情況保存起來,做成一個offset map?
位移管理機制將 Consumer 的位移數(shù)據(jù)作為一條條普通的 Kafka 消息,提交到 __consumer_offsets 主題中。
5 異常處理如何保證消息不被重復消費?
如何保證消息消費的冪等性?
如何防止消息丟失?
如何處理消息積壓?
消費慢了怎么處理?
你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機房具備T級流量清洗系統(tǒng)配攻擊溯源,準確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級服務(wù)器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧
名稱欄目:學到羊之Kafka-創(chuàng)新互聯(lián)
分享URL:http://www.rwnh.cn/article10/dscego.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站內(nèi)鏈、品牌網(wǎng)站建設(shè)、小程序開發(fā)、網(wǎng)站維護、手機網(wǎng)站建設(shè)、面包屑導航
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)