国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Kafka核心原理第二彈——更新中

這篇具有很好參考價值的文章主要介紹了Kafka核心原理第二彈——更新中。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

架構(gòu)原理

一、高吞吐機制:Batch打包、緩沖區(qū)、acks

1. Kafka Producer怎么把消息發(fā)送給Broker集群的?

需要指定把消息發(fā)送到哪個topic去

首先需要選擇一個topic的分區(qū),默認是輪詢來負載均衡,但是如果指定了一個分區(qū)key,那么根據(jù)這個key的hash值來分發(fā)到指定的分區(qū),這樣可以讓相同的key分發(fā)到同一個分區(qū)里去,還可以自定義partitioner來實現(xiàn)分區(qū)策略

producer.send(msg); // 用類似這樣的方式去發(fā)送消息,就會把消息給你均勻的分布到各個分區(qū)上去
producer.send(key, msg); // 訂單id,或者是用戶id,他會根據(jù)這個key的hash值去分發(fā)到某個分區(qū)上去,他可以保證相同的key會路由分發(fā)到同一個分區(qū)上去

知道要發(fā)送到哪個分區(qū)之后,還得找到這個分區(qū)的leader副本所在的機器,然后跟那個機器上的Broker通過Socket建立連接來進行通信,發(fā)送Kafka自定義協(xié)議格式的請求過去,把消息就帶過去了

如果找到了partition的leader所在的broker之后,就可以通過socket跟那臺broker建立連接,接著發(fā)送消息過去

Producer(生產(chǎn)者客戶端),起碼要知道兩個元數(shù)據(jù),每個topic有幾個分區(qū),每個分區(qū)的leader是在哪臺broker上,會自己從broker上拉取kafka集群的元數(shù)據(jù),緩存在自己client本地客戶端上

kafka使用者的層面來考慮一下,我如果要把數(shù)據(jù)寫入kafka集群,應(yīng)該如何來做,怎么把數(shù)據(jù)寫入kafka集群,以及他背后的一些原理還有使用過程中需要設(shè)置的一些參數(shù),到底應(yīng)該怎么來弄

Kafka核心原理第二彈——更新中,kafka,linq,分布式

2. 用一張圖告訴你Producer發(fā)送消息的內(nèi)部實現(xiàn)原理

每次發(fā)送消息都必須先把數(shù)據(jù)封裝成一個ProducerRecord對象,里面包含了要發(fā)送的topic,具體在哪個分區(qū),分區(qū)key,消息內(nèi)容,timestamp時間戳,然后這個對象交給序列化器,變成自定義協(xié)議格式的數(shù)據(jù)

接著把數(shù)據(jù)交給partitioner分區(qū)器,對這個數(shù)據(jù)選擇合適的分區(qū),默認就輪詢所有分區(qū),或者根據(jù)key來hash路由到某個分區(qū),這個topic的分區(qū)信息,都是在客戶端會有緩存的,當然會提前跟broker去獲取

接著這個數(shù)據(jù)會被發(fā)送到producer內(nèi)部的一塊緩沖區(qū)里

然后producer內(nèi)部有一個Sender線程,會從緩沖區(qū)里提取消息封裝成一個一個的batch,然后每個batch發(fā)送給分區(qū)的leader副本所在的broker
Kafka核心原理第二彈——更新中,kafka,linq,分布式

3. 基于Java API寫一個Kafka Producer發(fā)送消息的代碼示例

package com.zhss.demo.kafka;

import java.util.Properties;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class ProducerDemo {
	
	public static void main(String[] args) throws Exception {
		Properties props = new Properties();

		// 這里可以配置幾臺broker即可,他會自動從broker去拉取元數(shù)據(jù)進行緩存
		props.put("bootstrap.servers", "hadoop03:9092,hadoop04:9092,hadoop05:9092");  
		// 這個就是負責(zé)把發(fā)送的key從字符串序列化為字節(jié)數(shù)組
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		// 這個就是負責(zé)把你發(fā)送的實際的message從字符串序列化為字節(jié)數(shù)組
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("acks", "-1");
		props.put("retries", 3);
		props.put("batch.size", 323840);
		props.put("linger.ms", 10);
		props.put("buffer.memory", 33554432);
		props.put("max.block.ms", 3000);
		
		// 創(chuàng)建一個Producer實例:線程資源,跟各個broker建立socket連接資源
		KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

		ProducerRecord<String, String> record = new ProducerRecord<>(
				"test-topic", "test-key", "test-value");
		
		// 這是異步發(fā)送的模式
		producer.send(record, new Callback() {

			@Override
			public void onCompletion(RecordMetadata metadata, Exception exception) {
				if(exception == null) {
					// 消息發(fā)送成功
					System.out.println("消息發(fā)送成功");  
				} else {
					// 消息發(fā)送失敗,需要重新發(fā)送
				}
			}

		});

		Thread.sleep(10 * 1000); 
		
		// 這是同步發(fā)送的模式
//		producer.send(record).get(); 
		// 你要一直等待人家后續(xù)一系列的步驟都做完,發(fā)送消息之后
		// 有了消息的回應(yīng)返回給你,你這個方法才會退出來

		producer.close();
	}
	
}

4. 發(fā)送消息給Broker時遇到的各種異常該如何處理?

之前我們看到不管是異步還是同步,都可能讓你處理異常,常見的異常如下:

LeaderNotAvailableException:這個就是如果某臺機器掛了,此時leader副本不可用,會導(dǎo)致你寫入失敗,要等待其他follower副本切換為leader副本之后,才能繼續(xù)寫入,此時可以重試發(fā)送即可

如果說你平時重啟kafka的broker進程,肯定會導(dǎo)致leader切換,一定會導(dǎo)致你寫入報錯,是LeaderNotAvailableException

NotControllerException:這個也是同理,如果說Controller所在Broker掛了,那么此時會有問題,需要等待Controller重新選舉,此時也是一樣就是重試即可

NetworkException:網(wǎng)絡(luò)異常,重試即可

我們之前配置了一個參數(shù),retries,他會自動重試的,但是如果重試幾次之后還是不行,就會提供Exception給我們來處理了

5. 發(fā)送消息的緩沖區(qū)應(yīng)該如何優(yōu)化來提升發(fā)送的吞吐量?

buffer.memory:設(shè)置發(fā)送消息的緩沖區(qū),默認值是33554432,就是32MB

如果發(fā)送消息出去的速度小于寫入消息進去的速度,就會導(dǎo)致緩沖區(qū)寫滿,此時生產(chǎn)消息就會阻塞住,所以說這里就應(yīng)該多做一些壓測,盡可能保證說這塊緩沖區(qū)不會被寫滿導(dǎo)致生產(chǎn)行為被阻塞住

compression.type,默認是none,不壓縮,但是也可以使用lz4壓縮,效率還是不錯的,壓縮之后可以減小數(shù)據(jù)量,提升吞吐量,但是會加大producer端的cpu開銷

6. 消息批量發(fā)送的核心參數(shù)batch.size是如何優(yōu)化吞吐量?

batch.size,設(shè)置meigebatch的大小,如果batch太小,會導(dǎo)致頻繁網(wǎng)絡(luò)請求,吞吐量下降;如果batch太大,會導(dǎo)致一條消息需要等待很久才能被發(fā)送出去,而且會讓內(nèi)存緩沖區(qū)有很大壓力,過多數(shù)據(jù)緩沖在內(nèi)存里

默認值是:16384,就是16kb,也就是一個batch滿了16kb就發(fā)送出去,一般在實際生產(chǎn)環(huán)境,這個batch的值可以增大一些來提升吞吐量,可以自己壓測一下

還有一個參數(shù),linger.ms,這個值默認是0,意思就是消息必須立即被發(fā)送,但是這是不對的,一般設(shè)置一個100毫秒之類的,這樣的話就是說,這個消息被發(fā)送出去后進入一個batch,如果100毫秒內(nèi),這個batch滿了16kb,自然就會發(fā)送出去

但是如果100毫秒內(nèi),batch沒滿,那么也必須把消息發(fā)送出去了,不能讓消息的發(fā)送延遲時間太長,也避免給內(nèi)存造成過大的一個壓力

7. 如何根據(jù)業(yè)務(wù)場景對消息大小以及請求超時進行合理的設(shè)置?

max.request.size:這個參數(shù)用來控制發(fā)送出去的消息的大小,默認是1048576字節(jié),也就1mb,這個一般太小了,很多消息可能都會超過1mb的大小,所以需要自己優(yōu)化調(diào)整,把他設(shè)置更大一些

你發(fā)送出去的一條大數(shù)據(jù),超大的JSON串,超過1MB,就不讓你發(fā)了

request.timeout.ms:這個就是說發(fā)送一個請求出去之后,他有一個超時的時間限制,默認是30秒,如果30秒都收不到響應(yīng),那么就會認為異常,會拋出一個TimeoutException來讓我們進行處理

8. 基于Kafka內(nèi)核架構(gòu)原理深入分析acks參數(shù)到底是干嘛的

acks參數(shù),其實是控制發(fā)送出去的消息的持久化機制的

如果acks=0,那么producer根本不管寫入broker的消息到底成功沒有,發(fā)送一條消息出去,立馬就可以發(fā)送下一條消息,這是吞吐量最高的方式,但是可能消息都丟失了,你也不知道的,但是說實話,你如果真是那種實時數(shù)據(jù)流分析的業(yè)務(wù)和場景,就是僅僅分析一些數(shù)據(jù)報表,丟幾條數(shù)據(jù)影響不大的

會讓你的發(fā)送吞吐量會提升很多,你發(fā)送弄一個batch出,不需要等待人家leader寫成功,直接就可以發(fā)送下一個batch了,吞吐量很大的,哪怕是偶爾丟一點點數(shù)據(jù),實時報表,折線圖,餅圖

acks=all,或者acks=-1:這個leader寫入成功以后,必須等待其他ISR中的副本都寫入成功,才可以返回響應(yīng)說這條消息寫入成功了,此時你會收到一個回調(diào)通知

min.insync.replicas = 2,ISR里必須有2個副本,一個leader和一個follower,最最起碼的一個,不能只有一個leader存活,連一個follower都沒有了

acks = -1,每次寫成功一定是leader和follower都成功才可以算做成功,leader掛了,follower上是一定有這條數(shù)據(jù),不會丟失

retries = Integer.MAX_VALUE,無限重試,如果上述兩個條件不滿足,寫入一直失敗,就會無限次重試,保證說數(shù)據(jù)必須成功的發(fā)送給兩個副本,如果做不到,就不停的重試,除非是面向金融級的場景,面向企業(yè)大客戶,或者是廣告計費,跟錢的計算相關(guān)的場景下,才會通過嚴格配置保證數(shù)據(jù)絕對不丟失

acks=1:只要leader寫入成功,就認為消息成功了,默認給這個其實就比較合適的,還是可能會導(dǎo)致數(shù)據(jù)丟失的,如果剛寫入leader,leader就掛了,此時數(shù)據(jù)必然丟了,其他的follower沒收到數(shù)據(jù)副本,變成leader

9. 針對瞬間異常的消息重試參數(shù)有哪些需要考慮的點

有的時候一些leader切換之類的問題,需要進行重試,設(shè)置retries即可,而且還可以跟消息不丟失結(jié)合起來,但是消息重試會導(dǎo)致重復(fù)發(fā)送的問題,比如說網(wǎng)絡(luò)抖動一下導(dǎo)致他以為沒成功,就重試了,其實人家都成功了

所以消息重試導(dǎo)致的消費重復(fù),需要你在下游consumer做冪等性處理,但是kafka已經(jīng)支持了一次且僅一次的消息語義

另外一個,消息重試是可能導(dǎo)致消息的亂序的,因為可能排在你后面的消息都發(fā)送出去了,你現(xiàn)在收到回調(diào)失敗了才在重試,此時消息就會亂序,所以可以使用“max.in.flight.requests.per.connection”參數(shù)設(shè)置為1,這樣可以保證producer同一時間只能發(fā)送一條消息

兩次重試的間隔默認是100毫秒,用“retry.backoff.ms”來進行設(shè)置

一般來說,某臺broker重啟導(dǎo)致的leader切換,是最常見的異常,所以盡可能把重試次數(shù)和間隔,設(shè)置的可以cover住新leader切換過來

10. Kafka Producer高階用法(一):自定義分區(qū)

public class HotDataPartitioner implements Partitioner {

private Random random;

@Override
public void configure(Map<String, ?> configs) {
random = new Random();
}

@Override
public int partition(String topic, Object keyObj, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
String key = (String)keyObj;
List<PartitionInfo> partitionInfoList = cluster.availablePartitionsForTopic(topic);
int partitionCount = partitionInfoList.size();
int hotDataPartition = partitionCount - 1;
return !key.contains(“hot_data”) ? random.nextInt(partitionCount - 1) : hotDataPartition;
}

}

props.put(“partitioner.class, “com.zhss.HotDataPartitioner”);

測試發(fā)送

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test-topic

11. Kafka Producer高階用法(二):自定義序列化

12. Kafka Producer高階用法(三):自定義攔截器

二、Kafka Consumer選舉與Rebalance實現(xiàn)原理

1. 一張圖畫清Kafka基于Consumer Group的消費者組的模型

每個consumer都要屬于一個consumer.group,就是一個消費組,topic的一個分區(qū)只會分配給一個消費組下的一個consumer來處理,每個consumer可能會分配多個分區(qū),也有可能某個consumer沒有分配到任何分區(qū)

分區(qū)內(nèi)的數(shù)據(jù)是保證順序性的

group.id = “membership-consumer-group”

如果你希望實現(xiàn)一個廣播的效果,你的每臺機器都要消費到所有的數(shù)據(jù),每臺機器啟動的時候,group.id可以是一個隨機生成的UUID也可以,你只要讓不同的機器的KafkaConsumer的group.id是不一樣的

如果consumer group中某個消費者掛了,此時會自動把分配給他的分區(qū)交給其他的消費者,如果他又重啟了,那么又會把一些分區(qū)重新交還給他,這個就是所謂的消費者rebalance的過程

2. 消費者offset的記錄方式以及基于內(nèi)部topic的提交模式

每個consumer內(nèi)存里數(shù)據(jù)結(jié)構(gòu)保存對每個topic的每個分區(qū)的消費offset,定期會提交offset,老版本是寫入zk,但是那樣高并發(fā)請求zk是不合理的架構(gòu)設(shè)計,zk是做分布式系統(tǒng)的協(xié)調(diào)的,輕量級的元數(shù)據(jù)存儲,不能負責(zé)高并發(fā)讀寫,作為數(shù)據(jù)存儲

所以后來就是提交offset發(fā)送給內(nèi)部topic:__consumer_offsets,提交過去的時候,key是group.id+topic+分區(qū)號,value就是當前offset的值,每隔一段時間,kafka內(nèi)部會對這個topic進行compact

也就是每個group.id+topic+分區(qū)號就保留最新的那條數(shù)據(jù)即可

而且因為這個__consumer_offsets可能會接收高并發(fā)的請求,所以默認分區(qū)50個,這樣如果你的kafka部署了一個大的集群,比如有50臺機器,就可以用50臺機器來抗offset提交的請求壓力,就好很多
Kafka核心原理第二彈——更新中,kafka,linq,分布式

3. 基于Java API寫一個Kafka Consumer消費消息的代碼示例

String topicName = “test-topic”;
String groupId = “test-group”;

Properties props = new Properties();
props.put(“bootstrap.servers”, “l(fā)ocalhost:9092);
props.put(“group.id”, “groupId”);
props.put(“enable.auto.commit”,true);
props.put(“auto.commit.ineterval.ms”,1000);
// 每次重啟都是從最早的offset開始讀取,不是接著上一次
props.put(“auto.offset.reset”, “earliest”); 
props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
props.put(“value.deserializer”, “org.apache.kafka.common.serialization.SttringDeserializer”);

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topicName));

try {
while(true) {
ConsumerRecords<String, String> records = consumer.poll(1000); // 超時時間
for(ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() +,+ record.key() +,+ record.value());
}
}
} catch(Exception e) {

}

4. Kafka感知消費者故障是通過哪三個參數(shù)來實現(xiàn)的?

heartbeat.interval.ms:consumer心跳時間,必須得保持心跳才能知道consumer是否故障了,然后如果故障之后,就會通過心跳下發(fā)rebalance的指令給其他的consumer通知他們進行rebalance的操作

session.timeout.ms:kafka多長時間感知不到一個consumer就認為他故障了,默認是10秒

max.poll.interval.ms:如果在兩次poll操作之間,超過了這個時間,那么就會認為這個consume處理能力太弱了,會被踢出消費組,分區(qū)分配給別人去消費,一遍來說結(jié)合你自己的業(yè)務(wù)處理的性能來設(shè)置就可以了

5. 對消息進行消費時有哪幾個參數(shù)需要注意以及設(shè)置呢?

fetch.max.bytes:獲取一條消息最大的字節(jié)數(shù),一般建議設(shè)置大一些

max.poll.records:一次poll返回消息的最大條數(shù),默認是500條

connection.max.idle.ms:consumer跟broker的socket連接如果空閑超過了一定的時間,此時就會自動回收連接,但是下次消費就要重新建立socket連接,這個建議設(shè)置為-1,不要去回收

6. 消費者offset相關(guān)的參數(shù)設(shè)置會對運行產(chǎn)生什么樣的影響?

auto.offset.reset:這個參數(shù)的意思是,如果下次重啟,發(fā)現(xiàn)要消費的offset不在分區(qū)的范圍內(nèi),就會重頭開始消費;但是如果正常情況下會接著上次的offset繼續(xù)消費的

enable.auto.commit:這個就是開啟自動提交位移

7. Group Coordinator是什么以及主要負責(zé)什么?

每個consumer group都會選擇一個broker作為自己的coordinator,他是負責(zé)監(jiān)控這個消費組里的各個消費者的心跳,以及判斷是否宕機,然后開啟rebalance的,那么這個如何選擇呢?

就是根據(jù)group.id來進行選擇,他有內(nèi)部的一個選擇機制,會給你挑選一個對應(yīng)的Broker,總會把你的各個消費組均勻分配給各個Broker作為coordinator來進行管理的

他負責(zé)的事情只要就是rebalance,說白了你的consumer group中的每個consumer剛剛啟動就會跟選舉出來的這個consumer group對應(yīng)的coordinator所在的broker進行通信,然后由coordinator分配分區(qū)給你的這個consumer來進行消費

coordinator會盡可能均勻的分配分區(qū)給各個consumer來消費

8. 為消費者選擇Coordinator的算法是如何實現(xiàn)的?

首先對groupId進行hash,接著對__consumer_offsets的分區(qū)數(shù)量取模,默認是50,可以通過offsets.topic.num.partitions來設(shè)置,找到你的這個consumer group的offset要提交到__consumer_offsets的哪個分區(qū)

比如說:groupId,“membership-consumer-group” -> hash值(數(shù)字)-> 對50取模 -> 就知道這個consumer group下的所有的消費者提交offset的時候是往哪個分區(qū)去提交offset,大家可以找到__consumer_offsets的一個分區(qū)

__consumer_offset的分區(qū)的副本數(shù)量默認來說1,只有一個leader

然后對這個分區(qū)找到對應(yīng)的leader所在的broker,這個broker就是這個consumer group的coordinator了,接著就會維護一個Socket連接跟這個Broker進行通信

9. Coordinator和Consume Leader如何協(xié)作制定分區(qū)方案?

每個consumer都發(fā)送JoinGroup請求到Coordinator,然后Coordinator從一個consumer group中選擇一個consumer作為leader,把consumer group情況發(fā)送給這個leader,接著這個leader會負責(zé)制定分區(qū)方案,通過SyncGroup發(fā)給Coordinator

接著Coordinator就把分區(qū)方案下發(fā)給各個consumer,他們會從指定的分區(qū)的leader broker開始進行socket連接以及消費消息
Kafka核心原理第二彈——更新中,kafka,linq,分布式

10. rebalance的三種策略分別有哪些優(yōu)劣勢?

這里有三種rebalance的策略:range、round-robin、sticky

0~8

order-topic-0
order-topic-1
order-topic-2

range策略就是按照partiton的序號范圍,比如partitioin02給一個consumer,partition35給一個consumer,partition6~8給一個consumer,默認就是這個策略;

round-robin策略,就是輪詢分配,比如partiton0、3、6給一個consumer,partition1、4、7給一個consumer,partition2、5、8給一個consumer

但是上述的問題就在于說,可能在rebalance的時候會導(dǎo)致分區(qū)被頻繁的重新分配,比如說掛了一個consumer,然后就會導(dǎo)致partition04分配給第一個consumer,partition58分配給第二個consumer

這樣的話,原本是第二個consumer消費的partition3~4就給了第一個consumer,實際上來說未必就很好

最新的一個sticky策略,就是說盡可能保證在rebalance的時候,讓原本屬于這個consumer的分區(qū)還是屬于他們,然后把多余的分區(qū)再均勻分配過去,這樣盡可能維持原來的分區(qū)分配的策略

consumer1:0~2 + 6~7
consumer2:3~5 + 8

11. Consumer內(nèi)部單線程處理一切事務(wù)的核心設(shè)計思想

其實就是在一個while循環(huán)里不停的去調(diào)用poll()方法,其實是我們自己的一個線程,就是我們自己的這個線程就是唯一的KafkaConsumer的工作線程,新版本的kafka api,簡化,減少了線程數(shù)量

Consumer自己內(nèi)部就一個后臺線程,定時發(fā)送心跳給broker;但是其實負責(zé)進行拉取消息、緩存消息、在內(nèi)存里更新offset、每隔一段時間提交offset、執(zhí)行rebalance這些任務(wù)的就一個線程,其實就是我們調(diào)用Consumer.poll()方法的那個線程

就一個線程調(diào)用進去,會負責(zé)把所有的事情都干了

為什么叫做poll呢?因為就是你可以監(jiān)聽N多個Topic的消息,此時會跟集群里很多Kafka Broker維護一個Socket連接,然后每一次線程調(diào)用poll(),就會監(jiān)聽多個socket是否有消息傳遞過來

可能一個consumer會消費很多個partition,每個partition其實都是leader可能在不同的broker上,那么如果consumer要拉取多個partition的數(shù)據(jù),就需要跟多個broker進行通信,維護socket

每個socket就會跟一個broker進行通信

每個Consumer內(nèi)部會維護多個Socket,負責(zé)跟多個Broker進行通信,我們就一個工作線程每次調(diào)用poll()的時候,他其實會監(jiān)聽多個socket跟broker的通信,是否有新的數(shù)據(jù)可以去拉取
Kafka核心原理第二彈——更新中,kafka,linq,分布式

12. 消費過程中的各種offset之間的關(guān)系是什么?

上一次提交offset,當前offset(還未提交),高水位offset,LEO

內(nèi)存里記錄這么幾個東西:上一次提交offset,當前消費到的offset,你不斷的在消費消息,不停的在拉取新的消息,不停的更新當前消費的offset,HW offset,你拉取的時候,是只能看到HW他前面的數(shù)據(jù)

LEO,leader partition已經(jīng)更新到了一個offset了,但是HW在前面,你只能拉取到HW的數(shù)據(jù),HW后面的數(shù)據(jù),意味著不是所有的follower都寫入進去了,所以不能去讀取的

13. 自動提交offset的語義以及導(dǎo)致消息丟失和重復(fù)消費的問題

默認是自動提交

auto.commit.inetrval.ms:5000,默認是5秒提交一次

如果你提交了消費到的offset之后,人家kafka broker就可以感知到了,比如你消費到了offset = 56987,下次你的consumer再次重啟的時候,就會自動從kafka broker感知到說自己上一次消費到的offset = 56987

這次重啟之后,就繼續(xù)從offset = 56987這個位置繼續(xù)往后去消費就可以了

他的語義是一旦消息給你poll到了之后,這些消息就認為處理完了,后續(xù)就可以提交了,所以這里有兩種問題:

第一,消息丟失,如果你剛poll到消息,然后還沒來得及處理,結(jié)果人家已經(jīng)提交你的offset了,此時你如果consumer宕機,再次重啟,數(shù)據(jù)丟失,因為上一次消費的那批數(shù)據(jù)其實你沒處理,結(jié)果人家認為你處理了

poll到了一批數(shù)據(jù),offset = 65510~65532,人家剛好就是到了時間提交了offset,offset = 65532這個地方已經(jīng)提交給了kafka broker,接著你準備對這批數(shù)據(jù)進行消費,但是不巧的是,你剛要消費就直接宕機了

其實你消費到的數(shù)據(jù)是沒處理的,但是消費offset已經(jīng)提交給kafka了,下次你重啟的時候,offset = 65533這個位置開始消費的,之前的一批數(shù)據(jù)就丟失了

第二,重復(fù)消費,如果你poll到消息,都處理完畢了,此時還沒來得及提交offset,你的consumer就宕機了,再次重啟會重新消費到這一批消息,再次處理一遍,那么就是有消息重復(fù)消費的問題

poll到了一批數(shù)據(jù),offset = 65510~65532,你很快的處理完了,都寫入數(shù)據(jù)庫了,結(jié)果還沒來得及提交offset就宕機了,上一次提交的offset = 65509,重啟,他會再次讓你消費offset = 65510~65532,一樣的數(shù)據(jù)再次重復(fù)消費了一遍,寫入數(shù)據(jù)庫

重啟kafka consumer,修改了他的代碼

14. 如何實現(xiàn)Consumer Group的狀態(tài)機流轉(zhuǎn)機制?

剛開始Consumer Group狀態(tài)是:Empty

接著如果部分consumer發(fā)送了JoinGroup請求,會進入:PreparingRebalance的狀態(tài),等待一段時間其他成員加入,這個時間現(xiàn)在默認就是max.poll.interval.ms來指定的,所以這個時間間隔一般可以稍微大一點

接著如果所有成員都加入組了,就會進入AwaitingSync狀態(tài),這個時候就不能允許任何一個consumer提交offset了,因為馬上要rebalance了,進行重新分配了,這個時候就會選擇一個leader consumer,由他來制定分區(qū)方案

然后leader consumer制定好了分區(qū)方案,SyncGroup請求發(fā)送給coordinator,他再下發(fā)方案給所有的consumer成員,此時進入stable狀態(tài),都可以正?;趐oll來消費了

所以如果說在stable狀態(tài)下,有consumer進入組或者離開崩潰了,那么都會重新進入PreparingRebalance狀態(tài),重新看看當前組里有誰,如果剩下的組員都在,那么就進入AwaitingSync狀態(tài)

leader consumer重新制定方案,然后再下發(fā)

15. 最新設(shè)計的rebalance分代機制可以有什么作用?

大家設(shè)想一個場景,在rebalance的時候,可能你本來消費了partition3的數(shù)據(jù),結(jié)果有些數(shù)據(jù)消費了還沒提交offset,結(jié)果此時rebalance,把partition3分配給了另外一個cnosumer了,此時你如果提交partition3的數(shù)據(jù)的offset,能行嗎?

必然不行,所以每次rebalance會觸發(fā)一次consumer group generation,分代,每次分代會加1,然后你提交上一個分代的offset是不行的,那個partiton可能已經(jīng)不屬于你了,大家全部按照新的partiton分配方案重新消費數(shù)據(jù)

consumer group generation = 1
consumer group generation = 2

16. Consumer端的自定義反序列化器是什么?

17. 自行指定每個Consumer要消費哪些分區(qū)有用嗎?

List partitions = consumer.partitionsFor(“order-topic”);

new TopicPartition(partitionInfo.topic(), partitionInfo.partition());

consumer.assign(partitions); //指定每個consumer要消費哪些分區(qū),你就不是依靠consumer的自動的分區(qū)分配方案來做了

18. 老版本的high-level consumer的實現(xiàn)原理是什么?

producer和consumer api原理,都是新版本的kafka api

老版本的kafka consumer api分成兩種,high-level和low-level,都是基于zk實現(xiàn)的,只不過前者有consumer group的概念,后者沒有

high-level的api,比如說consumer啟動就是在zk里寫一個臨時節(jié)點,但是如果自己宕機了,那么zk臨時節(jié)點就沒了,別人就會發(fā)現(xiàn),然后就會開啟rebalance

然后在消費的時候,可以指定多個線程取消費一個topic,比如說你和這個consumer分配到了5個分區(qū),那么你可以指定最多5個線程,每個線程消費一個分區(qū)的數(shù)據(jù),但是新版本的就一個線程負責(zé)消費所有分區(qū)

在提交offset,就是向zk寫入對某個分區(qū)現(xiàn)在消費到了哪個offset了,默認60秒才提交一次

新版本的api就不基于zk來實現(xiàn)了呢,zk主要是做輕量級的分布式協(xié)調(diào),元數(shù)據(jù)存儲,并不適合高并發(fā)大量連接的場景,cnosumer可能有成百上千個,成千上萬個,zk來做的,連接的壓力,高并發(fā)的讀寫

broker內(nèi)部基于zk來進行協(xié)調(diào)

19. 老版本的low-level consumer的實現(xiàn)原理是什么?

老版本的low-level消費者,是可以自己控制offset的,實現(xiàn)很底層的一些控制,但是需要自己去提交offset,還要自己找到某個分區(qū)對應(yīng)的leader broker,跟他進行連接獲取消息,如果leader變化了,也得自己處理,非常的麻煩

比如說storm-kafka這個插件,在storm消費kafka數(shù)據(jù)的時候,就是使用的low-level api,自己獲取offset,提交寫入zk中自己指定的znode中,但是在未來基本上老版本的會越來越少使用

三、Kafka的時間輪延時調(diào)度機制與架構(gòu)原理總結(jié)

1. Producer的緩沖區(qū)內(nèi)部數(shù)據(jù)結(jié)構(gòu)是什么樣子的?

producer會創(chuàng)建一個accumulator緩沖區(qū),他里面是一個HashMap數(shù)據(jù)結(jié)構(gòu),每個分區(qū)都會對應(yīng)一個batch隊列,因為你打包成出來的batch,那必須是這個batch都是發(fā)往同一個分區(qū)的,這樣才能發(fā)送一個batch到這個分區(qū)的leader broker

{
“order-topic-0” -> [batch1, batch2],
“order-topic-1” -> [batch3]
}

batch.size

每個batch包含三個東西,一個是compressor,這是負責(zé)追加寫入batch的組件;第二個是batch緩沖區(qū),就是寫入數(shù)據(jù)的地方;第三個是thunks,就是每個消息都有一個回調(diào)Callback匿名內(nèi)部類的對象,對應(yīng)batch里每個消息的回調(diào)函數(shù)

每次寫入一條數(shù)據(jù)都對應(yīng)一個Callback回調(diào)函數(shù)的

2. 消息緩沖區(qū)滿的時候是阻塞住還是拋出異常?

max.block.ms,其實就是說如果寫緩沖區(qū)滿了,此時是阻塞住一段時間,然后什么時候拋異常,默認是60000,也就是60秒

3. 負責(zé)IO請求的Sender線程是如何基于緩沖區(qū)發(fā)送數(shù)據(jù)的?

Sender線程會不停的輪詢緩沖區(qū)內(nèi)的HashMap,看batch是否滿了,或者是看linger.ms時間是不是到了,然后就得發(fā)送數(shù)據(jù)去,發(fā)送的時候會根據(jù)各個batch的目標leader broker來進行分組

因為可能不同的batch是對應(yīng)不同的分區(qū),但是不同的分區(qū)的Leader是在一個broker上的,<Node, List>,接著會進一步封裝為<Node, Request>,每個broker一次就是一個請求,但是這里可能包含很多個batch,接著就是將分組好的batch發(fā)送給leader broker,并且處理response,來反過來調(diào)用每個batch的callback函數(shù)

發(fā)送出去的Request會被放入InFlighRequests里面去保存,Map<NodeId, Deque>,這里就代表了發(fā)送出去的請求,但是還沒接收到響應(yīng)的

4. 同時可以接受有幾個發(fā)送到Broker的請求沒收到響應(yīng)?

Map<NodeId, Deque> => 給這個broker發(fā)送了哪些請求過去了?

max.in.flight.requests.per.connection:5

這個參數(shù)默認值是5,默認情況下,每個Broker最多只能有5個請求是發(fā)送出去但是還沒接收到響應(yīng)的,所以這種情況下是有可能導(dǎo)致順序錯亂的,大家一定要搞清楚這一點,先發(fā)送的請求可能后續(xù)要重發(fā)

5. Kafka自定義的基于TCP的二進制協(xié)議深入探秘一番(一)

kafka自定義了一組二進制的協(xié)議,現(xiàn)在一共是包含了43種協(xié)議類型,每種協(xié)議都有對應(yīng)的請求和響應(yīng),Request和Response,其實說白了,如果大家現(xiàn)在看咱們的那個自研分布式存儲系統(tǒng)的課,里面用到了gRPC

你大概可以認為就是定義了43種接口,每個接口就是一種協(xié)議,然后每個接口都有自己對應(yīng)的Request和Response,就這個意思

每個協(xié)議的Request都有相同的請求頭(RequestHeader),也有不同的請求體(RequestBody),請求頭包含了:api_key、api_version、correlation_id、client_id,這里的api_key就類似于“PRODUCE”、“FETCH”,你可以認為是接口的名字吧

“PRODUCE”就是發(fā)送消息的接口,“FETCH”就是拉取消息的接口,就這個意思

api_version,就是這個API的版本號

correlation_id,就是類似客戶端生成的一次請求的唯一標志位,唯一標識一次請求

client_id,就是客戶端的id

每個協(xié)議的Response也有相同的響應(yīng)頭,就是一個correlation_id,就是對某個請求的響應(yīng)

6. Kafka自定義的基于TCP的二進制協(xié)議深入探秘一番(二)

比如說發(fā)送消息,就是ProduceRequest和ProduceResponse,代表“PRODUCE”這個接口的請求和響應(yīng),api_key=0,其實就是“PRODUCE”接口的代表

他的RequestBody,包含了:transactional_id,acks,timeout,topic_data(topic,data(partition,record_set)),acks就是客戶端自己指定的acks參數(shù),這個會指示leader和follower副本的寫入方式,timeout就是超時時間,默認就是30秒,request.timeout.ms

然后就是要寫入哪個topic,哪個分區(qū),以及對應(yīng)數(shù)據(jù)集合,里面是多個batch

ProduceResponse,ResponseBody,包含了responses(topic,partition_responses(partition,error_code,base_offset,log_append_time,log_start_offset)),throttle_time_ms,簡單來說就是當前響應(yīng)是對哪個topic寫入的響應(yīng)

包含了每個topic的各個分區(qū)的響應(yīng),每個partition的寫入響應(yīng),包括error_code錯誤碼,base_offset是消息集合的起始offset,log_append_time是寫入broker端的時間,log_start_offset是分區(qū)的起始offset

其實各種接口大體上來說就是如此,所以現(xiàn)在大家就知道了,協(xié)議就是一種規(guī)定,你發(fā)送過來的請求是什么格式的,他可能有請求頭還有請求體,分別包含哪些字段,按什么格式放數(shù)據(jù),響應(yīng)也是一樣的

然后大家就可以按一樣的協(xié)議來發(fā)送請求和接收響應(yīng)

7. 盤點一下在Broker內(nèi)部有哪些不同場景下會有延時任務(wù)?

比如說acks=-1,那么必須等待leader和follower都寫完才能返回響應(yīng),而且有一個超時時間,默認是30秒,也就是request.timeout.ms,那么在寫入一條數(shù)據(jù)到leader磁盤之后,就必須有一個延時任務(wù),到期時間是30秒

延時任務(wù)會被放到DelayedOperationPurgatory,延時操作管理器中

這個延時任務(wù)如果因為所有follower都寫入副本到本地磁盤了,那么就會被自動觸發(fā)蘇醒,那么就可以返回響應(yīng)結(jié)果給客戶端了,否則的話,這個延時任務(wù)自己指定了最多是30秒到期,如果到了超時時間都沒等到,那么就直接超時返回異常了

還有一種是延時拉取任務(wù),也就是說follower往leader拉取消息的時候,如果發(fā)現(xiàn)是空的,那么此時會創(chuàng)建一個延時拉取任務(wù),然后延時時間到了之后,就會再次讀取一次消息,如果過程中l(wèi)eader寫入了消息那么也會自動執(zhí)行這個拉取任務(wù)

8. Kafka的時間輪延時調(diào)度機制(一):O(1)時間復(fù)雜度

Kafka內(nèi)部有很多延時任務(wù),沒有基于JDK Timer來實現(xiàn),那個插入和刪除任務(wù)的時間復(fù)雜度是O(nlogn),而是基于了自己寫的時間輪來實現(xiàn)的,時間復(fù)雜度是O(1),其實Netty、ZooKeeper、Quartz很多中間件都會實現(xiàn)時間輪

延時任務(wù)是很多很多的,大量的發(fā)送消息以及拉取消息,都會涉及到延時任務(wù),任務(wù)數(shù)量很多,如果基于傳統(tǒng)的JDK Timer把大量的延時任務(wù)頻繁的插入和刪除,時間復(fù)雜度是O(nlogn)性能比較低的

時間輪的機制,延時任務(wù)插入和刪除,O(1)

簡單來說,一個時間輪(TimerWheel)就是一個數(shù)組實現(xiàn)的存放定時任務(wù)的環(huán)形隊列,數(shù)組每個元素都是一個定時任務(wù)列表(TimerTaskList),這個TimerTaskList是一個環(huán)形雙向鏈表,鏈表里的每個元素都是定時任務(wù)(TimerTask)

時間輪是有很多個時間格的,一個時間格就是時間輪的時間跨度tickMs,wheelSize就是時間格的數(shù)量,時間輪的總時間跨度就是tickMs * wheelSize(interval),然后還有一個表盤指針(currentTime),就是時間輪當前所處的時間

currentTime指向的時間格就是到期,需要執(zhí)行里面的定時任務(wù)

比如說tickMs = 1ms,wheelSize = 20,那么時間輪跨度(inetrval)就是20ms,剛開始currentTime = 0,這個時候如果有一個延時2ms之后執(zhí)行的任務(wù)插入進來,就會基于數(shù)組的index直接定位到時間輪底層數(shù)組的第三個元素

因為tickMs = 1ms,所以第一個元素代表的是0ms,第二個元素代表的是1ms的地方,第三個元素代表的就是2ms的地方,直接基于數(shù)組來定位就是O(1)是吧,然后到數(shù)組之后把這個任務(wù)插入其中的雙向鏈表,這個時間復(fù)雜度也是O(1)

所以這個插入定時任務(wù)的時間復(fù)雜度就是O(1)

然后currentTime會隨著時間不斷的推移,1ms之后會指向第二個時間格,2ms之后會指向第三個時間格,這個時候就會執(zhí)行第三個時間格里剛才插入進來要在2ms之后執(zhí)行的那個任務(wù)了

這個時候如果插入進來一個8ms之后要執(zhí)行的任務(wù),那么就會放到第11個時間格上去,相比于currentTime剛好是8ms之后,對吧,就是個意思,然后如果是插入一個19ms之后執(zhí)行的呢?那就會放在第二個時間格

每個插入進來的任務(wù),他都會依據(jù)當前的currentTime來放,最后正好要讓currentTime轉(zhuǎn)動這么多時間之后,正好可以執(zhí)行那個時間格里的任務(wù)

9. Kafka的時間輪延時調(diào)度機制(二):多層級時間輪

接著上一講的內(nèi)容,那如果這個時候來一個350毫秒之后執(zhí)行的定時任務(wù)呢?已經(jīng)超出當前這個時間輪的范圍了,那么就放到上層時間輪,上層時間輪的tickMs就是下層時間輪的interval,也就是20ms

wheelSize是固定的,都是20,那么上層時間輪的inetrval周期就是400ms,如果再上一層的時間輪他的tickMs是400ms,那么interval周期就是8000ms,也就是8s,再上一層時間輪的tickMs是8s,interval就是160s,也就是好幾分鐘了,以此類推即可

反正有很多層級的時間輪,一個時間輪不夠,就往上開辟一個新的時間輪出來,每個時間輪的tickMs是下級時間輪的interval,而且currentTime就跟時鐘的指針一樣是不停的轉(zhuǎn)動的,你只要根據(jù)定時周期把他放入對應(yīng)的輪子即可

每個輪子插入的時候根據(jù)currentTime,放到對應(yīng)時間之后的時間格即可

比如定時350ms后執(zhí)行的任務(wù),就可以放到interval位400ms的時間輪內(nèi),currentTime自然會轉(zhuǎn)動到那個時間格來執(zhí)行他

10. Kafka的時間輪延時調(diào)度機制(三):時間輪層級的下滑

接著上一講的內(nèi)容,那如果這個時候來一個350毫秒之后執(zhí)行的定時任務(wù)呢?已經(jīng)超出當前這個時間輪的范圍了,那么就放到上層時間輪,上層時間輪的tickMs就是下層時間輪的interval,也就是20ms

wheelSize是固定的,都是20,那么上層時間輪的inetrval周期就是400ms,如果再上一層的時間輪他的tickMs是400ms,那么interval周期就是8000ms,也就是8s,再上一層時間輪的tickMs是8s,interval就是160s,也就是好幾分鐘了,以此類推即可

反正有很多層級的時間輪,一個時間輪不夠,就往上開辟一個新的時間輪出來,每個時間輪的tickMs是下級時間輪的interval,而且currentTime就跟時鐘的指針一樣是不停的轉(zhuǎn)動的,你只要根據(jù)定時周期把他放入對應(yīng)的輪子即可

每個輪子插入的時候根據(jù)currentTime,放到對應(yīng)時間之后的時間格即可

比如定時350ms后執(zhí)行的任務(wù),就可以放到interval位400ms的時間輪內(nèi),currentTime自然會轉(zhuǎn)動到那個時間格來執(zhí)行他

11. Kafka的時間輪延時調(diào)度機制(四):基于DelayQueue推動

基于數(shù)組和雙向鏈表來O(1)時間度可以插入任務(wù)

但是推進時間輪怎么做呢?搞一個線程不停的空循環(huán)判斷是否進入下一個時間格嗎?那樣很浪費CPU資源,所以采取的是DelayQueue

每個時間輪里的TimerTaskList作為這個時間格的任務(wù)列表,都會插入DelayQueue中,設(shè)置一個延時出隊時間,DelayQueue會自動把過期時間最短的排在隊頭,然后專門有一個線程來從DelayQueue里獲取到期任務(wù)列表

某個時間格對應(yīng)的TimerTaskList到期之后,就會被線程獲取到,這種方式就可以實現(xiàn)時間輪推進的效果,推進時間輪基于DelayQueue,時間復(fù)雜度也是O(1),因為只要從隊頭獲取即可
Kafka核心原理第二彈——更新中,kafka,linq,分布式
Kafka核心原理第二彈——更新中,kafka,linq,分布式文章來源地址http://www.zghlxwxcb.cn/news/detail-698776.html

到了這里,關(guān)于Kafka核心原理第二彈——更新中的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Kafka核心原理之精準一次性投遞

    Kafka核心原理之精準一次性投遞

    在Kafka中,精準一次性投遞(Exactly Once)=至少投遞一次(At Least Once)+冪等性。 至少投遞一次(At Least Once):將生產(chǎn)端參數(shù)acks設(shè)置為-1(all),可以保證生產(chǎn)端發(fā)送到Broker的消息不會丟失,即:至少投遞一次(At Least Once)。 冪等性: 冪等生產(chǎn)者冪保證單分區(qū)單會話內(nèi)精準一

    2024年04月25日
    瀏覽(27)
  • Kafka第二課-代碼實戰(zhàn)、參數(shù)配置詳解、設(shè)計原理詳解

    Kafka第二課-代碼實戰(zhàn)、參數(shù)配置詳解、設(shè)計原理詳解

    引入依賴 生產(chǎn)者代碼以及參數(shù)詳解 消費者代碼以及參數(shù)詳解 實體類 引入基本依賴 配置application.yml 當配置ack-mode: MANUAL_IMMEDIATE時,需要手動在消費者提交offset,否則會一直重復(fù)消費 消費者 測試,訪問生產(chǎn)者控制層,可以自動監(jiān)聽到消費者 Kafka核心總控制器Controller 在Kafka集

    2024年02月16日
    瀏覽(21)
  • Kafka核心設(shè)計與實踐原理:設(shè)計理念、基本概念、主要功能與應(yīng)用場景

    Kafka核心設(shè)計與實踐原理:設(shè)計理念、基本概念、主要功能與應(yīng)用場景

    詳細介紹Kafka作為分布式流式處理平臺的設(shè)計理念、基本概念,以及其主要功能與應(yīng)用場景,包括消息系統(tǒng)、容錯的持久化、流式處理平臺等功能,同時探討如何保證消息的唯一性、消費順序等問題。

    2024年02月22日
    瀏覽(19)
  • 【Kafka專題】Kafka收發(fā)消息核心參數(shù)詳解

    【Kafka專題】Kafka收發(fā)消息核心參數(shù)詳解

    在Kafka中,對于客戶端和服務(wù)端的定義如下: 客戶端Client: 包括消息生產(chǎn)者 和 消息消費者 服務(wù)端:即Broker,Broker:一個Kafka服務(wù)器就是一個Broker 首先,先引入客戶端maven依賴 1.1 消息發(fā)送者源碼示例 然后可以使用Kafka提供的客戶端——Producer類,快速發(fā)送消息。 看上面的源

    2024年02月03日
    瀏覽(16)
  • 【分布式應(yīng)用】kafka集群、Filebeat+Kafka+ELK搭建

    【分布式應(yīng)用】kafka集群、Filebeat+Kafka+ELK搭建

    主要原因是由于在高并發(fā)環(huán)境下,同步請求來不及處理,請求往往會發(fā)生阻塞。比如大量的請求并發(fā)訪問數(shù)據(jù)庫,導(dǎo)致行鎖表鎖,最后請求線程會堆積過多,從而觸發(fā) too many connection 錯誤,引發(fā)雪崩效應(yīng)。 我們使用消息隊列,通過異步處理請求,從而緩解系統(tǒng)的壓力。消息隊

    2024年02月16日
    瀏覽(96)
  • 分布式 - 消息隊列Kafka:Kafka 消費者的消費位移

    分布式 - 消息隊列Kafka:Kafka 消費者的消費位移

    01. Kafka 分區(qū)位移 對于Kafka中的分區(qū)而言,它的每條消息都有唯一的offset,用來表示消息在分區(qū)中對應(yīng)的位置。偏移量從0開始,每個新消息的偏移量比前一個消息的偏移量大1。 每條消息在分區(qū)中的位置信息由一個叫位移(Offset)的數(shù)據(jù)來表征。分區(qū)位移總是從 0 開始,假設(shè)一

    2024年02月12日
    瀏覽(27)
  • Kafka 知識點學(xué)習(xí)【Kafka 學(xué)習(xí)之24問-第二十四刊】

    Kafka 知識點學(xué)習(xí)【Kafka 學(xué)習(xí)之24問-第二十四刊】

    ??作者簡介,普修羅雙戰(zhàn)士,一直追求不斷學(xué)習(xí)和成長,在技術(shù)的道路上持續(xù)探索和實踐。 ??多年互聯(lián)網(wǎng)行業(yè)從業(yè)經(jīng)驗,歷任核心研發(fā)工程師,項目技術(shù)負責(zé)人。 ??歡迎 ??點贊?評論?收藏 Kafka知識專欄學(xué)習(xí) Kafka知識云集 訪問地址 備注 Kafka知識點(1) https://blog.csdn.net/m

    2024年02月05日
    瀏覽(46)
  • 分布式 - 消息隊列Kafka:Kafka消費者的分區(qū)分配策略

    分布式 - 消息隊列Kafka:Kafka消費者的分區(qū)分配策略

    Kafka 消費者負載均衡策略? Kafka 消費者分區(qū)分配策略? 1. 環(huán)境準備 創(chuàng)建主題 test 有5個分區(qū),準備 3 個消費者并進行消費,觀察消費分配情況。然后再停止其中一個消費者,再次觀察消費分配情況。 ① 創(chuàng)建主題 test,該主題有5個分區(qū),2個副本: ② 創(chuàng)建3個消費者CustomConsu

    2024年02月13日
    瀏覽(31)
  • 分布式 - 消息隊列Kafka:Kafka生產(chǎn)者架構(gòu)和配置參數(shù)

    分布式 - 消息隊列Kafka:Kafka生產(chǎn)者架構(gòu)和配置參數(shù)

    生產(chǎn)者發(fā)送消息流程參考圖1: 先從創(chuàng)建一個ProducerRecord對象開始,其中需要包含目標主題和要發(fā)送的內(nèi)容。另外,還可以指定鍵、分區(qū)、時間戳或標頭。在發(fā)送ProducerRecord對象時,生產(chǎn)者需要先把鍵和值對象序列化成字節(jié)數(shù)組,這樣才能在網(wǎng)絡(luò)上傳輸。 接下來,如果沒有顯式

    2024年02月13日
    瀏覽(28)
  • 分布式 - 消息隊列Kafka:Kafka生產(chǎn)者發(fā)送消息的方式

    分布式 - 消息隊列Kafka:Kafka生產(chǎn)者發(fā)送消息的方式

    不管是把Kafka作為消息隊列、消息總線還是數(shù)據(jù)存儲平臺,總是需要一個可以往Kafka寫入數(shù)據(jù)的生產(chǎn)者、一個可以從Kafka讀取數(shù)據(jù)的消費者,或者一個兼具兩種角色的應(yīng)用程序。 Kafka 生產(chǎn)者是指使用 Apache Kafka 消息系統(tǒng)的應(yīng)用程序,它們負責(zé)將消息發(fā)送到 Kafka 集群中的一個或多

    2024年02月13日
    瀏覽(29)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包