一、前言
數(shù)據(jù)重復(fù)這個(gè)問(wèn)題其實(shí)也是挺正常,全鏈路都有可能會(huì)導(dǎo)致數(shù)據(jù)重復(fù)。
通常,消息消費(fèi)時(shí)候都會(huì)設(shè)置一定重試次數(shù)來(lái)避免網(wǎng)絡(luò)波動(dòng)造成的影響,同時(shí)帶來(lái)副作用是可能出現(xiàn)消息重復(fù)。
整理下消息重復(fù)的幾個(gè)場(chǎng)景:
- 生產(chǎn)端: 遇到異常,基本解決措施都是 重試 。
- 場(chǎng)景一:
leader
分區(qū)不可用了,拋LeaderNotAvailableException
異常,等待選出新leader
分區(qū)。 - 場(chǎng)景二:
Controller
所在Broker
掛了,拋NotControllerException
異常,等待Controller
重新選舉。 - 場(chǎng)景三:網(wǎng)絡(luò)異常、斷網(wǎng)、網(wǎng)絡(luò)分區(qū)、丟包等,拋
NetworkException
異常,等待網(wǎng)絡(luò)恢復(fù)。
-
消費(fèi)端:
poll
一批數(shù)據(jù),處理完畢還沒(méi)提交offset
,機(jī)子宕機(jī)重啟了,又會(huì)poll
上批數(shù)據(jù),再度消費(fèi)就造成了消息重復(fù)。
怎么解決?
先來(lái)了解下消息的三種投遞語(yǔ)義:
-
最多一次(
at most once
): 消息只發(fā)一次,消息可能會(huì)丟失,但絕不會(huì)被重復(fù)發(fā)送。例如:mqtt
中QoS = 0
。 -
至少一次(
at least once
): 消息至少發(fā)一次,消息不會(huì)丟失,但有可能被重復(fù)發(fā)送。例如:mqtt
中QoS = 1
-
精確一次(
exactly once
): 消息精確發(fā)一次,消息不會(huì)丟失,也不會(huì)被重復(fù)發(fā)送。例如:mqtt
中QoS = 2
。
了解了這三種語(yǔ)義,再來(lái)看如何解決消息重復(fù),即如何實(shí)現(xiàn)精準(zhǔn)一次,可分為三種方法:
-
Kafka
冪等性Producer
: 保證生產(chǎn)端發(fā)送消息冪等。局限性,是只能保證單分區(qū)且單會(huì)話(重啟后就算新會(huì)話) -
Kafka
事務(wù): 保證生產(chǎn)端發(fā)送消息冪等。解決冪等Producer
的局限性。 - 消費(fèi)端冪等:保證消費(fèi)端接收消息冪等。蔸底方案。
Kafka 冪等性 Producer
冪等性指 :無(wú)論執(zhí)行多少次同樣的運(yùn)算,結(jié)果都是相同的。即一條命令,任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同。
冪等性使用示例:在生產(chǎn)端添加對(duì)應(yīng)配置即可
Properties props = new Properties();
props.put("enable.idempotence", ture); // 1. 設(shè)置冪等
props.put("acks", "all"); // 2. 當(dāng) enable.idempotence 為 true,這里默認(rèn)為 all
props.put("max.in.flight.requests.per.connection", 5); // 3. 注意
- 設(shè)置冪等,啟動(dòng)冪等。
- 配置
acks
,注意:一定要設(shè)置acks=all
,否則會(huì)拋異常。 - 配置
max.in.flight.requests.per.connection
需要<= 5
,否則會(huì)拋異常OutOfOrderSequenceException
。
0.11 >= Kafka < 1.1, max.in.flight.request.per.connection = 1
Kafka >= 1.1, max.in.flight.request.per.connection <= 5
為了更好理解,需要了解下Kafka 冪等機(jī)制:
-
Producer
每次啟動(dòng)后,會(huì)向Broker
申請(qǐng)一個(gè)全局唯一的pid
。(重啟后pid
會(huì)變化,這也是弊端之一) -
Sequence Numbe
:針對(duì)每個(gè)<Topic, Partition>
都對(duì)應(yīng)一個(gè)從0開(kāi)始單調(diào)遞增的Sequence
,同時(shí)Broker
端會(huì)緩存這個(gè)seq num
- 判斷是否重復(fù): 拿
<pid, seq num>
去Broker
里對(duì)應(yīng)的隊(duì)列ProducerStateEntry.Queue
(默認(rèn)隊(duì)列長(zhǎng)度為 5)查詢是否存在
- 如果
nextSeq == lastSeq + 1,即 服務(wù)端seq + 1 == 生產(chǎn)傳入seq
,則接收。 - 如果
nextSeq == 0 && lastSeq == Int.MaxValue
,即剛初始化,也接收。 - 反之,要么重復(fù),要么丟消息,均拒絕。
這種設(shè)計(jì)針對(duì)解決了兩個(gè)問(wèn)題:
-
消息重復(fù): 場(chǎng)景
Broker
保存消息后還沒(méi)發(fā)送ack
就宕機(jī)了,這時(shí)候Producer
就會(huì)重試,這就造成消息重復(fù)。 - 消息亂序: 避免場(chǎng)景,前一條消息發(fā)送失敗而其后一條發(fā)送成功,前一條消息重試后成功,造成的消息亂序。
那什么時(shí)候該使用冪等:
3. 如果已經(jīng)使用 acks=all
,使用冪等也可以。
4. 如果已經(jīng)使用 acks=0
或者 acks=1
,說(shuō)明你的系統(tǒng)追求高性能,對(duì)數(shù)據(jù)一致性要求不高。不要使用冪等。
Kafka 事務(wù)
使用 Kafka 事務(wù)解決冪等的弊端:?jiǎn)螘?huì)話且單分區(qū)冪等。
Tips: 這塊篇幅較長(zhǎng),這先稍微提及下使用,之后另起一篇。
事務(wù)使用示例:分為生產(chǎn)端 和 消費(fèi)端
Properties props = new Properties();
props.put("enable.idempotence", ture); // 1. 設(shè)置冪等
props.put("acks", "all"); // 2. 當(dāng) enable.idempotence 為 true,這里默認(rèn)為 all
props.put("max.in.flight.requests.per.connection", 5); // 3. 最大等待數(shù)
props.put("transactional.id", "my-transactional-id"); // 4. 設(shè)定事務(wù) id
Producer<String, String> producer = new KafkaProducer<String, String>(props);
// 初始化事務(wù)
producer.initTransactions();
try{
// 開(kāi)始事務(wù)
producer.beginTransaction();
// 發(fā)送數(shù)據(jù)
producer.send(new ProducerRecord<String, String>("Topic", "Key", "Value"));
// 數(shù)據(jù)發(fā)送及 Offset 發(fā)送均成功的情況下,提交事務(wù)
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// 數(shù)據(jù)發(fā)送或者 Offset 發(fā)送出現(xiàn)異常時(shí),終止事務(wù)
producer.abortTransaction();
} finally {
// 關(guān)閉 Producer 和 Consumer
producer.close();
consumer.close();
}
這里消費(fèi)端 Consumer
需要設(shè)置下配置:isolation.level
參數(shù)
-
read_uncommitted:
這是默認(rèn)值,表明Consumer
能夠讀取到Kafka
寫(xiě)入的任何消息,不論事務(wù)型Producer
提交事務(wù)還是終止事務(wù),其寫(xiě)入的消息都可以讀取。如果你用了事務(wù)型Producer
,那么對(duì)應(yīng)的Consumer
就不要使用這個(gè)值。 -
read_committed:
表明Consumer
只會(huì)讀取事務(wù)型Producer
成功提交事務(wù)寫(xiě)入的消息。當(dāng)然了,它也能看到非事務(wù)型Producer
寫(xiě)入的所有消息。
消費(fèi)端冪等
“如何解決消息重復(fù)?” 這個(gè)問(wèn)題,其實(shí)換一種說(shuō)法:就是如何解決消費(fèi)端冪等性問(wèn)題。
只要消費(fèi)端具備了冪等性,那么重復(fù)消費(fèi)消息的問(wèn)題也就解決了。
典型的方案是使用:消息表,來(lái)去重:
- 上述例子中,消費(fèi)端拉取到一條消息后,開(kāi)啟事務(wù),將消息
Id
新增到本地消息表中,同時(shí)更新訂單信息。 - 如果消息重復(fù),則新增操作
insert
會(huì)異常,同時(shí)觸發(fā)事務(wù)回滾。
二、案例:Kafka 冪等性 Producer 使用
環(huán)境搭建可參考:https://developer.confluent.io/tutorials/message-ordering/kafka.html#view-all-records-in-the-topic
準(zhǔn)備工作如下:
-
Zookeeper:
本地使用Docker
啟動(dòng)
$ docker run -d --name zookeeper -p 2181:2181 zookeeper
a86dff3689b68f6af7eb3da5a21c2dba06e9623f3c961154a8bbbe3e9991dea4
-
Kafka
:版本2.7.1
,源碼編譯啟動(dòng)(看上文源碼搭建啟動(dòng)) - 啟動(dòng)生產(chǎn)者:
Kafka
源碼中exmaple
中 - 啟動(dòng)消息者:可以用
Kafka
提供的腳本
# 舉個(gè)栗子:topic 需要自己去修改
$ cd ./kafka-2.7.1-src/bin
$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic
創(chuàng)建 topic
: 1副本,2 分區(qū)
$ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic myTopic --create --replication-factor 1 --partitions 2
# 查看
$ ./kafka-topics.sh --bootstrap-server broker:9092 --topic myTopic --describe
生產(chǎn)者代碼:
public class KafkaProducerApplication {
private final Producer<String, String> producer;
final String outTopic;
public KafkaProducerApplication(final Producer<String, String> producer,
final String topic) {
this.producer = producer;
outTopic = topic;
}
public void produce(final String message) {
final String[] parts = message.split("-");
final String key, value;
if (parts.length > 1) {
key = parts[0];
value = parts[1];
} else {
key = null;
value = parts[0];
}
final ProducerRecord<String, String> producerRecord
= new ProducerRecord<>(outTopic, key, value);
producer.send(producerRecord,
(recordMetadata, e) -> {
if(e != null) {
e.printStackTrace();
} else {
System.out.println("key/value " + key + "/" + value + "\twritten to topic[partition] " + recordMetadata.topic() + "[" + recordMetadata.partition() + "] at offset " + recordMetadata.offset());
}
}
);
}
public void shutdown() {
producer.close();
}
public static void main(String[] args) {
final Properties props = new Properties();
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "myApp");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
final String topic = "myTopic";
final Producer<String, String> producer = new KafkaProducer<>(props);
final KafkaProducerApplication producerApp = new KafkaProducerApplication(producer, topic);
String filePath = "/home/donald/Documents/Code/Source/kafka-2.7.1-src/examples/src/main/java/kafka/examples/input.txt";
try {
List<String> linesToProduce = Files.readAllLines(Paths.get(filePath));
linesToProduce.stream().filter(l -> !l.trim().isEmpty())
.forEach(producerApp::produce);
System.out.println("Offsets and timestamps committed in batch from " + filePath);
} catch (IOException e) {
System.err.printf("Error reading file %s due to %s %n", filePath, e);
} finally {
producerApp.shutdown();
}
}
}
啟動(dòng)生產(chǎn)者后,控制臺(tái)輸出如下:
啟動(dòng)消費(fèi)者:
$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic
修改配置 acks
啟用冪等的情況下,調(diào)整 acks
配置,生產(chǎn)者啟動(dòng)后結(jié)果是怎樣的:
- 修改配置
acks = 1
- 修改配置
acks = 0
會(huì)直接報(bào)錯(cuò):
Exception in thread "main" org.apache.kafka.common.config.ConfigException: Must set acks to all in order to use the idempotent producer.
Otherwise we cannot guarantee idempotence.
修改配置 max.in.flight.requests.per.connection
啟用冪等的情況下,調(diào)整此配置,結(jié)果是怎樣的:
將 max.in.flight.requests.per.connection > 5
會(huì)怎樣?
當(dāng)然會(huì)報(bào)錯(cuò):文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-818720.html
Caused by: org.apache.kafka.common.config.ConfigException: Must set max.in.flight.requests.per.connection to at most 5 to use the idempotent producer.
文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-818720.html
到了這里,關(guān)于一碰就頭疼的 Kafka 消息重復(fù)問(wèn)題,立馬解決!的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!