依賴管理
在pom.xml文件中導(dǎo)入依賴
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.5.RELEASE</version>
</dependency>
配置文件修改
需要自己配置AckMode時(shí)候的配置
spring:
application:
name: base.kafka
kafka:
bootstrap-servers: kafka服務(wù)地址1:端口,kafka服務(wù)地址2:端口,kafka服務(wù)地址3:端口
producer:
# 寫入失敗時(shí),重試次數(shù)。當(dāng)leader節(jié)點(diǎn)失效,一個(gè)repli節(jié)點(diǎn)會(huì)替代成為leader節(jié)點(diǎn),此時(shí)可能出現(xiàn)寫入失敗,
# 當(dāng)retris為0時(shí),produce不會(huì)重復(fù)。retirs重發(fā),此時(shí)repli節(jié)點(diǎn)完全成為leader節(jié)點(diǎn),不會(huì)產(chǎn)生消息丟失。
retries: 0
#procedure要求leader在考慮完成請(qǐng)求之前收到的確認(rèn)數(shù),用于控制發(fā)送記錄在服務(wù)端的持久化,其值可以為如下:
#acks = 0 如果設(shè)置為零,則生產(chǎn)者將不會(huì)等待來(lái)自服務(wù)器的任何確認(rèn),該記錄將立即添加到套接字緩沖區(qū)并視為已發(fā)送。在這種情況下,無(wú)法保證服務(wù)器已收到記錄,并且重試配置將不會(huì)生效(因?yàn)榭蛻舳送ǔ2粫?huì)知道任何故障),為每條記錄返回的偏移量始終設(shè)置為-1。
#acks = 1 這意味著leader會(huì)將記錄寫入其本地日志,但無(wú)需等待所有副本服務(wù)器的完全確認(rèn)即可做出回應(yīng),在這種情況下,如果leader在確認(rèn)記錄后立即失敗,但在將數(shù)據(jù)復(fù)制到所有的副本服務(wù)器之前,則記錄將會(huì)丟失。
#acks = all 這意味著leader將等待完整的同步副本集以確認(rèn)記錄,這保證了只要至少一個(gè)同步副本服務(wù)器仍然存活,記錄就不會(huì)丟失,這是最強(qiáng)有力的保證,這相當(dāng)于acks = -1的設(shè)置。
#可以設(shè)置的值為:all, -1, 0, 1
acks: 1
consumer:
group-id: testGroup
# smallest和largest才有效,如果smallest重新0開(kāi)始讀取,如果是largest從logfile的offset讀取。一般情況下我們都是設(shè)置smallest
auto-offset-reset: earliest
# 設(shè)置自動(dòng)提交offset
enable-auto-commit: true
max-poll-records: 2
server:
port: 8060
消費(fèi)kafka消息
kafka支持的消費(fèi)模式,設(shè)置在AbstractMessageListenerContainer.AckMode
的枚舉中,下面就介紹下各個(gè)模式的區(qū)別
/**
* The offset commit behavior enumeration.
*/
public enum AckMode {
/**
* Commit after each record is processed by the listener.
*/
RECORD,
/**
* Commit whatever has already been processed before the next poll.
*/
BATCH,
/**
* Commit pending updates after
* {@link ContainerProperties#setAckTime(long) ackTime} has elapsed.
*/
TIME,
/**
* Commit pending updates after
* {@link ContainerProperties#setAckCount(int) ackCount} has been
* exceeded.
*/
COUNT,
/**
* Commit pending updates after
* {@link ContainerProperties#setAckCount(int) ackCount} has been
* exceeded or after {@link ContainerProperties#setAckTime(long)
* ackTime} has elapsed.
*/
COUNT_TIME,
/**
* User takes responsibility for acks using an
* {@link AcknowledgingMessageListener}.
*/
MANUAL,
/**
* User takes responsibility for acks using an
* {@link AcknowledgingMessageListener}. The consumer
* immediately processes the commit.
*/
MANUAL_IMMEDIATE,
}
AckMode模式
AckMode模式 | 作用 |
---|---|
MANUAL | 當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽(tīng)器(ListenerConsumer)處理之后, 手動(dòng)調(diào)用Acknowledgment.acknowledge()后提交 |
MANUAL_IMMEDIATE | 手動(dòng)調(diào)用Acknowledgment.acknowledge()后立即提交 |
RECORD | 當(dāng)每一條記錄被消費(fèi)者監(jiān)聽(tīng)器(ListenerConsumer)處理之后提交 |
BATCH | 當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽(tīng)器(ListenerConsumer)處理之后提交 |
TIME | 當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽(tīng)器(ListenerConsumer)處理之后,距離上次提交時(shí)間大于TIME時(shí)提交 |
COUNT | 當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽(tīng)器(ListenerConsumer)處理之后,被處理record數(shù)量大于等于COUNT時(shí)提交 |
COUNT_TIME | TIME或COUNT?有一個(gè)條件滿足時(shí)提交 |
監(jiān)聽(tīng)器工廠的配置類:
/**
* kafka消費(fèi)者配置
*/
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String servers;
//會(huì)話過(guò)期時(shí)長(zhǎng),consumer通過(guò)ConsumerCoordinator間歇性發(fā)送心跳
//超期后,會(huì)被認(rèn)為consumer失效,服務(wù)遷移到其他consumer節(jié)點(diǎn).(group)
//需要注意,Coordinator與kafkaConsumer共享底層通道,也是基于poll獲取協(xié)調(diào)事件,但是會(huì)在單獨(dú)的線程中
@Value("${spring.kafka.consumer.session.timeout}")
private String sessionTimeout;
@Value("${spring.kafka.consumer.concurrency}")
private int concurrency;
//單次最多允許poll的消息條數(shù).
//此值不建議過(guò)大,應(yīng)該考慮你的業(yè)務(wù)處理效率.
@Value("${spring.kafka.consumer.maxpoll.records}")
private int maxPollRecords;
//兩次poll之間的時(shí)間隔間最大值,如果超過(guò)此值將會(huì)被認(rèn)為此consumer失效,觸發(fā)consumer重新平衡.
//此值必須大于,一個(gè)batch所有消息處理時(shí)間總和.
//最大500000
//2分鐘
@Value("${spring.kafka.consumer.maxpoll.interval}")
private int maxPollIntervalMS;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public StringJsonMessageConverter converter() {
return new StringJsonMessageConverter();
}
@Bean
public KafkaListenerContainerFactory<?> batchDataFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
Map<String, Object> consumerConfig = consumerConfigs();
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerConfig);
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(concurrency);
//設(shè)置為批量消費(fèi),每個(gè)批次數(shù)量在Kafka配置參數(shù)中設(shè)置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.setBatchListener(true);
factory.setMessageConverter(new BatchMessagingMessageConverter());
//設(shè)置提交偏移量的方式
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//每一批數(shù)量
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, this.maxPollRecords);
propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,this.maxPollIntervalMS);
return propsMap;
}
@Bean
public TestMessages listener() {
return new TestMessages();
}
}
監(jiān)聽(tīng)器使用的配置
@Component
public class TestMessages {
/**
* MANUAL 當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽(tīng)器(ListenerConsumer)處理之后, 手動(dòng)調(diào)用Acknowledgment.acknowledge()后提交
* @param message
* @param ack
*/
@KafkaListener(containerFactory = "batchDataFactory" , topics = "kafka(topic名稱)")
public void onMessageManual(List<Object> message, Acknowledgment ack){
log.info("batchDataFactory處理數(shù)據(jù)量:{}",message.size());
message.forEach(item -> log.info("batchDataFactory處理數(shù)據(jù)內(nèi)容:{}",item));
ack.acknowledge();//直接提交offset
}
}
MANUAL_IMMEDIATE
當(dāng)每一條記錄被消費(fèi)者監(jiān)聽(tīng)器(ListenerConsumer)處理之后提交
MANUAL和MANUAL_IMMEDIATE兩者的相同和區(qū)別
相同之處
這兩種模式都是需要進(jìn)行手動(dòng)確認(rèn)ack.acknowledge();才能完成消息的消費(fèi),否則在重啟消費(fèi)端實(shí)例的時(shí)候數(shù)據(jù)會(huì)再次被消費(fèi)端接收到。
兩者的區(qū)別
MANUAL: 在處理完最后一次輪詢的所有結(jié)果后,將隊(duì)列排隊(duì),并在一次操作中提交偏移量??梢哉J(rèn)為是在批處理結(jié)束時(shí)提交偏移量
MANUAL_IMMEDIATE:只要在偵聽(tīng)器線程上執(zhí)行確認(rèn),就立即提交偏移。會(huì)在批量執(zhí)行的時(shí)候逐一提交它們。
其他模式大家都可以在批量處理工廠類中進(jìn)行修改設(shè)置:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-564495.html
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
上面的內(nèi)容可能存在沒(méi)有描述清楚或者錯(cuò)誤的地方,假如開(kāi)發(fā)同學(xué)發(fā)現(xiàn)了,請(qǐng)及時(shí)告知,我會(huì)第一時(shí)間修改相關(guān)內(nèi)容。假如我的這篇內(nèi)容對(duì)你有任何幫助的話,麻煩給我點(diǎn)一個(gè)贊。你的點(diǎn)贊就是我前進(jìn)的動(dòng)力。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-564495.html
到了這里,關(guān)于Spring Boot 整合kafka消費(fèi)模式AckMode以及手動(dòng)消費(fèi) 依賴管理的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!