BlockingQueue實現(xiàn)簡易消息隊列處理器 可分區(qū)順序消費
這篇具有很好參考價值的文章主要介紹了BlockingQueue實現(xiàn)簡易消息隊列處理器 可分區(qū)順序消費。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。
大家好,最近在鞏固JUC并發(fā)包,突然想到如果自己的應(yīng)用體量不大,但有需要消息隊列來實現(xiàn)應(yīng)用解耦和削峰來緩解服務(wù)器突增壓力,比如搶票時,突然有比較用戶同時搶票,就容易造成服務(wù)器同時連接數(shù)較多,拒絕其他用戶的使用,就想著可以用消息隊列來緩解,但是體量有不大,還沒必要用MQ框架,那就直接自己寫一個,這樣,搶票請求來了就直接丟給隊列處理器,然后再延遲查詢處理結(jié)果,這樣能減輕不少壓力,老樣子,先看下實現(xiàn)效果
:
然后看下測試代碼:
public class TestOptional {
@Test
public void doTestOptional(){
MxMQ<Message> mxMQ = MxMQ.getInstance();
/**
* 添加分區(qū) 無消息一直阻塞
*/
mxMQ.addPartion("test", new MQHandler<Message>() {
@Override
public void hand(Message message) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(message.getMessage());
}
});
/**
* 添加分區(qū) 無消息且等待時長超過20秒自動移除該分區(qū)
*/
mxMQ.addPartionAutoRemove("test2", new MQHandler<Message>() {
@Override
public void hand(Message message) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(message.getMessage());
}
});
for(int index = 0;index < 20;index++){
int finalIndex = index;
Message message = new Message("test_" + finalIndex);
Message message2 = new Message("test2_" + finalIndex);
try {
mxMQ.sendMessage("test",message);
mxMQ.sendMessage("test2",message2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
while (true){}
}
}
還可以自定義不同分區(qū)不同的處理器,邏輯自由定義,下面看下幾個關(guān)鍵類:
MxMQRunnable:
package com.mx.mxmq;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class MxMQRunnable<T> implements Runnable{
boolean isRun = false;
ArrayBlockingQueue<T> arrayBlockingQueue = null;
MQHandler<T> mqHandler = null;
int state = 0;
MxMQ.QueueEmpty queueEmpty = null;
public void setQueueEmpty(MxMQ.QueueEmpty queueEmpty) {
this.queueEmpty = queueEmpty;
}
public MxMQRunnable(MQHandler<T> mqHandler){
isRun = true;
arrayBlockingQueue = new ArrayBlockingQueue(50);
this.mqHandler = mqHandler;
state = MxMQ.STATE_WAIT;
}
public MxMQRunnable(int number,MQHandler<T> mqHandler){
arrayBlockingQueue = new ArrayBlockingQueue(number);
this.mqHandler = mqHandler;
state = MxMQ.STATE_WAIT;
}
public void setState(int state) {
this.state = state;
}
@Override
public void run() {
while (isRun){
try {
T t = null;
if(state == MxMQ.STATE_WAIT){
t = arrayBlockingQueue.take();
} else {
t = arrayBlockingQueue.poll(20,TimeUnit.SECONDS);
if(t == null){
close();
queueEmpty.empty(this);
break;
}
}
if(mqHandler != null){
mqHandler.hand(t);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public boolean sendMessage(T t) throws InterruptedException {
return arrayBlockingQueue.offer(t,20, TimeUnit.SECONDS);
}
public boolean removeMessage(T t){
return arrayBlockingQueue.remove(t);
}
public void close(){
isRun = false;
}
}
MxMQ:
package com.mx.mxmq;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MxMQ<T> {
public static final int STATE_WAIT = 0;
public static final int STATE_REMOVE = 1;
private MxMQ(){
executors = Executors.newCachedThreadPool();
partionRunMap = new ConcurrentHashMap<>();
}
public static MxMQ getInstance() {
if(instance == null){
synchronized (MxMQ.class){
if(instance == null){
instance = new MxMQ();
}
}
}
return instance;
}
private static volatile MxMQ instance = null;
private ConcurrentHashMap<String,MxMQRunnable<T>> partionRunMap = null;
private ExecutorService executors = null;
/**
* 添加分區(qū)
* @param partion 分區(qū)
* @param mxHandler 處理器
* @return
*/
public boolean addPartion(String partion,MQHandler<T> mxHandler){
if(partionRunMap.get(partion) == null){
MxMQRunnable<T> curMxMQRunnable = new MxMQRunnable<T>(mxHandler);
partionRunMap.put(partion,curMxMQRunnable);
executors.execute(curMxMQRunnable);
System.out.println(partion+"被添加");
return true;
}
return false;
}
/**
* 當分區(qū)里面沒有任務(wù)超過20秒后就會自動移除分區(qū)
* @param partion 分區(qū)
* @param mxHandler 處理器
* @return
*/
public boolean addPartionAutoRemove(String partion,MQHandler<T> mxHandler){
if(partionRunMap.get(partion) == null){
MxMQRunnable<T> curMxMQRunnable = new MxMQRunnable<T>(mxHandler);
curMxMQRunnable.setState(STATE_REMOVE);
curMxMQRunnable.setQueueEmpty(new QueueEmpty() {
@Override
public void empty(MxMQRunnable mxMQRunnable) {
removePartion(partion);
}
});
partionRunMap.put(partion,curMxMQRunnable);
executors.execute(curMxMQRunnable);
System.out.println(partion+"被添加");
return true;
}
return false;
}
public boolean removePartion(String partion){
if(partionRunMap.get(partion) != null){
MxMQRunnable<T> remove = partionRunMap.remove(partion);
remove.close();
System.out.println(partion+"被移除");
return true;
}
return false;
}
public boolean sendMessage(String partion,T t) throws InterruptedException {
MxMQRunnable<T> tMxMQRunnable = partionRunMap.get(partion);
if(tMxMQRunnable != null){
tMxMQRunnable.sendMessage(t);
return true;
}
return false;
}
public boolean removeMessage(String partion,T t){
MxMQRunnable<T> tMxMQRunnable = partionRunMap.get(partion);
if(tMxMQRunnable != null){
return tMxMQRunnable.removeMessage(t);
}
return false;
}
interface QueueEmpty{
void empty(MxMQRunnable mxMQRunnable);
}
}
MQHandler:
package com.mx.mxmq;
public interface MQHandler<T> {
void hand(T t);
}
Message:文章來源:http://www.zghlxwxcb.cn/news/detail-743357.html
package com.mx.mxmq;
public class Message {
String message;
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public Message(String message){
this.message = message;
}
}
好了,收,大概就是這樣子,主要應(yīng)用場景為:需要輕量級的順序隊列消費 應(yīng)用場景文章來源地址http://www.zghlxwxcb.cn/news/detail-743357.html
到了這里,關(guān)于BlockingQueue實現(xiàn)簡易消息隊列處理器 可分區(qū)順序消費的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!
本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!