定義:
- Kafka 是一個分布式的基于發(fā)布/訂閱默認(rèn)的消息隊列
- 是一個開源的分布式事件流平臺,被常用用于數(shù)據(jù)管道、流分析、數(shù)據(jù)集成、關(guān)鍵任務(wù)應(yīng)用
消費模式:
- 點對點模式 (少用)
消費者主動拉取數(shù)據(jù),消息收到后清除消息![]()
- 發(fā)布/訂閱模式
生產(chǎn)者推送消息到隊列,都消費者訂閱各自所需的消息![]()
基本概念:
- Producer: 消息生產(chǎn)者
- Consumer: 消費者
- Consumer: Group 消費者組,消費者組id相同得消費者為一個消費者組;一個消費者也為一個消費者組去消費
- Broker: kafka服務(wù)器
- Topic :消息主題, 數(shù)據(jù)分類
- Partition :分區(qū),一個Tpoic 有多個分區(qū)組成
- Replica : 副本,每個分區(qū)對應(yīng)多個副本
- Leader:副本里包含leader、follower ;生產(chǎn)以及消費只針對 leader
生產(chǎn)者發(fā)送流程:
- producer -> send(producerRecord) -> interceprots 攔截器 -> Serializer 序列化器 -> Partitioner 分區(qū)器
- 當(dāng)數(shù)據(jù)累積到
batch.size
之后,sender才會發(fā)送數(shù)據(jù);默認(rèn)16k- 如果數(shù)據(jù)遲遲未達(dá)到batch.size , sender等待
linger.ms
設(shè)置的時間,到了之后就會發(fā)送數(shù)據(jù)。單位ms.默認(rèn)值0ms
,標(biāo)識沒有延遲compression.type
數(shù)據(jù)壓縮方式RecordAccumulator
緩沖區(qū)大小,默認(rèn)32m- 應(yīng)答模式
ack
- 0: 生產(chǎn)者發(fā)送數(shù)據(jù)后,不需要等待數(shù)據(jù)應(yīng)答
- 1:生產(chǎn)者發(fā)送過來的數(shù)據(jù),Leader收到數(shù)據(jù)后應(yīng)答
- 1:all leader與其它所有節(jié)點收齊數(shù)據(jù)后應(yīng)答
![]()
消費大概邏輯:
消費者組(Consumer Group (CG)):
groupid
相同的消費者形成一個消費者組- 消費者組內(nèi)每個消費者負(fù)責(zé)消費不同分區(qū)的數(shù)據(jù),一個分區(qū)只能由一個組內(nèi)的一個消費者消費
- 消費者組之間互不影響
- 當(dāng)消費者組的數(shù)量,大于分區(qū)數(shù),則會有
閑置
coordinator
:輔助實現(xiàn)消費者組的初始化
和分區(qū)的分配
- 每個節(jié)點有個
coordinator
, 通過groupid % 50
,選擇出coordinator
節(jié)點 50為 _consumer_offset 的分區(qū)數(shù)- 1%50 = 1 ,
_consumer_offset
的 號分區(qū)上的coordinator
則為 leadercoordinator
再消費者組中隨機選擇一個 consumer 成為leader,由leader 制定消費計劃,讓后返回給coordinator
,再由coordinator
來把消費技化 分配給其它消費者coordinator
與消費者的心跳
保持時間3秒
,45秒 超時
- 會移除消費者,觸發(fā)再平衡
- 消費者消費時間過長,默認(rèn)
5分鐘
- 會移除消費者觸發(fā)再平衡
消費流程:
- 創(chuàng)建消費者網(wǎng)絡(luò)連接客戶端
ConsumerNetworkClient
,與kafka交互- 消費請求初始化:每批次
最小抓取大小
、數(shù)據(jù)未達(dá)到超時 時間 500ms 、抓取數(shù)據(jù)大小上限- 發(fā)送消費請求 -》onSuccess() 回調(diào), 拉取數(shù)據(jù) -》 按批次放入消息隊列
- 消費者從 消息隊列每批次消費數(shù)據(jù) (500條) -》反序列化 -》攔截器 -》 處理數(shù)據(jù)
消費計劃(分區(qū)分配策略)默認(rèn) Range + CooperativeSticky:
- Range:針對
每一個topic
,對topic分區(qū)排序、消息者排序,通過分區(qū)數(shù) / 消費者數(shù),決定每個消息者消費幾個分區(qū),除不盡的前面的消費者多消費。容易產(chǎn)生數(shù)據(jù)傾斜
![]()
- RoundRobin:輪詢分區(qū)策略,
針對所有topic
,把所有topic的分區(qū)和消費者列出來,按照hashcode進(jìn)行排序,通過輪詢算法
把分區(qū)分配給消費者- Sticky :黏性 (執(zhí)行新的分配的時,盡量靠近上次的分配結(jié)果),首先回盡量的均勻,且隨機分配分區(qū)到消費者
- CooperativeSticky:協(xié)作者黏性,Sticky 的策略相同,但支持合作式再平衡,消費者可以繼續(xù)從沒有被重新分配的分區(qū)消費
offset 位移: 是標(biāo)記消費消費位置
- <0.9 : 是維護(hù)在 zookeeper中
- 0.9 之后:offset 維護(hù)在一個內(nèi)置的 topic :_consumer_offsets 中
- 采用 key - value 方式存儲數(shù)據(jù),key:groupid +topic + 分區(qū)號
- offset
自動提交
:默認(rèn)每5秒自動提交offset ,默認(rèn)
就是 true- offset
手動提交
:消費的時候,手動提交offset
- 同步:等待提交offset成功,再消費下一條
- 異步:不等待,直接消費,失敗后沒有重試機制
- 指定offset 消費:
earliest
: 自動將偏移量重置為最早的偏移量 --from-beginninglatest
(默認(rèn)): 自動將偏離量充值為最新偏移量nono
: 如果未找到消費者組的先前偏移量,則向消費者拋出異常。
//設(shè)置自動提交offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
//自動提交時間 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"5000");
//offset 手動提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
KafkaConsumer kafkaConsumer = new KafkaConsumer<String,String>(properties);
//定義主題
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
//訂閱
kafkaConsumer.subscribe(topics);
while (true){
ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
if (CollectionUtil.isNotEmpty(consumerRecords)){
for (ConsumerRecord<String, String> record : consumerRecords) {
System.out.println(record);
}
}
//手動提交offset
kafkaConsumer.commitAsync();
}
指定時間消費:
//查詢對應(yīng)分區(qū)
Set<TopicPartition> partitions = kafkaConsumer.assignment();
//保證分區(qū)分配方案定制完畢
while (partitions.size()==0){
kafkaConsumer.poll(Duration.ofSeconds(1));
partitions=kafkaConsumer.assignment();
}
//把時間轉(zhuǎn)換成對應(yīng)的 offset
Map<TopicPartition,Long> map = new HashMap<>(6);
Map<TopicPartition,Long> offsetmap = kafkaConsumer.offsetsForTimes(map);
for (TopicPartition topicPartition : partitions) {
//一天前
offsetmap.put(topicPartition,System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
}
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimeMap = kafkaConsumer.offsetsForTimes(offsetmap);
for (TopicPartition partition : partitions) {
OffsetAndTimestamp timestamp = offsetsForTimeMap.get(partition);
kafkaConsumer.seek(partition,timestamp.offset());
}
kafka 文件存儲機制 :
Topic
是邏輯上的概念,partition
是物理上的概念, 每個partition
對應(yīng)一個log文件
。該log文件中存儲的就是 producer生產(chǎn)的數(shù)據(jù)- Producer生產(chǎn)的數(shù)據(jù),會被不斷追加到該log文件末端,為防止log文件過大導(dǎo)致數(shù)據(jù)定位效率低,kafka采取了分片和索引機制
- 每個partition分為多個
segment
,每個segment包含, .index .log .timeindex .snapshot 文件- 這些文件位于一個文件夾下,該文件夾命名規(guī)則為:topic名稱+分區(qū)序號 first-0
![]()
- 稀疏索引:大約每往log文件寫入 4kb數(shù)據(jù),會往index文件寫入一條索引。
- index文件中保存的 odffset是
相對offset
,這樣能確保 offset得值所占空間不會過大,因此能將offset得值控制在固定大小
文件清除、壓縮策略:
- kafka 默認(rèn)日志保存時間為 7 天
- 壓縮策略:compact,對應(yīng)相同key的value,只保留最新的一個版本。
kafka 高效讀寫:
- Kafka 本身是分布式集群,可以采用分區(qū)技術(shù),并行度高
- 讀數(shù)據(jù)采用
稀疏索引
,可以快速定位要消費得數(shù)據(jù)- 順序?qū)懘疟P,kafka得producer生產(chǎn)數(shù)據(jù),要寫入
log文件
中,寫得過程是一直追加到文件末端,為順序?qū)?/code>
零拷貝
: Kaka的數(shù)據(jù)加工處理操作交由Kaka生產(chǎn)者和Kaka消費者處理。Kaka Broker應(yīng)用層不關(guān)心存儲的數(shù)據(jù),所以就不用走應(yīng)用層,傳輸效率高。- 頁緩存: Kaka重度依賴底層操作系統(tǒng)提供的PageCache功能。當(dāng)上層有寫操作時,操作系統(tǒng)只是將數(shù)據(jù)寫PageCache。當(dāng)讀操作發(fā)生時,先從PageCache中查找,如果找不到,再去磁盤中讀取。實際上PageCache是把盡可能多的空閑內(nèi)存都當(dāng)做了磁盤緩存來使用。
常用腳本命名:
- topic 相關(guān)命令 :
- 查詢topic列表 :
sh kafka-topics.sh --bootstrap-server localhost:9092 --list
- 創(chuàng)建topic (名稱:first 分區(qū):1個 副本 3個)副本數(shù)量不能超過集群數(shù)量
sh kafka-topics.sh --bootstrap-server localhost:9092 --topic first --create --partitions 1 --replication-factor 3
- topic 信息
sh kafka-topics.sh --bootstrap-server localhost:9092 --topic first --describe
- 修改topic 分區(qū)數(shù)(只能增加)
sh kafka-topics.sh --bootstrap-server localhost:9092 --topic first --describe --partitions 3
- 生產(chǎn)消息:
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first
- 消費消費:
sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first --from-beginning
Spring boot 簡單整合:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
server:
port: 8200
spring:
mvc:
pathmatch:
matching-strategy: ant_path_matcher
application:
name: @artifactId@
kafka:
bootstrap-servers:
- 192.168.1.250:32010
# 生產(chǎn)配置
producer:
#序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
linger.ms: 10 #sender 等待事件
#ssl認(rèn)證配置相關(guān)
# sasl.mechanism: PLAIN
# security.protocol: SASL_PLAINTEXT
# sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
#緩存區(qū)大小 32m
buffer-memory: 33554432
#批次大小 16k
batch-size: 16
# ISR 全部應(yīng)答
#acks: -1
#事務(wù)ID前綴 ,配合 @Transactional ,保證多個消息的原子性
#transaction-id-prefix: "transaction-id-xx"
#消費配置
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#group-id: xiaoshu-1
enable-auto-commit: false
# 從最早消息開始消費,但是消費后,會記錄offset、相同 group-id不會再次消費
# offset 是針對每個消費者組
auto-offset-reset: earliest
#批量消費,每次最多消費多少條
#max-poll-records: 50
#ssl認(rèn)證配置相關(guān)
# properties:
# sasl.mechanism: PLAIN
# security.protocol: SASL_PLAINTEXT
# sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
listener:
# 手動調(diào)用Acknowledgment.acknowledge()后立即提交
ack-mode: manual
#批量消費,配合 @KafkaListener - batch="true"
#type: batch
生產(chǎn):文章來源:http://www.zghlxwxcb.cn/news/detail-434853.html
@Resource
private KafkaTemplate<String,String> kafkaTemplate;
//@Transactional(rollbackFor = RuntimeException.class),配合 ack配置 實現(xiàn)多條消息發(fā)送,原子性
@ApiOperation(value = "推送消息到kafak")
@GetMapping("/sendMsg")
public String sendMsg(String topic,String msg){
kafkaTemplate.send(topic,msg).addCallback(success -> {
if (success==null){
System.out.println("消息發(fā)送失敗");
return;
}
// 消息發(fā)送到的topic
String topicName = success.getRecordMetadata().topic();
// 消息發(fā)送到的分區(qū)
int partition = success.getRecordMetadata().partition();
// 消息在分區(qū)內(nèi)的offset
long offset = success.getRecordMetadata().offset();
System.out.println("發(fā)送消息成功:" + topic + "-" + partition + "-" + offset);
}, failure -> {
System.out.println("發(fā)送消息失敗:" + failure.getMessage());
});
return "ok";
}
消費:文章來源地址http://www.zghlxwxcb.cn/news/detail-434853.html
@Configuration
public class KafkaConsumer {
private static final String TOPIC_DLT=".DLT";
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
/**
* 每個分區(qū)由消費者組種得一個消費者消費,每個消費者獨立
* 分區(qū) -》 消費 、2分區(qū)2個消費監(jiān)聽
* @param record
* @param consumer
*/
@KafkaListener(groupId = "group-1", topicPartitions ={ @TopicPartition(topic = "four",partitions = {"0"})},batch = "false")
public void consumerTopic1(ConsumerRecord<String, String> record, Consumer consumer){
String value = record.value();
String topic1 = record.topic();
long offset = record.offset();
int partition = record.partition();
try {
log.info("收到消息:"+value+"topic:"+topic1+"offset:"+offset+"分區(qū)"+partition);
//TODO 異常,推送到 對應(yīng)死信 ↓
//int i=1/0;
} catch (Exception e) {
System.out.println("commit failed");
kafkaTemplate.send(topic1+TOPIC_DLT,value);
} finally {
consumer.commitAsync();
}
}
@KafkaListener(groupId = "group-1", topicPartitions ={ @TopicPartition(topic = "four",partitions = {"1"})},batch = "false")
public void consumerTopic2(ConsumerRecord<String, String> record, Consumer consumer){
String value = record.value();
String topic1 = record.topic();
long offset = record.offset();
int partition = record.partition();
try {
log.info("收到消息:"+value+"topic:"+topic1+"offset:"+offset+"分區(qū)"+partition);
//TODO 異常,推送到 對應(yīng)死信 ↓
//int i=1/0;
} catch (Exception e) {
System.out.println("commit failed");
kafkaTemplate.send(topic1+TOPIC_DLT,value);
} finally {
consumer.commitAsync();
}
}
}
/**
* 監(jiān)聽 topic1 ->轉(zhuǎn)發(fā)到 topic2
*/
@KafkaListener(topics = {"topic1"},groupId = "group-4")
@SendTo("topic2")
public String onMessage7(ConsumerRecord<?, ?> record) {
return record.value()+"-轉(zhuǎn)發(fā)消息";
}
@KafkaListener(topics = {"topic2"},groupId = "group-5")
public void onMessage8(ConsumerRecord<?, ?> record) {
System.out.println("收到轉(zhuǎn)發(fā)消息"+record.value());
}
到了這里,關(guān)于Kafka 基礎(chǔ)整理、 Springboot 簡單整合的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!