国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

redis實(shí)現(xiàn)分布式延時(shí)隊(duì)列

這篇具有很好參考價(jià)值的文章主要介紹了redis實(shí)現(xiàn)分布式延時(shí)隊(duì)列。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

延時(shí)隊(duì)列簡(jiǎn)介

延時(shí)隊(duì)列是一種特殊的消息隊(duì)列,它允許將消息在一定的延遲時(shí)間后再進(jìn)行消費(fèi)。延時(shí)隊(duì)列的主要特點(diǎn)是可以延遲消息的處理時(shí)間,以滿足定時(shí)任務(wù)或者定時(shí)事件的需求。

總之,延時(shí)隊(duì)列通過(guò)延遲消息的消費(fèi)時(shí)間,提供了一種方便、可靠的方式來(lái)處理定時(shí)任務(wù)和定時(shí)事件。它在分布式系統(tǒng)中具有重要的作用,能夠提高系統(tǒng)的可靠性和性能。

延時(shí)隊(duì)列的實(shí)現(xiàn)方式可以有多種,本文介紹一種redis實(shí)現(xiàn)的分布式延時(shí)隊(duì)列。

應(yīng)用場(chǎng)景

  • 定時(shí)任務(wù):可以將需要在特定時(shí)間執(zhí)行的任務(wù)封裝為延時(shí)消息,通過(guò)延時(shí)隊(duì)列來(lái)觸發(fā)任務(wù)的執(zhí)行。

  • 訂單超時(shí)處理:可以將訂單消息發(fā)送到延時(shí)隊(duì)列中,并設(shè)置訂單的超時(shí)時(shí)間,超過(guò)時(shí)間后,消費(fèi)者從隊(duì)列中獲取到超時(shí)的訂單消息,進(jìn)行相應(yīng)的處理。

  • 消息重試機(jī)制:當(dāng)某個(gè)消息處理失敗時(shí),可以將該消息發(fā)送到延時(shí)隊(duì)列中,并設(shè)置一定的重試時(shí)間,超過(guò)時(shí)間后再次嘗試處理。

案例:

12306火車(chē)票購(gòu)買(mǎi),搶了訂單后,45分鐘沒(méi)有支付,自動(dòng)取消訂單

考慮:

數(shù)據(jù)持久化:redis是支持的,可以使用rdb,也可以使用aof

有序存儲(chǔ):因?yàn)橹灰钚〉臎](méi)過(guò)期,后面的肯定就沒(méi)過(guò)期,這樣的話檢查最小的節(jié)點(diǎn)就行了,考慮使用redis中的zset結(jié)構(gòu)

高可用:考慮哨兵或者cluster

高伸縮:因?yàn)?2306用戶量非常大,可能導(dǎo)致redis中存儲(chǔ)的任務(wù)空間非常大,所以考慮擴(kuò)展節(jié)點(diǎn),從這個(gè)角度來(lái)說(shuō),使用cluster集群模式,哨兵只有一個(gè)節(jié)點(diǎn)即主節(jié)點(diǎn)寫(xiě)數(shù)據(jù)。

實(shí)現(xiàn):

整體思路:

  • 生產(chǎn)消費(fèi)者模型:因?yàn)?2306的用戶量非常大,所以考慮生產(chǎn)者和消費(fèi)者有多個(gè)節(jié)點(diǎn);
  • 采用cluster模式實(shí)現(xiàn)高可用以及高伸縮性;
  • 采用zset存儲(chǔ)延時(shí)任務(wù)(zadd key score member,score表示時(shí)間);
  • 為了讓數(shù)據(jù)均勻分布在cluster集群中的多個(gè)主節(jié)點(diǎn)中:構(gòu)建多個(gè)zset,每個(gè)zset對(duì)應(yīng)一個(gè)消費(fèi)者,生產(chǎn)者隨機(jī)向某個(gè)zset中生產(chǎn)數(shù)據(jù)。

具體實(shí)現(xiàn)

生產(chǎn)者

需要安裝hiredis-cluster集群,安裝編譯如下:

git clone https://github.com/Nordix/hiredis-cluster.git
cd hiredis-cluster
mkdir build
cd build
cmake -DCMAKE_BUILD_TYPE=RelWithDebInfo -
DENABLE_SSL=ON ..
make
sudo make install
sudo ldconfig

需要安裝libevent庫(kù),最后編譯時(shí)執(zhí)行gcc producer.c -o producer -levent -lhiredis_cluster -lhiredis -lhiredis_ssl編譯生產(chǎn)者可執(zhí)行程序

#include <hiredis_cluster/adapters/libevent.h>
#include <hiredis_cluster/hircluster.h>
#include <event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <sys/time.h>

int64_t g_taskid = 0;

#define MAX_KEY 10

static int64_t hi_msec_now() {
    int64_t msec;
    struct timeval now;
    int status;
    status = gettimeofday(&now, NULL);
    if (status < 0) {
        return -1;
    }
    msec = (int64_t)now.tv_sec * 1000LL + (int64_t)(now.tv_usec / 1000LL);
    return msec;
}

static int _vscnprintf(char *buf, size_t size, const char *fmt, va_list args) {
    int n;
    n = vsnprintf(buf, size, fmt, args);
    if (n <= 0) {
        return 0;
    }
    if (n <= (int)size) {
        return n;
    }
    return (int)(size-1);
}

static int _scnprintf(char *buf, size_t size, const char *fmt, ...) {
    va_list args;
    int n;
    va_start(args, fmt);
    n = _vscnprintf(buf, size, fmt, args);
    va_end(args);
    return n;
}

void connectCallback(const redisAsyncContext *ac, int status) {
    if (status != REDIS_OK) {
        printf("Error: %s\n", ac->errstr);
        return;
    }
    printf("Connected to %s:%d\n", ac->c.tcp.host, ac->c.tcp.port);
}

void disconnectCallback(const redisAsyncContext *ac, int status) {
    if (status != REDIS_OK) {
        printf("Error: %s\n", ac->errstr);
        return;
    }
    printf("Disconnected from %s:%d\n", ac->c.tcp.host, ac->c.tcp.port);
}

void addTaskCallback(redisClusterAsyncContext *cc, void *r, void *privdata) {
    redisReply *reply = (redisReply *)r;
    if (reply == NULL) {
        if (cc->errstr) {
            printf("errstr: %s\n", cc->errstr);
        }
        return;
    }

    int64_t now = hi_msec_now() / 10;
    printf("add task success reply: %lld now=%ld\n", reply->integer, now);
}

int addTask(redisClusterAsyncContext *cc, char *desc) {
    /* 轉(zhuǎn)化為厘米秒 */
    int64_t now = hi_msec_now() / 10;
    g_taskid++;
    
    /* key */
    char key[256] = {0};
	// 為了讓數(shù)據(jù)均勻分布在cluster集群中的多個(gè)主節(jié)點(diǎn)中:
?	// 構(gòu)建多個(gè)zset,每個(gè)zset對(duì)應(yīng)一個(gè)消費(fèi)者,生產(chǎn)者隨機(jī)向某個(gè)zset中生產(chǎn)數(shù)據(jù),
	// 生產(chǎn)者可以有很多個(gè),只需要保證向task_group:0-task_group:9中均勻的生產(chǎn)數(shù)據(jù)即可
    int len = _scnprintf(key, 255, "task_group:%ld", g_taskid % MAX_KEY);
    key[len] = '\0';
    
    /* member */
    char mem[1024] = {0};
    len = _scnprintf(mem, 1023, "task:%ld:%s", g_taskid, desc);
    mem[len] = '\0';
    
    int status;
	// 為每一個(gè)任務(wù)延時(shí)5秒中去處理
    status = redisClusterAsyncCommand(cc, addTaskCallback, "",
                                      "zadd %s %ld %s", key, now+500, mem);

    printf("redisClusterAsyncCommand:zadd %s %ld %s\n", key, now+500, mem);
    if (status != REDIS_OK) {
        printf("error: err=%d errstr=%s\n", cc->err, cc->errstr);
    }
    return 0;
}

void stdio_callback(struct bufferevent *bev, void *arg) {
    redisClusterAsyncContext *cc = (redisClusterAsyncContext *)arg;
    struct evbuffer *evbuf = bufferevent_get_input(bev);
    char *msg = evbuffer_readln(evbuf, NULL, EVBUFFER_EOL_LF);
    if (!msg) return;

    if (strcmp(msg, "quit") == 0) {
        printf("safe exit!!!\n");
        exit(0);
        return;
    }
    if (strlen(msg) > 1024-5-13-1) {
        printf("[err]msg is too long, try again...\n");
        return;
    }

    addTask(cc, msg);
    printf("stdio read the data: %s\n", msg);
}

int main(int argc, char **argv) {
    printf("Connecting...\n");
	// 連接cluster集群,可以從cluster集群中任意一個(gè)節(jié)點(diǎn)出發(fā)連接集群
    redisClusterAsyncContext *cc =
        redisClusterAsyncConnect("127.0.0.1:7006", HIRCLUSTER_FLAG_NULL);
    printf("redisClusterAsyncContext...\n");
    if (cc && cc->err) {
        printf("Error: %s\n", cc->errstr);
        return 1;
    }

    struct event_base *base = event_base_new();
    redisClusterLibeventAttach(cc, base);
    redisClusterAsyncSetConnectCallback(cc, connectCallback);
    redisClusterAsyncSetDisconnectCallback(cc, disconnectCallback);

    // nodeIterator ni;
    // initNodeIterator(&ni, cc->cc);
    // cluster_node *node;
    // while ((node = nodeNext(&ni)) != NULL) {
    //     printf("node %s:%d role:%d pad:%d\n", node->host, node->port, node->role, node->pad);
    // }
    struct bufferevent *ioev = bufferevent_socket_new(base, 0, BEV_OPT_CLOSE_ON_FREE);
    bufferevent_setcb(ioev, stdio_callback, NULL, NULL, cc);
    bufferevent_enable(ioev, EV_READ | EV_PERSIST);

    printf("Dispatch..\n");
    event_base_dispatch(base);

    printf("Done..\n");
    redisClusterAsyncFree(cc);
    event_base_free(base);
    return 0;
}

// 需要安裝 hiredis-cluster libevent
// gcc producer.c -o producer -levent -lhiredis_cluster -lhiredis -lhiredis_ssl

說(shuō)明:

這里構(gòu)建了10個(gè)zset,分別是task_group:0,task_group:1,…,task_group:9作為10個(gè)zset的key,zset的數(shù)據(jù)其實(shí)就代表著消費(fèi)者的數(shù)量,通常消費(fèi)者的功能是一摸一樣的,生產(chǎn)者就不管你有多少個(gè)了,只需要將任務(wù)均勻的打散在不同的zset中就行了(具體實(shí)現(xiàn)可以搞一個(gè)全局的id,每一次添加任務(wù)時(shí)id++,然后再對(duì)zset個(gè)數(shù)10取模,最終可以得到0-9之間的一個(gè)數(shù),然后再與task_group拼接,這樣就可以將任務(wù)均勻的打散在不同的zset中)。

消費(fèi)者

消費(fèi)者是采用skynet+lua腳本實(shí)現(xiàn)的,每個(gè)消費(fèi)者會(huì)不斷的去檢查redis中的任務(wù)有沒(méi)有過(guò)期,如果過(guò)期,就取出來(lái)刪除(這里只是demo,只是打印之后刪除任務(wù))

local skynet = require "skynet"

local function table_dump( object )
    if type(object) == 'table' then
        local s = '{ '
        for k,v in pairs(object) do
            if type(k) ~= 'number' then k = string.format("%q", k) end
            s = s .. '['..k..'] = ' .. table_dump(v) .. ','
        end
        return s .. '} '
    elseif type(object) == 'function' then
        return tostring(object)
    elseif type(object) == 'string' then
        return string.format("%q", object)
    else
        return tostring(object)
    end
end

local mode, key = ...
if mode == "slave" then
    local rediscluster = require "skynet.db.redis.cluster"
    local function onmessage(data,channel,pchannel)
        print("onmessage",data,channel,pchannel)
    end
    skynet.start(function ()
        local db = rediscluster.new({
                {host="127.0.0.1",port=7001},
            },
            {read_slave=true,auth=nil,db=0,},
            onmessage
        )
        assert(db, "redis-cluster startup error")
        skynet.fork(function ()
            while true do
                local res = db:zrange(key, 0, 0, "withscores")
                if not next(res) then
                    skynet.sleep(50)
                else
                    local expire = tonumber(res[2])
                    local now = skynet.time()*100
                    if now >= expire then
                        print(("%s is comsumed:expire_time:%d"):format(res[1], expire))
                        db:zrem(key, res[1])
                    else
                        skynet.sleep(10)
                    end
                end
            end
        end)
    end)

else
    skynet.start(function ()	-- // 啟動(dòng)10個(gè)程序,并把"slave"傳入mode,task_group:i傳入到key中,即每個(gè)程序只消費(fèi)一個(gè)
        for i=0,9 do
            skynet.newservice(SERVICE_NAME, "slave", "task_group:"..i)

運(yùn)行結(jié)果

redis實(shí)現(xiàn)分布式延時(shí)隊(duì)列,redis,redis,分布式

redis分布式延時(shí)隊(duì)列優(yōu)勢(shì)

1.Redis zset支持高性能的 score 排序。

2.Redis是在內(nèi)存上進(jìn)行操作的,速度非常快。

3.Redis可以搭建集群,當(dāng)消息很多時(shí)候,我們可以用集群來(lái)提高消息處理的速度,提高可用性。

4.Redis具有持久化機(jī)制,當(dāng)出現(xiàn)故障的時(shí)候,可以通過(guò)AOF和RDB方式來(lái)對(duì)數(shù)據(jù)進(jìn)行恢復(fù),保證了數(shù)據(jù)的可靠性

redis分布式延時(shí)隊(duì)列劣勢(shì)

使用 Redis 實(shí)現(xiàn)的延時(shí)消息隊(duì)列也存在數(shù)據(jù)持久化, 消息可靠性的問(wèn)題:

  • 沒(méi)有重試機(jī)制 - 處理消息出現(xiàn)異常沒(méi)有重試機(jī)制, 這些需要自己去實(shí)現(xiàn), 包括重試次數(shù)的實(shí)現(xiàn)等;
  • 沒(méi)有 ACK 機(jī)制 - 例如在獲取消息并已經(jīng)刪除了消息情況下, 正在處理消息的時(shí)候客戶端崩潰了, 這條正在處理的這些消息就會(huì)丟失, MQ 是需要明確的返回一個(gè)值給 MQ 才會(huì)認(rèn)為這個(gè)消息是被正確的消費(fèi)了。

總結(jié):如果對(duì)消息可靠性要求較高, 推薦使用 MQ 來(lái)實(shí)現(xiàn)文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-720759.html

到了這里,關(guān)于redis實(shí)現(xiàn)分布式延時(shí)隊(duì)列的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 分布式鎖實(shí)現(xiàn)(mysql,以及redis)以及分布式的概念

    分布式鎖實(shí)現(xiàn)(mysql,以及redis)以及分布式的概念

    我旁邊的一位老哥跟我說(shuō),你知道分布式是是用來(lái)干什么的嘛?一句話給我干懵了,我能隱含知道,大概是用來(lái)做分壓處理的,并增加系統(tǒng)穩(wěn)定性的。但是具體如何,我卻道不出個(gè)1,2,3?,F(xiàn)在就將這些做一個(gè)詳細(xì)的總結(jié)。至少以后碰到面試官可以說(shuō)上個(gè)123。 那么就正式進(jìn)入

    2024年01月21日
    瀏覽(37)
  • Redis——》實(shí)現(xiàn)分布式鎖

    推薦鏈接: ????總結(jié)——》【Java】 ????總結(jié)——》【Mysql】 ????總結(jié)——》【Redis】 ????總結(jié)——》【Kafka】 ????總結(jié)——》【Spring】 ????總結(jié)——》【SpringBoot】 ????總結(jié)——》【MyBatis、MyBatis-Plus】 ????總結(jié)——》【Linux】 ????總結(jié)——》【MongoDB】 ???

    2024年02月10日
    瀏覽(24)
  • Redis分布式鎖實(shí)現(xiàn)原理

    Redis分布式鎖實(shí)現(xiàn)原理

    在早期互聯(lián)網(wǎng)的架構(gòu)中,一個(gè)應(yīng)用都是單機(jī)進(jìn)行部署,這種情況下,利用JDK提供的鎖機(jī)制即可解決共享數(shù)據(jù)在多線程場(chǎng)景下的線程安全問(wèn)題,但隨著技術(shù)的發(fā)展,分布式系統(tǒng)架構(gòu)逐漸普及,在分布式架構(gòu)中,由于一個(gè)應(yīng)用會(huì)進(jìn)行多機(jī)部署,服務(wù)器實(shí)例之間的JVM是互相獨(dú)立的,

    2024年02月16日
    瀏覽(16)
  • 使用redis實(shí)現(xiàn)分布式鎖

    使用redis實(shí)現(xiàn)分布式鎖

    在一個(gè)分布式系統(tǒng)中,也會(huì)涉及多個(gè)節(jié)點(diǎn)訪問(wèn)同一個(gè)公共資源的情況,此時(shí)就需要通過(guò)鎖來(lái)做互斥控制,避免出現(xiàn)類(lèi)似于“線程安全”的問(wèn)題,而java的synchronized這樣的鎖只能在當(dāng)前進(jìn)程中生效,在分布式的這種多個(gè)進(jìn)程多個(gè)主機(jī)的場(chǎng)景無(wú)能為力,此時(shí)就需要分布式鎖。 例如

    2024年02月07日
    瀏覽(36)
  • redis如何實(shí)現(xiàn)分布式鎖?

    首先,“分布式鎖”的概念,是相對(duì)“本地鎖”而言。 本地鎖比如java中的synchronized 這類(lèi) JDK 自帶的 本地鎖 ,來(lái)控制一個(gè) JVM 進(jìn)程內(nèi)的多個(gè)線程對(duì)本地共享資源的訪問(wèn)。 同一時(shí)刻只有一個(gè)線程可以獲取到本地鎖訪問(wèn)共享資源。 分布式系統(tǒng)下,不同的服務(wù)/客戶端通常運(yùn)

    2024年02月06日
    瀏覽(21)
  • 分布式鎖之redis實(shí)現(xiàn)

    分布式鎖之redis實(shí)現(xiàn)

    需要掛在的data和redis.conf自行創(chuàng)建即可 不要忘記開(kāi)放端口6379 修改redis.conf配置文件,設(shè)置 requirepass xxxxx 如果直接使用RedisTemplate使用的序列化器是jdk的,存的是二進(jìn)制,使用StringRedisTemplate默認(rèn)初始化序列化器就是String類(lèi)型 執(zhí)行票數(shù)存入redis指令 ?編寫(xiě)代碼演示超賣(mài)問(wèn)題 ?500

    2024年02月10日
    瀏覽(20)
  • 基于 Redis 實(shí)現(xiàn)分布式限流

    分布式限流是指通過(guò)將限流策略嵌入到分布式系統(tǒng)中,以控制流量或保護(hù)服務(wù),保證系統(tǒng)在高并發(fā)訪問(wèn)情況下不被過(guò)載。 分布式限流可以防止系統(tǒng)因大量請(qǐng)求同時(shí)到達(dá)導(dǎo)致壓力過(guò)大而崩潰,從而提高系統(tǒng)的穩(wěn)定性和可靠性。同時(shí),它可以使得業(yè)務(wù)資源能夠更好地分配,提高系

    2024年02月12日
    瀏覽(25)
  • 解讀分布式鎖(redis實(shí)現(xiàn)方案)

    解讀分布式鎖(redis實(shí)現(xiàn)方案)

    分布式鎖是一種用于分布式系統(tǒng)中的并發(fā)控制機(jī)制,它用于確保在多個(gè)節(jié)點(diǎn)或多個(gè)進(jìn)程之間的并發(fā)操作中,某些關(guān)鍵資源或代碼塊只能被一個(gè)節(jié)點(diǎn)或進(jìn)程同時(shí)訪問(wèn)。分布式鎖的目的是避免多個(gè)節(jié)點(diǎn)同時(shí)修改共享資源而導(dǎo)致的數(shù)據(jù)不一致或沖突的問(wèn)題。通俗的來(lái)說(shuō),分布式鎖的

    2024年02月15日
    瀏覽(38)
  • 2、基于redis實(shí)現(xiàn)分布式鎖

    2、基于redis實(shí)現(xiàn)分布式鎖

    借助于redis中的命令setnx(key, value),key不存在就新增,存在就什么都不做。同時(shí)有多個(gè)客戶端發(fā)送setnx命令,只有一個(gè)客戶端可以成功,返回1(true);其他的客戶端返回0(false)。 多個(gè)客戶端同時(shí)獲取鎖(setnx) 獲取成功,執(zhí)行業(yè)務(wù)邏輯,執(zhí)行完成釋放鎖(del) 其他客戶端等

    2024年02月15日
    瀏覽(38)
  • Redis系列13:分布式鎖實(shí)現(xiàn)

    Redis系列13:分布式鎖實(shí)現(xiàn)

    Redis系列1:深刻理解高性能Redis的本質(zhì) Redis系列2:數(shù)據(jù)持久化提高可用性 Redis系列3:高可用之主從架構(gòu) Redis系列4:高可用之Sentinel(哨兵模式) Redis系列5:深入分析Cluster 集群模式 追求性能極致:Redis6.0的多線程模型 追求性能極致:客戶端緩存帶來(lái)的革命 Redis系列8:Bitmap實(shí)現(xiàn)

    2024年02月06日
    瀏覽(18)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包