1、kafka生產(chǎn)者
1.1 生產(chǎn)者消息發(fā)送流程
1.1.1 發(fā)送原理
在消息發(fā)生的過程中,設(shè)計(jì)到了兩個(gè)線程——main線程和Sender線程。在main線程中創(chuàng)建了一個(gè)雙端隊(duì)列RecordAccumulator。main線程將消息發(fā)給RecordAccumulator,Sender線程不斷從RecordAccumulator中拉取消息發(fā)送到Kafka Broker。
- batch.size:只有數(shù)據(jù)積累到batch.size之后,sender才會(huì)發(fā)送數(shù)據(jù)。默認(rèn)16k
- linger.ms:如果數(shù)據(jù)遲遲未達(dá)到batch.size,sender等待linger.ms設(shè)置的時(shí)間到了之后就會(huì)發(fā)送數(shù)據(jù)。單位ms,默認(rèn)值數(shù)0ms,表示沒有延遲。
應(yīng)答acks:
- 0:生產(chǎn)者發(fā)生過來的數(shù)據(jù),不需要等數(shù)據(jù)落盤應(yīng)答。
- 1:生產(chǎn)者發(fā)生過來的數(shù)據(jù),Leader收到數(shù)據(jù)后應(yīng)答
- -1(all):生產(chǎn)者發(fā)送過來的數(shù)據(jù),Leader和ISR隊(duì)列里面的所有節(jié)點(diǎn)收齊數(shù)據(jù)后應(yīng)答。-1和all等價(jià)。
1.1.2 生產(chǎn)者重要參數(shù)列表
參數(shù)名稱 | 描述 |
---|---|
bootstrap.servers | 生產(chǎn)者連接集群所需的Broker地址清單。例如hadoop102:9092,hadoop103:9092,hadoop104:9092,可以設(shè)置1個(gè)或多個(gè),中間用逗號(hào)隔開。注意這里并非需要所有broker地址,因?yàn)樯a(chǎn)者從給定的broker里查到其他broker信息 |
key.serializer和value.serializer | 指定發(fā)生信息的key和value的序列化類型。一定要寫全類名 |
buffer.memory | RecordAccumulator緩沖區(qū)總大小,默認(rèn)32MB |
batch.size | 緩沖區(qū)一批數(shù)據(jù)最大值,默認(rèn)16K。適當(dāng)增加該值,可以提高吞吐量,但是如果該值設(shè)置太大,會(huì)導(dǎo)致數(shù)據(jù)傳輸延遲增加 |
linger.ms | 如果數(shù)據(jù)遲遲未到batch.size,sender等待linger.time之后就會(huì)發(fā)送數(shù)據(jù)。單位ms,默認(rèn)值是0ms,表示沒有延遲。生產(chǎn)環(huán)境建議該值大小5-100ms之間 |
acks | 0:生產(chǎn)者發(fā)生過來的數(shù)據(jù),不需要等數(shù)據(jù)落盤應(yīng)答。1: 生產(chǎn)者發(fā)送過來的數(shù)據(jù),Leader收到數(shù)據(jù)后應(yīng)答。-1(all):生產(chǎn)者發(fā)給過來的數(shù)據(jù),Leader和isr隊(duì)列里面的所有節(jié)點(diǎn)收齊數(shù)據(jù)后應(yīng)答。默認(rèn)值是-1,-1和all是等價(jià)的 |
max.in.flight.requests.per.connection | 允許最多沒有返回ack的次數(shù),默認(rèn)為5,開啟冪等性包保證該值是1-5的數(shù)字 |
retries | 當(dāng)消息發(fā)給出現(xiàn)錯(cuò)誤的時(shí)候,系統(tǒng)會(huì)重發(fā)消息。retries表示重試的次數(shù)。默認(rèn)是int的最大值,2147483647.如果設(shè)置了重試,還想抱著消息的有序性,需要設(shè)置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否則在重試此失敗消息的時(shí)候,其他的消息可能發(fā)送成功了 |
retry.backoff.ms | 兩次重試之間的時(shí)間間隔,默認(rèn)是 100ms。 |
enable.idempotence | 是否開啟冪等性,默認(rèn) true,開啟冪等性。 |
compression.type | 生產(chǎn)者發(fā)送的所有數(shù)據(jù)的壓縮方式。默認(rèn)是 none,也就是不壓縮。支持壓縮類型:none、gzip、snappy、lz4 和 zstd。 |
1.2 異步發(fā)送API
1.2.1 普通異步發(fā)送
1、需求:創(chuàng)建 Kafka 生產(chǎn)者,采用異步的方式發(fā)送到 Kafka Broker
2、代碼編寫
(1)創(chuàng)建工程(KafkaDemo)
(2)導(dǎo)入依賴
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
(3)創(chuàng)建包名org.zhm.producer
(4)編寫不帶回調(diào)函數(shù)的API代碼
package org.zhm.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* @ClassName CustomProducer
* @Description TODO
* @Author Zouhuiming
* @Date 2023/6/12 18:35
* @Version 1.0
*/
public class CustomProducer {
public static void main(String[] args) {
//1、創(chuàng)建kafka生產(chǎn)者的配置對(duì)象
Properties properties=new Properties();
//2、給kafka配置對(duì)象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
//key,value序列化(必須):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//3、創(chuàng)建kafka生產(chǎn)者對(duì)象
KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String, String>(properties);
//4、調(diào)用send()方法,發(fā)生消息
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first","zhm"+i));
}
//5、關(guān)閉資源
kafkaProducer.close();
}
}
(5)測(cè)試
①在 hadoop102 上開啟 Kafka 消費(fèi)者。
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
②在 IDEA 中執(zhí)行代碼,觀察 hadoop102 控制臺(tái)中是否接收到消息。
1.2.2 帶回調(diào)函數(shù)的異步發(fā)送
回調(diào)函數(shù)會(huì)在Producer收到ack時(shí)調(diào)用,為異步調(diào)用,該方法有兩個(gè)參數(shù),分別是元數(shù)據(jù)信息(RecordMetadata)和異常信息·(Exception),如果Exception為null,說明消息發(fā)生成功,如果Exception不為null,說明消息發(fā)送失敗。
注意:消息發(fā)送失敗會(huì)自動(dòng)重試,不需要我們?cè)诨卣{(diào)函數(shù)中手動(dòng)重試。
package org.zhm.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* @ClassName CustoProducerCallback
* @Description TODO
* @Author Zouhuiming
* @Date 2023/6/12 18:44
* @Version 1.0
*/
public class CustoProducerCallback {
public static void main(String[] args) throws InterruptedException {
//1、創(chuàng)建kafka生產(chǎn)者的配置對(duì)象
Properties properties=new Properties();
//2、給kafka配置對(duì)象添加配置信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
//key、value序列化(必須)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//3、創(chuàng)建kafka生產(chǎn)者對(duì)象
KafkaProducer<String,String> producer=new KafkaProducer<>(properties);
//4、調(diào)用send()方法 發(fā)送信息
for (int i = 0; i < 6; i++) {
//添加回調(diào)
producer.send(new ProducerRecord<>("first", "zhm" + i), new Callback() {
//該方法在Producer收到ack時(shí)調(diào)用,為異步調(diào)用
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e==null){
//沒有異常,輸出信息到控制臺(tái)
System.out.println("主題:"+recordMetadata.topic()+"->"+"分區(qū):"
+recordMetadata.partition());
}
else {
//出現(xiàn)異常打印
e.printStackTrace();
}
}
});
//延遲一會(huì)會(huì)看到數(shù)據(jù)發(fā)往不同分區(qū)
Thread.sleep(20);
}
//5、關(guān)閉資源
producer.close();
}
}
1、測(cè)試
①在 hadoop102 上開啟 Kafka 消費(fèi)者。
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
②在 IDEA 中執(zhí)行代碼,觀察 hadoop102 控制臺(tái)中是否接收到消息。
1.3 同步發(fā)送API
只需在異步發(fā)送的基礎(chǔ)上,再調(diào)用一下 get()方法即可。
package org.zhm.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* @ClassName CustomProducerSync
* @Description TODO
* @Author Zouhuiming
* @Date 2023/6/12 18:58
* @Version 1.0
*/
public class CustomProducerSync {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1、創(chuàng)建kafka生產(chǎn)者的配置對(duì)象
Properties properties=new Properties();
//2、給kafka配置對(duì)象添加配置信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
//key、value序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//3、創(chuàng)建kafka生產(chǎn)者對(duì)象
KafkaProducer<String,String> producer=new KafkaProducer<>(properties);
//4、調(diào)用send方法,發(fā)送信息
for (int i = 0; i < 10; i++) {
//異步發(fā)送 默認(rèn)
// producer.send(new ProducerRecord<>("first","zhm"+i));
//同步發(fā)送
producer.send(new ProducerRecord<>("first","zhmzhm"+i)).get();
}
//5、關(guān)閉資源
producer.close();
}
}
1、測(cè)試
①在 hadoop102 上開啟 Kafka 消費(fèi)者。
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
②在 IDEA 中執(zhí)行代碼,觀察 hadoop102 控制臺(tái)中是否接收到消息。
1.4 生產(chǎn)者分區(qū)
1.4.1 分區(qū)好處
1、便于合理使用儲(chǔ)存資源,每個(gè)Partition在一個(gè)Broker上儲(chǔ)存,可以把海量的數(shù)據(jù)按照分區(qū)切割成一塊一塊數(shù)據(jù)儲(chǔ)存在多臺(tái)Broker上。合理控制分區(qū)的任務(wù),可以實(shí)現(xiàn)負(fù)載均衡的效果。
2、提高并行度,生產(chǎn)者可以以分區(qū)為單位發(fā)送數(shù)據(jù);消費(fèi)者可以以分區(qū)為單位進(jìn)行消費(fèi)數(shù)據(jù)。
1.4.2 生產(chǎn)者發(fā)生消息的分區(qū)
1、默認(rèn)分區(qū)器DefaultPartitioner
(1)指明partition的情況下,直接將指明的值作為partition值;例如partition=0,所有數(shù)據(jù)寫入分區(qū)0。
(2)沒有指明partition值但有key的情況下,將key的hash值與topic的partition數(shù)進(jìn)行取余得到partition值;例如:key1的hash值=5, key2的hash值=6 ,topic的partition數(shù)=2,那么key1 對(duì)應(yīng)的value1寫入1號(hào)分區(qū),key2對(duì)應(yīng)的value2寫入0號(hào)分區(qū)。
(3)既沒有partition值又沒有key值的情況下,Kafka采用Sticky Partition(黏性分區(qū)器),會(huì)隨機(jī)選擇一個(gè)分區(qū),并盡可能一直
使用該分區(qū),待該分區(qū)的batch已滿或者已完成,Kafka再隨機(jī)一個(gè)分區(qū)進(jìn)行使用(和上一次的分區(qū)不同)。
例如:第一次隨機(jī)選擇0號(hào)分區(qū),等0號(hào)分區(qū)當(dāng)前批次滿了(默認(rèn)16k)或者linger.ms設(shè)置的時(shí)間到, Kafka再隨機(jī)一個(gè)分區(qū)進(jìn)
行使用(如果還是0會(huì)繼續(xù)隨機(jī))。
2、案例一
將數(shù)據(jù)發(fā)往指定 partition 的情況
package org.zhm.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* @ClassName CustomProducerCallbackPartitions
* @Description TODO
* @Author Zouhuiming
* @Date 2023/6/12 19:10
* @Version 1.0
*/
public class CustomProducerCallbackPartitions {
public static void main(String[] args) {
//1、創(chuàng)建kafka生產(chǎn)者的配置對(duì)象
Properties properties=new Properties();
//2、給kafka配置對(duì)象添加配置信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
//鍵值序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//3、創(chuàng)建生產(chǎn)者對(duì)象
KafkaProducer<String ,String> producer=new KafkaProducer<String, String>(properties);
//4、調(diào)用send方法,發(fā)送信息
for (int i = 0; i < 5; i++) {
//指定數(shù)據(jù)發(fā)送到1號(hào)分區(qū),key1為空
producer.send(new ProducerRecord<>("first", 1, "", "zhm" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e==null){
System.out.println("主題:"+recordMetadata.topic()+"->"+"分區(qū):"+recordMetadata.partition());
}else {
e.printStackTrace();
}
}
});
}
//5、關(guān)閉資源
producer.close();
}
}
(1)測(cè)試
①在 hadoop102 上開啟 Kafka 消費(fèi)者。
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
②在 IDEA 中執(zhí)行代碼,觀察 hadoop102 控制臺(tái)中是否接收到消息。
3、案例二
沒有指明 partition 值但有 key 的情況下,將 key 的 hash 值與 topic 的 partition 數(shù)進(jìn)行取余得到 partition 值。
package org.zhm.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* @ClassName CustomProducerCallback1
* @Description TODO
* @Author Zouhuiming
* @Date 2023/6/12 19:21
* @Version 1.0
*/
public class CustomProducerCallback1 {
public static void main(String[] args) {
Properties properties=new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
KafkaProducer<String,String> kafkaProducer=new KafkaProducer(properties);
for (int i = 0; i < 5; i++) {
//依次指定key值為a、b、f,數(shù)據(jù)key的hash值與3分別發(fā)往1、2、0
kafkaProducer.send(new ProducerRecord<>("first", "a", "zhm" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e==null){
System.out.println("當(dāng)key為a時(shí):"+"主題:"+recordMetadata.topic()+"分區(qū):"+recordMetadata.partition());
}else {
e.printStackTrace();
}
}
});
}
for (int i = 0; i < 5; i++) {
//依次指定key值為a、b、f,數(shù)據(jù)key的hash值與3分別發(fā)往1、2、0
kafkaProducer.send(new ProducerRecord<>("first", "b", "zhm" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e==null){
System.out.println("當(dāng)key為b時(shí):"+"主題:"+recordMetadata.topic()+"分區(qū):"+recordMetadata.partition());
}else {
e.printStackTrace();
}
}
});
}
for (int i = 0; i < 5; i++) {
//依次指定key值為a、b、f,數(shù)據(jù)key的hash值與3分別發(fā)往1、2、0
kafkaProducer.send(new ProducerRecord<>("first", "f", "zhm" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e==null){
System.out.println("當(dāng)key為f時(shí):"+"主題:"+recordMetadata.topic()+"分區(qū):"+recordMetadata.partition());
}else {
e.printStackTrace();
}
}
});
}
kafkaProducer.close();
}
}
(1)測(cè)試
①在 hadoop102 上開啟 Kafka 消費(fèi)者。
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
②在 IDEA 中執(zhí)行代碼,觀察 hadoop102 控制臺(tái)中是否接收到消息。
1.4.3 自定義分區(qū)器
如果研發(fā)人員可以根據(jù)企業(yè)需求,自己重新實(shí)現(xiàn)分區(qū)器
1、例如我們實(shí)現(xiàn)一個(gè)分區(qū)器實(shí)現(xiàn),發(fā)送過來的數(shù)據(jù)中如果包含 atguigu,就發(fā)往 0 號(hào)分區(qū),不包含 atguigu,就發(fā)往 1 號(hào)分區(qū)。
2、案例實(shí)現(xiàn)
(1)定義類實(shí)現(xiàn) Partitioner 接口。
(2)重寫 partition()方法。
package org.zhm.producer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* @ClassName Mypartitioner
* @Description TODO
* @Author Zouhuiming
* @Date 2023/6/12 19:28
* @Version 1.0
*/
/**
1、實(shí)現(xiàn)接口Partitioner
2、實(shí)現(xiàn)三個(gè)方法:Partition、close、configure
3、編寫Partition方法,返回分區(qū)號(hào)
*/
public class MyPartitioner implements Partitioner {
/*
*
* @description:返回信息對(duì)應(yīng)的分區(qū)
* @author: zouhuiming
* @date: 2023/6/12 19:30
* @param: [s, o, bytes, o1, bytes1, cluster]
* [主題、消息的key、消息的key序列化后的字節(jié)數(shù)組、消息的value、消息的value序列哈后字節(jié)數(shù)組、集群元數(shù)據(jù)可以查看的分區(qū)信息]
* @return: int
**/
@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
//獲取信息
String msyValue = o1.toString();
//創(chuàng)建partition
int partition;
//判斷信息是否包含zhm
if (msyValue.contains("zhm")){
partition=0;
}
else {
partition=1;
}
//返回分區(qū)號(hào)
return partition;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
(3)使用分區(qū)器的方法,在生產(chǎn)者的配置中添加分區(qū)器參數(shù)。
package org.zhm.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* @ClassName CustomProducerCallbackPartitionsMine
* @Description TODO
* @Author Zouhuiming
* @Date 2023/6/12 19:35
* @Version 1.0
*/
public class CustomProducerCallbackPartitionsMine {
public static void main(String[] args) {
Properties properties=new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//添加自定義分區(qū)器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"org.zhm.producer.MyPartitioner");
KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String, String>(properties);
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "zhm" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e==null){
System.out.println("主題:"+recordMetadata.topic()+"分區(qū):"+recordMetadata.partition());
}else {
e.printStackTrace();
}
}
});
}
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "hello" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e==null){
System.out.println("主題:"+recordMetadata.topic()+"分區(qū):"+recordMetadata.partition());
}else {
e.printStackTrace();
}
}
});
}
kafkaProducer.close();
}
}
(4)測(cè)試
①在 hadoop102 上開啟 Kafka 消費(fèi)者。
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
②在 IDEA 控制臺(tái)觀察回調(diào)信息。
1.5 生產(chǎn)經(jīng)驗(yàn)——生產(chǎn)者如何提高吞吐量
- batch.size:批次大小,默認(rèn)16k
- linger.ms:等待時(shí)間,修改為5-100ms
- compression.type:壓縮snappy
- RecordAccumulator:緩存區(qū)大小,修改1為64MB
1.6 生產(chǎn)經(jīng)驗(yàn)——數(shù)據(jù)可靠性
1、ack應(yīng)答原理
可靠性總結(jié):
- acks=0,生產(chǎn)者發(fā)送過來數(shù)據(jù)就不管了,可靠性差,效率高;
- acks=1,生產(chǎn)者發(fā)送過來數(shù)據(jù)Leader應(yīng)答,可靠性中等,效率中等;
- acks=-1(all),,生產(chǎn)者發(fā)送過來數(shù)據(jù)Leader和ISR隊(duì)列里面所有Follwer應(yīng)答,可靠性高,效率低;
在生產(chǎn)環(huán)境中,acks=0很少使用;acks=1,一般用于傳輸普通日志,允許丟個(gè)別數(shù)據(jù);acks=-1,一般用于傳輸和錢相關(guān)的數(shù)據(jù),對(duì)可靠性要求比較高的場(chǎng)景。
數(shù)據(jù)重復(fù)分析
1.7 生產(chǎn)經(jīng)驗(yàn)——數(shù)據(jù)去重
1.7.1 數(shù)據(jù)傳遞語(yǔ)義
- 至少一次(At Least Once) =ACK級(jí)別設(shè)置為-1+分區(qū)副本數(shù)大于等于2+ISR里應(yīng)答的最小副本數(shù)量大于等于2
- 最多一次(At Most Once)=ACK級(jí)別設(shè)置為0
- 總結(jié)
- At Least Once可以保證數(shù)據(jù)不丟失,但是不能保證數(shù)據(jù)不重復(fù);
- At Most Once可以保證數(shù)據(jù)不重復(fù),但是不能保證數(shù)據(jù)不丟失。
- 精確一次(Exactly Once):對(duì)于一些非常重要的信息,比如和錢相關(guān)的數(shù)據(jù),要求數(shù)據(jù)既不能重復(fù)也不丟失。Kafka 0.11版本以后,引入了一項(xiàng)重大特性:冪等性和事務(wù)。
1.7.2 冪等性
冪等性就是指Producer不論向Broker發(fā)送多少次重復(fù)數(shù)據(jù),Broker端都只會(huì)持久化一條,保證了不重復(fù)。
精確一次(Exactly Once) = 冪等性 + 至少一次( ack=-1 + 分區(qū)副本數(shù)>=2 + ISR最小副本數(shù)量>=2) 。
重復(fù)數(shù)據(jù)的判斷標(biāo)準(zhǔn):具有<PID,Partition,SeqNumber>相同主鍵的消息提交時(shí),Broker只會(huì)持久化一條。其中PID是Kafka每次重啟都會(huì)分配一個(gè)新的;Partition表示分區(qū)號(hào);Sequence Number是單調(diào)自增的。
所以冪等性只能保證的是在單分區(qū)單會(huì)話內(nèi)不重復(fù)。
如何啟用冪等性
開啟參數(shù) enable.idempotence 默認(rèn)為 true,false 關(guān)閉
1.7.3 生產(chǎn)者事務(wù)
1、Kafka事務(wù)原理
注意:開啟事務(wù),必須開啟冪等性
2、Kafka 的事務(wù)一共有如下 5 個(gè) API
// 1 初始化事務(wù)
void initTransactions();
// 2 開啟事務(wù)
void beginTransaction() throws ProducerFencedException;
// 3 在事務(wù)內(nèi)提交已經(jīng)消費(fèi)的偏移量(主要用于消費(fèi)者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws
ProducerFencedException;
// 4 提交事務(wù)
void commitTransaction() throws ProducerFencedException;
// 5 放棄事務(wù)(類似于回滾事務(wù)的操作)
void abortTransaction() throws ProducerFencedException;
1.8 生產(chǎn)經(jīng)驗(yàn)——數(shù)據(jù)有序
文章來源:http://www.zghlxwxcb.cn/news/detail-488080.html
1.8 生產(chǎn)檢驗(yàn)——數(shù)據(jù)亂序
1、kafka在1.x版本之前保證數(shù)據(jù)單分區(qū)有序,條件如下:
max.in.flight.requests.per.connection=1(不需要考慮是否開啟冪等性)。
2、kafka在1.x及以后版本保證數(shù)據(jù)單分區(qū)有序,條件如下:
(1)未開啟冪等性
max.in.flight.requests.per.connection需要設(shè)置為1。
(2)開啟冪等性
max.in.flight.requests.per.connection需要設(shè)置小于等于5。
原因說明:因?yàn)樵趉afka1.x以后,啟用冪等后,kafka服務(wù)端會(huì)緩存producer發(fā)來的最近5個(gè)request的元數(shù)據(jù),故無(wú)論如何,都可以保證最近5個(gè)request的數(shù)據(jù)都是有序的。文章來源地址http://www.zghlxwxcb.cn/news/detail-488080.html
到了這里,關(guān)于Kafka學(xué)習(xí)---2、kafka生產(chǎn)者、異步和同步發(fā)送API、分區(qū)、生產(chǎn)經(jīng)驗(yàn)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!