Disruptor 是蘋國外廠本易公司LMAX開發(fā)的一個(gè)高件能列,研發(fā)的初夷是解決內(nèi)存隊(duì)列的延識(shí)問顧在性能測試中發(fā)現(xiàn)竟然與10操作處于同樣的數(shù)量級(jí)),基于Disruptor開發(fā)的系統(tǒng)單線程能支撐每秒600萬訂單,2010年在QCn演講后,獲得了業(yè)界關(guān)注,201年,企業(yè)應(yīng)用軟件專家Martin Fower專門撰寫長文介紹。同年它還獲得了Oradle官方的Duke大獎(jiǎng)。目前,包括Apache StomCame、 L0g4 2在內(nèi)的很多知名項(xiàng)目都應(yīng)用了Disrupior以獲取高性能。注意,這里所說的隊(duì)列是系統(tǒng)內(nèi)部的內(nèi)存隊(duì)列,而不是Kaka這樣的分布式隊(duì)列。
前兩篇介紹了Disruptor,【數(shù)據(jù)結(jié)構(gòu)】Disruptor環(huán)形數(shù)組無鎖并發(fā)框架閱讀_wenchun001的博客-CSDN博客
【并發(fā)編程】ShenyuAdmin里面數(shù)據(jù)同步用到的無鎖環(huán)形隊(duì)列LMAX Disruptor并發(fā)框架_wenchun001的博客-CSDN博客
今天開始依次從引用包到編碼步驟說明如下
引用依賴
<dependencies>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.4</version>
</dependency>
</dependencies>
1.構(gòu)建消息載體(事件)
單生產(chǎn)者單消費(fèi)者模式 1,創(chuàng)建Event(消息載體/事件)和EventFactory (事件工廠) 2,創(chuàng)建 OrderEvent類,這個(gè)類將會(huì)被放入環(huán)形隊(duì)列中作為消息內(nèi)容。創(chuàng)建OrderEventFactory類,用于創(chuàng)建OrderEvent 事件
@Data
public class OrderEvent2 {
private long value;
private String name:
}
public class OrderEventFactory implements EventFactory<orderEvent> {
@Override
public OrderEvent newInstance()
{
return new OrderEvent();
}
}
2.構(gòu)建生產(chǎn)者
創(chuàng)建 OrderEventProducer 類,它將作為生產(chǎn)者使 用
public class OrderEventProducer {
//事件隊(duì)列
private RingBuffer<OrderEvent> ringBuffer;
public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer)( this,ringBuffer =ringBuffer;}
public void onData(long value,String name) {
// 獲取事件隊(duì)列 的下一個(gè)槽
long sequence = ringBuffer.next();
try {
//獲取消息 (事件)
OrderEvent orderEvent = ringBuffer.get(sequence);
// 寫入消息數(shù)據(jù)
orderEvent.setValue(value):
orderEvent.setName(name);
}catch (Exception e){
//異常
}finally {
//發(fā)布事件
rringBuffer.publish(sequence);
}
}
}
3.構(gòu)建消費(fèi)者
4.生產(chǎn)消息,消費(fèi)消息的測試
文章來源:http://www.zghlxwxcb.cn/news/detail-634180.html
public static void main(String[] args) throws Exception {
//創(chuàng)建disruptor
Disruptor<OrderEvent> disruptor = new Disruptor<>(new OrderEventFactory()ringBufferSize:124 * 124
Executors.defaultThreadFactory(), ProducerType.SINGLE,//單生產(chǎn)者
new YieldingwaitStrategy() //等待策略
);
//設(shè)置消費(fèi)者用于處理RingBuffer的事件
disruptor.handleEventswith(new OrderEventHandler());
//設(shè)置多消費(fèi)者,消息會(huì)被重復(fù)消費(fèi)
//disruptor.handleEventswith(new OrderEventHandler(),new OrderEventHandler());
//設(shè)置多消費(fèi)者 消費(fèi)者要實(shí)現(xiàn)workHandLer接口,一條消息只會(huì)被一個(gè)消費(fèi)者消費(fèi)
//disruptor.handleEventsWithworkerPool(new OrderEventHandler(), new OrderEventHandler());
//啟動(dòng)disruptor
disruptor.start();
//創(chuàng)建ringbuffer容器
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
//創(chuàng)建生產(chǎn)者
OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);// 發(fā)送消息
for(int i=0;i<10;i++){
eventProduceronData(i, "消息"+1);
}
disruptor.shutdown();
}
多生產(chǎn)者的案例
//創(chuàng)建disruptor
Disruptor<OrderEvent> disruptor = new Disruptor<>(new OrderEventFactory()ringBufferSize:124 * 124
Executors.defaultThreadFactory(), ProducerType.MULIT,//多生產(chǎn)者
new YieldingwaitStrategy() //等待策略
);
消費(fèi)者優(yōu)先級(jí)模式
在實(shí)際場景中,我們通常會(huì)因?yàn)闃I(yè)務(wù)邏而形成一條消費(fèi)鏈,比如一個(gè)消息必須由 消費(fèi)者A->消費(fèi)者B->消費(fèi)者C 的順序依次進(jìn)行消費(fèi)。在配置消費(fèi)者時(shí),可以通過.then 方法去實(shí)現(xiàn)順序消費(fèi)。
I disruptor.handleEventswith(new OrderEventHandler())
then(new OrderEventHandler())
then(new OrderEventHandler());
handleEventsWith 與 handleEventsWithworkerPool 都是支持hen 的,它們可以結(jié)合使用。比可以按照消費(fèi)者A 消費(fèi)者B 消費(fèi)者C)->消費(fèi)者D 的消費(fèi)項(xiàng)序
1 disruptor.handleEventswith(new OrderEventHandler())
thenHandleEventsWithworkerPool(new OrderEventHandler(), new OrderEventHandler())
then(new OrderEventHandler());文章來源地址http://www.zghlxwxcb.cn/news/detail-634180.html
到了這里,關(guān)于【并發(fā)編程】無鎖環(huán)形隊(duì)列Disruptor并發(fā)框架使用的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!