目錄
1、核心概念
消息和批次
Topic和Partition
Replicas
Offset
broker和集群
生產(chǎn)者和消費者
2、開發(fā)實戰(zhàn)
2.1、消息發(fā)送
介紹
代碼實現(xiàn)
2.2、消息消費
介紹
代碼實現(xiàn)
2.3、SpringBoot Kafka
pom
application.yaml
KafkaConfig
producer
consumer
1、核心概念
消息和批次
????????kafka的基本數(shù)據(jù)單元,由字節(jié)數(shù)組組成??梢岳斫獬蓴?shù)據(jù)庫的一條數(shù)據(jù)。
????????批次就是一組消息,把同一個主題和分區(qū)的消息分批次寫入kafka,可以減少網(wǎng)絡(luò)開銷,提高效率;批次越大,單位時間內(nèi)處理的消息就越多,單個消息的傳輸時間就越長。
Topic和Partition
? ? ? ? topic主題,kafka通過主題進行分類。主題可以理解成數(shù)據(jù)庫的表或者文件系統(tǒng)里的文件夾。
? ? ? ? partition分區(qū)可以理解成一個FIFO的消息隊列。(同一個分區(qū)的消息保證順序消費)
????????主題可以被分為若干分區(qū),一個主題通過分區(qū)將消息存儲在kafka集群中,提供橫向擴展的能力。消息以追加的方式寫入分區(qū),每個分區(qū)保證先入先出的順序讀取。在需要嚴格保證消息順序消費的場景下,可以將partition設(shè)置為1,即主題只有一個分區(qū)。
? ? ? ? 主題的分區(qū)策略有如下幾種:
- 直接指定分區(qū);
- 根據(jù)消息的key散列取模得出分區(qū);
- 輪詢指定分區(qū)。
Replicas
- 副本,每個分區(qū)都有多個副本。其中包含一個首領(lǐng)副本和多個跟隨者副本。
- 首領(lǐng)副本用于響應(yīng)生產(chǎn)者的消息寫入請求與消費者的消息讀取請求;
- 跟隨者副本用于同步首領(lǐng)副本的數(shù)據(jù),保持與首領(lǐng)副本一致的狀態(tài),有數(shù)據(jù)備份的功能。
- 一旦首領(lǐng)副本所在的服務(wù)器宕機,就會從跟隨者中選出一個升級為首領(lǐng)副本。
Offset
? ? ? ? 偏移量。
? ? ? ? 生產(chǎn)者offset:每個分區(qū)都有一個offset,叫做生產(chǎn)者的offset,可以理解為當(dāng)前這個分區(qū)隊列的最大值,下一個消息來的時候,就會將消息寫入到offset這個位置。
? ? ? ? 消費者offset:每個消費者消費分區(qū)中的消息時,會記錄消費的位置(offset),下一次消費時就會從這個位置開始消費。
broker和集群
broker為一個獨立的kafka服務(wù)器;一個kafka集群里有多個broker。
? ? ? ? broker接收來自生產(chǎn)者的消息,為消息設(shè)置偏移量,并將消息保存到磁盤。同時,broker為消費者提供服務(wù),對讀取分區(qū)的請求做出響應(yīng),返回已經(jīng)保存到磁盤上的消息。(單個broker可以輕松處理數(shù)千個分區(qū)以及每秒百萬級的消息量)。
? ? ? ? 集群中同一個主題的同一個分區(qū),會在多個broker上存在;其中一個broker上的分區(qū)被稱為首領(lǐng)分區(qū),用于與生產(chǎn)者和消費者交互,其余broker上的分區(qū)叫做副本分區(qū),用于備份分區(qū)數(shù)據(jù),防止broker宕機導(dǎo)致消息丟失。
? ? ? ? 每個集群都有一個broker是集群控制器,作用如下:
- 將分區(qū)分配給首領(lǐng)分區(qū)的broker;
- 監(jiān)控broker,首領(lǐng)分區(qū)切換
生產(chǎn)者和消費者
????????生產(chǎn)者生產(chǎn)消息,消息被發(fā)布到一個特定的主題上。默認情況下,kafka會將消息均勻地分布到主題的所有分區(qū)上。分區(qū)策略有如下幾種:
- 直接指定分區(qū);
- 根據(jù)消息的key散列取模得出分區(qū);
- 輪詢指定分區(qū)。
? ? ? ? 消費者通過偏移量來區(qū)分已經(jīng)讀過的消息,從而消費消息。消費者是消費組的一部分,消費組可以保證每個分區(qū)只能被一個消費者使用,避免重復(fù)消費。
2、開發(fā)實戰(zhàn)
2.1、消息發(fā)送
介紹
- 生產(chǎn)者主要有KafkaProducer和ProducerRecord兩個對象:KafkaProducer用于發(fā)送消息,ProducerRecord用于封裝kafka消息。
- 生產(chǎn)者生產(chǎn)消息后,需要broker的確認,可以選擇同步或者異步確認:同步確認效率低;異步確認效率高,但需要設(shè)置回調(diào)對象。? ? ? ??
代碼實現(xiàn)
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
Map<String, Object> configs = new HashMap<>();
// 設(shè)置連接Kafka的初始連接?到的服務(wù)器地址
// 如果是集群,則可以通過此初始連接發(fā)現(xiàn)集群中的其他broker
configs.put("bootstrap.servers", "node1:9092");
// 設(shè)置key和value的序列化器
configs.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configs.put("acks", "1");
KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);
// 用于封裝Producer的消息
ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
"topic_1", // 主題名稱
0, // 分區(qū)編號,現(xiàn)在只有?個分區(qū),所以是0
0, // 數(shù)字作為key
"message 0" // 字符串作為value
);
// 發(fā)送消息,同步等待消息的確認
// producer.send(record).get(3_000, TimeUnit.MILLISECONDS);
// 使用回調(diào)異步等待消息的確認
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println(
"主題:" + metadata.topic() + "\n"
+ "分區(qū):" + metadata.partition() + "\n"
+ "偏移量:" + metadata.offset() + "\n"
+ "序列化的key字節(jié):" + metadata.serializedKeySize() + "\n"
+ "序列化的value字節(jié):" + metadata.serializedValueSize() + "\n"
+ "時間戳:" + metadata.timestamp()
);
} else {
System.out.println("有異常:" + exception.getMessage());
}
}
});
// 關(guān)閉連接
producer.close();
}
2.2、消息消費
介紹
????????消費者主要有KafkaConsumer對象,用于消費消息。Kafka不支持消息的推送,我們可以通過消息拉取(poll)方式實現(xiàn)消息的消費。KafkaConsumer主要參數(shù)如下:
代碼實現(xiàn)
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
// 指定bootstrap.servers屬性作為初始化連接Kafka的服務(wù)器。
// 如果是集群,則會基于此初始化連接發(fā)現(xiàn)集群中的其他服務(wù)器。
configs.put("bootstrap.servers", "node1:9092");
// key和value的反序列化器
configs.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
configs.put("group.id", "consumer.demo");
// 創(chuàng)建消費者對象
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(configs);
final Pattern pattern = Pattern.compile("topic_[0-9]");
// 消費者訂閱主題或分區(qū)
// consumer.subscribe(pattern);
// consumer.subscribe(pattern, new ConsumerRebalanceListener() {
final List<String> topics = Arrays.asList("topic_1");
consumer.subscribe(topics, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
partitions.forEach(tp -> {
System.out.println("剝奪的分區(qū):" + tp.partition());
});
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
partitions.forEach(tp -> {
System.out.println(tp.partition());
});
}
});
// 拉取訂閱主題的消息
final ConsumerRecords<Integer, String> records = consumer.poll(3_000);
// 獲取topic_1主題的消息
final Iterable<ConsumerRecord<Integer, String>> topic1Iterable = records.records("topic_1");
// 遍歷topic_1主題的消息
topic1Iterable.forEach(record -> {
System.out.println("========================================");
System.out.println("消息頭字段:" + Arrays.toString(record.headers().toArray()));
System.out.println("消息的key:" + record.key());
System.out.println("消息的值:" + record.value());
System.out.println("消息的主題:" + record.topic());
System.out.println("消息的分區(qū)號:" + record.partition());
System.out.println("消息的偏移量:" + record.offset());
});
// 關(guān)閉消費者
consumer.close();
}
2.3、SpringBoot Kafka
pom
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
application.yaml
spring:
kafka:
bootstrap-servers: node1:9092 # 用于建立初始連接的broker地址
producer:
key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
batch-size: 16384 # 默認的批處理記錄數(shù)
buffer-memory: 33554432 # 32MB的總發(fā)送緩存
consumer:
key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: spring-kafka-02-consumer # consumer的消費組id
enable-auto-commit: true # 是否自動提交消費者偏移量
auto-commit-interval: 100 # 每隔100ms向broker提交一次偏移量
auto-offset-reset: earliest # 如果該消費者的偏移量不存在,則自動設(shè)置為最早的偏移量
KafkaConfig
@Configuration
public class KafkaConfig {
@Bean
public NewTopic topic1() {
return new NewTopic("ntp-01", 5, (short) 1);
}
@Bean
public NewTopic topic2() {
return new NewTopic("ntp-02", 3, (short) 1);
}
}
producer
@RestController
public class KafkaSyncProducerController {
@Autowired
private KafkaTemplate template;
@RequestMapping("send/sync/{message}")
public String sendSync(@PathVariable String message) {
ListenableFuture future = template.send(new ProducerRecord<Integer, String>("topic-spring-02", 0, 1, message));
try {
// 同步等待broker的響應(yīng)
Object o = future.get();
SendResult<Integer, String> result = (SendResult<Integer, String>) o;
System.out.println(result.getRecordMetadata().topic() + result.getRecordMetadata().partition() + result.getRecordMetadata().offset());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return "success";
}
}
@RestController
public class KafkaAsyncProducerController {
@Autowired
private KafkaTemplate<Integer, String> template;
@RequestMapping("send/async/{message}")
public String asyncSend(@PathVariable String message) {
ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("topic-spring-02", 0, 3, message);
ListenableFuture<SendResult<Integer, String>> future = template.send(record);
// 添加回調(diào),異步等待響應(yīng)
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>(){
@Override
public void onFailure(Throwable throwable) {
System.out.println("發(fā)送失敗: " + throwable.getMessage());
}
@Override
public void onSuccess(SendResult<Integer, String> result) {
System.out.println("發(fā)送成功:" + result.getRecordMetadata().topic() + "\t" + result.getRecordMetadata().partition() + "\t" + result.getRecordMetadata().offset());
}
});
return "success";
}
}
consumer
@Component
public class MyConsumer {
@KafkaListener(topics = "topic-spring-02")
public void onMessage(ConsumerRecord<Integer, String> record) {
Optional<ConsumerRecord<Integer, String>> optional = Optional.ofNullable(record);
if (optional.isPresent()) {
System.out.println(record.topic() + "\t" + record.partition() + "\t" + record.offset() + "\t" + record.key() + "\t" + record.value());
}
}
}
以上內(nèi)容為個人學(xué)習(xí)理解,如有問題,歡迎在評論區(qū)指出。文章來源:http://www.zghlxwxcb.cn/news/detail-704385.html
部分內(nèi)容截取自網(wǎng)絡(luò),如有侵權(quán),聯(lián)系作者刪除。文章來源地址http://www.zghlxwxcb.cn/news/detail-704385.html
到了這里,關(guān)于kafka學(xué)習(xí)-基本概念與簡單實戰(zhàn)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!