国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Kafka:消費(fèi)者參數(shù)配置

這篇具有很好參考價(jià)值的文章主要介紹了Kafka:消費(fèi)者參數(shù)配置。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

maven配置

// 消費(fèi)者
Properties properties = new Properties();
// 連接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.25.129:9092,192.168.25.129:9092");
// 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 組id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

springboot配置類(lèi)

@Configuration
public class KafkaConfig {
 
    // 配置全局admin
    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put("bootstrap.servers","192.168.25.129:9092");
        KafkaAdmin admin = new KafkaAdmin(configs);
        return admin;
    }
}

配置文件

# 用于建立初始連接的broker地址
spring.kafka.bootstrap-servers=192.168.25.129:9092
# producer用到的key和value的序列化類(lèi)
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 默認(rèn)的批處理記錄數(shù) 16k
spring.kafka.producer.batch-size=16384
# 32MB的總發(fā)送緩存
spring.kafka.producer.buffer-memory=33554432
# consumer用到的key和value的反序列化類(lèi)
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# consumer的消費(fèi)組id
spring.kafka.consumer.group-id=spring-kafka-consumer-group
# 是否自動(dòng)提交消費(fèi)者偏移量
spring.kafka.consumer.enable-auto-commit=true
# 每隔100ms向broker提交一次偏移量
spring.kafka.consumer.auto-commit-interval=100
# 如果該消費(fèi)者的偏移量不存在,則自動(dòng)設(shè)置為最早的偏移量
spring.kafka.consumer.auto-offset-reset=earliest

參數(shù)配置列表文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-492598.html

屬性 說(shuō)明
bootstrap.servers 向Kafka集群建立初始連接用到的host/port列表。 客戶端會(huì)使用這里列出的所有服務(wù)器進(jìn)行集群其他服務(wù)器的發(fā)現(xiàn),而不管 是否指定了哪個(gè)服務(wù)器用作引導(dǎo)。 這個(gè)列表僅影響用來(lái)發(fā)現(xiàn)集群所有服務(wù)器的初始主機(jī)。 字符串形式:host1:port1,host2:port2,... 由于這組服務(wù)器僅用于建立初始鏈接,然后發(fā)現(xiàn)集群中的所有服務(wù)器,因 此沒(méi)有必要將集群中的所有地址寫(xiě)在這里。 一般最好兩臺(tái),以防其中一臺(tái)宕掉。
key.deserializer key的反序列化類(lèi),該類(lèi)需要實(shí)現(xiàn) org.apache.kafka.common.serialization.Deserializer 接口。
value.deserializer 實(shí)現(xiàn)了 org.apache.kafka.common .serialization.Deserializer 接口的反序列化器, 用于對(duì)消息的value進(jìn)行反序列化。
client.id 當(dāng)從服務(wù)器消費(fèi)消息的時(shí)候向服務(wù)器發(fā)送的id字符串。在ip/port基礎(chǔ)上 提供應(yīng)用的邏輯名稱(chēng),記錄在服務(wù)端的請(qǐng)求日志中,用于追蹤請(qǐng)求的源。
group.id 用于唯一標(biāo)志當(dāng)前消費(fèi)者所屬的消費(fèi)組的字符串。 如果消費(fèi)者使用組管理功能如subscribe(topic)或使用基于Kafka的偏移量 管理策略,該項(xiàng)必須設(shè)置。
auto.offset.reset 當(dāng)Kafka中沒(méi)有初始偏移量或當(dāng)前偏移量在服務(wù)器中不存在(如,數(shù)據(jù)被 刪除了),該如何處理? earliest:自動(dòng)重置偏移量到最早的偏移量 latest:自動(dòng)重置偏移量為最新的偏移量 none:如果消費(fèi)組原來(lái)的(previous)偏移量不存在,則向消費(fèi)者拋異 常 anything:向消費(fèi)者拋異常
auto.commit.interval.ms 如果設(shè)置了 enable.auto.commit 的值為true, 則該值定義了消費(fèi)者偏移量向Kafka提交的頻率。
enable.auto.commit 如果設(shè)置為true,消費(fèi)者會(huì)自動(dòng)周期性地向服務(wù)器提交偏移量。
fetch.min.bytes 服務(wù)器對(duì)每個(gè)拉取消息的請(qǐng)求返回的數(shù)據(jù)量最小值。 如果數(shù)據(jù)量達(dá)不到這個(gè)值,請(qǐng)求等待,以讓更多的數(shù)據(jù)累積, 達(dá)到這個(gè)值之后響應(yīng)請(qǐng)求。 默認(rèn)設(shè)置是1個(gè)字節(jié),表示只要有一個(gè)字節(jié)的數(shù)據(jù), 就立即響應(yīng)請(qǐng)求,或者在沒(méi)有數(shù)據(jù)的時(shí)候請(qǐng)求超時(shí)。 將該值設(shè)置為大一點(diǎn)兒的數(shù)字,會(huì)讓服務(wù)器等待稍微 長(zhǎng)一點(diǎn)兒的時(shí)間以累積數(shù)據(jù)。 如此則可以提高服務(wù)器的吞吐量,代價(jià)是額外的延遲時(shí)間。
fetch.max.wait.ms 如果服務(wù)器端的數(shù)據(jù)量達(dá)不到 fetch.min.bytes 的話, 服務(wù)器端不能立即響應(yīng)請(qǐng)求。 該時(shí)間用于配置服務(wù)器端阻塞請(qǐng)求的最大時(shí)長(zhǎng)。
fetch.max.bytes 服務(wù)器給單個(gè)拉取請(qǐng)求返回的最大數(shù)據(jù)量。 消費(fèi)者批量拉取消息,如果第一個(gè)非空消息批次的值比該值大, 消息批也會(huì)返回,以讓消費(fèi)者可以接著進(jìn)行。 即該配置并不是絕對(duì)的最大值。 broker可以接收的消息批最大值通過(guò) message.max.bytes (broker配置) 或 max.message.bytes (主題配置)來(lái)指定。 需要注意的是,消費(fèi)者一般會(huì)并發(fā)拉取請(qǐng)求。
connections.max.idle.ms 在這個(gè)時(shí)間之后關(guān)閉空閑的連接。
check.crcs 自動(dòng)計(jì)算被消費(fèi)的消息的CRC32校驗(yàn)值。 可以確保在傳輸過(guò)程中或磁盤(pán)存儲(chǔ)過(guò)程中消息沒(méi)有被破壞。 它會(huì)增加額外的負(fù)載,在追求極致性能的場(chǎng)合禁用。
exclude.internal.topics 是否內(nèi)部主題應(yīng)該暴露給消費(fèi)者。如果該條目設(shè)置為true, 則只能先訂閱再拉取。
isolation.level 控制如何讀取事務(wù)消息。 如果設(shè)置了 read_committed ,消費(fèi)者的poll()方法只會(huì) 返回已經(jīng)提交的事務(wù)消息。 如果設(shè)置了 read_uncommitted (默認(rèn)值), 消費(fèi)者的poll方法返回所有的消息,即使是已經(jīng)取消的事務(wù)消息。 非事務(wù)消息以上兩種情況都返回。 消息總是以偏移量的順序返回。 read_committed 只能返回到達(dá)LSO的消息。 在LSO之后出現(xiàn)的消息只能等待相關(guān)的事務(wù)提交之后才能看到。 結(jié)果, read_committed 模式,如果有為提交的事務(wù), 消費(fèi)者不能讀取到直到HW的消息。 read_committed 的seekToEnd方法返回LSO。
heartbeat.interval.ms 當(dāng)使用消費(fèi)組的時(shí)候,該條目指定消費(fèi)者向消費(fèi)者協(xié)調(diào)器 發(fā)送心跳的時(shí)間間隔。 心跳是為了確保消費(fèi)者會(huì)話的活躍狀態(tài), 同時(shí)在消費(fèi)者加入或離開(kāi)消費(fèi)組的時(shí)候方便進(jìn)行再平衡。 該條目的值必須小于 session.timeout.ms ,也不應(yīng)該高于 session.timeout.ms 的1/3。 可以將其調(diào)整得更小,以控制正常重新平衡的預(yù)期時(shí)間。
session.timeout.ms 當(dāng)使用Kafka的消費(fèi)組的時(shí)候,消費(fèi)者周期性地向broker發(fā)送心跳數(shù) 表明自己的存在。 如果經(jīng)過(guò)該超時(shí)時(shí)間還沒(méi)有收到消費(fèi)者的心跳, 則broker將消費(fèi)者從消費(fèi)組移除,并啟動(dòng)再平衡。 該值必須在broker配置 group.min.session.timeout.ms 和 group.max.session.timeout.ms 之間。
max.poll.records 一次調(diào)用poll()方法返回的記錄最大數(shù)量。
max.poll.interval.ms 使用消費(fèi)組的時(shí)候調(diào)用poll()方法的時(shí)間間隔。 該條目指定了消費(fèi)者調(diào)用poll()方法的最大時(shí)間間隔。 如果在此時(shí)間內(nèi)消費(fèi)者沒(méi)有調(diào)用poll()方法, 則broker認(rèn)為消費(fèi)者失敗,觸發(fā)再平衡, 將分區(qū)分配給消費(fèi)組中其他消費(fèi)者。
max.partition.fetch.bytes 對(duì)每個(gè)分區(qū),服務(wù)器返回的最大數(shù)量。消費(fèi)者按批次拉取數(shù)據(jù)。 如果非空分區(qū)的第一個(gè)記錄大于這個(gè)值,批處理依然可以返回, 以保證消費(fèi)者可以進(jìn)行下去。 broker接收批的大小由 message.max.bytes (broker參數(shù))或 max.message.bytes (主題參數(shù))指定。 fetch.max.bytes 用于限制消費(fèi)者單次請(qǐng)求的數(shù)據(jù)量。
send.buffer.bytes 用于TCP發(fā)送數(shù)據(jù)時(shí)使用的緩沖大?。⊿O_SNDBUF), -1表示使用OS默認(rèn)的緩沖區(qū)大小。
retry.backoff.ms 在發(fā)生失敗的時(shí)候如果需要重試,則該配置表示客戶端 等待多長(zhǎng)時(shí)間再發(fā)起重試。 該時(shí)間的存在避免了密集循環(huán)。
request.timeout.ms 客戶端等待服務(wù)端響應(yīng)的最大時(shí)間。如果該時(shí)間超時(shí), 則客戶端要么重新發(fā)起請(qǐng)求,要么如果重試耗盡,請(qǐng)求失敗。
reconnect.backoff.ms 重新連接主機(jī)的等待時(shí)間。避免了重連的密集循環(huán)。 該等待時(shí)間應(yīng)用于該客戶端到broker的所有連接。
reconnect.backoff.max.ms 重新連接到反復(fù)連接失敗的broker時(shí)要等待的最長(zhǎng)時(shí)間 (以毫秒為單位)。 如果提供此選項(xiàng),則對(duì)于每個(gè)連續(xù)的連接失敗, 每臺(tái)主機(jī)的退避將成倍增加,直至達(dá)到此最大值。 在計(jì)算退避增量之后,添加20%的隨機(jī)抖動(dòng)以避免連接風(fēng)暴。
receive.buffer.bytes TCP連接接收數(shù)據(jù)的緩存(SO_RCVBUF)。 -1表示使用操作系統(tǒng)的默認(rèn)值。
partition.assignment.strategy 當(dāng)使用消費(fèi)組的時(shí)候,分區(qū)分配策略的類(lèi)名。
metrics.sample.window.ms 計(jì)算指標(biāo)樣本的時(shí)間窗口。
metrics.recording.level 指標(biāo)的最高記錄級(jí)別。
metrics.num.samples 用于計(jì)算指標(biāo)而維護(hù)的樣本數(shù)量
interceptor.classes 攔截器類(lèi)的列表。默認(rèn)沒(méi)有攔截器 攔截器是消費(fèi)者的攔截器,該攔截器需要實(shí)現(xiàn) org.apache.kafka.clients.consumer .ConsumerInterceptor 接口。 攔截器可用于對(duì)消費(fèi)者接收到的消息進(jìn)行攔截處理。

到了這里,關(guān)于Kafka:消費(fèi)者參數(shù)配置的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • kafka配置多個(gè)消費(fèi)者groupid kafka多個(gè)消費(fèi)者消費(fèi)同一個(gè)partition(java)

    kafka配置多個(gè)消費(fèi)者groupid kafka多個(gè)消費(fèi)者消費(fèi)同一個(gè)partition(java)

    kafka是由Apache軟件基金會(huì)開(kāi)發(fā)的一個(gè)開(kāi)源流處理平臺(tái)。kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),它可以處理消費(fèi)者在網(wǎng)站中的所有動(dòng)作流數(shù)據(jù)。 kafka中partition類(lèi)似數(shù)據(jù)庫(kù)中的分表數(shù)據(jù),可以起到水平擴(kuò)展數(shù)據(jù)的目的,比如有a,b,c,d,e,f 6個(gè)數(shù)據(jù),某個(gè)topic有兩個(gè)partition,一

    2024年01月22日
    瀏覽(161)
  • 【Kafka】【十七】消費(fèi)者poll消息的細(xì)節(jié)與消費(fèi)者心跳配置

    默認(rèn)情況下,消費(fèi)者?次會(huì)poll500條消息。 代碼中設(shè)置了?輪詢的時(shí)間是1000毫秒 意味著: 如果?次poll到500條,就直接執(zhí)?for循環(huán) 如果這?次沒(méi)有poll到500條。且時(shí)間在1秒內(nèi),那么?輪詢繼續(xù)poll,要么到500條,要么到1s 如果多次poll都沒(méi)達(dá)到500條,且1秒時(shí)間到了,那么直接執(zhí)

    2024年02月09日
    瀏覽(30)
  • 關(guān)于kafka消費(fèi)者超時(shí)配置

    在Kafka中,消費(fèi)者超時(shí)配置是指消費(fèi)者在等待服務(wù)器響應(yīng)時(shí)的超時(shí)時(shí)間。如果消費(fèi)者在超時(shí)時(shí)間內(nèi)未收到服務(wù)器的響應(yīng),它將重新發(fā)起請(qǐng)求或執(zhí)行其他邏輯。 以下是關(guān)于Kafka消費(fèi)者超時(shí)配置的一些常見(jiàn)選項(xiàng): session.timeout.ms :該配置定義了消費(fèi)者與Kafka集群之間的會(huì)話超時(shí)時(shí)間

    2024年02月16日
    瀏覽(23)
  • Kafka消費(fèi)者常用超時(shí)時(shí)間配置

    https://blog.csdn.net/BHSZZY/article/details/126757295 //心跳超時(shí)時(shí)間(session超時(shí)時(shí)間)增加成25秒(之前項(xiàng)目設(shè)置了15秒) spring.kafka.properties.session.timeout.ms = 25000 //每次拉取的消息減少為20(之前是默認(rèn)值500) spring.kafka.consumer.max-poll-records=20 //消息消費(fèi)超時(shí)時(shí)間增加為10分鐘 spring.kafka.p

    2024年02月03日
    瀏覽(26)
  • Kafka系列——詳解創(chuàng)建Kafka消費(fèi)者及相關(guān)配置

    參考自kafka系列文章——消費(fèi)者創(chuàng)建與配置 在讀取消息之前,需要先創(chuàng)建一個(gè) KafkaConsumer 對(duì)象。 創(chuàng)建 KafkaConsumer 對(duì)象與創(chuàng)建 KafkaProducer 對(duì)象非常相似——把想要傳給消費(fèi)者的屬性放在 Properties 對(duì)象里,后面深入討論所有屬性。這里我們只需要使用 3 個(gè)必要的屬性: bootstrap.

    2024年02月09日
    瀏覽(21)
  • kafka生產(chǎn)者和消費(fèi)者配置介紹

    每個(gè)kafka broker中配置文件 server.properties 默認(rèn)必須配置的屬性如下: **bootstrap.servers** - 指定生產(chǎn)者客戶端連接kafka集群所需的broker地址列表,格式為host1:port1,host2:port2,可以設(shè)置一個(gè)或多個(gè)。這里并非需要所有的broker地址,因?yàn)樯a(chǎn)者會(huì)從給定的broker里尋找其它的broker。 **key

    2024年02月12日
    瀏覽(24)
  • 筆記:配置多個(gè)kafka生產(chǎn)者和消費(fèi)者

    如果只有一個(gè)kafka,那么使用自帶的KafkaAutoConfiguration配置類(lèi)即可,對(duì)應(yīng)已有屬性類(lèi)KafkaProperties,屬性前綴為spring.kafka.xxx; 本文記錄配置多個(gè)kafka的情況,即在KafkaAutoConfiguration的基礎(chǔ)上,自定義額外的kafka生產(chǎn)者和消費(fèi)者。 適用場(chǎng)景:需要消費(fèi)來(lái)源于不同kafka的消息、需要在不

    2024年02月15日
    瀏覽(32)
  • kafka配置大全broker、topic、生產(chǎn)者和消費(fèi)者等配置介紹

    每個(gè)kafka broker中配置文件 server.properties 默認(rèn)必須配置的屬性如下: **bootstrap.servers** - 指定生產(chǎn)者客戶端連接kafka集群所需的broker地址列表,格式為host1:port1,host2:port2,可以設(shè)置一個(gè)或多個(gè)。這里并非需要所有的broker地址,因?yàn)樯a(chǎn)者會(huì)從給定的broker里尋找其它的broker。 **key

    2024年02月05日
    瀏覽(40)
  • Kafka-Java四:Spring配置Kafka消費(fèi)者提交Offset的策略

    Kafka消費(fèi)者提交Offset的策略有 自動(dòng)提交Offset: 消費(fèi)者將消息拉取下來(lái)以后未被消費(fèi)者消費(fèi)前,直接自動(dòng)提交offset。 自動(dòng)提交可能丟失數(shù)據(jù),比如消息在被消費(fèi)者消費(fèi)前已經(jīng)提交了offset,有可能消息拉取下來(lái)以后,消費(fèi)者掛了 手動(dòng)提交Offset 消費(fèi)者在消費(fèi)消息時(shí)/后,再提交o

    2024年02月08日
    瀏覽(23)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包