Redis 是一個(gè)開(kāi)源(BSD許可)的,內(nèi)存中的數(shù)據(jù)結(jié)構(gòu)存儲(chǔ)系統(tǒng),它可以用作數(shù)據(jù)庫(kù)、緩存和消息中間件。那么redis的底層是如何來(lái)存儲(chǔ)數(shù)據(jù)的呢?
一、redis如何在存儲(chǔ)大量的key時(shí)候,查詢速度還能接近O(1)呢?
查詢速度接近O(1)的數(shù)據(jù)結(jié)構(gòu)通常讓我們想到的就是HashMap結(jié)構(gòu),那下面我從源碼來(lái)追蹤下redis到底是不是使用的HashMap結(jié)構(gòu)呢?生成的全局hashTable的大小為4
redis的數(shù)據(jù)最外層的結(jié)構(gòu)是redisDb(server.h文件) ,其定義如下:
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id; /* Database ID */
long long avg_ttl; /* Average TTL, just for stats */
unsigned long expires_cursor; /* Cursor of the active expire cycle. */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;
從上面定義我們可以看出redisDb 的保存數(shù)據(jù)的結(jié)構(gòu)是dict(dict.h),那么我們從文件中獲取
typedef struct dict {
dictType *type;
void *privdata;
dictht ht[2];
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */
} dict;
/* This is our hash table structure. Every dictionary has two of this as we
* implement incremental rehashing, for the old to the new table. */
typedef struct dictht {
dictEntry **table;
unsigned long size;
unsigned long sizemask;
unsigned long used;
} dictht;
dict 包含了兩個(gè)hash表(dictht ht[2]),這里使用兩個(gè)hash表就是為了后續(xù)給漸進(jìn)式rehash來(lái)進(jìn)行服務(wù)的.屬性rehashidx == -1時(shí)候代表不是處于reshaing中。
dictht 就一個(gè)hashtable,其包含dictEntry 的數(shù)組。然后我們繼續(xù)看下
typedef struct dictEntry {
void *key;
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
struct dictEntry *next;
} dictEntry;
dictEntry 的就是hash表中的一個(gè)鍵值對(duì),那么根據(jù)上面的代碼我們可以繪出redis中內(nèi)存結(jié)構(gòu)圖。
redis的rehash過(guò)程怎么處理呢?
隨著redis中key的數(shù)據(jù)量增多,隨著key的增多,那么dictEntry 連越來(lái)越長(zhǎng),這個(gè)時(shí)候查詢出來(lái)的性能將會(huì)越來(lái)越慢。這個(gè)時(shí)候就需要對(duì)hashTable進(jìn)行擴(kuò)容,在數(shù)據(jù)量大的時(shí)候如果等到所有的擴(kuò)容完成,那么必然會(huì)導(dǎo)致redis長(zhǎng)時(shí)間等待,那么這個(gè)時(shí)候我們就采用漸進(jìn)式rehash方式來(lái)進(jìn)行擴(kuò)容。
什么是漸進(jìn)式rehash呢?
Redis 默認(rèn)使用了兩個(gè)全局哈希表:dictht[0]和哈希表 dictht[1],一開(kāi)始,當(dāng)你剛插入數(shù)據(jù)時(shí),默認(rèn)使用dictht[0],此時(shí)的dictht[1] 并沒(méi)有被分配空間。隨著數(shù)據(jù)逐步增多,Redis 開(kāi)始執(zhí)行 rehash,這個(gè)過(guò)程分為三步:
1、給dictht[1]分配更大的空間,一般是當(dāng)前dictht[0]已使用大小的2倍,但是必須滿足是2的冪次倍!
2、把哈希表0 中的數(shù)據(jù)重新映射并拷貝到哈希表1 中(在hash表1下進(jìn)行重新計(jì)算hash值);
3、釋放哈希表 0 的空間
4、把dictht[0]指向剛剛創(chuàng)建好的dictht[1]
什么時(shí)候進(jìn)行hash
- 1、在沒(méi)有fork子進(jìn)程進(jìn)行RDS或者AOF數(shù)據(jù)備份的時(shí)候且ht[0] .used >= ht[0].size時(shí)
- 2、 在有fork子進(jìn)程進(jìn)行RDS或者AOF數(shù)據(jù)備份的時(shí)候且ht[0] .used > ht[0].size * 5時(shí)
擴(kuò)容,肯定是在添加數(shù)據(jù)的時(shí)候才會(huì)擴(kuò)容,所以我們找一個(gè)添加數(shù)據(jù)的入口,我們從源碼層面進(jìn)行下驗(yàn)證:
int dictReplace(dict *d, void *key, void *val)
{
dictEntry *entry, *existing, auxentry;
/* Try to add the element. If the key
* does not exists dictAdd will succeed. */
entry = dictAddRaw(d,key,&existing);
if (entry) {
dictSetVal(d, entry, val);
return 1;
}
/* Set the new value and free the old one. Note that it is important
* to do that in this order, as the value may just be exactly the same
* as the previous one. In this context, think to reference counting,
* you want to increment (set), and then decrement (free), and not the
* reverse. */
auxentry = *existing;
dictSetVal(d, existing, val);
dictFreeVal(d, &auxentry);
return 0;
}
然后繼續(xù)查看dictAddRaw方法
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
long index;
dictEntry *entry;
dictht *ht;
if (dictIsRehashing(d)) _dictRehashStep(d);
/* Get the index of the new element, or -1 if
* the element already exists. */
if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
return NULL;
/* Allocate the memory and store the new entry.
* Insert the element in top, with the assumption that in a database
* system it is more likely that recently added entries are accessed
* more frequently. */
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
entry = zmalloc(sizeof(*entry));
entry->next = ht->table[index];
ht->table[index] = entry;
ht->used++;
/* Set the hash entry fields. */
dictSetKey(d, entry, key);
return entry;
}
然后繼續(xù)往下看_dictKeyIndex方法
static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing)
{
unsigned long idx, table;
dictEntry *he;
if (existing) *existing = NULL;
/* Expand the hash table if needed */
if (_dictExpandIfNeeded(d) == DICT_ERR)
return -1;
for (table = 0; table <= 1; table++) {
idx = hash & d->ht[table].sizemask;
/* Search if this slot does not already contain the given key */
he = d->ht[table].table[idx];
while(he) {
if (key==he->key || dictCompareKeys(d, key, he->key)) {
if (existing) *existing = he;
return -1;
}
he = he->next;
}
if (!dictIsRehashing(d)) break;
}
return idx;
}
從上面代碼注釋可以看出來(lái),_dictExpandIfNeeded就是用來(lái)進(jìn)行擴(kuò)容的
/* Expand the hash table if needed */
static int _dictExpandIfNeeded(dict *d)
{
/* Incremental rehashing already in progress. Return. */
if (dictIsRehashing(d)) return DICT_OK;
/* If the hash table is empty expand it to the initial size. */
if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);
/* If we reached the 1:1 ratio, and we are allowed to resize the hash
* table (global setting) or we should avoid it but the ratio between
* elements/buckets is over the "safe" threshold, we resize doubling
* the number of buckets. */
if (!dictTypeExpandAllowed(d))
return DICT_OK;
if ((dict_can_resize == DICT_RESIZE_ENABLE &&
d->ht[0].used >= d->ht[0].size) ||
(dict_can_resize != DICT_RESIZE_FORBID &&
d->ht[0].used / d->ht[0].size > dict_force_resize_ratio))
{
return dictExpand(d, d->ht[0].used + 1);
}
return DICT_OK;
}
- 1、在hashtable擴(kuò)容的時(shí)候,如果正在擴(kuò)容的時(shí)將不會(huì)出發(fā)擴(kuò)容操作
- 2、DICT_HT_INITIAL_SIZE的大小為4,即默認(rèn)創(chuàng)建的hashtable大小為4
- 3、dict_force_resize_ratio的值為5
*這里需要關(guān)注dict_can_resize 這個(gè)字段什么時(shí)候被賦值了,
如何進(jìn)行擴(kuò)容?
對(duì)hashtable真正擴(kuò)容的方法是dictExpand(d, d->ht[0].used + 1)
/* return DICT_ERR if expand was not performed */
int dictExpand(dict *d, unsigned long size) {
return _dictExpand(d, size, NULL);
}
int _dictExpand(dict *d, unsigned long size, int* malloc_failed)
{
if (malloc_failed) *malloc_failed = 0;
/* the size is invalid if it is smaller than the number of
* elements already inside the hash table */
if (dictIsRehashing(d) || d->ht[0].used > size)
return DICT_ERR;
dictht n; /* the new hash table */
unsigned long realsize = _dictNextPower(size);
/* Detect overflows */
if (realsize < size || realsize * sizeof(dictEntry*) < realsize)
return DICT_ERR;
/* Rehashing to the same table size is not useful. */
if (realsize == d->ht[0].size) return DICT_ERR;
/* Allocate the new hash table and initialize all pointers to NULL */
n.size = realsize;
n.sizemask = realsize-1;
if (malloc_failed) {
n.table = ztrycalloc(realsize*sizeof(dictEntry*));
*malloc_failed = n.table == NULL;
if (*malloc_failed)
return DICT_ERR;
} else
n.table = zcalloc(realsize*sizeof(dictEntry*));
n.used = 0;
/* Is this the first initialization? If so it's not really a rehashing
* we just set the first hash table so that it can accept keys. */
if (d->ht[0].table == NULL) {
d->ht[0] = n;
return DICT_OK;
}
/* Prepare a second hash table for incremental rehashing */
d->ht[1] = n;
d->rehashidx = 0;
return DICT_OK;
}
1、先定義一個(gè)新的全局表,大小2^2 到 2的n次冪跟size來(lái)進(jìn)行比較,取第一次滿足的時(shí)候的條件,_dictNextPower(size)的代碼如下:
while(1) {
if (i >= size)
return i;
i *= 2;
}
2、設(shè)置dictht 的size等于剛剛計(jì)算好的realSize,掩碼等于realsize-1
3、給dictht 的table分配地址和做初始化操作
4、接下來(lái)就判斷ht[0].table是否為null,如果為null說(shuō)明是第一次進(jìn)行初始存放數(shù)據(jù),而不是真正的進(jìn)行rehash。此時(shí)只需要將ht[0] = n,即把剛剛創(chuàng)建的全局hashtable賦值給ht[0]就可以了
5、如果不是那么把剛剛創(chuàng)建的全局hashtable賦值給ht[1],然后dict對(duì)應(yīng)的rehashidx 值修改為0
至此我們完成了hash表的擴(kuò)容
那redis的數(shù)據(jù)如何進(jìn)行遷移的
答案就是我們剛剛說(shuō)到的使用漸進(jìn)式rehash方法,那具體是如何做的?
假如一次性把數(shù)據(jù)遷移會(huì)很耗時(shí)間,會(huì)讓單條指令等待很久很久,會(huì)形成阻塞。所以,Redis采用的是漸進(jìn)式Rehash,所謂漸進(jìn)式,就是慢慢的,不會(huì)一次性把所有數(shù)據(jù)遷移。
那什么時(shí)候會(huì)進(jìn)行漸進(jìn)式數(shù)據(jù)遷移?
1.每次進(jìn)行key的crud操作都會(huì)進(jìn)行一個(gè)hash桶的數(shù)據(jù)遷移
2.定時(shí)任務(wù),進(jìn)行部分?jǐn)?shù)據(jù)遷移文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-487216.html
- 執(zhí)行crud時(shí)候?qū)?shù)據(jù)的操作,進(jìn)行rehash操作
- 定時(shí)任務(wù)執(zhí)行
源碼來(lái)源于server.c
/* This function handles 'background' operations we are required to do
* incrementally in Redis databases, such as active key expiring, resizing,
* rehashing. */
void databasesCron(void) {
/* Expire keys by random sampling. Not required for slaves
* as master will synthesize DELs for us. */
if (server.active_expire_enabled) {
if (iAmMaster()) {
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
} else {
expireSlaveKeys();
}
}
/* Defrag keys gradually. */
activeDefragCycle();
/* Perform hash tables rehashing if needed, but only if there are no
* other processes saving the DB on disk. Otherwise rehashing is bad
* as will cause a lot of copy-on-write of memory pages. */
if (!hasActiveChildProcess()) {
/* We use global counters so if we stop the computation at a given
* DB we'll be able to start from the successive in the next
* cron loop iteration. */
static unsigned int resize_db = 0;
static unsigned int rehash_db = 0;
int dbs_per_call = CRON_DBS_PER_CALL;
int j;
/* Don't test more DBs than we have. */
if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;
/* Resize */
for (j = 0; j < dbs_per_call; j++) {
tryResizeHashTables(resize_db % server.dbnum);
resize_db++;
}
/* Rehash */
if (server.activerehashing) {
for (j = 0; j < dbs_per_call; j++) {
int work_done = incrementallyRehash(rehash_db);
if (work_done) {
/* If the function did some work, stop here, we'll do
* more at the next cron loop. */
break;
} else {
/* If this db didn't need rehash, we'll try the next one. */
rehash_db++;
rehash_db %= server.dbnum;
}
}
}
}
}
那接下來(lái)我們真正看下真正執(zhí)行rehash操作的方法:文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-487216.html
static void _dictRehashStep(dict *d) {
if (d->pauserehash == 0) dictRehash(d,1);
}
/* Performs N steps of incremental rehashing. Returns 1 if there are still
* keys to move from the old to the new hash table, otherwise 0 is returned.
*
* Note that a rehashing step consists in moving a bucket (that may have more
* than one key as we use chaining) from the old to the new hash table, however
* since part of the hash table may be composed of empty spaces, it is not
* guaranteed that this function will rehash even a single bucket, since it
* will visit at max N*10 empty buckets in total, otherwise the amount of
* work it does would be unbound and the function may block for a long time. */
int dictRehash(dict *d, int n) {
int empty_visits = n*10; /* Max number of empty buckets to visit. */
if (dict_can_resize == DICT_RESIZE_FORBID || !dictIsRehashing(d)) return 0;
if (dict_can_resize == DICT_RESIZE_AVOID &&
(d->ht[1].size / d->ht[0].size < dict_force_resize_ratio))
{
return 0;
}
while(n-- && d->ht[0].used != 0) {
dictEntry *de, *nextde;
/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
assert(d->ht[0].size > (unsigned long)d->rehashidx);
while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
}
de = d->ht[0].table[d->rehashidx];
/* Move all the keys in this bucket from the old to the new hash HT */
while(de) {
uint64_t h;
nextde = de->next;
/* Get the index in the new hash table */
h = dictHashKey(d, de->key) & d->ht[1].sizemask;
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;
d->ht[0].used--;
d->ht[1].used++;
de = nextde;
}
d->ht[0].table[d->rehashidx] = NULL;
d->rehashidx++;
}
/* Check if we already rehashed the whole table... */
if (d->ht[0].used == 0) {
zfree(d->ht[0].table);
d->ht[0] = d->ht[1];
_dictReset(&d->ht[1]);
d->rehashidx = -1;
return 0;
}
/* More to rehash... */
return 1;
}
- 1、基于rehashidx從0開(kāi)始把數(shù)據(jù)從ht[0]轉(zhuǎn)移到ht[1]中
- 2、當(dāng)整個(gè)ht[0]中已使用的數(shù)量為0時(shí),就會(huì)把原來(lái)ht[0]中所占用的內(nèi)存進(jìn)行釋放,然后ht[0]指向ht[1],最后重置rehashIndex為-1
根據(jù)上面的分析我們可以知道rehash過(guò)程中,在redis中設(shè)置值的操作大致如下:
到了這里,關(guān)于redis到底是怎么樣進(jìn)行漸進(jìn)式Rehash的的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!