高并發(fā)緩存實戰(zhàn)RedisSon、性能優(yōu)化
分布式鎖性能提升
1.數(shù)據(jù)冷熱分離
對于經(jīng)常訪問的數(shù)據(jù)保留在redis緩存當(dāng)中,不用帶數(shù)據(jù)設(shè)置超時時間定期刪除控制redis的大小
String productStr = redisUtil.get(productCacheKey);
if (!StringUtils.isEmpty(productStr)) {
product = JSON.parseObject(productStr, Product.class);
redisUtil.expire(productCacheKey, 30000, TimeUnit.SECONDS); //讀延期
}
2.緩存擊穿(失效)
緩存擊穿數(shù)據(jù)庫沒有被擊穿
redisUtil.expire(productCacheKey, 30000, TimeUnit.SECONDS);
如果商家是批量導(dǎo)入的數(shù)據(jù),呢么就會同時存到redis中,設(shè)置固定的時間就會導(dǎo)致緩存在一瞬間失效,用戶訪問不到就會將流量打到數(shù)據(jù)庫上造成數(shù)據(jù)庫段時間內(nèi)抖動。
解決辦法:
redisUtil.set(productCacheKey, EMPTY_CACHE, genEmptyCacheTimeout(), TimeUnit.SECONDS);
private Integer genProductCacheTimeout() {
//過期時間是不一樣的
return PRODUCT_CACHE_TIMEOUT + new Random().nextInt(5) * 60 * 60;
}
3.緩存穿透
緩存和數(shù)據(jù)庫都沒有結(jié)果
緩存穿透是指查詢一個一定不存在的數(shù)據(jù),由于緩存不命中,導(dǎo)致請求直接訪問數(shù)據(jù)庫或后端服務(wù),從而影響系統(tǒng)性能甚至崩潰。其發(fā)生的原因可能是惡意攻擊、不合理的業(yè)務(wù)邏輯和數(shù)據(jù)分布等。
例如,一個ID為負(fù)數(shù)或者非法字符的請求,即使被緩存,也是無效的,每次都需要訪問數(shù)據(jù)庫,造成了服務(wù)器資源的浪費。如果遇到大量的這種請求,就有可能導(dǎo)致緩存失效,直接訪問數(shù)據(jù)庫,甚至造成服務(wù)器癱瘓。
為了防止緩存穿透,我們可以采取以下措施:
- 在業(yè)務(wù)層面對惡意攻擊進(jìn)行限制,如合法字符過濾、請求頻率限制等。
- 對于查詢結(jié)果為空的情況,在緩存中設(shè)置一個空對象。
- 使用布隆過濾器(Bloom Filter)等技術(shù)來預(yù)先過濾掉不合法的請求,減輕數(shù)據(jù)庫的壓力。
- 將熱點數(shù)據(jù)(頻繁訪問的數(shù)據(jù))預(yù)先加載到緩存中,以提高緩存命中率。
- 設(shè)置合理的緩存過期時間,防止緩存中一直存在無效數(shù)據(jù)。
通過以上措施,可以有效地減少緩存穿透問題的發(fā)生,提高系統(tǒng)的性能和穩(wěn)定性。
3.1對于查詢結(jié)果為空的情況,在緩存中設(shè)置一個空對象
對于數(shù)據(jù)庫也無法訪問到數(shù)據(jù)的,首次訪問后設(shè)置“{}”到緩存中防止每次都訪問數(shù)據(jù)庫增加數(shù)據(jù)庫的io壓力,第二次直接可以命中緩存,針對黑客使用多個商品id對redis進(jìn)行攻擊的狀況,我們可以設(shè)置一個超時時間并延期,時間設(shè)置60-90s,防止大量的id撐大redis的存儲
product = getProductFromCache(productCacheKey);
if (product != null) {
return product;
}
if (product != null) {
redisUtil.set(productCacheKey, JSON.toJSONString(product),
genProductCacheTimeout(), TimeUnit.SECONDS);
productMap.put(productCacheKey, product);
} else {
redisUtil.set(productCacheKey, EMPTY_CACHE, genEmptyCacheTimeout(), TimeUnit.SECONDS);
}
private Integer genEmptyCacheTimeout() {
return 60 + new Random().nextInt(30);
}
4.DCL解決突發(fā)熱點的新增緩存
product = getProductFromCache(productCacheKey);
if (product != null) {
return product;
}
//DCL
RLock hotCacheLock = redisson.getLock(LOCK_PRODUCT_HOT_CACHE_PREFIX + productId);
hotCacheLock.lock();
//boolean result = hotCacheLock.tryLock(3, TimeUnit.SECONDS);
try {
product = getProductFromCache(productCacheKey);
if (product != null) {
return product;
}
try {
product = productDao.get(productId);
if (product != null) {
redisUtil.set(productCacheKey, JSON.toJSONString(product),
genProductCacheTimeout(), TimeUnit.SECONDS);
productMap.put(productCacheKey, product);
} else {
redisUtil.set(productCacheKey, EMPTY_CACHE, genEmptyCacheTimeout(), TimeUnit.SECONDS);
}
} finally {
rLock.unlock();
}
} finally {
hotCacheLock.unlock();
}
不能使用synchronized,原因:
1.只在單節(jié)點的jvm中生效,每個節(jié)點都會新增一次(非主要問題)
2.this會鎖住這個類,當(dāng)product0001需要加鎖新建緩存的時候,product0001的所有進(jìn)程都必須要等待,這沒有問題,但是product0003、product0004。。。也都需要等待,這就導(dǎo)致了業(yè)務(wù)被阻塞了,效率低下
所以需要分布式鎖
5.緩存數(shù)據(jù)庫雙些不一致
product = getProductFromCache(productCacheKey);
if (product != null) {
return product;
}
//DCL
RLock hotCacheLock = redisson.getLock(LOCK_PRODUCT_HOT_CACHE_PREFIX + productId);
hotCacheLock.lock();
//boolean result = hotCacheLock.tryLock(3, TimeUnit.SECONDS);
try {
product = getProductFromCache(productCacheKey);
if (product != null) {
return product;
}
RLock updateProductLock = redisson.getLock(LOCK_PRODUCT_UPDATE_PREFIX + productId);
//synchronized (this) 不是分布式的只能鎖住當(dāng)前jvm進(jìn)程
//synchronized (this){
try {
product = productDao.get(productId);
if (product != null) {
redisUtil.set(productCacheKey, JSON.toJSONString(product),
genProductCacheTimeout(), TimeUnit.SECONDS);
productMap.put(productCacheKey, product);
} else {
redisUtil.set(productCacheKey, EMPTY_CACHE, genEmptyCacheTimeout(), TimeUnit.SECONDS);
}
} finally {
rLock.unlock();
}
} finally {
hotCacheLock.unlock();
}
加鎖限制
控制查和寫的中間不能插入執(zhí)行其他的邏輯,在查詢和修改的代碼中都需要加鎖
6.代碼的復(fù)雜度
其實大部分情況下只有很小一塊會被執(zhí)行,大部分代碼塊是在完善各種邏輯,但是也是在取舍尋找一種最合適的方案
7.鎖優(yōu)化
1.分段鎖
庫存1000 則可分為 produc_1 - produc_10 10個線程可以同時執(zhí)行這段邏輯
2.讀寫鎖
@Transactional
public Product update(Product product) {
Product productResult = null;
//RLock updateProductLock = redisson.getLock(LOCK_PRODUCT_UPDATE_PREFIX + product.getId());
RReadWriteLock readWriteLock = redisson.getReadWriteLock(LOCK_PRODUCT_UPDATE_PREFIX + product.getId());
RLock writeLock = readWriteLock.writeLock();
writeLock.lock();
try {
productResult = productDao.update(product);
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getId(), JSON.toJSONString(productResult),
genProductCacheTimeout(), TimeUnit.SECONDS);
productMap.put(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getId(), product);
} finally {
writeLock.unlock();
}
return productResult;
}
public Product get(Long productId) throws InterruptedException {
Product product = null;
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
product = getProductFromCache(productCacheKey);
if (product != null) {
return product;
}
//DCL 加分布式鎖解決熱點緩存并發(fā)重建問題
RLock hotCacheLock = redisson.getLock(LOCK_PRODUCT_HOT_CACHE_PREFIX + productId);
hotCacheLock.lock();
//boolean result = hotCacheLock.tryLock(3, TimeUnit.SECONDS);
try {
product = getProductFromCache(productCacheKey);
if (product != null) {
return product;
}
//RLock updateProductLock = redisson.getLock(LOCK_PRODUCT_UPDATE_PREFIX + productId);
RReadWriteLock readWriteLock = redisson.getReadWriteLock(LOCK_PRODUCT_UPDATE_PREFIX + productId);
RLock rLock = readWriteLock.readLock();
rLock.lock();
//synchronized (this) 不是分布式的只能鎖住當(dāng)前jvm進(jìn)程
//synchronized (this){
try {
product = productDao.get(productId);
if (product != null) {
redisUtil.set(productCacheKey, JSON.toJSONString(product),
genProductCacheTimeout(), TimeUnit.SECONDS);
productMap.put(productCacheKey, product);
} else {
redisUtil.set(productCacheKey, EMPTY_CACHE, genEmptyCacheTimeout(), TimeUnit.SECONDS);
}
} finally {
rLock.unlock();
}
} finally {
hotCacheLock.unlock();
}
return product;
}
對同一個id的商品加上同一把鎖,讀操作加讀鎖 ,寫操作加鞋鎖,讀鎖遇到讀鎖不會阻斷
讀寫鎖是一種用于并發(fā)編程中的同步機制。與普通鎖相比,讀寫鎖允許多個線程同時讀取共享資源,但只允許一個線程寫入共享資源。這樣就可以提高并發(fā)性和系統(tǒng)吞吐量。
讀寫鎖通常由讀鎖和寫鎖兩部分組成。當(dāng)一個線程想要讀取共享資源時,它必須獲取讀鎖,并且當(dāng)沒有寫線程占用鎖時,可以同時有多個讀線程同時獲取讀鎖。當(dāng)一個線程想要修改共享資源時,它必須獲取寫鎖,而不允許其他任何讀或?qū)懢€程獲取鎖,直到它釋放寫鎖。
使用讀寫鎖可以有效地減少鎖競爭,提高程序性能,特別是在多讀少寫的情況下。但是,過度使用讀寫鎖也會導(dǎo)致一些問題,例如讀線程饑餓、寫線程優(yōu)先級過低等。因此,在使用時需要謹(jǐn)慎考慮鎖的使用場景和鎖的粒度。
- 讀操作時,每個線程可以直接獲取鎖,不需要等待其他線程釋放讀鎖,因為讀操作是并發(fā)執(zhí)行的,不會破壞數(shù)據(jù)的完整性。
- 寫操作時,只有一個線程可以獲取寫鎖,其他線程需要等待寫鎖釋放后才能繼續(xù)執(zhí)行。Redisson中通過分布式鎖的方式來實現(xiàn)寫鎖的互斥執(zhí)行。當(dāng)一個線程請求獲取寫鎖時,Redisson會在Redis服務(wù)器上創(chuàng)建一個對應(yīng)的分布式鎖,只有該線程能夠成功獲取該分布式鎖,其他線程則需要等待。
@Override
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('hset', KEYS[1], 'mode', 'read'); " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('set', KEYS[2] .. ':1', 1); " +
"redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
"local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"local key = KEYS[2] .. ':' .. ind;" +
"redis.call('set', key, 1); " +
"redis.call('pexpire', key, ARGV[1]); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end;" +
"return redis.call('pttl', KEYS[1]);",
Arrays.<Object>asList(getName(), getReadWriteTimeoutNamePrefix(threadId)),
internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId));
}
-
首先獲取指定鍵(KEYS[1])的值,并獲取該鍵對應(yīng)哈希表中mode字段的值,判斷是否存在。如果不存在,則設(shè)置mode為read,表明當(dāng)前鎖狀態(tài)為讀鎖;并將當(dāng)前線程加入到指定鍵對應(yīng)的哈希表中,并創(chuàng)建用于記錄當(dāng)前線程的計數(shù)器的哈希表,并設(shè)置過期時間(ARGV[1])。最后返回nil,表示當(dāng)前線程成功獲取讀鎖。
-
如果mode字段的值為read,或者為write并且當(dāng)前線程已經(jīng)獲取了該鎖,則將哈希表中該線程的計數(shù)器加1,并在單獨的哈希表中創(chuàng)建一個記錄當(dāng)前線程持有讀鎖的鍵值對,同樣設(shè)置過期時間。最后返回nil,表示當(dāng)前線程成功續(xù)訂讀鎖。否則,等待其他線程釋放讀鎖。
-
如果mode字段的值為write但是當(dāng)前線程沒有獲取該鎖,則表明當(dāng)前鎖狀態(tài)為寫鎖,不能獲取讀鎖。此時返回當(dāng)前鎖的剩余過期時間(即當(dāng)前線程等待獲取鎖的時間),以便客戶端等待。
@Override <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "local mode = redis.call('hget', KEYS[1], 'mode'); " + "if (mode == false) then " + "redis.call('hset', KEYS[1], 'mode', 'write'); " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (mode == 'write') then " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "local currentExpire = redis.call('pttl', KEYS[1]); " + "redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " + "return nil; " + "end; " + "end;" + "return redis.call('pttl', KEYS[1]);", Arrays.<Object>asList(getName()), internalLockLeaseTime, getLockName(threadId)); }
-
首先獲取指定鍵(KEYS[1])的值,并獲取該鍵對應(yīng)哈希表中mode字段的值,判斷是否存在。如果不存在,則設(shè)置mode為write,表明當(dāng)前鎖狀態(tài)為寫鎖;并將當(dāng)前線程加入到指定鍵對應(yīng)的哈希表中,并設(shè)置過期時間(ARGV[1])。最后返回nil,表示當(dāng)前線程成功獲取寫鎖。
-
如果mode字段的值為write,則說明當(dāng)前鎖狀態(tài)為寫鎖,需要判斷當(dāng)前線程是否已經(jīng)獲取了該鎖。如果已經(jīng)獲取了該鎖,則將哈希表中該線程的計數(shù)器加1,并更新鎖的過期時間;最后返回nil,表示當(dāng)前線程成功續(xù)訂寫鎖。否則,等待其他線程釋放寫鎖。
-
如果mode字段的值為非write,則說明當(dāng)前鎖狀態(tài)為讀鎖,不能獲取寫鎖。此時返回當(dāng)前鎖的剩余過期時間(即當(dāng)前線程等待獲取鎖的時間),以便客戶端等待。
后續(xù)需要研究讀寫鎖的性能問題
3.串行轉(zhuǎn)并發(fā)
RLock hotCacheLock = redisson.getLock(LOCK_PRODUCT_HOT_CACHE_PREFIX + productId);
hotCacheLock.lock();
//boolean result = hotCacheLock.tryLock(3, TimeUnit.SECONDS);
等待1s之后,鎖返回false,如果緩存此時已經(jīng)將需要的數(shù)據(jù)加載到緩存當(dāng)中則可以直接命中緩存返回
但是如果1s內(nèi)沒有完成數(shù)據(jù)的加載會導(dǎo)致緩存擊穿
8.redis多級緩存
一個key會存在redis的一個單節(jié)點上,一個redis節(jié)點優(yōu)化后可抗10萬并發(fā),會導(dǎo)致各個微服務(wù),功能出現(xiàn)問題,導(dǎo)致大規(guī)模的緩存問題,
可以進(jìn)行限流、降級控制,如果限流出現(xiàn)問題,代碼也可以做響應(yīng)的處理,—多級緩存
JVM的抗并發(fā)100萬級別,需要不同節(jié)點同步(隊列) 會出現(xiàn)段時間的數(shù)據(jù)不一致
Redis多級緩存指的是將Redis和其他緩存系統(tǒng)(如本地內(nèi)存緩存、Memcached等)結(jié)合使用,實現(xiàn)分層緩存的機制。多級緩存通常由兩個或多個層次組成:
- 一級緩存:本地內(nèi)存緩存,主要用于緩存熱點數(shù)據(jù),可以快速響應(yīng)客戶端請求,減輕Redis服務(wù)器的壓力。
- 二級緩存:Redis緩存,主要用于持久化緩存數(shù)據(jù),保證數(shù)據(jù)的可靠性,并將數(shù)據(jù)分布在多個節(jié)點上,提高緩存的并發(fā)訪問能力。
使用多級緩存的好處在于可以充分利用各個緩存的優(yōu)勢,快速響應(yīng)客戶端請求,提高訪問效率。具體操作步驟如下:
- 對于讀取數(shù)據(jù)的請求,先從本地內(nèi)存緩存中讀取數(shù)據(jù),如果沒有則從Redis緩存中讀取,并更新本地內(nèi)存緩存,以便下一次請求時直接從本地內(nèi)存緩存中獲取。
- 對于寫入數(shù)據(jù)的請求,先更新本地內(nèi)存緩存,再同步更新Redis緩存中的數(shù)據(jù)。
需要注意的是,在使用多級緩存的過程中,需要保證緩存數(shù)據(jù)的一致性和可靠性。當(dāng)二級緩存中的數(shù)據(jù)發(fā)生變化時,需要及時更新一級緩存中的數(shù)據(jù),否則會導(dǎo)致數(shù)據(jù)不一致。此外,需要考慮數(shù)據(jù)的失效策略和緩存節(jié)點選擇的問題,避免緩存雪崩或緩存穿透等問題的發(fā)生。文章來源:http://www.zghlxwxcb.cn/news/detail-641805.html
對于這種熱點中的熱點商品,需要一個專門的系統(tǒng)進(jìn)行實時的維護(hù)文章來源地址http://www.zghlxwxcb.cn/news/detail-641805.html
9.完整代碼
@Service
public class ProductService {
@Autowired
private ProductDao productDao;
@Autowired
private RedisUtil redisUtil;
@Autowired
private Redisson redisson;
public static final Integer PRODUCT_CACHE_TIMEOUT = 60 * 60 * 24;
public static final String EMPTY_CACHE = "{}";
public static final String LOCK_PRODUCT_HOT_CACHE_PREFIX = "lock:product:hot_cache:";
public static final String LOCK_PRODUCT_UPDATE_PREFIX = "lock:product:update:";
//Map 偽代碼 圖一樂 真實使用多級緩存框架 熱點系統(tǒng)
public static Map<String, Product> productMap = new ConcurrentHashMap<>();
@Transactional
public Product create(Product product) {
Product productResult = productDao.create(product);
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getId(), JSON.toJSONString(productResult),
genProductCacheTimeout(), TimeUnit.SECONDS);
return productResult;
}
@Transactional
public Product update(Product product) {
Product productResult = null;
//RLock updateProductLock = redisson.getLock(LOCK_PRODUCT_UPDATE_PREFIX + product.getId());
RReadWriteLock readWriteLock = redisson.getReadWriteLock(LOCK_PRODUCT_UPDATE_PREFIX + product.getId());
RLock writeLock = readWriteLock.writeLock();
writeLock.lock();
try {
productResult = productDao.update(product);
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getId(), JSON.toJSONString(productResult),
genProductCacheTimeout(), TimeUnit.SECONDS);
productMap.put(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getId(), product);
} finally {
writeLock.unlock();
}
return productResult;
}
public Product get(Long productId) throws InterruptedException {
Product product = null;
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
product = getProductFromCache(productCacheKey);
if (product != null) {
return product;
}
//DCL 加分布式鎖解決熱點緩存并發(fā)重建問題
RLock hotCacheLock = redisson.getLock(LOCK_PRODUCT_HOT_CACHE_PREFIX + productId);
hotCacheLock.lock();
//boolean result = hotCacheLock.tryLock(3, TimeUnit.SECONDS);
try {
product = getProductFromCache(productCacheKey);
if (product != null) {
return product;
}
//RLock updateProductLock = redisson.getLock(LOCK_PRODUCT_UPDATE_PREFIX + productId);
RReadWriteLock readWriteLock = redisson.getReadWriteLock(LOCK_PRODUCT_UPDATE_PREFIX + productId);
RLock rLock = readWriteLock.readLock();
rLock.lock();
//synchronized (this) 不是分布式的只能鎖住當(dāng)前jvm進(jìn)程
//synchronized (this){
try {
product = productDao.get(productId);
if (product != null) {
redisUtil.set(productCacheKey, JSON.toJSONString(product),
genProductCacheTimeout(), TimeUnit.SECONDS);
productMap.put(productCacheKey, product);
} else {
redisUtil.set(productCacheKey, EMPTY_CACHE, genEmptyCacheTimeout(), TimeUnit.SECONDS);
}
} finally {
rLock.unlock();
}
} finally {
hotCacheLock.unlock();
}
return product;
}
private Integer genProductCacheTimeout() {
return PRODUCT_CACHE_TIMEOUT + new Random().nextInt(5) * 60 * 60;
}
public static void main(String[] args) {
System.out.println( new Random().nextInt(5) * 60 * 60);
}
private Integer genEmptyCacheTimeout() {
return 60 + new Random().nextInt(30);
}
private Product getProductFromCache(String productCacheKey) {
Product product = productMap.get(productCacheKey);
if (product != null) {
return product;
}
String productStr = redisUtil.get(productCacheKey);
if (!StringUtils.isEmpty(productStr)) {
if (EMPTY_CACHE.equals(productStr)) {
redisUtil.expire(productCacheKey, genEmptyCacheTimeout(), TimeUnit.SECONDS);
return new Product();
}
product = JSON.parseObject(productStr, Product.class);
redisUtil.expire(productCacheKey, genProductCacheTimeout(), TimeUnit.SECONDS); //讀延期
}
return product;
}
}
到了這里,關(guān)于高并發(fā)緩存實戰(zhàn)RedisSon、性能優(yōu)化的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!