分布式異步任務(wù)處理組件底層網(wǎng)絡(luò)通信模型的設(shè)計--如圖:文章來源:http://www.zghlxwxcb.cn/news/detail-625297.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-625297.html
- 使用Java原生NIO來實現(xiàn)TCP通信模型
- 普通節(jié)點維護(hù)一個網(wǎng)絡(luò)IO線程,負(fù)責(zé)和主節(jié)點的網(wǎng)絡(luò)數(shù)據(jù)通信連接--這里的網(wǎng)絡(luò)數(shù)據(jù)是指組件通信協(xié)議之下的直接面對字節(jié)流的數(shù)據(jù)讀寫,上層會有另一個線程負(fù)責(zé)網(wǎng)絡(luò)通信協(xié)議的實現(xiàn);---也就是說維護(hù)一個selector線程,負(fù)責(zé)處理socketchannel的IO事件;
- Leader節(jié)點網(wǎng)絡(luò)通信層有多個線程--一個selector線程負(fù)責(zé)接受其他節(jié)點的連接請求,然后為每個連接建立一個線程并分配單獨的selector來處理各自連接上的IO事件--如此設(shè)計的原因是各節(jié)點的狀態(tài)嚴(yán)格依賴與主節(jié)點的心跳和其他通信,防止主節(jié)點線程阻塞導(dǎo)致心跳失??;從而引發(fā)節(jié)點下線帶來的大量同步工作--后續(xù)會聊到;
- 各節(jié)點網(wǎng)絡(luò)通信線程之上會有一個線程專門負(fù)責(zé)組件的網(wǎng)絡(luò)通信協(xié)議,就是將網(wǎng)絡(luò)傳輸?shù)淖止?jié)流解碼成組件的通信協(xié)議包,因為NIO的buffer是數(shù)據(jù)塊,所以首先通過讀寫隊列將字節(jié)轉(zhuǎn)化為字節(jié)流,通過協(xié)議轉(zhuǎn)化為網(wǎng)絡(luò)通信命令包,同時解決粘包半包等問題;
- 網(wǎng)絡(luò)通信線程和協(xié)議實現(xiàn)線程之間通過讀寫兩個隊列來實現(xiàn)(網(wǎng)絡(luò)IO線程的讀隊列就是協(xié)議線程的寫隊列,反過來一樣,所以這里讀寫隊列是相對的;),為了保證性能,避免重復(fù)創(chuàng)建對象和對象回收,設(shè)計了ByteBuffer緩存機(jī)制和異步讀寫隊列數(shù)據(jù)結(jié)構(gòu)--詳細(xì)結(jié)構(gòu)如圖--
- 說一下三個隊列--讀寫隊列和緩存隊列,用來實現(xiàn)IO通信線程和協(xié)議通信線程之間的數(shù)據(jù)通信--兩個線程基本上會輪訓(xùn)處理網(wǎng)絡(luò)IO事件,和上層協(xié)議事件,基本過程如下--
- 從網(wǎng)絡(luò)IO線程角度出發(fā)--
- 當(dāng)產(chǎn)生可讀事件時,網(wǎng)絡(luò)IO線程會從緩存隊列中獲取一個空的ByteBuffer,這里設(shè)計為當(dāng)沒有可用的緩存Buffer對象時會新建一個--具體在隊列實現(xiàn)里講,可能會產(chǎn)生寫擴(kuò)張現(xiàn)象,后期性能優(yōu)化時考慮加入回收機(jī)制;
- 將socket緩沖區(qū)中的網(wǎng)絡(luò)數(shù)據(jù)read進(jìn)Buffer中,然后將Buffer對象入隊到IO寫隊列中;
- 然后檢查IO讀隊列不為空時,對IO讀隊列出隊,獲取要發(fā)送的數(shù)據(jù)Buffer對象,發(fā)送到其他節(jié)點中;
- 從網(wǎng)絡(luò)IO線程角度出發(fā)--
- 異步多線程隊列,支持兩個線程同時出隊入隊操作;原理和代碼貼下來,基本實現(xiàn):
package org.example.web.buffer;
import org.example.web.api.SocketBufferQueue;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
public class AsynchronousQueue<T extends AbstractBuffer> implements SocketBufferQueue {
//異步讀寫隊列實現(xiàn)原理;
/*
* 當(dāng)隊列中的元素個數(shù)>1時,讀線程和寫線程可以同時進(jìn)行,因為這時候不涉及操作共享變量
*當(dāng)隊列中的元素個數(shù)<=1時,讀寫隊列中只能有一個線程操作讀或者寫,因為此時會涉及隊列頭尾指針的操作;
* 實現(xiàn)原理,寫線程在獲取寫鎖時可以正常做寫操作:此時有兩種情況--
* 1,獲取寫鎖之后隊列為空,此時不會有讀線程做讀操作,只有獲得寫鎖的該線程可以put,put完成之后將頭尾指針同時指向改為以元素即可;此時隊列元素個數(shù)為1;
* 2,獲取寫鎖之后隊列中只有一個元素,這時也可以保證只有該線程在做寫入,因為只有一個元素的情況下,讀線程要讀取該元素必須同時獲得讀鎖和寫鎖;此時隊列元素個數(shù)為2;
* 3,讀線程獲取讀鎖之后有三種情況;size>1;size=1;size=0;
* 4, 重點是保證不能多個線程同時進(jìn)入隊列元素為零的狀態(tài);就是讀線程消費了最后一個元素,正好此時寫線程在隊列為空的時候?qū)懭耄x寫線程會同時操作頭尾指針,造成錯亂,所以在元素數(shù)量為1
* 的時候就要進(jìn)行同步操作;原理:
* 1.讀線程獲取讀鎖之后如果size=1,此時不會先消費,而是試圖獲取寫鎖,防止此時有寫線程同時操作,獲取寫鎖之后再判斷size是否為1,如果為1則做出隊操作,然后釋放寫鎖,如果為2則直接釋放寫鎖--再進(jìn)行出隊操作;
* 2,這里讀線程獲取讀鎖之后判斷size=1,再獲取讀鎖成功之后有兩種情況--
* 1,有寫線程在讀線程之前獲取到了寫鎖,則讀線程獲取到寫鎖的時候size>=2了(可能不止一個),
* 2,判斷size=1之后直接獲取到了寫鎖,此時就應(yīng)該阻塞其他寫線程做入隊操作,等待自己完成出隊操作之后再釋放寫鎖;
* 5,再說一下size怎么保證同步,
* 1,在size<=1的時候嚴(yán)格保證線程同步操作,保證size;
* 2,在size>1的時候,此時可以理解為隊列同時在出隊和入隊,size在兩個線程操作的時候先出隊-1還是先入隊+1其實是沒有關(guān)系的,因為原子操作保證了最后結(jié)果是沒有問題的就行;
* */
private AtomicInteger size;
protected T head;
protected T tail;
private Object readLock;
private Object writeLock;
//這里考慮使用cas還是Synchronized
AsynchronousQueue(){
this.writeLock=new Object();
this.readLock=new Object();
}
AsynchronousQueue(int initSize){
this();
this.size=new AtomicInteger(initSize);
}
//空隊列初始化要創(chuàng)建一個node
AsynchronousQueue(T node){
this(1);
this.head=node;
this.tail=this.head;
}
public boolean offerFirstOne(T node){
synchronized (this.writeLock){
if(this.size.get()>0){
return false;
}
this.head=this.tail=node;
return this.size.compareAndSet(0,1);
}
}
public boolean offer(T node){
preOfferElement(node);
synchronized (this.writeLock){
if(this.size.get()==0){
return this.offerFirstOne(node);
}else{
T temp=this.head;
node.next=temp;
temp.pre=node;
this.head=node;
}
return this.size.incrementAndGet() > 1;
}
}
private void preOfferElement(T bufferNode){
bufferNode.next=null;
bufferNode.pre=null;
}
public T pollLastOne(){
return this.size.compareAndSet(1,0)?this.tail:null;
}
public T poll(){
synchronized (this.readLock){
if(this.size.get()==0){
return null;
}
if(this.size.get()==1){
synchronized (this.writeLock){
if(this.size()>1){
return this.getTailElement();
}
if(this.size()==1){
this.pollLastOne();
}
}
}
return this.getTailElement();
}
}
private T getTailElement(){
if(this.size()>1){
this.tail= (T) this.tail.pre;
this.size.decrementAndGet();
return (T) this.tail.next;
}
return null;
}
public int size(){
return this.size.get();
}
public int increamentSize(){
return this.size.incrementAndGet();
}
public int decrementSize(){
return this.size.decrementAndGet();
}
private class BufferNode{
private ByteBuffer buffer;
private BufferNode pre;
private BufferNode next;
BufferNode(ByteBuffer byteBuffer){
this.buffer=byteBuffer;
}
BufferNode(){
}
}
}
到了這里,關(guān)于分布式異步任務(wù)處理組件(七)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!