最近項目組的kafka集群,老是由于應用端寫入kafka topic的消息太多,導致所在的broker節(jié)點占滿,導致其他的組件接連宕機。
這里和應用端溝通可以刪除1天之前的消息來清理磁盤,并且可以調整topic的消息存活時間。
一、調整Topic的消息存活時長刪除消息
kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name topicName --alter --add-config retention.ms=86400000
如上調整topic的消息存活時長為為1天,當執(zhí)行完之后執(zhí)行查詢topic詳細信息,可以看到已經發(fā)生了修改,并且過一會過期的消息會被刪除。
kafka-topics --bootstrap-server localhost:9092 --describe --topic topicName
二、不修改Topic消息存活時長刪除消息
1.登錄到相應的機器上。
2.找到寫滿的磁盤,刪除掉不需要的業(yè)務數據。數據清理原則:
- 不可直接刪除Kafka的數據目錄,避免造成不必要的數據丟失。
- 找到占用空間較多或者明確不需要的Topic,選擇其中某些Partition,從最早的日志數據開始刪除。刪除segment及相應地index和timeindex文件。不要清理內置的Topic,例如__consumer_offsets和_schema等。
3.重啟磁盤被寫滿的相應的Broker節(jié)點,使日志目錄online。
參考:Kafka磁盤寫滿時如何運維操作_開源大數據平臺E-MapReduce-阿里云幫助中心 (aliyun.com)
怎么刪除kafka中的數據-火山引擎 (volcengine.com)
三、Kafka消息清理策略
在Kafka中,存在數據過期的機制,稱為data expire。如何處理過期數據是根據指定的policy(策略)決定的,而處理過期數據的行為,即為log cleanup。
在Kafka中有以下幾種處理過期數據的策略:
log.cleanup.policy=delete(Kafka中所有用戶創(chuàng)建的topics,默認均為此策略)
- 根據數據已保存的時間,進行刪除(默認為1周)
- 根據log的max size,進行刪除(默認為-1,也就是無限制)
log.cleanup.policy=compact(topic __consumer_offsets?默認為此策略)
- 根據messages中的key,進行刪除操作
- 在active segment?被commit?后,會刪除掉old duplicate keys
- 無限制的時間與空間的日志保留
自動清理Kafka中的數據可以控制磁盤上數據的大小、刪除不需要的數據,同時也減少了對Kafka集群的維護成本。
那Log cleanup?在什么時候發(fā)生呢?
- 首先值得注意的是:log cleanup?在partition segment?上發(fā)生
- 更小/更多的segment,也就意味著log cleanup?發(fā)生的頻率會上升
- Log cleanup?不應該頻繁發(fā)生=>?因為它會消耗CPU與內存資源
- Cleaner的檢查會在每15秒進行一次,由log.cleaner.backoff.ms?控制
log.cleanup.policy=delete
log.cleanup.policy=delete?的策略,根據數據保留的時間、以及l(fā)og的max size,對數據進行cleanup??刂茢祿A魰r間以及l(fā)og max size的參數分別為:
log.retention.hours:指定數據保留的時常(默認為一周,168)
- 將參數調整到更高的值,也就意味著會占據更多的磁盤空間
- 更小值意味著保存的數據量會更少(假如consumer?宕機超過一周,則數據便會再未處理前即丟失)
log.retention.bytes:每個partition中保存的最大數據量大?。J為-1,也就是無限大)
- 再控制log的大小不超過一個閾值時,會比較有用
在到達log cleanup?的條件后,cleaner會自動根據時間或是空間的規(guī)則進行刪除,新數據仍寫入active segment:
針對于這個參數,一般有以下兩種使用場景,分別為:
log保留周期為一周,根據log保留期進行l(wèi)og cleanup:
- log.retention.hours=168?以及?log.retention.bytes=-1
log保留期為無限制,根據log大小進行進行l(wèi)og cleanup:
- log.retention.hours=17520以及?log.retention.bytes=524288000
其中第一個場景會更常見。
Log Compaction
Log compaction用于確保:在一個partition中,對任意一個key,它所對應的value都是最新的。
這里舉個例子:我們有個topic名為employee-salary,我們希望維護每個employee當前最新的工資情況。
左邊的是compaction前,segments中的數據,右邊為compaction?后,segments中的數據,其中有部分key對應的value有更新:
??
可以看到在log compaction后,相對于更新后的key-value message,舊的message被刪除。
Log Compaction?有如下特點:
- messages的順序仍然是保留的,log compaction?僅移除一些messages,但不會重新對它們進行排序
- 一條message的offset是無法改變的(immutable),如果一條message缺失,則offset會直接被跳過
- 被刪除的records在一段時間內仍然可以被consumers訪問到,這段時間由參數delete.retention.ms(默認為24小時)控制
需要注意的是:Kafka?本身是不會組織用戶發(fā)送duplicate data的。這些重復數據也僅會在一個segment在被commit?的時候做重復數據刪除,所以consumer仍會讀取到這部分重復數據(如果客戶端有發(fā)的話)。
Log Compaction也會有時失敗,compaction thread?可能會crash,所以需要確保給Kafka server?足夠的內存用于做這些操作。如果log compaction異常,則需要重啟Kafka(此為一個已知的bug)。
Log Compaction也無法通過API手動觸發(fā)(至少到現在為止是這樣),只能server端自動觸發(fā)。
下面是一個?Log Compaction過程的示意圖:
正在寫入的records仍會被寫入Active Segment,已經committed segments會自動做compaction。此過程會遍歷所有segments中的records,并移除掉所有需要被移除的messages。文章來源:http://www.zghlxwxcb.cn/news/detail-760503.html
Log compaction由上文提到的log.cleanup.policy=compact進行配置,其中:文章來源地址http://www.zghlxwxcb.cn/news/detail-760503.html
- Segment.ms(默認為7天):在關閉一個active segment前,所需等待的最長時間
- Segment.bytes(默認為1G):一個segment的最大大小
- Min.compaction .lag.ms(默認為0):在一個message可以被compact前,所需等待的時間
- Delete.retention.ms(默認為24小時):在一條message被加上刪除標記后,在實際刪除前等待的時間
- Min.Cleanable.dirty.ratio(默認為0.5):若是設置的更高,則會有更高效的清理,但是更少的清理操作觸發(fā)。若是設置的更低,則清理的效率稍低,但是會有更多的清理操作被觸發(fā)
到了這里,關于Kafka磁盤寫滿日志清理操作的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!