国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

使用雙異步后,如何保證數(shù)據(jù)一致性?

這篇具有很好參考價(jià)值的文章主要介紹了使用雙異步后,如何保證數(shù)據(jù)一致性?。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

使用雙異步后,如何保證數(shù)據(jù)一致性?,搬磚工逆襲Java架構(gòu)師,java,高并發(fā),異步,線程池,學(xué)習(xí)

大家好,我是哪吒。

一、前情提要

在上一篇文章中,我們通過雙異步的方式導(dǎo)入了10萬行的Excel,有個(gè)小伙伴在評(píng)論區(qū)問我,如何保證插入后數(shù)據(jù)的一致性呢?

很簡(jiǎn)單,通過對(duì)比Excel文件行數(shù)和入庫(kù)數(shù)量是否相等即可。

那么,如何獲取異步線程的返回值呢?

使用雙異步后,如何保證數(shù)據(jù)一致性?,搬磚工逆襲Java架構(gòu)師,java,高并發(fā),異步,線程池,學(xué)習(xí)

二、通過Future獲取異步返回值

我們可以通過給異步方法添加Future返回值的方式獲取結(jié)果。

FutureTask 除了實(shí)現(xiàn) Future 接口外,還實(shí)現(xiàn)了 Runnable 接口。因此,F(xiàn)utureTask 可以交給 Executor 執(zhí)行,也可以由調(diào)用線程直接執(zhí)行FutureTask.run()。

1、FutureTask 是基于 AbstractQueuedSynchronizer實(shí)現(xiàn)的

AbstractQueuedSynchronizer簡(jiǎn)稱AQS,它是一個(gè)同步框架,它提供通用機(jī)制來原子性管理同步狀態(tài)、阻塞和喚醒線程,以及 維護(hù)被阻塞線程的隊(duì)列。
基于 AQS 實(shí)現(xiàn)的同步器包括: ReentrantLock、Semaphore、ReentrantReadWriteLock、 CountDownLatch 和 FutureTask。

基于 AQS實(shí)現(xiàn)的同步器包含兩種操作:

  1. acquire,阻塞調(diào)用線程,直到AQS的狀態(tài)允許這個(gè)線程繼續(xù)執(zhí)行,在FutureTask中,get()就是這個(gè)方法;
  2. release,改變AQS的狀態(tài),使state變?yōu)榉亲枞麪顟B(tài),在FutureTask中,可以通過run()和cancel()實(shí)現(xiàn)。

2、FutureTask執(zhí)行流程

使用雙異步后,如何保證數(shù)據(jù)一致性?,搬磚工逆襲Java架構(gòu)師,java,高并發(fā),異步,線程池,學(xué)習(xí)

  1. 執(zhí)行@Async異步方法;
  2. 建立新線程async-executor-X,執(zhí)行Runnable的run()方法,(FutureTask實(shí)現(xiàn)RunnableFuture,RunnableFuture實(shí)現(xiàn)Runnable);
  3. 判斷狀態(tài)state;
    • 如果未新建或者不處于AQS,直接返回;
    • 否則進(jìn)入COMPLETING狀態(tài),執(zhí)行異步線程代碼;
  4. 如果執(zhí)行cancel()方法改變AQS的狀態(tài)時(shí),會(huì)喚醒AQS等待隊(duì)列中的第一個(gè)線程線程async-executor-1;
  5. 線程async-executor-1被喚醒后
    • 將自己從AQS隊(duì)列中移除;
    • 然后喚醒next線程async-executor-2;
    • 改變線程async-executor-1的state;
    • 等待get()線程取值。
  6. next等待線程被喚醒后,循環(huán)線程async-executor-1的步驟
    • 被喚醒
    • 從AQS隊(duì)列中移除
    • 喚醒next線程
    • 改變異步線程狀態(tài)
  7. 新建線程async-executor-N,監(jiān)聽異步方法的state
    • 如果處于EXCEPTIONAL以上狀態(tài),拋出異常;
    • 如果處于COMPLETING狀態(tài),加入AQS隊(duì)列等待;
    • 如果處于NORMAL狀態(tài),返回結(jié)果;

3、get()方法執(zhí)行流程

get()方法通過判斷狀態(tài)state觀測(cè)異步線程是否已結(jié)束,如果結(jié)束直接將結(jié)果返回,否則會(huì)將等待節(jié)點(diǎn)扔進(jìn)等待隊(duì)列自旋,阻塞住線程。

自旋直至異步線程執(zhí)行完畢,獲取另一邊的線程計(jì)算出結(jié)果或取消后,將等待隊(duì)列里的所有節(jié)點(diǎn)依次喚醒并移除隊(duì)列。

使用雙異步后,如何保證數(shù)據(jù)一致性?,搬磚工逆襲Java架構(gòu)師,java,高并發(fā),異步,線程池,學(xué)習(xí)

  1. 如果state小于等于COMPLETING,表示任務(wù)還在執(zhí)行中;
    • 如果線程被中斷,從等待隊(duì)列中移除等待節(jié)點(diǎn)WaitNode,拋出中斷異常;
    • 如果state大于COMPLETING;
      • 如果已有等待節(jié)點(diǎn)WaitNode,將線程置空;
      • 返回當(dāng)前狀態(tài);
    • 如果任務(wù)正在執(zhí)行,讓出時(shí)間片;
    • 如果還未構(gòu)造等待節(jié)點(diǎn),則new一個(gè)新的等待節(jié)點(diǎn);
    • 如果未入隊(duì)列,CAS嘗試入隊(duì);
    • 如果有超時(shí)時(shí)間參數(shù);
      • 計(jì)算超時(shí)時(shí)間;
      • 如果超時(shí),則從等待隊(duì)列中移除等待節(jié)點(diǎn)WaitNode,返回當(dāng)前狀態(tài)state;
      • 阻塞隊(duì)列nanos毫秒。
    • 否則阻塞隊(duì)列;
  2. 如果state大于COMPLETING;
    • 如果執(zhí)行完畢,返回結(jié)果;
    • 如果大于等于取消狀態(tài),則拋出異常。

很多小朋友對(duì)讀源碼,嗤之以鼻,工作3年、5年,還是沒認(rèn)真讀過任何源碼,覺得讀了也沒啥用,或者讀了也看不懂~

其實(shí),只要把源碼的執(zhí)行流程通過畫圖的形式呈現(xiàn)出來,你就會(huì)幡然醒悟,原來是這樣的~

簡(jiǎn)而言之:

1. 如果異步線程還沒執(zhí)行完,則進(jìn)入CAS自旋;
2. 其它線程獲取結(jié)果或取消后,重新喚醒CAS隊(duì)列中等待的線程;
3. 再通過get()判斷狀態(tài)state;
4. 直至返回結(jié)果或(取消、超時(shí)、異常)為止。

三、FutureTask源碼具體分析

1、FutureTask源碼

通過定義整形狀態(tài)值,判斷state大小,這個(gè)思想很有意思,值得學(xué)習(xí)。

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}
public class FutureTask<V> implements RunnableFuture<V> {

	// 最初始的狀態(tài)是new 新建狀態(tài)
	private volatile int state;
    private static final int NEW          = 0; // 新建狀態(tài)
    private static final int COMPLETING   = 1; // 完成中
    private static final int NORMAL       = 2; // 正常執(zhí)行完
    private static final int EXCEPTIONAL  = 3; // 異常
    private static final int CANCELLED    = 4; // 取消
    private static final int INTERRUPTING = 5; // 正在中斷
    private static final int INTERRUPTED  = 6; // 已中斷

	public V get() throws InterruptedException, ExecutionException {
	    int s = state;
	    // 任務(wù)還在執(zhí)行中
	    if (s <= COMPLETING)
	        s = awaitDone(false, 0L);
	    return report(s);
	}
	
	private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
        	// 線程被中斷,從等待隊(duì)列中移除等待節(jié)點(diǎn)WaitNode,拋出中斷異常
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            // 任務(wù)已執(zhí)行完畢或取消
            if (s > COMPLETING) {
            	// 如果已有等待節(jié)點(diǎn)WaitNode,將線程置空
                if (q != null)
                    q.thread = null;
                return s;
            }
            // 任務(wù)正在執(zhí)行,讓出時(shí)間片
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            // 還未構(gòu)造等待節(jié)點(diǎn),則new一個(gè)新的等待節(jié)點(diǎn)
            else if (q == null)
                q = new WaitNode();
            // 未入隊(duì)列,CAS嘗試入隊(duì)
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            // 如果有超時(shí)時(shí)間參數(shù)
            else if (timed) {
            	// 計(jì)算超時(shí)時(shí)間
                nanos = deadline - System.nanoTime();
                // 如果超時(shí),則從等待隊(duì)列中移除等待節(jié)點(diǎn)WaitNode,返回當(dāng)前狀態(tài)state
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                // 阻塞隊(duì)列nanos毫秒
                LockSupport.parkNanos(this, nanos);
            }
            else
            	// 阻塞隊(duì)列
                LockSupport.park(this);
        }
    }
    
	private V report(int s) throws ExecutionException {
		// 獲取outcome中記錄的返回結(jié)果
        Object x = outcome;
        // 如果執(zhí)行完畢,返回結(jié)果
        if (s == NORMAL)
            return (V)x;
            // 如果大于等于取消狀態(tài),則拋出異常
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }
}
2、將異步方法的返回值改為Future<Integer>,將返回值放到new AsyncResult<>();中;
@Async("async-executor")
public void readXls(String filePath, String filename) {
    try {
    	// 此代碼為簡(jiǎn)化關(guān)鍵性代碼
        List<Future<Integer>> futureList = new ArrayList<>();
        for (int time = 0; time < times; time++) {
            Future<Integer> sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsync();
            futureList.add(sumFuture);
        }
    }catch (Exception e){
        logger.error("readXlsCacheAsync---插入數(shù)據(jù)異常:",e);
    }
}
@Async("async-executor")
public Future<Integer> readXlsCacheAsync() {
    try {
        // 此代碼為簡(jiǎn)化關(guān)鍵性代碼
        return new AsyncResult<>(sum);
    }catch (Exception e){
        return new AsyncResult<>(0);
    }
}

3、通過Future<Integer>.get()獲取返回值:

public static boolean getFutureResult(List<Future<Integer>> futureList, int excelRow) throws Exception{
    int[] futureSumArr = new int[futureList.size()];
    for (int i = 0;i<futureList.size();i++) {
        try {
            Future<Integer> future = futureList.get(i);
            while (true) {
                if (future.isDone() && !future.isCancelled()) {
                    Integer futureSum = future.get();
                    logger.info("獲取Future返回值成功"+"----Future:" + future
                            + ",Result:" + futureSum);
                    futureSumArr[i] += futureSum;
                    break;
                } else {
                    logger.info("Future正在執(zhí)行---獲取Future返回值中---等待3秒");
                    Thread.sleep(3000);
                }
            }
        } catch (Exception e) {
            logger.error("獲取Future返回值異常: ", e);
        }
    }
    
    boolean insertFlag = getInsertSum(futureSumArr, excelRow);
    logger.info("獲取所有異步線程Future的返回值成功,Excel插入結(jié)果="+insertFlag);
    return insertFlag;
}

4、這里也可以通過新線程+Future獲取Future返回值

不過感覺多此一舉了,就當(dāng)練習(xí)Future異步取返回值了~

public static Future<Boolean> getFutureResultThreadFuture(List<Future<Integer>> futureList, int excelRow) throws Exception {
    ExecutorService service = Executors.newSingleThreadExecutor();
    final boolean[] insertFlag = {false};
    service.execute(new Runnable() {
        public void run() {
            try {
                insertFlag[0] = getFutureResult(futureList, excelRow);
            } catch (Exception e) {
                logger.error("新線程+Future獲取Future返回值異常: ", e);
                insertFlag[0] = false;
            }
        }
    });
    service.shutdown();
    return new AsyncResult<>(insertFlag[0]);
}

獲取異步線程結(jié)果后,我們可以通過添加事務(wù)的方式,實(shí)現(xiàn)Excel入庫(kù)操作的數(shù)據(jù)一致性。

但Future會(huì)造成主線程的阻塞,這個(gè)就很不友好了,有沒有更優(yōu)解呢?


在BUG中磨礪,在優(yōu)化中成長(zhǎng)

使用雙異步后,從 191s 優(yōu)化到 2s

增加索引 + 異步 + 不落地后,從 12h 優(yōu)化到 15 min

使用懶加載 + 零拷貝后,程序的秒開率提升至99.99%

性能優(yōu)化2.0,新增緩存后,程序的秒開率不升反降


??文章收錄于:100天精通Java從入門到就業(yè)

全網(wǎng)最細(xì)Java零基礎(chǔ)手把手入門教程,系列課程包括:Java基礎(chǔ)、Java8新特性、Java集合、高并發(fā)、性能優(yōu)化等,適合零基礎(chǔ)和進(jìn)階提升的同學(xué)。

??哪吒多年工作總結(jié):Java學(xué)習(xí)路線總結(jié),搬磚工逆襲Java架構(gòu)師

華為OD機(jī)試 2023B卷題庫(kù)瘋狂收錄中,刷題點(diǎn)這里

刷的越多,抽中的概率越大,每一題都有詳細(xì)的答題思路、詳細(xì)的代碼注釋、樣例測(cè)試,發(fā)現(xiàn)新題目,隨時(shí)更新,全天CSDN在線答疑。文章來源地址http://www.zghlxwxcb.cn/news/detail-817533.html

到了這里,關(guān)于使用雙異步后,如何保證數(shù)據(jù)一致性?的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 如何保證緩存和數(shù)據(jù)庫(kù)的數(shù)據(jù)一致性

    如何保證緩存和數(shù)據(jù)庫(kù)的數(shù)據(jù)一致性

    若數(shù)據(jù)庫(kù)更新成功,刪除緩存操作失敗,則此后讀到的都是緩存中過期的數(shù)據(jù),造成不一致問題。 同刪除緩存策略一樣,若數(shù)據(jù)庫(kù)更新成功緩存更新失敗則會(huì)造成數(shù)據(jù)不一致問題。 若緩存更新成功數(shù)據(jù)庫(kù)更新失敗, 則此后讀到的都是未持久化的數(shù)據(jù)。因?yàn)榫彺嬷械臄?shù)據(jù)是易

    2023年04月19日
    瀏覽(40)
  • MySQL和Redis如何保證數(shù)據(jù)一致性

    MySQL和Redis如何保證數(shù)據(jù)一致性

    MySQL與Redis都是常用的數(shù)據(jù)存儲(chǔ)和緩存系統(tǒng)。為了提高應(yīng)用程序的性能和可伸縮性,很多應(yīng)用程序?qū)ySQL和Redis一起使用,其中MySQL作為主要的持久存儲(chǔ),而Redis作為主要的緩存。在這種情況下,應(yīng)用程序需要確保MySQL和Redis中的數(shù)據(jù)是同步的,以確保數(shù)據(jù)的一致性。 “數(shù)據(jù)一致

    2024年02月12日
    瀏覽(28)
  • MySQL和Redis如何保證數(shù)據(jù)一致性?

    MySQL和Redis如何保證數(shù)據(jù)一致性?

    由于緩存的高并發(fā)和高性能已經(jīng)在各種項(xiàng)目中被廣泛使用,在讀取緩存這方面基本都是一致的,大概都是按照下圖的流程進(jìn)行操作: 但是在更新緩存方面,是更新完數(shù)據(jù)庫(kù)再更新緩存還是直接刪除緩存呢?又或者是先刪除緩存再更新數(shù)據(jù)庫(kù)?在這一點(diǎn)上就值得探討了。 在實(shí)

    2024年02月01日
    瀏覽(26)
  • 如何保證ES和數(shù)據(jù)庫(kù)的數(shù)據(jù)一致性?

    如何保證ES和數(shù)據(jù)庫(kù)的數(shù)據(jù)一致性?

    在業(yè)務(wù)中,我們通常需要把數(shù)據(jù)庫(kù)中的數(shù)據(jù)變更同步到ES中,那么如何保證數(shù)據(jù)庫(kù)和ES的一致性呢?通常有以下幾種做法: 雙寫 在代碼中,對(duì)數(shù)據(jù)庫(kù)和ES進(jìn)行雙寫,并且先操作本地?cái)?shù)據(jù)庫(kù),后操作ES,而且還需要把兩個(gè)操作放到一個(gè)事務(wù)中: ?在以上邏輯中,如果寫數(shù)據(jù)庫(kù)成功

    2024年04月28日
    瀏覽(23)
  • flink如何利用checkpoint保證數(shù)據(jù)狀態(tài)一致性

    flink如何利用checkpoint保證數(shù)據(jù)狀態(tài)一致性

    這本質(zhì)上是一『盡力而為』的方法。保證數(shù)據(jù)或事件最多由應(yīng)用程序中的所有算子處理一次。 這意味著如果數(shù)據(jù)在被流應(yīng)用程序完全處理之前發(fā)生丟失,則不會(huì)進(jìn)行其他重試或者重新發(fā)送。下圖中的例子說明了這種情況。 應(yīng)用程序中的所有算子都保證數(shù)據(jù)或事件至少被處理

    2024年02月21日
    瀏覽(35)
  • Redis如何保證緩存和數(shù)據(jù)庫(kù)一致性?

    現(xiàn)在我們?cè)诿嫦蛟鰟h改查開發(fā)時(shí),數(shù)據(jù)庫(kù)數(shù)據(jù)量大時(shí)或者對(duì)響應(yīng)要求較快,我們就需要用到Redis來拿取數(shù)據(jù)。 Redis:是一種高性能的內(nèi)存數(shù)據(jù)庫(kù),它將數(shù)據(jù)以鍵值對(duì)的形式存儲(chǔ)在內(nèi)存中,具有讀寫速度快、支持多種數(shù)據(jù)類型、原子性操作、豐富的特性等優(yōu)勢(shì)。 優(yōu)勢(shì): 性能極高

    2024年01月16日
    瀏覽(41)
  • Redis---數(shù)據(jù)庫(kù)和緩存如何保證一致性?

    用「讀 + 寫」請(qǐng)求的并發(fā)的場(chǎng)景來分析: 假如某個(gè)用戶數(shù)據(jù)在緩存中不存在,請(qǐng)求 A 讀取數(shù)據(jù)時(shí)從數(shù)據(jù)庫(kù)中查詢到年齡為 20,在未寫入緩存中時(shí)另一個(gè)請(qǐng)求 B 更新數(shù)據(jù)。它更新數(shù)據(jù)庫(kù)中的年齡為 21,并且清空緩存。這時(shí)請(qǐng)求 A 把從數(shù)據(jù)庫(kù)中讀到的年齡為 20 的數(shù)據(jù)寫入到緩存

    2024年01月24日
    瀏覽(27)
  • 并發(fā)情況如何實(shí)現(xiàn)加鎖來保證數(shù)據(jù)一致性?

    ReentrantLock(可重入鎖),指的是一個(gè)線程再次對(duì)已持有的鎖保護(hù)的臨界資源時(shí),重入請(qǐng)求將會(huì)成功。 簡(jiǎn)單的與我們常用的Synchronized進(jìn)行比較: ReentrantLock Synchronized 鎖實(shí)現(xiàn)機(jī)制 依賴AQS 監(jiān)視器模式 靈活性 支持響應(yīng)超時(shí)、中斷、嘗試獲取鎖 不靈活 釋放形式 必須顯示調(diào)用unloc

    2024年02月05日
    瀏覽(29)
  • mysql和redis如何保證數(shù)據(jù)庫(kù)一致性

    如果對(duì)于小公司的單機(jī)服務(wù)器來說在更新和刪除mysql數(shù)據(jù)的同時(shí)對(duì)redis緩存進(jìn)行更新或者刪除就行,一般有兩個(gè)選擇,例如: 先更新MySQL,后刪除(或更新)Redis 先刪除(或更新)Redis,后更新MySQL 但是不管使用其中哪種方式,都存在兩個(gè)可能的問題: 由于第一步與第二步并不是原

    2023年04月24日
    瀏覽(25)
  • 如何保證數(shù)據(jù)庫(kù)和緩存雙寫一致性?

    如何保證數(shù)據(jù)庫(kù)和緩存雙寫一致性?

    如何保證數(shù)據(jù)庫(kù)和緩存雙寫一致性,是面試中經(jīng)常被問的一個(gè)技術(shù)問題,程序汪推薦大家有必要好好研究一波 數(shù)據(jù)庫(kù)和緩存(比如:redis)雙寫數(shù)據(jù)一致性問題,是一個(gè)跟開發(fā)語(yǔ)言無關(guān)的公共問題。尤其在高并發(fā)的場(chǎng)景下,這個(gè)問題變得更加嚴(yán)重。 我很負(fù)責(zé)的告訴大家,該問

    2024年01月18日
    瀏覽(41)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包