国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

微服務(wù)---分布式多級緩存集群實(shí)現(xiàn)方案(Caffeine+redis+nginx本地緩存+Canal數(shù)據(jù)同步)

這篇具有很好參考價(jià)值的文章主要介紹了微服務(wù)---分布式多級緩存集群實(shí)現(xiàn)方案(Caffeine+redis+nginx本地緩存+Canal數(shù)據(jù)同步)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

分布式多級緩存集群實(shí)現(xiàn)方案

1.什么是多級緩存

傳統(tǒng)的緩存策略一般是請求到達(dá)Tomcat后,先查詢Redis,如果未命中則查詢數(shù)據(jù)庫,如圖:

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

存在下面的問題:

?請求要經(jīng)過Tomcat處理,Tomcat的性能成為整個系統(tǒng)的瓶頸

?Redis緩存失效時,會對數(shù)據(jù)庫產(chǎn)生沖擊

多級緩存就是充分利用請求處理的每個環(huán)節(jié),分別添加緩存,減輕Tomcat壓力,提升服務(wù)性能:

  • 瀏覽器訪問靜態(tài)資源時,優(yōu)先讀取瀏覽器本地緩存
  • 訪問非靜態(tài)資源(ajax查詢數(shù)據(jù))時,訪問服務(wù)端
  • 請求到達(dá)Nginx后,優(yōu)先讀取Nginx本地緩存
  • 如果Nginx本地緩存未命中,則去直接查詢Redis(不經(jīng)過Tomcat)
  • 如果Redis查詢未命中,則查詢Tomcat
  • 請求進(jìn)入Tomcat后,優(yōu)先查詢JVM進(jìn)程緩存
  • 如果JVM進(jìn)程緩存未命中,則查詢數(shù)據(jù)庫

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

在多級緩存架構(gòu)中,Nginx內(nèi)部需要編寫本地緩存查詢、Redis查詢、Tomcat查詢的業(yè)務(wù)邏輯,因此這樣的nginx服務(wù)不再是一個反向代理服務(wù)器,而是一個編寫業(yè)務(wù)的Web服務(wù)器了

因此這樣的業(yè)務(wù)Nginx服務(wù)也需要搭建集群來提高并發(fā),再有專門的nginx服務(wù)來做反向代理,如圖:

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

另外,我們的Tomcat服務(wù)將來也會部署為集群模式:

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

可見,多級緩存的關(guān)鍵有兩個:

  • 一個是在nginx中編寫業(yè)務(wù),實(shí)現(xiàn)nginx本地緩存、Redis、Tomcat的查詢

  • 另一個就是在Tomcat中實(shí)現(xiàn)JVM進(jìn)程緩存

其中Nginx編程則會用到OpenResty框架結(jié)合Lua這樣的語言。

這也是今天課程的難點(diǎn)和重點(diǎn)。

2.JVM進(jìn)程緩存

2.1.初識Caffeine

緩存在日常開發(fā)中啟動至關(guān)重要的作用,由于是存儲在內(nèi)存中,數(shù)據(jù)的讀取速度是非??斓?,能大量減少對數(shù)據(jù)庫的訪問,減少數(shù)據(jù)庫的壓力。我們把緩存分為兩類:

  • 分布式緩存,例如Redis:
    • 優(yōu)點(diǎn):存儲容量更大、可靠性更好、可以在集群間共享
    • 缺點(diǎn):訪問緩存有網(wǎng)絡(luò)開銷
    • 場景:緩存數(shù)據(jù)量較大、可靠性要求較高、需要在集群間共享
  • 進(jìn)程本地緩存,例如HashMap、GuavaCache:
    • 優(yōu)點(diǎn):讀取本地內(nèi)存,沒有網(wǎng)絡(luò)開銷,速度更快
    • 缺點(diǎn):存儲容量有限、可靠性較低、無法共享
    • 場景:性能要求較高,緩存數(shù)據(jù)量較小

我們今天會利用Caffeine框架來實(shí)現(xiàn)JVM進(jìn)程緩存。

Caffeine是一個基于Java8開發(fā)的,提供了近乎最佳命中率的高性能的本地緩存庫。目前Spring內(nèi)部的緩存使用的就是Caffeine。GitHub地址:https://github.com/ben-manes/caffeine

Caffeine的性能非常好,下圖是官方給出的性能對比:

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

可以看到Caffeine的性能遙遙領(lǐng)先!

緩存使用的基本API:

@Test
void testBasicOps() {
    // 構(gòu)建cache對象
    Cache<String, String> cache = Caffeine.newBuilder().build();

    // 存數(shù)據(jù)
    cache.put("gf", "迪麗熱巴");

    // 取數(shù)據(jù)
    String gf = cache.getIfPresent("gf");
    System.out.println("gf = " + gf);

    // 取數(shù)據(jù),包含兩個參數(shù):
    // 參數(shù)一:緩存的key
    // 參數(shù)二:Lambda表達(dá)式,表達(dá)式參數(shù)就是緩存的key,方法體是查詢數(shù)據(jù)庫的邏輯
    // 優(yōu)先根據(jù)key查詢JVM緩存,如果未命中,則執(zhí)行參數(shù)二的Lambda表達(dá)式
    String defaultGF = cache.get("defaultGF", key -> {
        // 根據(jù)key去數(shù)據(jù)庫查詢數(shù)據(jù)
        return "柳巖";
    });
    System.out.println("defaultGF = " + defaultGF);
}

Caffeine既然是緩存的一種,肯定需要有緩存的清除策略,不然的話內(nèi)存總會有耗盡的時候。

Caffeine提供了三種緩存驅(qū)逐策略:

  • 基于容量:設(shè)置緩存的數(shù)量上限

    // 創(chuàng)建緩存對象
    Cache<String, String> cache = Caffeine.newBuilder()
        .maximumSize(1) // 設(shè)置緩存大小上限為 1
        .build();
    
  • 基于時間:設(shè)置緩存的有效時間

    // 創(chuàng)建緩存對象
    Cache<String, String> cache = Caffeine.newBuilder()
        // 設(shè)置緩存有效期為 10 秒,從最后一次寫入開始計(jì)時 
        .expireAfterWrite(Duration.ofSeconds(10)) 
        .build();
    
    
  • 基于引用:設(shè)置緩存為軟引用或弱引用,利用GC來回收緩存數(shù)據(jù)。性能較差,不建議使用。

注意:在默認(rèn)情況下,當(dāng)一個緩存元素過期的時候,Caffeine不會自動立即將其清理和驅(qū)逐。而是在一次讀或?qū)懖僮骱螅蛘咴诳臻e時間完成對失效數(shù)據(jù)的驅(qū)逐。

2.2.實(shí)現(xiàn)JVM進(jìn)程緩存

2.2.1.需求

利用Caffeine實(shí)現(xiàn)下列需求:

  • 給根據(jù)id查詢商品的業(yè)務(wù)添加緩存,緩存未命中時查詢數(shù)據(jù)庫
  • 給根據(jù)id查詢商品庫存的業(yè)務(wù)添加緩存,緩存未命中時查詢數(shù)據(jù)庫
  • 緩存初始大小為100
  • 緩存上限為10000

2.2.2.實(shí)現(xiàn)

首先,我們需要定義兩個Caffeine的緩存對象,分別保存商品、庫存的緩存數(shù)據(jù)。

在item-service的com.java.item.config包下定義CaffeineConfig類:

package com.java.item.config;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.java.item.pojo.Item;
import com.java.item.pojo.ItemStock;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class CaffeineConfig {

    @Bean
    public Cache<Long, Item> itemCache(){
        return Caffeine.newBuilder()
                .initialCapacity(100)
                .maximumSize(10_000)
                .build();
    }

    @Bean
    public Cache<Long, ItemStock> stockCache(){
        return Caffeine.newBuilder()
                .initialCapacity(100)
                .maximumSize(10_000)
                .build();
    }
}

然后,修改item-service中的com.java.item.web包下的ItemController類,添加緩存邏輯:

@RestController
@RequestMapping("item")
public class ItemController {

    @Autowired
    private IItemService itemService;
    @Autowired
    private IItemStockService stockService;

    @Autowired
    private Cache<Long, Item> itemCache;
    @Autowired
    private Cache<Long, ItemStock> stockCache;
    
    // ...其它略
    
    @GetMapping("/{id}")
    public Item findById(@PathVariable("id") Long id) {
        return itemCache.get(id, key -> itemService.query()
                .ne("status", 3).eq("id", key)
                .one()
        );
    }

    @GetMapping("/stock/{id}")
    public ItemStock findStockById(@PathVariable("id") Long id) {
        return stockCache.get(id, key -> stockService.getById(key));
    }
}

3.Lua語法入門

Nginx編程需要用到Lua語言,因此我們必須先入門Lua的基本語法。

3.1.初識Lua

Lua 是一種輕量小巧的腳本語言,用標(biāo)準(zhǔn)C語言編寫并以源代碼形式開放, 其設(shè)計(jì)目的是為了嵌入應(yīng)用程序中,從而為應(yīng)用程序提供靈活的擴(kuò)展和定制功能。官網(wǎng):https://www.lua.org/

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

Lua經(jīng)常嵌入到C語言開發(fā)的程序中,例如游戲開發(fā)、游戲插件等。

Nginx本身也是C語言開發(fā),因此也允許基于Lua做拓展。

3.1.HelloWorld

CentOS7默認(rèn)已經(jīng)安裝了Lua語言環(huán)境,所以可以直接運(yùn)行Lua代碼。

1)在Linux虛擬機(jī)的任意目錄下,新建一個hello.lua文件

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

2)添加下面的內(nèi)容

print("Hello World!")  

3)運(yùn)行

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

3.2.變量和循環(huán)

學(xué)習(xí)任何語言必然離不開變量,而變量的聲明必須先知道數(shù)據(jù)的類型。

3.2.1.Lua的數(shù)據(jù)類型

Lua中支持的常見數(shù)據(jù)類型包括:

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

另外,Lua提供了type()函數(shù)來判斷一個變量的數(shù)據(jù)類型:

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

3.2.2.聲明變量

Lua聲明變量的時候無需指定數(shù)據(jù)類型,而是用local來聲明變量為局部變量:

-- 聲明字符串,可以用單引號或雙引號,
local str = 'hello'
-- 字符串拼接可以使用 ..
local str2 = 'hello' .. 'world'
-- 聲明數(shù)字
local num = 21
-- 聲明布爾類型
local flag = true

Lua中的table類型既可以作為數(shù)組,又可以作為Java中的map來使用。數(shù)組就是特殊的table,key是數(shù)組角標(biāo)而已:

-- 聲明數(shù)組 ,key為角標(biāo)的 table
local arr = {'java', 'python', 'lua'}
-- 聲明table,類似java的map
local map =  {name='Jack', age=21}

Lua中的數(shù)組角標(biāo)是從1開始,訪問的時候與Java中類似:

-- 訪問數(shù)組,lua數(shù)組的角標(biāo)從1開始
print(arr[1])

Lua中的table可以用key來訪問:

-- 訪問table
print(map['name'])
print(map.name)

3.2.3.循環(huán)

對于table,我們可以利用for循環(huán)來遍歷。不過數(shù)組和普通table遍歷略有差異。

遍歷數(shù)組:

-- 聲明數(shù)組 key為索引的 table
local arr = {'java', 'python', 'lua'}
-- 遍歷數(shù)組
for index,value in ipairs(arr) do
    print(index, value) 
end

遍歷普通table

-- 聲明map,也就是table
local map = {name='Jack', age=21}
-- 遍歷table
for key,value in pairs(map) do
   print(key, value) 
end

3.3.條件控制、函數(shù)

Lua中的條件控制和函數(shù)聲明與Java類似。

3.3.1.函數(shù)

定義函數(shù)的語法:

function 函數(shù)名( argument1, argument2..., argumentn)
    -- 函數(shù)體
    return 返回值
end

例如,定義一個函數(shù),用來打印數(shù)組:

function printArr(arr)
    for index, value in ipairs(arr) do
        print(value)
    end
end

3.3.2.條件控制

類似Java的條件控制,例如if、else語法:

if(布爾表達(dá)式)
then
   --[ 布爾表達(dá)式為 true 時執(zhí)行該語句塊 --]
else
   --[ 布爾表達(dá)式為 false 時執(zhí)行該語句塊 --]
end

與java不同,布爾表達(dá)式中的邏輯運(yùn)算是基于英文單詞:

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

3.3.3.案例

需求:自定義一個函數(shù),可以打印table,當(dāng)參數(shù)為nil時,打印錯誤信息

function printArr(arr)
    if not arr then
        print('數(shù)組不能為空!')
    end
    for index, value in ipairs(arr) do
        print(value)
    end
end

4.實(shí)現(xiàn)多級緩存

多級緩存的實(shí)現(xiàn)離不開Nginx編程,而Nginx編程又離不開OpenResty。

4.1.安裝OpenResty

OpenResty? 是一個基于 Nginx的高性能 Web 平臺,用于方便地搭建能夠處理超高并發(fā)、擴(kuò)展性極高的動態(tài) Web 應(yīng)用、Web 服務(wù)和動態(tài)網(wǎng)關(guān)。具備下列特點(diǎn):

  • 具備Nginx的完整功能
  • 基于Lua語言進(jìn)行擴(kuò)展,集成了大量精良的 Lua 庫、第三方模塊
  • 允許使用Lua自定義業(yè)務(wù)邏輯、自定義庫

官方網(wǎng)站: https://openresty.org/cn/

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

4.1.1.安裝

首先你的Linux虛擬機(jī)必須聯(lián)網(wǎng)

1)安裝開發(fā)庫

首先要安裝OpenResty的依賴開發(fā)庫,執(zhí)行命令:

yum install -y pcre-devel openssl-devel gcc --skip-broken
2)安裝OpenResty倉庫

你可以在你的 CentOS 系統(tǒng)中添加 openresty 倉庫,這樣就可以便于未來安裝或更新我們的軟件包(通過 yum check-update 命令)。運(yùn)行下面的命令就可以添加我們的倉庫:

yum-config-manager --add-repo https://openresty.org/package/centos/openresty.repo

如果提示說命令不存在,則運(yùn)行:

yum install -y yum-utils 

然后再重復(fù)上面的命令

3)安裝OpenResty

然后就可以像下面這樣安裝軟件包,比如 openresty

yum install -y openresty
4)安裝opm工具

opm是OpenResty的一個管理工具,可以幫助我們安裝一個第三方的Lua模塊。

如果你想安裝命令行工具 opm,那么可以像下面這樣安裝 openresty-opm 包:

yum install -y openresty-opm
5)目錄結(jié)構(gòu)

默認(rèn)情況下,OpenResty安裝的目錄是:/usr/local/openresty

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

看到里面的nginx目錄了嗎,OpenResty就是在Nginx基礎(chǔ)上集成了一些Lua模塊。

6)配置nginx的環(huán)境變量

打開配置文件:

vi /etc/profile

在最下面加入兩行:

export NGINX_HOME=/usr/local/openresty/nginx
export PATH=${NGINX_HOME}/sbin:$PATH

NGINX_HOME:后面是OpenResty安裝目錄下的nginx的目錄

然后讓配置生效:

source /etc/profile

4.1.2.啟動和運(yùn)行

OpenResty底層是基于Nginx的,查看OpenResty目錄的nginx目錄,結(jié)構(gòu)與windows中安裝的nginx基本一致:

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

所以運(yùn)行方式與nginx基本一致:

# 啟動nginx
nginx
# 重新加載配置
nginx -s reload
# 停止
nginx -s stop

nginx的默認(rèn)配置文件注釋太多,影響后續(xù)我們的編輯,這里將nginx.conf中的注釋部分刪除,保留有效部分。

修改/usr/local/openresty/nginx/conf/nginx.conf文件,內(nèi)容如下:

#user  nobody;
worker_processes  1;
error_log  logs/error.log;

events {
    worker_connections  1024;
}

http {
    include       mime.types;
    default_type  application/octet-stream;
    sendfile        on;
    keepalive_timeout  65;

    server {
        listen       8081;
        server_name  localhost;
        location / {
            root   html;
            index  index.html index.htm;
        }
        error_page   500 502 503 504  /50x.html;
        location = /50x.html {
            root   html;
        }
    }
}

在Linux的控制臺輸入命令以啟動nginx:

nginx

然后訪問頁面:http://192.168.150.101:8081,注意ip地址替換為你自己的虛擬機(jī)IP:

4.2.OpenResty快速入門

我們希望達(dá)到的多級緩存架構(gòu)如圖:

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

其中:

  • windows上的nginx用來做反向代理服務(wù),將前端的查詢商品的ajax請求代理到OpenResty集群

  • OpenResty集群用來編寫多級緩存業(yè)務(wù)

4.2.1.反向代理流程

現(xiàn)在,商品詳情頁使用的是假的商品數(shù)據(jù)。不過在瀏覽器中,可以看到頁面有發(fā)起ajax請求查詢真實(shí)商品數(shù)據(jù)。

這個請求如下:

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

請求地址是localhost,端口是80,就被windows上安裝的Nginx服務(wù)給接收到了。然后代理給了OpenResty集群:

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

我們需要在OpenResty中編寫業(yè)務(wù),查詢商品數(shù)據(jù)并返回到瀏覽器。

但是這次,我們先在OpenResty接收請求,返回假的商品數(shù)據(jù)。

4.2.2.OpenResty監(jiān)聽請求

OpenResty的很多功能都依賴于其目錄下的Lua庫,需要在nginx.conf中指定依賴庫的目錄,并導(dǎo)入依賴:

1)添加對OpenResty的Lua模塊的加載

修改/usr/local/openresty/nginx/conf/nginx.conf文件,在其中的http下面,添加下面代碼:

#lua 模塊
lua_package_path "/usr/local/openresty/lualib/?.lua;;";
#c模塊     
lua_package_cpath "/usr/local/openresty/lualib/?.so;;";  

2)監(jiān)聽/api/item路徑

修改/usr/local/openresty/nginx/conf/nginx.conf文件,在nginx.conf的server下面,添加對/api/item這個路徑的監(jiān)聽:

location  /api/item {
    # 默認(rèn)的響應(yīng)類型
    default_type application/json;
    # 響應(yīng)結(jié)果由lua/item.lua文件來決定
    content_by_lua_file lua/item.lua;
}

這個監(jiān)聽,就類似于SpringMVC中的@GetMapping("/api/item")做路徑映射。

content_by_lua_file lua/item.lua則相當(dāng)于調(diào)用item.lua這個文件,執(zhí)行其中的業(yè)務(wù),把結(jié)果返回給用戶。相當(dāng)于java中調(diào)用service。

4.2.3.編寫item.lua

1)在/usr/loca/openresty/nginx目錄創(chuàng)建文件夾:lua

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

2)在/usr/loca/openresty/nginx/lua文件夾下,新建文件:item.lua

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

3)編寫item.lua,返回假數(shù)據(jù)

item.lua中,利用ngx.say()函數(shù)返回?cái)?shù)據(jù)到Response中

ngx.say('{"id":10001,"name":"SALSA AIR","title":"RIMOWA 21寸托運(yùn)箱拉桿箱 SALSA AIR系列果綠色 820.70.36.4","price":17900,"image":"https://m.360buyimg.com/mobilecms/s720x720_jfs/t6934/364/1195375010/84676/e9f2c55f/597ece38N0ddcbc77.jpg!q70.jpg.webp","category":"拉桿箱","brand":"RIMOWA","spec":"","status":1,"createTime":"2019-04-30T16:00:00.000+00:00","updateTime":"2019-04-30T16:00:00.000+00:00","stock":2999,"sold":31290}')

4)重新加載配置

nginx -s reload

刷新商品頁面:http://localhost/item.html?id=1001,即可看到效果:

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

4.3.請求參數(shù)處理

上一節(jié)中,我們在OpenResty接收前端請求,但是返回的是假數(shù)據(jù)。

要返回真實(shí)數(shù)據(jù),必須根據(jù)前端傳遞來的商品id,查詢商品信息才可以。

那么如何獲取前端傳遞的商品參數(shù)呢?

4.3.1.獲取參數(shù)的API

OpenResty中提供了一些API用來獲取不同類型的前端請求參數(shù):

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

4.3.2.獲取參數(shù)并返回

在前端發(fā)起的ajax請求如圖:

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

可以看到商品id是以路徑占位符方式傳遞的,因此可以利用正則表達(dá)式匹配的方式來獲取ID

1)獲取商品id

修改/usr/loca/openresty/nginx/nginx.conf文件中監(jiān)聽/api/item的代碼,利用正則表達(dá)式獲取ID:

location ~ /api/item/(\d+) {
    # 默認(rèn)的響應(yīng)類型
    default_type application/json;
    # 響應(yīng)結(jié)果由lua/item.lua文件來決定
    content_by_lua_file lua/item.lua;
}

2)拼接ID并返回

修改/usr/loca/openresty/nginx/lua/item.lua文件,獲取id并拼接到結(jié)果中返回:

-- 獲取商品id
local id = ngx.var[1]
-- 拼接并返回
ngx.say('{"id":' .. id .. ',"name":"SALSA AIR","title":"RIMOWA 21寸托運(yùn)箱拉桿箱 SALSA AIR系列果綠色 820.70.36.4","price":17900,"image":"https://m.360buyimg.com/mobilecms/s720x720_jfs/t6934/364/1195375010/84676/e9f2c55f/597ece38N0ddcbc77.jpg!q70.jpg.webp","category":"拉桿箱","brand":"RIMOWA","spec":"","status":1,"createTime":"2019-04-30T16:00:00.000+00:00","updateTime":"2019-04-30T16:00:00.000+00:00","stock":2999,"sold":31290}')

3)重新加載并測試

運(yùn)行命令以重新加載OpenResty配置:

nginx -s reload

刷新頁面可以看到結(jié)果中已經(jīng)帶上了ID:

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

4.4.查詢Tomcat

拿到商品ID后,本應(yīng)去緩存中查詢商品信息,不過目前我們還未建立nginx、redis緩存。因此,這里我們先根據(jù)商品id去tomcat查詢商品信息。我們實(shí)現(xiàn)如圖部分:

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

需要注意的是,我們的OpenResty是在虛擬機(jī),Tomcat是在Windows電腦上。兩者IP一定不要搞錯了。

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

4.4.1.發(fā)送http請求的API

nginx提供了內(nèi)部API用以發(fā)送http請求:

local resp = ngx.location.capture("/path",{
    method = ngx.HTTP_GET,   -- 請求方式
    args = {a=1,b=2},  -- get方式傳參數(shù)
})

返回的響應(yīng)內(nèi)容包括:

  • resp.status:響應(yīng)狀態(tài)碼
  • resp.header:響應(yīng)頭,是一個table
  • resp.body:響應(yīng)體,就是響應(yīng)數(shù)據(jù)

注意:這里的path是路徑,并不包含IP和端口。這個請求會被nginx內(nèi)部的server監(jiān)聽并處理。

但是我們希望這個請求發(fā)送到Tomcat服務(wù)器,所以還需要編寫一個server來對這個路徑做反向代理:

 location /path {
     # 這里是windows電腦的ip和Java服務(wù)端口,需要確保windows防火墻處于關(guān)閉狀態(tài)
     proxy_pass http://192.168.150.1:8081; 
 }

原理如圖:

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

4.4.2.封裝http工具

下面,我們封裝一個發(fā)送Http請求的工具,基于ngx.location.capture來實(shí)現(xiàn)查詢tomcat。

1)添加反向代理,到windows的Java服務(wù)

因?yàn)閕tem-service中的接口都是/item開頭,所以我們監(jiān)聽/item路徑,代理到windows上的tomcat服務(wù)。

修改 /usr/local/openresty/nginx/conf/nginx.conf文件,添加一個location:

location /item {
    proxy_pass http://192.168.150.1:8081;
}

以后,只要我們調(diào)用ngx.location.capture("/item"),就一定能發(fā)送請求到windows的tomcat服務(wù)。

2)封裝工具類

之前我們說過,OpenResty啟動時會加載以下兩個目錄中的工具文件:

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

所以,自定義的http工具也需要放到這個目錄下。

/usr/local/openresty/lualib目錄下,新建一個common.lua文件:

vi /usr/local/openresty/lualib/common.lua

內(nèi)容如下:

-- 封裝函數(shù),發(fā)送http請求,并解析響應(yīng)
local function read_http(path, params)
    local resp = ngx.location.capture(path,{
        method = ngx.HTTP_GET,
        args = params,
    })
    if not resp then
        -- 記錄錯誤信息,返回404
        ngx.log(ngx.ERR, "http請求查詢失敗, path: ", path , ", args: ", args)
        ngx.exit(404)
    end
    return resp.body
end
-- 將方法導(dǎo)出
local _M = {  
    read_http = read_http
}  
return _M

這個工具將read_http函數(shù)封裝到_M這個table類型的變量中,并且返回,這類似于導(dǎo)出。

使用的時候,可以利用require('common')來導(dǎo)入該函數(shù)庫,這里的common是函數(shù)庫的文件名。

3)實(shí)現(xiàn)商品查詢

最后,我們修改/usr/local/openresty/lua/item.lua文件,利用剛剛封裝的函數(shù)庫實(shí)現(xiàn)對tomcat的查詢:

-- 引入自定義common工具模塊,返回值是common中返回的 _M
local common = require("common")
-- 從 common中獲取read_http這個函數(shù)
local read_http = common.read_http
-- 獲取路徑參數(shù)
local id = ngx.var[1]
-- 根據(jù)id查詢商品
local itemJSON = read_http("/item/".. id, nil)
-- 根據(jù)id查詢商品庫存
local itemStockJSON = read_http("/item/stock/".. id, nil)

這里查詢到的結(jié)果是json字符串,并且包含商品、庫存兩個json字符串,頁面最終需要的是把兩個json拼接為一個json:

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

這就需要我們先把JSON變?yōu)閘ua的table,完成數(shù)據(jù)整合后,再轉(zhuǎn)為JSON。

4.4.3.CJSON工具類

OpenResty提供了一個cjson的模塊用來處理JSON的序列化和反序列化。

官方地址: https://github.com/openresty/lua-cjson/

1)引入cjson模塊:

local cjson = require "cjson"

2)序列化:

local obj = {
    name = 'jack',
    age = 21
}
-- 把 table 序列化為 json
local json = cjson.encode(obj)

3)反序列化:

local json = '{"name": "jack", "age": 21}'
-- 反序列化 json為 table
local obj = cjson.decode(json);
print(obj.name)

4.4.4.實(shí)現(xiàn)Tomcat查詢

下面,我們修改之前的item.lua中的業(yè)務(wù),添加json處理功能:

-- 導(dǎo)入common函數(shù)庫
local common = require('common')
local read_http = common.read_http
-- 導(dǎo)入cjson庫
local cjson = require('cjson')

-- 獲取路徑參數(shù)
local id = ngx.var[1]
-- 根據(jù)id查詢商品
local itemJSON = read_http("/item/".. id, nil)
-- 根據(jù)id查詢商品庫存
local itemStockJSON = read_http("/item/stock/".. id, nil)

-- JSON轉(zhuǎn)化為lua的table
local item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)

-- 組合數(shù)據(jù)
item.stock = stock.stock
item.sold = stock.sold

-- 把item序列化為json 返回結(jié)果
ngx.say(cjson.encode(item))

4.4.5.基于ID負(fù)載均衡

剛才的代碼中,我們的tomcat是單機(jī)部署。而實(shí)際開發(fā)中,tomcat一定是集群模式:

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

因此,OpenResty需要對tomcat集群做負(fù)載均衡。

而默認(rèn)的負(fù)載均衡規(guī)則是輪詢模式,當(dāng)我們查詢/item/10001時:

  • 第一次會訪問8081端口的tomcat服務(wù),在該服務(wù)內(nèi)部就形成了JVM進(jìn)程緩存
  • 第二次會訪問8082端口的tomcat服務(wù),該服務(wù)內(nèi)部沒有JVM緩存(因?yàn)镴VM緩存無法共享),會查詢數(shù)據(jù)庫

你看,因?yàn)檩喸兊脑?,第一次查?081形成的JVM緩存并未生效,直到下一次再次訪問到8081時才可以生效,緩存命中率太低了。

怎么辦?

如果能讓同一個商品,每次查詢時都訪問同一個tomcat服務(wù),那么JVM緩存就一定能生效了。

也就是說,我們需要根據(jù)商品id做負(fù)載均衡,而不是輪詢。

1)原理

nginx提供了基于請求路徑做負(fù)載均衡的算法:

nginx根據(jù)請求路徑做hash運(yùn)算,把得到的數(shù)值對tomcat服務(wù)的數(shù)量取余,余數(shù)是幾,就訪問第幾個服務(wù),實(shí)現(xiàn)負(fù)載均衡。

例如:

  • 我們的請求路徑是 /item/10001
  • tomcat總數(shù)為2臺(8081、8082)
  • 對請求路徑/item/1001做hash運(yùn)算求余的結(jié)果為1
  • 則訪問第一個tomcat服務(wù),也就是8081

只要id不變,每次hash運(yùn)算結(jié)果也不會變,那就可以保證同一個商品,一直訪問同一個tomcat服務(wù),確保JVM緩存生效。

2)實(shí)現(xiàn)

修改/usr/local/openresty/nginx/conf/nginx.conf文件,實(shí)現(xiàn)基于ID做負(fù)載均衡。

首先,定義tomcat集群,并設(shè)置基于路徑做負(fù)載均衡:

upstream tomcat-cluster {
    hash $request_uri;
    server 192.168.150.1:8081;
    server 192.168.150.1:8082;
}

然后,修改對tomcat服務(wù)的反向代理,目標(biāo)指向tomcat集群:

location /item {
    proxy_pass http://tomcat-cluster;
}

重新加載OpenResty

nginx -s reload
3)測試

啟動兩臺tomcat服務(wù):

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

同時啟動:

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

清空日志后,再次訪問頁面,可以看到不同id的商品,訪問到了不同的tomcat服務(wù):

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

4.5.Redis緩存預(yù)熱

Redis緩存會面臨冷啟動問題:

冷啟動:服務(wù)剛剛啟動時,Redis中并沒有緩存,如果所有商品數(shù)據(jù)都在第一次查詢時添加緩存,可能會給數(shù)據(jù)庫帶來較大壓力。

緩存預(yù)熱:在實(shí)際開發(fā)中,我們可以利用大數(shù)據(jù)統(tǒng)計(jì)用戶訪問的熱點(diǎn)數(shù)據(jù),在項(xiàng)目啟動時將這些熱點(diǎn)數(shù)據(jù)提前查詢并保存到Redis中。

我們數(shù)據(jù)量較少,并且沒有數(shù)據(jù)統(tǒng)計(jì)相關(guān)功能,目前可以在啟動時將所有數(shù)據(jù)都放入緩存中。

1)利用Docker安裝Redis

docker run --name redis -p 6379:6379 -d redis redis-server --appendonly yes

2)在item-service服務(wù)中引入Redis依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

3)配置Redis地址

spring:
  redis:
    host: 192.168.150.101

4)編寫初始化類

緩存預(yù)熱需要在項(xiàng)目啟動時完成,并且必須是拿到RedisTemplate之后。

這里我們利用InitializingBean接口來實(shí)現(xiàn),因?yàn)镮nitializingBean可以在對象被Spring創(chuàng)建并且成員變量全部注入后執(zhí)行。

package com.java.item.config;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.java.item.pojo.Item;
import com.java.item.pojo.ItemStock;
import com.java.item.service.IItemService;
import com.java.item.service.IItemStockService;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class RedisHandler implements InitializingBean {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private IItemService itemService;
    @Autowired
    private IItemStockService stockService;

    private static final ObjectMapper MAPPER = new ObjectMapper();

    @Override
    public void afterPropertiesSet() throws Exception {
        // 初始化緩存
        // 1.查詢商品信息
        List<Item> itemList = itemService.list();
        // 2.放入緩存
        for (Item item : itemList) {
            // 2.1.item序列化為JSON
            String json = MAPPER.writeValueAsString(item);
            // 2.2.存入redis
            redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
        }

        // 3.查詢商品庫存信息
        List<ItemStock> stockList = stockService.list();
        // 4.放入緩存
        for (ItemStock stock : stockList) {
            // 2.1.item序列化為JSON
            String json = MAPPER.writeValueAsString(stock);
            // 2.2.存入redis
            redisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);
        }
    }
}

4.6.查詢Redis緩存

現(xiàn)在,Redis緩存已經(jīng)準(zhǔn)備就緒,我們可以再OpenResty中實(shí)現(xiàn)查詢Redis的邏輯了。如下圖紅框所示:

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

當(dāng)請求進(jìn)入OpenResty之后:

  • 優(yōu)先查詢Redis緩存
  • 如果Redis緩存未命中,再查詢Tomcat

4.6.1.封裝Redis工具

OpenResty提供了操作Redis的模塊,我們只要引入該模塊就能直接使用。但是為了方便,我們將Redis操作封裝到之前的common.lua工具庫中。

修改/usr/local/openresty/lualib/common.lua文件:

1)引入Redis模塊,并初始化Redis對象

-- 導(dǎo)入redis
local redis = require('resty.redis')
-- 初始化redis
local red = redis:new()
red:set_timeouts(1000, 1000, 1000)

2)封裝函數(shù),用來釋放Redis連接,其實(shí)是放入連接池

-- 關(guān)閉redis連接的工具方法,其實(shí)是放入連接池
local function close_redis(red)
    local pool_max_idle_time = 10000 -- 連接的空閑時間,單位是毫秒
    local pool_size = 100 --連接池大小
    local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
    if not ok then
        ngx.log(ngx.ERR, "放入redis連接池失敗: ", err)
    end
end

3)封裝函數(shù),根據(jù)key查詢Redis數(shù)據(jù)

-- 查詢r(jià)edis的方法 ip和port是redis地址,key是查詢的key
local function read_redis(ip, port, key)
    -- 獲取一個連接
    local ok, err = red:connect(ip, port)
    if not ok then
        ngx.log(ngx.ERR, "連接redis失敗 : ", err)
        return nil
    end
    -- 查詢r(jià)edis
    local resp, err = red:get(key)
    -- 查詢失敗處理
    if not resp then
        ngx.log(ngx.ERR, "查詢Redis失敗: ", err, ", key = " , key)
    end
    --得到的數(shù)據(jù)為空處理
    if resp == ngx.null then
        resp = nil
        ngx.log(ngx.ERR, "查詢Redis數(shù)據(jù)為空, key = ", key)
    end
    close_redis(red)
    return resp
end

4)導(dǎo)出

-- 將方法導(dǎo)出
local _M = {  
    read_http = read_http,
    read_redis = read_redis
}  
return _M

完整的common.lua:

-- 導(dǎo)入redis
local redis = require('resty.redis')
-- 初始化redis
local red = redis:new()
red:set_timeouts(1000, 1000, 1000)

-- 關(guān)閉redis連接的工具方法,其實(shí)是放入連接池
local function close_redis(red)
    local pool_max_idle_time = 10000 -- 連接的空閑時間,單位是毫秒
    local pool_size = 100 --連接池大小
    local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
    if not ok then
        ngx.log(ngx.ERR, "放入redis連接池失敗: ", err)
    end
end

-- 查詢r(jià)edis的方法 ip和port是redis地址,key是查詢的key
local function read_redis(ip, port, key)
    -- 獲取一個連接
    local ok, err = red:connect(ip, port)
    if not ok then
        ngx.log(ngx.ERR, "連接redis失敗 : ", err)
        return nil
    end
    -- 查詢r(jià)edis
    local resp, err = red:get(key)
    -- 查詢失敗處理
    if not resp then
        ngx.log(ngx.ERR, "查詢Redis失敗: ", err, ", key = " , key)
    end
    --得到的數(shù)據(jù)為空處理
    if resp == ngx.null then
        resp = nil
        ngx.log(ngx.ERR, "查詢Redis數(shù)據(jù)為空, key = ", key)
    end
    close_redis(red)
    return resp
end

-- 封裝函數(shù),發(fā)送http請求,并解析響應(yīng)
local function read_http(path, params)
    local resp = ngx.location.capture(path,{
        method = ngx.HTTP_GET,
        args = params,
    })
    if not resp then
        -- 記錄錯誤信息,返回404
        ngx.log(ngx.ERR, "http查詢失敗, path: ", path , ", args: ", args)
        ngx.exit(404)
    end
    return resp.body
end
-- 將方法導(dǎo)出
local _M = {  
    read_http = read_http,
    read_redis = read_redis
}  
return _M

4.6.2.實(shí)現(xiàn)Redis查詢

接下來,我們就可以去修改item.lua文件,實(shí)現(xiàn)對Redis的查詢了。

查詢邏輯是:

  • 根據(jù)id查詢Redis
  • 如果查詢失敗則繼續(xù)查詢Tomcat
  • 將查詢結(jié)果返回

1)修改/usr/local/openresty/lua/item.lua文件,添加一個查詢函數(shù):

-- 導(dǎo)入common函數(shù)庫
local common = require('common')
local read_http = common.read_http
local read_redis = common.read_redis
-- 封裝查詢函數(shù)
function read_data(key, path, params)
    -- 查詢本地緩存
    local val = read_redis("127.0.0.1", 6379, key)
    -- 判斷查詢結(jié)果
    if not val then
        ngx.log(ngx.ERR, "redis查詢失敗,嘗試查詢http, key: ", key)
        -- redis查詢失敗,去查詢http
        val = read_http(path, params)
    end
    -- 返回?cái)?shù)據(jù)
    return val
end

2)而后修改商品查詢、庫存查詢的業(yè)務(wù):

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

3)完整的item.lua代碼:

-- 導(dǎo)入common函數(shù)庫
local common = require('common')
local read_http = common.read_http
local read_redis = common.read_redis
-- 導(dǎo)入cjson庫
local cjson = require('cjson')

-- 封裝查詢函數(shù)
function read_data(key, path, params)
    -- 查詢本地緩存
    local val = read_redis("127.0.0.1", 6379, key)
    -- 判斷查詢結(jié)果
    if not val then
        ngx.log(ngx.ERR, "redis查詢失敗,嘗試查詢http, key: ", key)
        -- redis查詢失敗,去查詢http
        val = read_http(path, params)
    end
    -- 返回?cái)?shù)據(jù)
    return val
end

-- 獲取路徑參數(shù)
local id = ngx.var[1]

-- 查詢商品信息
local itemJSON = read_data("item:id:" .. id,  "/item/" .. id, nil)
-- 查詢庫存信息
local stockJSON = read_data("item:stock:id:" .. id, "/item/stock/" .. id, nil)

-- JSON轉(zhuǎn)化為lua的table
local item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)
-- 組合數(shù)據(jù)
item.stock = stock.stock
item.sold = stock.sold

-- 把item序列化為json 返回結(jié)果
ngx.say(cjson.encode(item))

4.7.Nginx本地緩存

現(xiàn)在,整個多級緩存中只差最后一環(huán),也就是nginx的本地緩存了。如圖:

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

4.7.1.本地緩存API

OpenResty為Nginx提供了shard dict的功能,可以在nginx的多個worker之間共享數(shù)據(jù),實(shí)現(xiàn)緩存功能。

1)開啟共享字典,在nginx.conf的http下添加配置:

 # 共享字典,也就是本地緩存,名稱叫做:item_cache,大小150m
 lua_shared_dict item_cache 150m; 

2)操作共享字典:

-- 獲取本地緩存對象
local item_cache = ngx.shared.item_cache
-- 存儲, 指定key、value、過期時間,單位s,默認(rèn)為0代表永不過期
item_cache:set('key', 'value', 1000)
-- 讀取
local val = item_cache:get('key')

4.7.2.實(shí)現(xiàn)本地緩存查詢

1)修改/usr/local/openresty/lua/item.lua文件,修改read_data查詢函數(shù),添加本地緩存邏輯:

-- 導(dǎo)入共享詞典,本地緩存
local item_cache = ngx.shared.item_cache

-- 封裝查詢函數(shù)
function read_data(key, expire, path, params)
    -- 查詢本地緩存
    local val = item_cache:get(key)
    if not val then
        ngx.log(ngx.ERR, "本地緩存查詢失敗,嘗試查詢Redis, key: ", key)
        -- 查詢r(jià)edis
        val = read_redis("127.0.0.1", 6379, key)
        -- 判斷查詢結(jié)果
        if not val then
            ngx.log(ngx.ERR, "redis查詢失敗,嘗試查詢http, key: ", key)
            -- redis查詢失敗,去查詢http
            val = read_http(path, params)
        end
    end
    -- 查詢成功,把數(shù)據(jù)寫入本地緩存
    item_cache:set(key, val, expire)
    -- 返回?cái)?shù)據(jù)
    return val
end

2)修改item.lua中查詢商品和庫存的業(yè)務(wù),實(shí)現(xiàn)最新的read_data函數(shù):

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

其實(shí)就是多了緩存時間參數(shù),過期后nginx緩存會自動刪除,下次訪問即可更新緩存。

這里給商品基本信息設(shè)置超時時間為30分鐘,庫存為1分鐘。

因?yàn)閹齑娓骂l率較高,如果緩存時間過長,可能與數(shù)據(jù)庫差異較大。

3)完整的item.lua文件:

-- 導(dǎo)入common函數(shù)庫
local common = require('common')
local read_http = common.read_http
local read_redis = common.read_redis
-- 導(dǎo)入cjson庫
local cjson = require('cjson')
-- 導(dǎo)入共享詞典,本地緩存
local item_cache = ngx.shared.item_cache

-- 封裝查詢函數(shù)
function read_data(key, expire, path, params)
    -- 查詢本地緩存
    local val = item_cache:get(key)
    if not val then
        ngx.log(ngx.ERR, "本地緩存查詢失敗,嘗試查詢Redis, key: ", key)
        -- 查詢r(jià)edis
        val = read_redis("127.0.0.1", 6379, key)
        -- 判斷查詢結(jié)果
        if not val then
            ngx.log(ngx.ERR, "redis查詢失敗,嘗試查詢http, key: ", key)
            -- redis查詢失敗,去查詢http
            val = read_http(path, params)
        end
    end
    -- 查詢成功,把數(shù)據(jù)寫入本地緩存
    item_cache:set(key, val, expire)
    -- 返回?cái)?shù)據(jù)
    return val
end

-- 獲取路徑參數(shù)
local id = ngx.var[1]

-- 查詢商品信息
local itemJSON = read_data("item:id:" .. id, 1800,  "/item/" .. id, nil)
-- 查詢庫存信息
local stockJSON = read_data("item:stock:id:" .. id, 60, "/item/stock/" .. id, nil)

-- JSON轉(zhuǎn)化為lua的table
local item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)
-- 組合數(shù)據(jù)
item.stock = stock.stock
item.sold = stock.sold

-- 把item序列化為json 返回結(jié)果
ngx.say(cjson.encode(item))

5.緩存同步

大多數(shù)情況下,瀏覽器查詢到的都是緩存數(shù)據(jù),如果緩存數(shù)據(jù)與數(shù)據(jù)庫數(shù)據(jù)存在較大差異,可能會產(chǎn)生比較嚴(yán)重的后果。

所以我們必須保證數(shù)據(jù)庫數(shù)據(jù)、緩存數(shù)據(jù)的一致性,這就是緩存與數(shù)據(jù)庫的同步。

5.1.數(shù)據(jù)同步策略

緩存數(shù)據(jù)同步的常見方式有三種:

設(shè)置有效期:給緩存設(shè)置有效期,到期后自動刪除。再次查詢時更新

  • 優(yōu)勢:簡單、方便
  • 缺點(diǎn):時效性差,緩存過期之前可能不一致
  • 場景:更新頻率較低,時效性要求低的業(yè)務(wù)

同步雙寫:在修改數(shù)據(jù)庫的同時,直接修改緩存

  • 優(yōu)勢:時效性強(qiáng),緩存與數(shù)據(jù)庫強(qiáng)一致
  • 缺點(diǎn):有代碼侵入,耦合度高;
  • 場景:對一致性、時效性要求較高的緩存數(shù)據(jù)

**異步通知:**修改數(shù)據(jù)庫時發(fā)送事件通知,相關(guān)服務(wù)監(jiān)聽到通知后修改緩存數(shù)據(jù)

  • 優(yōu)勢:低耦合,可以同時通知多個緩存服務(wù)
  • 缺點(diǎn):時效性一般,可能存在中間不一致狀態(tài)
  • 場景:時效性要求一般,有多個服務(wù)需要同步

而異步實(shí)現(xiàn)又可以基于MQ或者Canal來實(shí)現(xiàn):

1)基于MQ的異步通知:

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

解讀:

  • 商品服務(wù)完成對數(shù)據(jù)的修改后,只需要發(fā)送一條消息到MQ中。
  • 緩存服務(wù)監(jiān)聽MQ消息,然后完成對緩存的更新

依然有少量的代碼侵入。

2)基于Canal的通知

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

解讀:

  • 商品服務(wù)完成商品修改后,業(yè)務(wù)直接結(jié)束,沒有任何代碼侵入
  • Canal監(jiān)聽MySQL變化,當(dāng)發(fā)現(xiàn)變化后,立即通知緩存服務(wù)
  • 緩存服務(wù)接收到canal通知,更新緩存

代碼零侵入

5.2.安裝Canal

5.2.1.認(rèn)識Canal

Canal [k?’n?l],譯意為水道/管道/溝渠,canal是阿里巴巴旗下的一款開源項(xiàng)目,基于Java開發(fā)。基于數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱&消費(fèi)。GitHub的地址:https://github.com/alibaba/canal

Canal是基于mysql的主從同步來實(shí)現(xiàn)的,MySQL主從同步的原理如下:

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

  • 1)MySQL master 將數(shù)據(jù)變更寫入二進(jìn)制日志( binary log),其中記錄的數(shù)據(jù)叫做binary log events
  • 2)MySQL slave 將 master 的 binary log events拷貝到它的中繼日志(relay log)
  • 3)MySQL slave 重放 relay log 中事件,將數(shù)據(jù)變更反映它自己的數(shù)據(jù)

而Canal就是把自己偽裝成MySQL的一個slave節(jié)點(diǎn),從而監(jiān)聽master的binary log變化。再把得到的變化信息通知給Canal的客戶端,進(jìn)而完成對其它數(shù)據(jù)庫的同步。

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

5.2.2.安裝Canal

1.開啟MySQL主從

Canal是基于MySQL的主從同步功能,因此必須先開啟MySQL的主從功能才可以。

這里以之前用Docker運(yùn)行的mysql為例:

1.1.開啟binlog

打開mysql容器掛載的日志文件,我的在/tmp/mysql/conf目錄:

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

修改文件:

vi /tmp/mysql/conf/my.cnf

添加內(nèi)容:

log-bin=/var/lib/mysql/mysql-bin
binlog-do-db=java

配置解讀:

  • log-bin=/var/lib/mysql/mysql-bin:設(shè)置binary log文件的存放地址和文件名,叫做mysql-bin
  • binlog-do-db=java:指定對哪個database記錄binary log events,這里記錄java這個庫

最終效果:

[mysqld]
skip-name-resolve
character_set_server=utf8
datadir=/var/lib/mysql
server-id=1000
log-bin=/var/lib/mysql/mysql-bin
binlog-do-db=java

1.2.設(shè)置用戶權(quán)限

接下來添加一個僅用于數(shù)據(jù)同步的賬戶,出于安全考慮,這里僅提供對java這個庫的操作權(quán)限。

create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%' identified by 'canal';
FLUSH PRIVILEGES;

重啟mysql容器即可

docker restart mysql

測試設(shè)置是否成功:在mysql控制臺,或者Navicat中,輸入命令:

show master status;

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

2.安裝Canal

2.1.創(chuàng)建網(wǎng)絡(luò)

我們需要創(chuàng)建一個網(wǎng)絡(luò),將MySQL、Canal、MQ放到同一個Docker網(wǎng)絡(luò)中:

docker network create java

讓mysql加入這個網(wǎng)絡(luò):

docker network connect java mysql

2.3.安裝Canal

docker pull canal/canal-server:v1.1.5

然后運(yùn)行命令創(chuàng)建Canal容器:

docker run -p 11111:11111 --name canal \
-e canal.destinations=java \
-e canal.instance.master.address=mysql:3306  \
-e canal.instance.dbUsername=canal  \
-e canal.instance.dbPassword=canal  \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false  \
-e canal.instance.filter.regex=java\\..* \
--network java \
-d canal/canal-server:v1.1.5

說明:

  • -p 11111:11111:這是canal的默認(rèn)監(jiān)聽端口
  • -e canal.instance.master.address=mysql:3306:數(shù)據(jù)庫地址和端口,如果不知道m(xù)ysql容器地址,可以通過docker inspect 容器id來查看
  • -e canal.instance.dbUsername=canal:數(shù)據(jù)庫用戶名
  • -e canal.instance.dbPassword=canal :數(shù)據(jù)庫密碼
  • -e canal.instance.filter.regex=:要監(jiān)聽的表名稱

表名稱監(jiān)聽支持的語法:

mysql 數(shù)據(jù)解析關(guān)注的表,Perl正則表達(dá)式.
多個正則之間以逗號(,)分隔,轉(zhuǎn)義符需要雙斜杠(\\) 
常見例子:
1.  所有表:.*   or  .*\\..*
2.  canal schema下所有表: canal\\..*
3.  canal下的以canal打頭的表:canal\\.canal.*
4.  canal schema下的一張表:canal.test1
5.  多個規(guī)則組合使用然后以逗號隔開:canal\\..*,mysql.test1,mysql.test2 

5.3.監(jiān)聽Canal

Canal提供了各種語言的客戶端,當(dāng)Canal監(jiān)聽到binlog變化時,會通知Canal的客戶端。

caffeine 集群,java,# spring,# springboot,緩存,微服務(wù),分布式,java,redis

我們可以利用Canal提供的Java客戶端,監(jiān)聽Canal通知消息。當(dāng)收到變化的消息時,完成對緩存的更新。

不過這里我們會使用GitHub上的第三方開源的canal-starter客戶端。地址:https://github.com/NormanGyllenhaal/canal-client

與SpringBoot完美整合,自動裝配,比官方客戶端要簡單好用很多。

5.3.1.引入依賴:

<dependency>
    <groupId>top.javatool</groupId>
    <artifactId>canal-spring-boot-starter</artifactId>
    <version>1.2.1-RELEASE</version>
</dependency>

5.3.2.編寫配置:

canal:
  destination: java # canal的集群名字,要與安裝canal時設(shè)置的名稱一致
  server: 192.168.150.101:11111 # canal服務(wù)地址

5.3.3.修改Item實(shí)體類

通過@Id、@Column、等注解完成Item與數(shù)據(jù)庫表字段的映射:

package com.java.item.pojo;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Transient;

import javax.persistence.Column;
import java.util.Date;

@Data
@TableName("tb_item")
public class Item {
    @TableId(type = IdType.AUTO)
    @Id
    private Long id;//商品id
    @Column(name = "name")
    private String name;//商品名稱
    private String title;//商品標(biāo)題
    private Long price;//價(jià)格(分)
    private String image;//商品圖片
    private String category;//分類名稱
    private String brand;//品牌名稱
    private String spec;//規(guī)格
    private Integer status;//商品狀態(tài) 1-正常,2-下架
    private Date createTime;//創(chuàng)建時間
    private Date updateTime;//更新時間
    @TableField(exist = false)
    @Transient
    private Integer stock;
    @TableField(exist = false)
    @Transient
    private Integer sold;
}

5.3.4.編寫監(jiān)聽器

通過實(shí)現(xiàn)EntryHandler<T>接口編寫監(jiān)聽器,監(jiān)聽Canal消息。注意兩點(diǎn):

  • 實(shí)現(xiàn)類通過@CanalTable("tb_item")指定監(jiān)聽的表信息
  • EntryHandler的泛型是與表對應(yīng)的實(shí)體類
package com.java.item.canal;

import com.github.benmanes.caffeine.cache.Cache;
import com.java.item.config.RedisHandler;
import com.java.item.pojo.Item;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;

@CanalTable("tb_item")
@Component
public class ItemHandler implements EntryHandler<Item> {

    @Autowired
    private RedisHandler redisHandler;
    @Autowired
    private Cache<Long, Item> itemCache;

    @Override
    public void insert(Item item) {
        // 寫數(shù)據(jù)到JVM進(jìn)程緩存
        itemCache.put(item.getId(), item);
        // 寫數(shù)據(jù)到redis
        redisHandler.saveItem(item);
    }

    @Override
    public void update(Item before, Item after) {
        // 寫數(shù)據(jù)到JVM進(jìn)程緩存
        itemCache.put(after.getId(), after);
        // 寫數(shù)據(jù)到redis
        redisHandler.saveItem(after);
    }

    @Override
    public void delete(Item item) {
        // 刪除數(shù)據(jù)到JVM進(jìn)程緩存
        itemCache.invalidate(item.getId());
        // 刪除數(shù)據(jù)到redis
        redisHandler.deleteItemById(item.getId());
    }
}

在這里對Redis的操作都封裝到了RedisHandler這個對象中,是我們之前做緩存預(yù)熱時編寫的一個類,內(nèi)容如下:文章來源地址http://www.zghlxwxcb.cn/news/detail-519930.html

package com.java.item.config;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.java.item.pojo.Item;
import com.java.item.pojo.ItemStock;
import com.java.item.service.IItemService;
import com.java.item.service.IItemStockService;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class RedisHandler implements InitializingBean {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private IItemService itemService;
    @Autowired
    private IItemStockService stockService;

    private static final ObjectMapper MAPPER = new ObjectMapper();

    @Override
    public void afterPropertiesSet() throws Exception {
        // 初始化緩存
        // 1.查詢商品信息
        List<Item> itemList = itemService.list();
        // 2.放入緩存
        for (Item item : itemList) {
            // 2.1.item序列化為JSON
            String json = MAPPER.writeValueAsString(item);
            // 2.2.存入redis
            redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
        }

        // 3.查詢商品庫存信息
        List<ItemStock> stockList = stockService.list();
        // 4.放入緩存
        for (ItemStock stock : stockList) {
            // 2.1.item序列化為JSON
            String json = MAPPER.writeValueAsString(stock);
            // 2.2.存入redis
            redisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);
        }
    }

    public void saveItem(Item item) {
        try {
            String json = MAPPER.writeValueAsString(item);
            redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    public void deleteItemById(Long id) {
        redisTemplate.delete("item:id:" + id);
    }
}

到了這里,關(guān)于微服務(wù)---分布式多級緩存集群實(shí)現(xiàn)方案(Caffeine+redis+nginx本地緩存+Canal數(shù)據(jù)同步)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Redis分布式緩存之主從&哨兵&分片集群

    Redis分布式緩存之主從&哨兵&分片集群

    數(shù)據(jù)同步原理 集群伸縮:在集群中插入或刪除某個節(jié)點(diǎn) 集群故障轉(zhuǎn)移

    2024年02月04日
    瀏覽(28)
  • Redis-持久化、主從集群、哨兵模式、分片集群、分布式緩存

    Redis-持久化、主從集群、哨兵模式、分片集群、分布式緩存

    數(shù)據(jù)丟失問題 : Redis是內(nèi)存存儲,服務(wù)重啟可能會丟失數(shù)據(jù) 解決方案:利用Redis數(shù)據(jù)持久化,將數(shù)據(jù)寫入磁盤 并發(fā)能力問題 : 單節(jié)點(diǎn)Redis并發(fā)能力雖然不錯,單也無法滿足如618這樣的高并發(fā)場景 解決方案:搭建主從集群,實(shí)現(xiàn)讀寫分離 故障恢復(fù)問題 : 如果Redis宕機(jī),則服

    2024年02月16日
    瀏覽(23)
  • Redis分布式緩存方案

    Redis分布式緩存方案

    數(shù)據(jù)丟失:數(shù)據(jù)持久化 并發(fā)能力弱:搭建主從集群,實(shí)現(xiàn)讀寫分離 故障恢復(fù)問題:哨兵實(shí)現(xiàn)健康檢測,自動恢復(fù) 存儲能力:搭建分片集群,利用插槽機(jī)制實(shí)現(xiàn)動態(tài)擴(kuò)容 RDB持久化 數(shù)據(jù)庫備份文件,也叫快照,把內(nèi)存數(shù)據(jù)存到磁盤。使用save進(jìn)行主動RDB,會阻塞所有命令。建議

    2023年04月25日
    瀏覽(17)
  • Redis分布式緩存部署方案詳解

    高可用性 :分布式部署可以避免單點(diǎn)故障,提高系統(tǒng)的可用性。 高性能 :分布式部署可以通過增加節(jié)點(diǎn)數(shù)量來提高系統(tǒng)的吞吐量和響應(yīng)速度。 易于擴(kuò)展 :分布式部署可以方便地?cái)U(kuò)展系統(tǒng)的容量和性能,只需添加新節(jié)點(diǎn)即可。 Redis的分布式部署有多種方式,例如主從復(fù)制、

    2024年02月07日
    瀏覽(34)
  • 微服務(wù)07-分布式緩存

    微服務(wù)07-分布式緩存

    前提: 單機(jī)的Redis存在四大問題: 解決辦法:基于Redis集群解決單機(jī)Redis存在的問題 Redis 具有持久化功能,其會按照設(shè)置以 快照 或 操作日志 的形式將數(shù)據(jù)持久化到磁盤。 Redis有兩種持久化方案: RDB持久化 AOF持久化 注意: RDB 是默認(rèn)持久化方式,但 Redis 允許 RDB 與 AOF 兩種

    2024年02月12日
    瀏覽(22)
  • 分布式、集群、微服務(wù)

    分布式是以縮短單個任務(wù)的執(zhí)行時間來提升效率的;而集群則是通過提高單位時間內(nèi)執(zhí)行的任務(wù)數(shù)來提升效率。 分布式是指將不同的業(yè)務(wù)分布在不同的地方。 集群指的是將幾臺服務(wù)器集中在一起,實(shí)現(xiàn)同一業(yè)務(wù)。 分布式中的每一個節(jié)點(diǎn),都可以做集群。而集群并不一定就是

    2024年02月08日
    瀏覽(89)
  • 分布式集群下WebSocket Session共享解決方案

    接上一篇 SpringBoot集成WebSocket進(jìn)行消息主動推送 分布式集群下WebSocket Session共享解決方案 在實(shí)現(xiàn)中需要解決的類變量有兩個 其中online可以用Redis實(shí)現(xiàn)存儲 Session無法采用Redis進(jìn)行存儲, 因?yàn)椴荒軐ession進(jìn)行序列化 由于session無法實(shí)現(xiàn)序列化,不能存儲到redis這些中間存儲里面,

    2024年02月12日
    瀏覽(24)
  • 微服務(wù)中間件-分布式緩存Redis

    微服務(wù)中間件-分布式緩存Redis

    – 基于Redis集群解決單機(jī)Redis存在的問題 單機(jī)的Redis存在四大問題: 1.數(shù)據(jù)丟失問題: Redis是內(nèi)存存儲,服務(wù)重啟可能會丟失數(shù)據(jù) 2.并發(fā)能力問題: 單節(jié)點(diǎn)Redis并發(fā)能力雖然不錯,但也無法滿足如618這樣的高并發(fā)場景 3.故障恢復(fù)問題: 如果Redis宕機(jī),則服務(wù)不可用,需要一種自動

    2024年02月12日
    瀏覽(21)
  • 分布式軟件架構(gòu)——服務(wù)端緩存的三種屬性

    分布式軟件架構(gòu)——服務(wù)端緩存的三種屬性

    在透明多級分流系統(tǒng)中,我們以流量從客戶端中發(fā)出開始,以流量到達(dá)服務(wù)器集群中真正處理業(yè)務(wù)的節(jié)點(diǎn)結(jié)束。一起探索了在這個過程中與業(yè)務(wù)無關(guān)的一些通用組件,包括DNS、CDN、客戶端緩存,等等。 實(shí)際上,服務(wù)端緩存也是一種通用的技術(shù)組件,它主要用于減少多個客戶端

    2024年02月07日
    瀏覽(84)
  • 【業(yè)務(wù)功能篇87】微服務(wù)-springcloud-本地緩存-redis-分布式緩存-緩存穿透-雪崩-擊穿

    【業(yè)務(wù)功能篇87】微服務(wù)-springcloud-本地緩存-redis-分布式緩存-緩存穿透-雪崩-擊穿

    ??緩存的作用是減低對數(shù)據(jù)源的訪問頻率。從而提高我們系統(tǒng)的性能。 緩存的流程圖 2.1 本地緩存 ??其實(shí)就是把緩存數(shù)據(jù)存儲在內(nèi)存中(Map String,Object ).在單體架構(gòu)中肯定沒有問題。 單體架構(gòu)下的緩存處理 2.2 分布式緩存 ??在分布式環(huán)境下,我們原來的本地緩存就不是

    2024年02月10日
    瀏覽(31)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包