maven配置
// 消費(fèi)者
Properties properties = new Properties();
// 連接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.25.129:9092,192.168.25.129:9092");
// 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 組id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
springboot配置類(lèi)
@Configuration
public class KafkaConfig {
// 配置全局admin
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put("bootstrap.servers","192.168.25.129:9092");
KafkaAdmin admin = new KafkaAdmin(configs);
return admin;
}
}
配置文件文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-492598.html
# 用于建立初始連接的broker地址
spring.kafka.bootstrap-servers=192.168.25.129:9092
# producer用到的key和value的序列化類(lèi)
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 默認(rèn)的批處理記錄數(shù) 16k
spring.kafka.producer.batch-size=16384
# 32MB的總發(fā)送緩存
spring.kafka.producer.buffer-memory=33554432
# consumer用到的key和value的反序列化類(lèi)
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# consumer的消費(fèi)組id
spring.kafka.consumer.group-id=spring-kafka-consumer-group
# 是否自動(dòng)提交消費(fèi)者偏移量
spring.kafka.consumer.enable-auto-commit=true
# 每隔100ms向broker提交一次偏移量
spring.kafka.consumer.auto-commit-interval=100
# 如果該消費(fèi)者的偏移量不存在,則自動(dòng)設(shè)置為最早的偏移量
spring.kafka.consumer.auto-offset-reset=earliest
參數(shù)配置列表文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-492598.html
屬性 | 說(shuō)明 |
bootstrap.servers | 向Kafka集群建立初始連接用到的host/port列表。 客戶端會(huì)使用這里列出的所有服務(wù)器進(jìn)行集群其他服務(wù)器的發(fā)現(xiàn),而不管 是否指定了哪個(gè)服務(wù)器用作引導(dǎo)。 這個(gè)列表僅影響用來(lái)發(fā)現(xiàn)集群所有服務(wù)器的初始主機(jī)。 字符串形式:host1:port1,host2:port2,... 由于這組服務(wù)器僅用于建立初始鏈接,然后發(fā)現(xiàn)集群中的所有服務(wù)器,因 此沒(méi)有必要將集群中的所有地址寫(xiě)在這里。 一般最好兩臺(tái),以防其中一臺(tái)宕掉。 |
key.deserializer | key的反序列化類(lèi),該類(lèi)需要實(shí)現(xiàn) org.apache.kafka.common.serialization.Deserializer 接口。 |
value.deserializer | 實(shí)現(xiàn)了 org.apache.kafka.common .serialization.Deserializer 接口的反序列化器, 用于對(duì)消息的value進(jìn)行反序列化。 |
client.id | 當(dāng)從服務(wù)器消費(fèi)消息的時(shí)候向服務(wù)器發(fā)送的id字符串。在ip/port基礎(chǔ)上 提供應(yīng)用的邏輯名稱(chēng),記錄在服務(wù)端的請(qǐng)求日志中,用于追蹤請(qǐng)求的源。 |
group.id | 用于唯一標(biāo)志當(dāng)前消費(fèi)者所屬的消費(fèi)組的字符串。 如果消費(fèi)者使用組管理功能如subscribe(topic)或使用基于Kafka的偏移量 管理策略,該項(xiàng)必須設(shè)置。 |
auto.offset.reset | 當(dāng)Kafka中沒(méi)有初始偏移量或當(dāng)前偏移量在服務(wù)器中不存在(如,數(shù)據(jù)被 刪除了),該如何處理? earliest:自動(dòng)重置偏移量到最早的偏移量 latest:自動(dòng)重置偏移量為最新的偏移量 none:如果消費(fèi)組原來(lái)的(previous)偏移量不存在,則向消費(fèi)者拋異 常 anything:向消費(fèi)者拋異常 |
auto.commit.interval.ms | 如果設(shè)置了 enable.auto.commit 的值為true, 則該值定義了消費(fèi)者偏移量向Kafka提交的頻率。 |
enable.auto.commit | 如果設(shè)置為true,消費(fèi)者會(huì)自動(dòng)周期性地向服務(wù)器提交偏移量。 |
fetch.min.bytes | 服務(wù)器對(duì)每個(gè)拉取消息的請(qǐng)求返回的數(shù)據(jù)量最小值。 如果數(shù)據(jù)量達(dá)不到這個(gè)值,請(qǐng)求等待,以讓更多的數(shù)據(jù)累積, 達(dá)到這個(gè)值之后響應(yīng)請(qǐng)求。 默認(rèn)設(shè)置是1個(gè)字節(jié),表示只要有一個(gè)字節(jié)的數(shù)據(jù), 就立即響應(yīng)請(qǐng)求,或者在沒(méi)有數(shù)據(jù)的時(shí)候請(qǐng)求超時(shí)。 將該值設(shè)置為大一點(diǎn)兒的數(shù)字,會(huì)讓服務(wù)器等待稍微 長(zhǎng)一點(diǎn)兒的時(shí)間以累積數(shù)據(jù)。 如此則可以提高服務(wù)器的吞吐量,代價(jià)是額外的延遲時(shí)間。 |
fetch.max.wait.ms | 如果服務(wù)器端的數(shù)據(jù)量達(dá)不到 fetch.min.bytes 的話, 服務(wù)器端不能立即響應(yīng)請(qǐng)求。 該時(shí)間用于配置服務(wù)器端阻塞請(qǐng)求的最大時(shí)長(zhǎng)。 |
fetch.max.bytes | 服務(wù)器給單個(gè)拉取請(qǐng)求返回的最大數(shù)據(jù)量。 消費(fèi)者批量拉取消息,如果第一個(gè)非空消息批次的值比該值大, 消息批也會(huì)返回,以讓消費(fèi)者可以接著進(jìn)行。 即該配置并不是絕對(duì)的最大值。 broker可以接收的消息批最大值通過(guò) message.max.bytes (broker配置) 或 max.message.bytes (主題配置)來(lái)指定。 需要注意的是,消費(fèi)者一般會(huì)并發(fā)拉取請(qǐng)求。 |
connections.max.idle.ms | 在這個(gè)時(shí)間之后關(guān)閉空閑的連接。 |
check.crcs | 自動(dòng)計(jì)算被消費(fèi)的消息的CRC32校驗(yàn)值。 可以確保在傳輸過(guò)程中或磁盤(pán)存儲(chǔ)過(guò)程中消息沒(méi)有被破壞。 它會(huì)增加額外的負(fù)載,在追求極致性能的場(chǎng)合禁用。 |
exclude.internal.topics | 是否內(nèi)部主題應(yīng)該暴露給消費(fèi)者。如果該條目設(shè)置為true, 則只能先訂閱再拉取。 |
isolation.level | 控制如何讀取事務(wù)消息。 如果設(shè)置了 read_committed ,消費(fèi)者的poll()方法只會(huì) 返回已經(jīng)提交的事務(wù)消息。 如果設(shè)置了 read_uncommitted (默認(rèn)值), 消費(fèi)者的poll方法返回所有的消息,即使是已經(jīng)取消的事務(wù)消息。 非事務(wù)消息以上兩種情況都返回。 消息總是以偏移量的順序返回。 read_committed 只能返回到達(dá)LSO的消息。 在LSO之后出現(xiàn)的消息只能等待相關(guān)的事務(wù)提交之后才能看到。 結(jié)果, read_committed 模式,如果有為提交的事務(wù), 消費(fèi)者不能讀取到直到HW的消息。 read_committed 的seekToEnd方法返回LSO。 |
heartbeat.interval.ms | 當(dāng)使用消費(fèi)組的時(shí)候,該條目指定消費(fèi)者向消費(fèi)者協(xié)調(diào)器 發(fā)送心跳的時(shí)間間隔。 心跳是為了確保消費(fèi)者會(huì)話的活躍狀態(tài), 同時(shí)在消費(fèi)者加入或離開(kāi)消費(fèi)組的時(shí)候方便進(jìn)行再平衡。 該條目的值必須小于 session.timeout.ms ,也不應(yīng)該高于 session.timeout.ms 的1/3。 可以將其調(diào)整得更小,以控制正常重新平衡的預(yù)期時(shí)間。 |
session.timeout.ms | 當(dāng)使用Kafka的消費(fèi)組的時(shí)候,消費(fèi)者周期性地向broker發(fā)送心跳數(shù) 表明自己的存在。 如果經(jīng)過(guò)該超時(shí)時(shí)間還沒(méi)有收到消費(fèi)者的心跳, 則broker將消費(fèi)者從消費(fèi)組移除,并啟動(dòng)再平衡。 該值必須在broker配置 group.min.session.timeout.ms 和 group.max.session.timeout.ms 之間。 |
max.poll.records | 一次調(diào)用poll()方法返回的記錄最大數(shù)量。 |
max.poll.interval.ms | 使用消費(fèi)組的時(shí)候調(diào)用poll()方法的時(shí)間間隔。 該條目指定了消費(fèi)者調(diào)用poll()方法的最大時(shí)間間隔。 如果在此時(shí)間內(nèi)消費(fèi)者沒(méi)有調(diào)用poll()方法, 則broker認(rèn)為消費(fèi)者失敗,觸發(fā)再平衡, 將分區(qū)分配給消費(fèi)組中其他消費(fèi)者。 |
max.partition.fetch.bytes | 對(duì)每個(gè)分區(qū),服務(wù)器返回的最大數(shù)量。消費(fèi)者按批次拉取數(shù)據(jù)。 如果非空分區(qū)的第一個(gè)記錄大于這個(gè)值,批處理依然可以返回, 以保證消費(fèi)者可以進(jìn)行下去。 broker接收批的大小由 message.max.bytes (broker參數(shù))或 max.message.bytes (主題參數(shù))指定。 fetch.max.bytes 用于限制消費(fèi)者單次請(qǐng)求的數(shù)據(jù)量。 |
send.buffer.bytes | 用于TCP發(fā)送數(shù)據(jù)時(shí)使用的緩沖大?。⊿O_SNDBUF), -1表示使用OS默認(rèn)的緩沖區(qū)大小。 |
retry.backoff.ms | 在發(fā)生失敗的時(shí)候如果需要重試,則該配置表示客戶端 等待多長(zhǎng)時(shí)間再發(fā)起重試。 該時(shí)間的存在避免了密集循環(huán)。 |
request.timeout.ms | 客戶端等待服務(wù)端響應(yīng)的最大時(shí)間。如果該時(shí)間超時(shí), 則客戶端要么重新發(fā)起請(qǐng)求,要么如果重試耗盡,請(qǐng)求失敗。 |
reconnect.backoff.ms | 重新連接主機(jī)的等待時(shí)間。避免了重連的密集循環(huán)。 該等待時(shí)間應(yīng)用于該客戶端到broker的所有連接。 |
reconnect.backoff.max.ms | 重新連接到反復(fù)連接失敗的broker時(shí)要等待的最長(zhǎng)時(shí)間 (以毫秒為單位)。 如果提供此選項(xiàng),則對(duì)于每個(gè)連續(xù)的連接失敗, 每臺(tái)主機(jī)的退避將成倍增加,直至達(dá)到此最大值。 在計(jì)算退避增量之后,添加20%的隨機(jī)抖動(dòng)以避免連接風(fēng)暴。 |
receive.buffer.bytes | TCP連接接收數(shù)據(jù)的緩存(SO_RCVBUF)。 -1表示使用操作系統(tǒng)的默認(rèn)值。 |
partition.assignment.strategy | 當(dāng)使用消費(fèi)組的時(shí)候,分區(qū)分配策略的類(lèi)名。 |
metrics.sample.window.ms | 計(jì)算指標(biāo)樣本的時(shí)間窗口。 |
metrics.recording.level | 指標(biāo)的最高記錄級(jí)別。 |
metrics.num.samples | 用于計(jì)算指標(biāo)而維護(hù)的樣本數(shù)量 |
interceptor.classes | 攔截器類(lèi)的列表。默認(rèn)沒(méi)有攔截器 攔截器是消費(fèi)者的攔截器,該攔截器需要實(shí)現(xiàn) org.apache.kafka.clients.consumer .ConsumerInterceptor 接口。 攔截器可用于對(duì)消費(fèi)者接收到的消息進(jìn)行攔截處理。 |
到了這里,關(guān)于Kafka:消費(fèi)者參數(shù)配置的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!