国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

BlockingQueue實現(xiàn)簡易消息隊列處理器 可分區(qū)順序消費

這篇具有很好參考價值的文章主要介紹了BlockingQueue實現(xiàn)簡易消息隊列處理器 可分區(qū)順序消費。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

大家好,最近在鞏固JUC并發(fā)包,突然想到如果自己的應(yīng)用體量不大,但有需要消息隊列來實現(xiàn)應(yīng)用解耦和削峰來緩解服務(wù)器突增壓力,比如搶票時,突然有比較用戶同時搶票,就容易造成服務(wù)器同時連接數(shù)較多,拒絕其他用戶的使用,就想著可以用消息隊列來緩解,但是體量有不大,還沒必要用MQ框架,那就直接自己寫一個,這樣,搶票請求來了就直接丟給隊列處理器,然后再延遲查詢處理結(jié)果,這樣能減輕不少壓力,老樣子,先看下實現(xiàn)效果


BlockingQueue實現(xiàn)簡易消息隊列處理器 可分區(qū)順序消費,java,架構(gòu)方案,消息隊列
然后看下測試代碼:

public class TestOptional {
    @Test
    public void doTestOptional(){

        MxMQ<Message> mxMQ = MxMQ.getInstance();

        /**
         * 添加分區(qū) 無消息一直阻塞
         */
        mxMQ.addPartion("test", new MQHandler<Message>() {
            @Override
            public void hand(Message message) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println(message.getMessage());
            }
        });
        /**
         * 添加分區(qū) 無消息且等待時長超過20秒自動移除該分區(qū)
         */
        mxMQ.addPartionAutoRemove("test2", new MQHandler<Message>() {
            @Override
            public void hand(Message message) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println(message.getMessage());
            }
        });

        for(int index = 0;index < 20;index++){
            int finalIndex = index;
            Message message = new Message("test_" + finalIndex);
            Message message2 = new Message("test2_" + finalIndex);
            try {
                mxMQ.sendMessage("test",message);
                mxMQ.sendMessage("test2",message2);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        while (true){}

    }
}

還可以自定義不同分區(qū)不同的處理器,邏輯自由定義,下面看下幾個關(guān)鍵類:
MxMQRunnable:

package com.mx.mxmq;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class MxMQRunnable<T> implements Runnable{

    boolean isRun = false;
    ArrayBlockingQueue<T> arrayBlockingQueue = null;
    MQHandler<T> mqHandler = null;
    int state = 0;

    MxMQ.QueueEmpty queueEmpty = null;

    public void setQueueEmpty(MxMQ.QueueEmpty queueEmpty) {
        this.queueEmpty = queueEmpty;
    }

    public MxMQRunnable(MQHandler<T> mqHandler){
        isRun = true;
        arrayBlockingQueue = new ArrayBlockingQueue(50);
        this.mqHandler = mqHandler;
        state = MxMQ.STATE_WAIT;
    }

    public MxMQRunnable(int number,MQHandler<T> mqHandler){
        arrayBlockingQueue = new ArrayBlockingQueue(number);
        this.mqHandler = mqHandler;
        state = MxMQ.STATE_WAIT;
    }

    public void setState(int state) {
        this.state = state;
    }

    @Override
    public void run() {
        while (isRun){
            try {
                T t = null;
                if(state == MxMQ.STATE_WAIT){
                   t = arrayBlockingQueue.take();
                } else {
                   t = arrayBlockingQueue.poll(20,TimeUnit.SECONDS);
                   if(t == null){
                       close();
                       queueEmpty.empty(this);
                       break;
                   }
                }
                if(mqHandler != null){
                    mqHandler.hand(t);
                }
            } catch (Exception e) {
                 e.printStackTrace();
            }
        }
    }

    public boolean sendMessage(T t) throws InterruptedException {
        return arrayBlockingQueue.offer(t,20, TimeUnit.SECONDS);
    }

    public boolean removeMessage(T t){
        return arrayBlockingQueue.remove(t);
    }

    public void close(){
        isRun = false;
    }

}


MxMQ:

package com.mx.mxmq;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MxMQ<T> {

    public static final int STATE_WAIT = 0;
    public static final int STATE_REMOVE = 1;

    private MxMQ(){
        executors = Executors.newCachedThreadPool();
        partionRunMap = new ConcurrentHashMap<>();
    }

    public static MxMQ getInstance() {
        if(instance == null){
            synchronized (MxMQ.class){
                if(instance == null){
                    instance = new MxMQ();
                }
            }
        }
        return instance;
    }

    private static volatile MxMQ instance = null;

    private ConcurrentHashMap<String,MxMQRunnable<T>> partionRunMap = null;

    private ExecutorService executors =  null;

    /**
     * 添加分區(qū)
     * @param partion 分區(qū)
     * @param mxHandler 處理器
     * @return
     */
    public boolean addPartion(String partion,MQHandler<T> mxHandler){
        if(partionRunMap.get(partion) == null){
            MxMQRunnable<T> curMxMQRunnable = new MxMQRunnable<T>(mxHandler);
            partionRunMap.put(partion,curMxMQRunnable);
            executors.execute(curMxMQRunnable);
            System.out.println(partion+"被添加");
            return true;
        }
        return false;
    }

    /**
     * 當分區(qū)里面沒有任務(wù)超過20秒后就會自動移除分區(qū)
     * @param partion 分區(qū)
     * @param mxHandler 處理器
     * @return
     */
    public boolean addPartionAutoRemove(String partion,MQHandler<T> mxHandler){
        if(partionRunMap.get(partion) == null){
            MxMQRunnable<T> curMxMQRunnable = new MxMQRunnable<T>(mxHandler);
            curMxMQRunnable.setState(STATE_REMOVE);
            curMxMQRunnable.setQueueEmpty(new QueueEmpty() {
                @Override
                public void empty(MxMQRunnable mxMQRunnable) {
                    removePartion(partion);
                }
            });
            partionRunMap.put(partion,curMxMQRunnable);
            executors.execute(curMxMQRunnable);
            System.out.println(partion+"被添加");
            return true;
        }
        return false;
    }

    public boolean removePartion(String partion){
        if(partionRunMap.get(partion) != null){
            MxMQRunnable<T> remove = partionRunMap.remove(partion);
            remove.close();
            System.out.println(partion+"被移除");
            return true;
        }
        return false;
    }

    public boolean sendMessage(String partion,T t) throws InterruptedException {
        MxMQRunnable<T> tMxMQRunnable = partionRunMap.get(partion);
        if(tMxMQRunnable != null){
            tMxMQRunnable.sendMessage(t);
            return true;
        }
        return false;
    }

    public boolean removeMessage(String partion,T t){
        MxMQRunnable<T> tMxMQRunnable = partionRunMap.get(partion);
        if(tMxMQRunnable != null){
            return tMxMQRunnable.removeMessage(t);
        }
        return false;
    }

    interface QueueEmpty{
        void empty(MxMQRunnable mxMQRunnable);
    }

}


MQHandler:

package com.mx.mxmq;

public interface MQHandler<T> {
    void hand(T t);
}

Message:

package com.mx.mxmq;

public class Message {
    String message;

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    public Message(String message){
        this.message = message;
    }
}


好了,收,大概就是這樣子,主要應(yīng)用場景為:需要輕量級的順序隊列消費 應(yīng)用場景文章來源地址http://www.zghlxwxcb.cn/news/detail-743357.html

到了這里,關(guān)于BlockingQueue實現(xiàn)簡易消息隊列處理器 可分區(qū)順序消費的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • protues仿真微處理器8086實現(xiàn)交通燈

    protues仿真微處理器8086實現(xiàn)交通燈

    (1)下載安裝仿真環(huán)境protues。 (2)搭建8086開發(fā)環(huán)境,我使用的是 emu8086 。自行下載安裝即可。 有需要相關(guān)安裝包可以私信 (1)選用74LS373與74LS245來實現(xiàn)8086地址數(shù)據(jù)總線的拆分。 (2)選用 8259可編程中斷控制器 用于管理8086系列微機系統(tǒng)的外部中斷請求,實現(xiàn)優(yōu)先權(quán)的排

    2024年02月06日
    瀏覽(19)
  • RISC-V處理器的設(shè)計與實現(xiàn)(一)—— 基本指令集

    RISC-V處理器的設(shè)計與實現(xiàn)(一)—— 基本指令集

    RISC-V處理器的設(shè)計與實現(xiàn)(一)—— 基本指令集_Patarw_Li的博客-CSDN博客 RISC-V處理器的設(shè)計與實現(xiàn)(二)—— CPU框架設(shè)計_Patarw_Li的博客-CSDN博客 RISC-V處理器的設(shè)計與實現(xiàn)(三)—— 上板驗證_Patarw_Li的博客-CSDN博客 RISC-V處理器設(shè)計(四)—— Verilog 代碼設(shè)計-CSDN博客? RISC-V處

    2024年02月05日
    瀏覽(25)
  • 上海山景SH-ARC DSP音頻處理器低通濾波算法實現(xiàn)

    ? +hezkz17進入數(shù)字音頻答疑 上海山景DSP音頻處理器介紹: ? 上海山景DSP音頻處理器是一種數(shù)字信號處理器,專門用于音頻信號的處理和增強。它采用先進的數(shù)字信號處理技術(shù)和算法,能夠?qū)σ纛l信號進行實時處理,并且具有高效、穩(wěn)定、可靠等特點。 該處理器可以應(yīng)用于各種

    2024年02月13日
    瀏覽(17)
  • RISC-V處理器的設(shè)計與實現(xiàn)(三)—— 上板驗證(基于野火征途Pro開發(fā)板)

    RISC-V處理器的設(shè)計與實現(xiàn)(三)—— 上板驗證(基于野火征途Pro開發(fā)板)

    目錄 文章傳送門 一、添加串口 二、上板驗證 三、總結(jié)與思考 RISC-V處理器的設(shè)計與實現(xiàn)(一)—— 基本指令集_Patarw_Li的博客-CSDN博客 RISC-V處理器的設(shè)計與實現(xiàn)(二)—— CPU框架設(shè)計_Patarw_Li的博客-CSDN博客 RISC-V處理器的設(shè)計與實現(xiàn)(三)—— 上板驗證_Patarw_Li的博客-CSDN博客

    2024年02月11日
    瀏覽(20)
  • Spring MVC異常處理【單個控制異常處理器、全局異常處理器、自定義異常處理器】

    Spring MVC異常處理【單個控制異常處理器、全局異常處理器、自定義異常處理器】

    目錄 一、單個控制器異常處理 1.1 控制器方法 1.2 編寫出錯頁面 1.3 測試結(jié)果 二、全局異常處理 2.1 一個有異常的控制器類 2.2 全局異常處理器類 2.3 測試結(jié)果? 三、自定義異常處理器 3.1 自定義異常處理器 3.2 測試結(jié)果 往期專欄文章相關(guān)導(dǎo)讀? 1. Maven系列專欄文章 2. Mybatis系列

    2024年02月16日
    瀏覽(29)
  • Jmeter前置處理器和后置處理器

    Jmeter前置處理器和后置處理器

    1. 后置處理器(Post Processor) 本質(zhì)上是?種對sampler發(fā)出請求后接受到的響應(yīng)數(shù)據(jù)進?處理 (后處理)的?法 ?正則表達式后置處理器 (1)引?名稱:下?個請求要引?的參數(shù)名稱,如填寫title,則可?${title}引?它 (2)正則表達式: ():括起來的部分就是要提取的。 .:匹配

    2023年04月21日
    瀏覽(20)
  • DP讀書:鯤鵬處理器 架構(gòu)與編程(八)3.1鯤鵬處理器片上系統(tǒng)與Taishan處理器內(nèi)核架構(gòu)

    DP讀書:鯤鵬處理器 架構(gòu)與編程(八)3.1鯤鵬處理器片上系統(tǒng)與Taishan處理器內(nèi)核架構(gòu)

    處理器體系結(jié)構(gòu),是一個偏底層的內(nèi)容,但這是任一計算機系統(tǒng)的底層。 系統(tǒng)的性能、生態(tài)和功能很大程度上都依賴于計算機系統(tǒng)底層——處理器體系結(jié)構(gòu)。任何一個系統(tǒng)程序員、固件設(shè)計者、應(yīng)用程序員 甚至 服務(wù)器管理員,如果想要充分利用現(xiàn)代高性能處理器的硬件性能

    2024年02月12日
    瀏覽(20)
  • SkyEye處理器仿真系列:龍芯2K1000處理器

    SkyEye處理器仿真系列:龍芯2K1000處理器

    天目全數(shù)字實時仿真軟件SkyEye作為基于可視化建模的硬件行為級仿真平臺,能夠為嵌入式軟件提供虛擬化運行環(huán)境,開發(fā)、測試人員可在該虛擬運行環(huán)境上進行軟件開發(fā)、軟件測試和軟件驗證活動。小到芯片,大到系統(tǒng),SkyEye均可進行模擬。 1936年,被譽為“計算機科學(xué)與人

    2024年02月12日
    瀏覽(18)
  • DP讀書:鯤鵬處理器 架構(gòu)與編程(九)鯤鵬920處理器片上系統(tǒng)

    停更了兩天,我做了一個本專業(yè)相關(guān)的孤島問題的論文復(fù)現(xiàn),可并沒有什么太大進展,就像當初最開始跑Aspen一樣,我要面對的是一個相當復(fù)雜的多參系統(tǒng),這種情況下只能啃著技術(shù)文檔一步一步的去調(diào)。 再次返回我的鯤鵬920處理器,無疑是舒服的所以我只能盡我所能的在做

    2024年02月12日
    瀏覽(41)
  • 第三十二章 開發(fā)Productions - ObjectScript Productions - 定義警報處理器 - 使用路由警報處理器

    如果需要通過多種輸出機制聯(lián)系用戶,警報處理器應(yīng)該是一個業(yè)務(wù)流程,用于確定如何在消息中路由 Ens.AlertReques 。在這種情況下, Productions 必須為每個輸出機制包含一個額外的業(yè)務(wù)操作,并且警報處理器將消息轉(zhuǎn)發(fā)到這些業(yè)務(wù)操作。 要將警報處理器定義為路由流程,請創(chuàng)建

    2024年02月08日
    瀏覽(20)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包