国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

分布式異步任務(wù)處理組件(七)

這篇具有很好參考價值的文章主要介紹了分布式異步任務(wù)處理組件(七)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

分布式異步任務(wù)處理組件底層網(wǎng)絡(luò)通信模型的設(shè)計--如圖:

分布式異步任務(wù)處理組件(七),分布式文章來源地址http://www.zghlxwxcb.cn/news/detail-625297.html

  1. 使用Java原生NIO來實現(xiàn)TCP通信模型
  2. 普通節(jié)點維護(hù)一個網(wǎng)絡(luò)IO線程,負(fù)責(zé)和主節(jié)點的網(wǎng)絡(luò)數(shù)據(jù)通信連接--這里的網(wǎng)絡(luò)數(shù)據(jù)是指組件通信協(xié)議之下的直接面對字節(jié)流的數(shù)據(jù)讀寫,上層會有另一個線程負(fù)責(zé)網(wǎng)絡(luò)通信協(xié)議的實現(xiàn);---也就是說維護(hù)一個selector線程,負(fù)責(zé)處理socketchannel的IO事件;
  3. Leader節(jié)點網(wǎng)絡(luò)通信層有多個線程--一個selector線程負(fù)責(zé)接受其他節(jié)點的連接請求,然后為每個連接建立一個線程并分配單獨的selector來處理各自連接上的IO事件--如此設(shè)計的原因是各節(jié)點的狀態(tài)嚴(yán)格依賴與主節(jié)點的心跳和其他通信,防止主節(jié)點線程阻塞導(dǎo)致心跳失??;從而引發(fā)節(jié)點下線帶來的大量同步工作--后續(xù)會聊到;
  4. 各節(jié)點網(wǎng)絡(luò)通信線程之上會有一個線程專門負(fù)責(zé)組件的網(wǎng)絡(luò)通信協(xié)議,就是將網(wǎng)絡(luò)傳輸?shù)淖止?jié)流解碼成組件的通信協(xié)議包,因為NIO的buffer是數(shù)據(jù)塊,所以首先通過讀寫隊列將字節(jié)轉(zhuǎn)化為字節(jié)流,通過協(xié)議轉(zhuǎn)化為網(wǎng)絡(luò)通信命令包,同時解決粘包半包等問題;
  5. 網(wǎng)絡(luò)通信線程和協(xié)議實現(xiàn)線程之間通過讀寫兩個隊列來實現(xiàn)(網(wǎng)絡(luò)IO線程的讀隊列就是協(xié)議線程的寫隊列,反過來一樣,所以這里讀寫隊列是相對的;),為了保證性能,避免重復(fù)創(chuàng)建對象和對象回收,設(shè)計了ByteBuffer緩存機(jī)制和異步讀寫隊列數(shù)據(jù)結(jié)構(gòu)--詳細(xì)結(jié)構(gòu)如圖--分布式異步任務(wù)處理組件(七),分布式
  6. 說一下三個隊列--讀寫隊列和緩存隊列,用來實現(xiàn)IO通信線程和協(xié)議通信線程之間的數(shù)據(jù)通信--兩個線程基本上會輪訓(xùn)處理網(wǎng)絡(luò)IO事件,和上層協(xié)議事件,基本過程如下--
    1. 從網(wǎng)絡(luò)IO線程角度出發(fā)--
      1. 當(dāng)產(chǎn)生可讀事件時,網(wǎng)絡(luò)IO線程會從緩存隊列中獲取一個空的ByteBuffer,這里設(shè)計為當(dāng)沒有可用的緩存Buffer對象時會新建一個--具體在隊列實現(xiàn)里講,可能會產(chǎn)生寫擴(kuò)張現(xiàn)象,后期性能優(yōu)化時考慮加入回收機(jī)制;
      2. 將socket緩沖區(qū)中的網(wǎng)絡(luò)數(shù)據(jù)read進(jìn)Buffer中,然后將Buffer對象入隊到IO寫隊列中;
      3. 然后檢查IO讀隊列不為空時,對IO讀隊列出隊,獲取要發(fā)送的數(shù)據(jù)Buffer對象,發(fā)送到其他節(jié)點中;
  7. 異步多線程隊列,支持兩個線程同時出隊入隊操作;原理和代碼貼下來,基本實現(xiàn):
package org.example.web.buffer;

import org.example.web.api.SocketBufferQueue;

import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;

public class AsynchronousQueue<T extends AbstractBuffer> implements SocketBufferQueue {
    //異步讀寫隊列實現(xiàn)原理;
    /*
    * 當(dāng)隊列中的元素個數(shù)>1時,讀線程和寫線程可以同時進(jìn)行,因為這時候不涉及操作共享變量
    *當(dāng)隊列中的元素個數(shù)<=1時,讀寫隊列中只能有一個線程操作讀或者寫,因為此時會涉及隊列頭尾指針的操作;
    * 實現(xiàn)原理,寫線程在獲取寫鎖時可以正常做寫操作:此時有兩種情況--
    *     1,獲取寫鎖之后隊列為空,此時不會有讀線程做讀操作,只有獲得寫鎖的該線程可以put,put完成之后將頭尾指針同時指向改為以元素即可;此時隊列元素個數(shù)為1;
    *     2,獲取寫鎖之后隊列中只有一個元素,這時也可以保證只有該線程在做寫入,因為只有一個元素的情況下,讀線程要讀取該元素必須同時獲得讀鎖和寫鎖;此時隊列元素個數(shù)為2;
    *     3,讀線程獲取讀鎖之后有三種情況;size>1;size=1;size=0;
    *     4, 重點是保證不能多個線程同時進(jìn)入隊列元素為零的狀態(tài);就是讀線程消費了最后一個元素,正好此時寫線程在隊列為空的時候?qū)懭耄x寫線程會同時操作頭尾指針,造成錯亂,所以在元素數(shù)量為1
    * 的時候就要進(jìn)行同步操作;原理:
    *           1.讀線程獲取讀鎖之后如果size=1,此時不會先消費,而是試圖獲取寫鎖,防止此時有寫線程同時操作,獲取寫鎖之后再判斷size是否為1,如果為1則做出隊操作,然后釋放寫鎖,如果為2則直接釋放寫鎖--再進(jìn)行出隊操作;
    *           2,這里讀線程獲取讀鎖之后判斷size=1,再獲取讀鎖成功之后有兩種情況--
    *                   1,有寫線程在讀線程之前獲取到了寫鎖,則讀線程獲取到寫鎖的時候size>=2了(可能不止一個),
    *                   2,判斷size=1之后直接獲取到了寫鎖,此時就應(yīng)該阻塞其他寫線程做入隊操作,等待自己完成出隊操作之后再釋放寫鎖;
    *     5,再說一下size怎么保證同步,
    *           1,在size<=1的時候嚴(yán)格保證線程同步操作,保證size;
    *           2,在size>1的時候,此時可以理解為隊列同時在出隊和入隊,size在兩個線程操作的時候先出隊-1還是先入隊+1其實是沒有關(guān)系的,因為原子操作保證了最后結(jié)果是沒有問題的就行;
    * */
    private AtomicInteger size;
    protected T head;
    protected T tail;
    private Object readLock;
    private Object writeLock;
    //這里考慮使用cas還是Synchronized


    AsynchronousQueue(){
        this.writeLock=new Object();
        this.readLock=new Object();
    }
    AsynchronousQueue(int initSize){
        this();
        this.size=new AtomicInteger(initSize);
    }
    //空隊列初始化要創(chuàng)建一個node
    AsynchronousQueue(T node){
        this(1);
        this.head=node;
        this.tail=this.head;
    }
    public boolean offerFirstOne(T node){
        synchronized (this.writeLock){
            if(this.size.get()>0){
                return false;
            }
            this.head=this.tail=node;
            return this.size.compareAndSet(0,1);
        }
    }

    public boolean offer(T node){
        preOfferElement(node);
        synchronized (this.writeLock){
            if(this.size.get()==0){
                return this.offerFirstOne(node);
            }else{
                T temp=this.head;
                node.next=temp;
                temp.pre=node;
                this.head=node;
            }
            return this.size.incrementAndGet() > 1;
        }
    }
    private void preOfferElement(T bufferNode){
        bufferNode.next=null;
        bufferNode.pre=null;
    }
    public T pollLastOne(){
        return this.size.compareAndSet(1,0)?this.tail:null;
    }

    public T poll(){
        synchronized (this.readLock){
            if(this.size.get()==0){
                return null;
            }
            if(this.size.get()==1){
                synchronized (this.writeLock){
                    if(this.size()>1){
                        return this.getTailElement();
                    }
                    if(this.size()==1){
                        this.pollLastOne();
                    }
                }
            }
            return this.getTailElement();
        }
    }

    private T getTailElement(){
        if(this.size()>1){
            this.tail= (T) this.tail.pre;
            this.size.decrementAndGet();
            return (T) this.tail.next;
        }
        return null;
    }

    public int size(){
        return this.size.get();
    }
    public int increamentSize(){
        return this.size.incrementAndGet();
    }
    public int decrementSize(){
        return this.size.decrementAndGet();
    }
    private class BufferNode{
        private ByteBuffer buffer;
        private BufferNode pre;
        private BufferNode next;
        BufferNode(ByteBuffer byteBuffer){
            this.buffer=byteBuffer;
        }
        BufferNode(){
        }
    }
}

到了這里,關(guān)于分布式異步任務(wù)處理組件(七)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Asynq: 基于Redis實現(xiàn)的Go生態(tài)分布式任務(wù)隊列和異步處理庫

    Asynq: 基于Redis實現(xiàn)的Go生態(tài)分布式任務(wù)隊列和異步處理庫

    Asynq [1] 是一個Go實現(xiàn)的分布式任務(wù)隊列和異步處理庫,基于redis,類似Ruby的 sidekiq [2] 和Python的 celery [3] 。Go生態(tài)類似的還有 machinery [4] 和goworker 同時提供一個WebUI asynqmon [5] ,可以源碼形式安裝或使用Docker image, 還可以和Prometheus集成 docker run --rm --name asynqmon -p 8080:8080 hibiken/as

    2024年02月14日
    瀏覽(22)
  • 分布式異步任務(wù)框架celery

    Celery是一個基于消息中間件的分布式任務(wù)隊列框架,專門用于處理異步任務(wù)。它允許生產(chǎn)者發(fā)送任務(wù)到消息隊列,而消費者則負(fù)責(zé)處理這些任務(wù)。Celery的核心特性包括異步執(zhí)行、實時操作支持以及強(qiáng)大的調(diào)度能力,使其每天可以處理數(shù)以百萬計的任務(wù)。 在Celery中,任務(wù)是以

    2024年04月10日
    瀏覽(23)
  • celery分布式異步任務(wù)隊列-4.4.7

    version 4.4.7 學(xué)習(xí)總結(jié) python實現(xiàn)、開源、遵循BSD許可的分布式任務(wù)隊列; 可以處理大量消息,簡單、靈活、可靠的分布式系統(tǒng),專注任務(wù)的 實時處理 和 定時調(diào)度 處理; 它是線程、進(jìn)程分配任務(wù)的一種機(jī)制,官方僅做支持linux開發(fā)。 五大部分: task,任務(wù) beat,定時調(diào)度管理器

    2024年02月07日
    瀏覽(46)
  • 分布式任務(wù)調(diào)度,定時任務(wù)的處理方案

    分布式任務(wù)調(diào)度,定時任務(wù)的處理方案

    適用場景: Spring 定時任務(wù)是 Spring 框架提供的一種輕量級的任務(wù)調(diào)度方案,它的特點是簡單易用、輕量級。Spring 定時任務(wù)的執(zhí)行是在 單個節(jié)點 上進(jìn)行的,如果需要分布式任務(wù)調(diào)度,需要自己實現(xiàn)相應(yīng)的解決方案。 1.導(dǎo)入依賴版本自己控制 2.啟動類加上@EnableScheduling 3.編寫業(yè)

    2023年04月14日
    瀏覽(46)
  • 4.4 媒資管理模塊 - 分布式任務(wù)處理介紹、視頻處理技術(shù)方案

    4.4 媒資管理模塊 - 分布式任務(wù)處理介紹、視頻處理技術(shù)方案

    視頻轉(zhuǎn)碼是指的對視頻文件的編碼格式進(jìn)行轉(zhuǎn)換 視頻上傳成功需要對視頻的格式進(jìn)行轉(zhuǎn)碼處理,比如:avi轉(zhuǎn)成mp4 一般做文件存儲的服務(wù)都需要對文件進(jìn)行處理,例如對視頻進(jìn)行轉(zhuǎn)碼處理,可能由于文件量較大需要使用多線程等技術(shù)進(jìn)行高效處理 文件格式 :是指.mp4、.avi、

    2024年02月02日
    瀏覽(33)
  • Celery分布式異步框架

    Celery分布式異步框架

    \\\"\\\"\\\" 1)可以不依賴任何服務(wù)器,通過自身命令,啟動服務(wù)(內(nèi)部支持socket) 2)celery服務(wù)為為其他項目服務(wù)提供異步解決任務(wù)需求的 注:會有兩個服務(wù)同時運(yùn)行,一個是項目服務(wù),一個是celery服務(wù),項目服務(wù)將需要異步處理的任務(wù)交給celery服務(wù),celery就會在需要時異步完成項目的

    2024年02月11日
    瀏覽(26)
  • 分布式定時任務(wù)

    分布式定時任務(wù)

    本文引用了谷粒商城的課程 定時任務(wù)是我們系統(tǒng)里面經(jīng)常要用到的一些功能。如每天的支付訂單要與支付寶進(jìn)行對賬操作、每個月定期進(jìn)行財務(wù)匯總、在服務(wù)空閑時定時統(tǒng)計當(dāng)天所有信息數(shù)據(jù)等。 定時任務(wù)有個非常流行的框架Quartz和Java原生API的Timer類。Spring框架也可以支持

    2023年04月15日
    瀏覽(25)
  • 分布式、鎖、延時任務(wù)

    分布式、鎖、延時任務(wù)

    Redis分布式鎖-這一篇全了解(Redission實現(xiàn)分布式鎖完美方案) ls / / 下有哪些子節(jié)點 get /zookeeper 查看某個子節(jié)點內(nèi)容 create /aa “test” delete /aa set /aa “test01” 模式 默認(rèn)創(chuàng)建永久 create -e 創(chuàng)建臨時 create -e /zz “hello zz” create -s 創(chuàng)建 有序節(jié)點 create -s -e 臨時序列化節(jié)點 一次性的監(jiān)

    2024年02月09日
    瀏覽(31)
  • 分布式任務(wù)調(diào)度系統(tǒng)分析

    分布式任務(wù)調(diào)度系統(tǒng)分析

    首先,我們來思考一些幾個業(yè)務(wù)場景: XX 信用卡中心,每月 28 日凌晨 1:00 到 3:00 需要完成全網(wǎng)用戶當(dāng)月的費用清單的生成 XX 電商平臺,需要每天上午 9:00 開始向會員推送送優(yōu)惠券使用提醒 XX 公司,需要定時執(zhí)行 Python 腳本,清理掉某文件服務(wù)系統(tǒng)中無效的 tmp 文件 最開始,

    2023年04月22日
    瀏覽(39)
  • ray-分布式計算框架-集群與異步Job管理

    ray-分布式計算框架-集群與異步Job管理

    0. ray 簡介 ray是開源分布式計算框架,為并行處理提供計算層,用于擴(kuò)展AI與Python應(yīng)用程序,是ML工作負(fù)載統(tǒng)一工具包 Ray AI Runtime ML應(yīng)用程序庫集 Ray Core 通用分布式計算庫 Task -- Ray允許任意Python函數(shù)在單獨的Python worker上運(yùn)行,這些異步Python函數(shù)稱為任務(wù) Actor -- 從函數(shù)擴(kuò)展到類

    2023年04月25日
    瀏覽(61)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包