前言
之前有發(fā)過一篇文章聊聊如何利用redis實現(xiàn)多級緩存同步。有個讀者就給我留言說,因為他項目的redis版本不是6.0+版本,因此他使用我文章介紹通過MQ來實現(xiàn)本地緩存同步,他的同步流程大概如下圖
他原來的業(yè)務(wù)流程是每天凌晨開啟定時器去爬取第三方的數(shù)據(jù),并持久化到redis,后邊因為redis發(fā)生過宕機事故,他碰巧看了我文章,就覺得可以用使用多級緩存的策略,用來做個兜底。他的業(yè)務(wù)流程就如上圖,即每天凌晨開啟定時器去爬取第三方數(shù)據(jù),持久化到redis和其中一臺服務(wù)的本地緩存,然后將爬取到的業(yè)務(wù)數(shù)據(jù)發(fā)送到kafka,其他業(yè)務(wù)服務(wù)通過訂閱kafka,將業(yè)務(wù)數(shù)據(jù)保存到本地緩存。
他改造完,某天突然發(fā)現(xiàn)在集群環(huán)境中,只要其中一臺服務(wù)消費了kafka數(shù)據(jù),其他就消費不到。今天就借這個話題,來聊聊集群環(huán)境中本地緩存如何進行同步
前置知識
kafka消費topic-partitions模式分為subscribe模式和assign模式。subscribe模式需要指定group.id,該模式會為consumer自動分配partition,且同一個group.id下的不同consumer不會消費同樣的分區(qū)。assign模式需要為consumer手動、顯示的指定需要消費的topic-partitions,不受group.id限制,相當與指定的group.id無效。通俗一點講就是assign模式下,所有消費者都可以訂閱指定分區(qū)
我們要通過消息隊列實現(xiàn)本地緩存同步,本質(zhì)上就是需要利用消息隊列提供廣播能力,而kafka默認不具備。不過我們可以根據(jù)kafka提供的消費模式進行定制,從而是kafka也具備廣播能力
集群本地緩存同步方案
方案一:利用MQ廣播能力
因為讀者項目是使用kafka,且項目是使用spring-kafka,我們也就以此為例
1、subscribe模式
通過前置知識,我們了解到在subscribe模式下,同一個group.id下的不同consumer不會消費同樣的分區(qū),這就意味我們可以通過指定不同group.id來消費同樣分區(qū)達到廣播的效果
那如何在同個集群服務(wù)實現(xiàn)不同的group.id?
此時Spring EL 表達式就派上用場了,我們通過 Spring EL 表達式,在每個消費者分組的名字上配合 UUID 生成其后綴。這樣,就能保證每個項目啟動的消費者分組不同,從而達到廣播消費的目的
示例
@KafkaListener(topics = "${userCache.topic}",groupId = "${userCache.topic}_group_" + "#{T(java.util.UUID).randomUUID()})")
public void receive(Acknowledgment ack, String data){
System.out.println(String.format("serverPort:【%s】,接收到數(shù)據(jù):【%s】",serverPort,data));
ack.acknowledge();
}
如果我們決定UUID不直觀,我們也可以使用IP作為標識,只要能保證同個集群服務(wù)的group.id是唯一即可
不過如果要改成ip,我們得做一定的改造。改造步驟如下
a、 獲取ip地址信息,并放入environment
public class ServerAddrEnvironmentPostProcessor implements EnvironmentPostProcessor{
private String SERVER_ADDRESS = "server.addr";
@Override
@SneakyThrows
public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
MutablePropertySources propertySources = environment.getPropertySources();
Map<String, Object> source = new HashMap<>();
String serverAddr = InetAddress.getLocalHost().getHostAddress();
source.put(SERVER_ADDRESS,serverAddr);
MapPropertySource mapPropertySource = new MapPropertySource("serverAddrProperties",source);
propertySources.addFirst(mapPropertySource);
}
}
b、 配置spi
在src/main/resource目錄下配置META-INF/spring.factories,配置內(nèi)容如下
org.springframework.boot.env.EnvironmentPostProcessor=\
com.github.lybgeek.comsumer.ip.ServerAddrEnvironmentPostProcessor
c、 @KafkaListener配置如下內(nèi)容
@KafkaListener(topics = "${userCache.topic}",groupId = "${userCache.topic}_group_" + "${server.addr}" + "_${server.port}")
小結(jié)
該方式的實現(xiàn)優(yōu)點是比較簡單,但如果需要對服務(wù)進行運維監(jiān)控統(tǒng)計,那就不怎么友好了,雖然指定IP會比隨機UUID好點,但如果是容器化部署,每次部署其IP也是會變化,這樣跟隨機指定UUID,差別也不大了。其次如果是使用云產(chǎn)品,比如阿里云對comsume group是有數(shù)量上限,且消費者組需要提前創(chuàng)建,這種情況使用該方案就不是很合適了
assign模式
通過assign模式手動消費對應(yīng)的分區(qū)
示例
@KafkaListener(topicPartitions =
{@TopicPartition(topic = "${userCache.topic}", partitions = "0")})
public void receive(Acknowledgment ack, ConsumerRecord record){
System.out.println(String.format("serverPort:【%s】,接收到數(shù)據(jù):【%s】",serverPort,record));
ack.acknowledge();
}
小結(jié)
該方式實現(xiàn)也是很簡單,如果我們不需要動態(tài)創(chuàng)建新的分區(qū),用該方案實現(xiàn)廣播,會是一個不錯的選擇。不過該方式的缺點很明顯,因為是手動指定分區(qū),當該分區(qū)有問題,也挺麻煩的
方案二:通過定時器觸發(fā)
該方案主要基于讀者目前的同步進行改造,改造后如下圖
核心就是根據(jù)讀者業(yè)務(wù)的特性,因為他是定時每天晚上同步爬取,那就意味著他這個數(shù)據(jù)至少在當天基本不變,就可以讓集群里的服務(wù)都定時執(zhí)行,此時僅需將xxl-job的調(diào)度策略改成分片廣播就行,這樣就可以持久化到redis的同時,也持久化到本地緩存
小結(jié)
該方案改動量比較小,有個小缺點就是,因為集群內(nèi)所有服務(wù)都執(zhí)行調(diào)度,這樣就會使redis重復(fù)持久化,不過問題也不大就是好。最后讀者選擇該方案
總結(jié)
本文主要闡述集群環(huán)境中本地緩存如何進行同步,之前還有讀者問我說,使用了多級緩存,數(shù)據(jù)一致性要如何保證?以前我可能會從技術(shù)角度來回答,比如你可以延遲雙刪,或者如果你是mysql,你可以使用canal+mq,更甚者你可以使用分布式鎖來保證。但現(xiàn)在我更多從業(yè)務(wù)角度來思考這件事情,你都考慮使用緩存,是不是意味著你在業(yè)務(wù)上是可以容忍一定不一致性,既然可以容忍,是不是最終可以通過一些補償方案來解決這個不一致性
沒有完美的方案,你此時感覺的完美方案,可能是當時在那個業(yè)務(wù)場景下,做了一個貼合業(yè)務(wù)的權(quán)衡文章來源:http://www.zghlxwxcb.cn/news/detail-660784.html
demo鏈接
https://github.com/lyb-geek/springboot-learning/tree/master/springboot-kafka-broadcast文章來源地址http://www.zghlxwxcb.cn/news/detail-660784.html
到了這里,關(guān)于聊聊在集群環(huán)境中本地緩存如何進行同步的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!