目錄
1. 多線程安全
1.1. 生產(chǎn)者是多線程安全的么?
1.1. 消費(fèi)者是多線程安全的么?
2. 消費(fèi)者規(guī)避多線程安全方案
2.1. 每個(gè)線程維護(hù)一個(gè)kafkaConsumer
2.2. [單/多]kafkaConsumer實(shí)例 + 多worker線程
2.3.方案優(yōu)缺點(diǎn)對(duì)比
1. 多線程安全
1.1. 生產(chǎn)者是多線程安全的么?
????????Kafka生產(chǎn)者是線程安全的,可以在多個(gè)線程中共享一個(gè)Kafka生產(chǎn)者實(shí)例。這是因?yàn)镵afka生產(chǎn)者實(shí)例內(nèi)部使用了一些同步機(jī)制來(lái)保證線程安全,例如使用了線程安全的隊(duì)列來(lái)緩存消息,使用了同步鎖來(lái)保護(hù)共享資源的訪問(wèn)等。
????????同時(shí),Kafka生產(chǎn)者的send()方法是非阻塞的,可以在多個(gè)線程中并發(fā)調(diào)用,不會(huì)阻塞線程。Kafka生產(chǎn)者還提供了異步發(fā)送和同步發(fā)送兩種發(fā)送方式,可以根據(jù)實(shí)際需求選擇不同的發(fā)送方式。
然而,如果多個(gè)線程共享同一個(gè)Kafka生產(chǎn)者實(shí)例,需要注意以下幾點(diǎn):
-
同一個(gè)線程中不要同時(shí)調(diào)用send()方法和flush()方法,可能會(huì)導(dǎo)致消息發(fā)送順序不一致。
-
不同線程中調(diào)用send()方法時(shí),需要注意消息的順序,可以使用Kafka的分區(qū)機(jī)制來(lái)保證消息的順序。
-
如果多個(gè)線程發(fā)送的消息都是針對(duì)同一個(gè)主題或分區(qū),可能會(huì)導(dǎo)致消息的重復(fù)發(fā)送或丟失。因此,建議在多線程情況下,使用Kafka的分區(qū)機(jī)制,將消息發(fā)送到不同的分區(qū)中,以避免消息的重復(fù)和丟失。
????????綜上所述,Kafka生產(chǎn)者是線程安全的,可以在多個(gè)線程中共享一個(gè)Kafka生產(chǎn)者實(shí)例,但需要注意消息的順序和分區(qū)的使用,以保證消息的可靠性和順序性。
1.1. 消費(fèi)者是多線程安全的么?
????????Kafka消費(fèi)者是非線程安全的,主要原因是因?yàn)椋篕afka消費(fèi)者使用了內(nèi)部的狀態(tài)來(lái)跟蹤消費(fèi)進(jìn)度和偏移量。這種狀態(tài)包括消費(fèi)者的位置,消費(fèi)者的偏移量,以及消費(fèi)者的訂閱主題和分區(qū)等信息。如果多個(gè)線程同時(shí)訪問(wèn)同一個(gè)Kafka消費(fèi)者實(shí)例,就會(huì)導(dǎo)致這些狀態(tài)信息的不一致,從而導(dǎo)致消費(fèi)進(jìn)度出現(xiàn)錯(cuò)誤。
2. 消費(fèi)者規(guī)避多線程安全方案
背景:Kafka消費(fèi)者中具有poll()方法,該方法是一個(gè)阻塞方法,如果在同一個(gè)線程中多次調(diào)用poll()方法,會(huì)導(dǎo)致消費(fèi)者阻塞在poll()方法中,無(wú)法及時(shí)消費(fèi)新到達(dá)的消息。因此,為了提高Kafka消費(fèi)者的并發(fā)性能,通常使用多個(gè)消費(fèi)者線程來(lái)消費(fèi)+處理消息。為了保證Kafka消費(fèi)者的線程安全性,通常采用以下2種方案。
方案的本質(zhì):一個(gè)線程只能綁定一個(gè)消費(fèi)者實(shí)例(不能多個(gè)線程共用一個(gè)消費(fèi)者實(shí)例)
2.1. 每個(gè)線程創(chuàng)建一個(gè)Consumer
?1. 創(chuàng)建多個(gè)線程,去消費(fèi)topic
2. 每個(gè)線程綁定固定數(shù)量的分區(qū)(最好的情況是一個(gè)消費(fèi)者綁定一個(gè)分區(qū))
????????代碼比較簡(jiǎn)單,唯一需要關(guān)注的點(diǎn):創(chuàng)建的消費(fèi)者線程和分區(qū)的數(shù)量可能不相等,此時(shí),可以采用「分區(qū)副本自動(dòng)分配策略」,該策略會(huì)將消費(fèi)者線程和分區(qū)綁定在一起。
場(chǎng)景討論:
????????按照現(xiàn)在的代碼,假設(shè)消費(fèi)者線程獲取了分區(qū)A 100個(gè)msg,然后處理,若還未處理完,此時(shí),offset未提交。因?yàn)榇藭r(shí)offset還未提交,消費(fèi)者線程還會(huì)去分區(qū)A調(diào)用poll,獲取到的msg依然會(huì)是剛才的100個(gè)msg。存在消費(fèi)重復(fù)的場(chǎng)景,因此,消費(fèi)者需要做好冪等處理。
? ? ? ? 上面的代碼,不會(huì)出現(xiàn)這個(gè)場(chǎng)景:消費(fèi)者獲取了消息1,2,3,3處理完成了,執(zhí)行了提交偏移,但是消息1還未提交的場(chǎng)景。
代碼示例:
package com.bie.kafka.kafkaThrea;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
/**
*
* 1、KafkaConsumer是非線程安全的,KafkaProducer是線程安全的。
* 2、該案例是創(chuàng)建多個(gè)線程,每個(gè)線程維護(hù)一個(gè)KafkaConsumer實(shí)例
* 用戶創(chuàng)建多個(gè)線程消費(fèi)topic數(shù)據(jù),使用分區(qū)副本自動(dòng)分配策略,將消費(fèi)者線程與分區(qū)進(jìn)行綁定
* 3、ConsumerRunnable,消費(fèi)線程類,執(zhí)行真正的消費(fèi)任務(wù)
*/
public class ConsumerRunnable implements Runnable {
// 每個(gè)線程維護(hù)私有的kafkaConsumer實(shí)例
private final KafkaConsumer<String, String> consumer;
/**
* 默認(rèn)每個(gè)消費(fèi)者的配置參數(shù)初始化
*
* @param brokerList
* @param groupId
* @param topic
*/
public ConsumerRunnable(String brokerList, String groupId, String topic) {
// 帶參數(shù)的構(gòu)造方法
Properties props = new Properties();
// kafka的列表
props.put("bootstrap.servers", brokerList);
// 消費(fèi)者組編號(hào)
props.put("group.id", groupId);
// 自動(dòng)提交
props.put("enable.auto.commit", true);
// 提交提交每個(gè)一秒鐘
props.put("auto.commit.interval.ms", "1000");
// 反序列化key
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 反序列化value
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 將配置信息進(jìn)行初始化操作
this.consumer = new KafkaConsumer<>(props);
// 定義響應(yīng)的主題信息topic:使用分區(qū)副本自動(dòng)分配策略
consumer.subscribe(Arrays.asList(topic));
}
@Override
public void run() {
// 消費(fèi)者保持一直消費(fèi)的狀態(tài)
while (true) {
// 將獲取到消費(fèi)的信息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(200));
// 遍歷出每個(gè)消費(fèi)的消息
for (ConsumerRecord<String, String> record : records) {
// 處理消息:deal msg
}
// 提交offset
}
}
}
package com.bie.kafka.kafkaThrea;
import java.util.ArrayList;
import java.util.List;
/**
* 1、消費(fèi)線程管理類,創(chuàng)建多個(gè)線程類執(zhí)行消費(fèi)任務(wù)
*/
public class ConsumerGroup {
// 消費(fèi)者群組,多消費(fèi)者。
private List<ConsumerRunnable> consumers;
public ConsumerGroup(int consumerNum, String groupId, String topic, String brokerList) {
// 初始化消費(fèi)者組
consumers = new ArrayList<>(consumerNum);
// 初始化消費(fèi)者,創(chuàng)建多少個(gè)消費(fèi)者
for (int i = 0; i < consumerNum; i++) {
// 根據(jù)消費(fèi)者構(gòu)造方法,創(chuàng)建消費(fèi)者實(shí)例
ConsumerRunnable consumerRunnable = new ConsumerRunnable(brokerList, groupId, topic);
// 將創(chuàng)建的消費(fèi)者實(shí)例添加到消費(fèi)者組中
consumers.add(consumerRunnable);
}
}
public void execute() {
// 將消費(fèi)者組里面的消費(fèi)者遍歷出來(lái)
for (ConsumerRunnable task : consumers) {
// 創(chuàng)建一個(gè)消費(fèi)者線程,并且啟動(dòng)該線程
new Thread(task).start();
}
}
}
package com.bie.kafka.kafkaThrea;
public class ConsumerMain {
public static void main(String[] args) {
// kafka即broker列表
String brokerList = "slaver1:9092,slaver2:9092,slaver3:9092";
// group組名稱
String groupId = "group1";
// topic主題名稱
String topic = "topic1";
// 消費(fèi)者的數(shù)量
int consumerNum = 3;
// 通過(guò)構(gòu)造器創(chuàng)建出一個(gè)對(duì)象
ConsumerGroup consumerGroup = new ConsumerGroup(consumerNum, groupId, topic, brokerList);
// 執(zhí)行execute的方法,創(chuàng)建出ConsumerRunnable消費(fèi)者實(shí)例。多線程多消費(fèi)者實(shí)例
consumerGroup.execute();
}
}
上面代碼,在實(shí)際工程應(yīng)用中存在問(wèn)題,描述見(jiàn)下:
- 每個(gè)消費(fèi)者線程,先執(zhí)行poll拉取一批batchsize命令后
- 批量處理這批消息
- 提交offset
????????步驟2,處理batchsize的消息,可能中間的某一個(gè)失敗了,但是步驟3提交了整體的offset,會(huì)導(dǎo)致失敗的消息丟失了。
解決方案:處理每個(gè)消息的流程中,增加重試機(jī)制(本地消息表),保證該消息能執(zhí)行成功
2.2. 消費(fèi)者組?+ worker線程池
?與2.1方案一的區(qū)別在于,將「消息的獲取」與「消息的處理」解耦開(kāi)
1. 消息的獲?。壕S護(hù)一個(gè)or多個(gè)kafkaConsumer實(shí)例,獲取消息,獲取到消息后,將消息丟到消息處理線程池中
2. 消息的處理:創(chuàng)建一個(gè)線程池,里面存放了worker線程,每個(gè)worker線程處理獲取到的消息
ConsumerWorker類
????????worker線程:執(zhí)行msg的處理,更新offsets信息
package huxi.test.consumer.multithreaded;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.List;
import java.util.Map;
public class ConsumerWorker<K, V> implements Runnable {
private final ConsumerRecords<K, V> records;
private final Map<TopicPartition, OffsetAndMetadata> offsets;
public ConsumerWorker(ConsumerRecords<K, V> record, Map<TopicPartition, OffsetAndMetadata> offsets) {
this.records = record;
this.offsets = offsets;
}
@Override
public void run() {
for (TopicPartition partition : records.partitions()) {
// 獲取到分區(qū)的消息記錄
List<ConsumerRecord<K, V>> partitionRecords = records.records(partition);
// 遍歷獲取到的消息記錄
for (ConsumerRecord<K, V> record : partitionRecords) {
// 消息處理邏輯
}
/* 下面操作,是更新各個(gè)分區(qū)的offset信息到offsets變量,并沒(méi)有真正的提交位移 */
/* 真正的更新位移操作,不是在worker線程,而是在消費(fèi)者線程 */
// 待更新的位移
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
// 同步鎖,鎖住offsets位移
synchronized (offsets) {
// 如果offsets位移不包含partition這個(gè)key信息,就將位移信息設(shè)置到map集合里面
if (!offsets.containsKey(partition)) {
offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
} else {
// 否則,offsets位移包含partition這個(gè)key信息,獲取到offsets的位置信息
long curr = offsets.get(partition).offset();
if (curr <= lastOffset + 1) { // 如果獲取到的位置信息小于等于上一次位移信息大小,將這個(gè)partition的位置信息設(shè)置到map集合中。并保存到broker中。
offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
}
}
}
}
}
}
?ConsumerThreadHandler類
????????一個(gè)主線程,定時(shí)去poll消息,然后將消息投遞到worker線程中,最后,當(dāng)offsets信息發(fā)生變更后,提交offset
package huxi.test.consumer.multithreaded;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ConsumerThreadHandler<K, V> {
// KafkaConsumer實(shí)例
private final KafkaConsumer<K, V> consumer;
// ExecutorService實(shí)例
private ExecutorService executors;
// 位移信息offsets
private final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
public ConsumerThreadHandler(String brokerList, String groupId, String topic) {
// 構(gòu)造kafkaConsumer配置
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("enable.auto.commit", "false"); // 非自動(dòng)提交位移信息
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
// 創(chuàng)建kafkaConsumer,賦值給成員變量consumer
consumer = new KafkaConsumer<>(props);
// 消費(fèi)者訂閱消息,并實(shí)現(xiàn)重平衡rebalance
// rebalance監(jiān)聽(tīng)器,創(chuàng)建一個(gè)匿名內(nèi)部類。使用rebalance監(jiān)聽(tīng)器前提是使用消費(fèi)者組(consumer group)。
// 監(jiān)聽(tīng)器最常見(jiàn)用法就是手動(dòng)提交位移到第三方存儲(chǔ)以及在rebalance前后執(zhí)行一些必要的審計(jì)操作。
consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
// 在coordinator開(kāi)啟新一輪rebalance前onPartitionsRevoked方法會(huì)被調(diào)用。
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
consumer.commitSync(offsets); // 提交位移
}
// rebalance完成后會(huì)調(diào)用onPartitionsAssigned方法。
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
offsets.clear(); // 清除位移信息
}
});
}
/**
* 消費(fèi)主方法
* @param threadNumber 線程池中線程數(shù)
*/
public void consume(int threadNumber) {
// 創(chuàng)建一個(gè)worker線程池,線程數(shù)量為threadNumber個(gè)
executors = new ThreadPoolExecutor(
threadNumber,
threadNumber,
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(1000),
new ThreadPoolExecutor.CallerRunsPolicy());
// 只有一個(gè)消費(fèi)者線程
try {
// 消費(fèi)者一直處于等待狀態(tài),等待消息消費(fèi)
while (true) {
// 從主題中獲取消息
ConsumerRecords<K, V> records = consumer.poll(1000L);
// 如果獲取到的消息不為空
if (!records.isEmpty()) {
// submit: 將一批msg交給worker線程處理,消息處理完后,更新offsets信息
executors.submit(new ConsumerWorker<>(records, offsets));
}
// 調(diào)用提交位移信息
commitOffsets();
}
} catch (WakeupException e) {
// swallow this exception
} finally {
commitOffsets(); // 調(diào)用提交位移信息
consumer.close(); // 關(guān)閉consumer
}
}
private void commitOffsets() {
Map<TopicPartition, OffsetAndMetadata> unmodfiedMap;
// 保證線程安全、同步鎖,鎖住offsets
synchronized (offsets) {
// 判斷如果offsets位移信息為空,直接返回,節(jié)省同步鎖對(duì)offsets的鎖定的時(shí)間
if (offsets.isEmpty()) {
return;
}
// 如果offsets位移信息不為空,將位移信息offsets放到集合中,方便同步
unmodfiedMap = Collections.unmodifiableMap(new HashMap<>(offsets));
// 清除位移信息offsets
offsets.clear();
}
// 將封裝好的位移信息unmodfiedMap集合進(jìn)行同步提交
// 手動(dòng)提交位移信息
consumer.commitSync(unmodfiedMap);
}
public void close() {
consumer.wakeup();
// 關(guān)閉ExecutorService實(shí)例
executors.shutdown();
}
}
Main類
????????包裝了所有的工具類,啟動(dòng)整個(gè)程序
package huxi.test.consumer.multithreaded;
public class Main {
public static void main(String[] args) {
// broker列表、topic、group id
String brokerList = "localhost:9092";
String topic = "test-topic";
String groupID = "test-group";
// 1. 根據(jù)ConsumerThreadHandler構(gòu)造方法,創(chuàng)建出消費(fèi)者h(yuǎn)andler
final ConsumerThreadHandler<byte[], byte[]> handler = new ConsumerThreadHandler<>(brokerList, groupID, topic);
final int cpuCount = Runtime.getRuntime().availableProcessors();
// 創(chuàng)建線程的匿名內(nèi)部類
Runnable runnable = new Runnable() {
@Override
public void run() {
// 執(zhí)行consume,在此線程中執(zhí)行消費(fèi)者消費(fèi)消息。
handler.consume(cpuCount);
}
};
new Thread(runnable).start(); // 3. 該函數(shù)會(huì)調(diào)用上面的run()方法
try {
// 20秒后自動(dòng)停止該測(cè)試程序
Thread.sleep(20000L);
} catch (InterruptedException e) {
// swallow this exception
}
System.out.println("Starting to close the consumer...");
handler.close();
}
}
2.3.方案優(yōu)缺點(diǎn)對(duì)比
優(yōu)點(diǎn) | 缺點(diǎn) | |
方案1 | 實(shí)現(xiàn)簡(jiǎn)單 速度較快,因?yàn)闊o(wú)線程交互開(kāi)銷 方便位移管理 易于維護(hù)分區(qū)之間的消息消費(fèi)順序 |
socket連接開(kāi)銷大 consumer受限于topic的分區(qū)數(shù),擴(kuò)展性差 broker端處理負(fù)載高(因?yàn)榘l(fā)往broker的請(qǐng)求較多) reblance可能性增大 |
方案2 | 消息獲取和處理解耦 可獨(dú)立擴(kuò)展consumer和worker數(shù),伸縮性好 |
實(shí)現(xiàn)負(fù)載 難于維護(hù)分區(qū)內(nèi)的消息順序 處理鏈路變長(zhǎng),導(dǎo)致位移管理困難文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-426212.html worker線程異??赡軐?dǎo)致消費(fèi)數(shù)據(jù)丟失文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-426212.html |
到了這里,關(guān)于探究:kafka生產(chǎn)者/消費(fèi)者與多線程安全的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!