国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

深入理解Java線程池ThreadPoolExcutor實現(xiàn)原理、數(shù)據(jù)結構和算法(源碼解析)

這篇具有很好參考價值的文章主要介紹了深入理解Java線程池ThreadPoolExcutor實現(xiàn)原理、數(shù)據(jù)結構和算法(源碼解析)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

什么是線程池?

????????線程池主要是為了解決執(zhí)行新任務執(zhí)行時,應用程序為減少為任務創(chuàng)建一個新線程和任務執(zhí)行完畢時銷毀線程所帶來的開銷。通過線程池,可以在項目初始化時就創(chuàng)建一個線程集合,然后在需要執(zhí)行新任務時重用這些線程而不是每次都新建一個線程,一旦任務已經(jīng)完成了,線程回到線程池中并等待下一次分配任務,達到資源復用的效果。

線程池主要優(yōu)勢?

  • 降低資源消耗:通過池化技術重復利用已創(chuàng)建的線程,降低線程創(chuàng)建和銷毀造成的損耗。
  • 提高響應速度:任務到達時,無需等待線程創(chuàng)建即可立即執(zhí)行。
  • 提高線程的可管理性:線程是稀缺資源,如果無限制創(chuàng)建,不僅會消耗系統(tǒng)資源,還會因為線程的不合理分布導致資源調度失衡,降低系統(tǒng)的穩(wěn)定性。使用線程池可以進行統(tǒng)一的分配、調優(yōu)和監(jiān)控。
  • 提供更多更強大的功能:線程池具備可拓展性,允許開發(fā)人員向其中增加更多的功能。比如延時定時線程池ScheduledThreadPoolExecutor,就允許任務延期執(zhí)行或定期執(zhí)行。

如何創(chuàng)建線程?

通過Executors

創(chuàng)建線程池

newSingleThreadExecutor:創(chuàng)建一個只有一個線程的線程池,串行執(zhí)行所有任務,即使空閑時也不會被關閉??梢员WC所有任務的執(zhí)行順序按照任務的提交順序執(zhí)行。如果這個唯一的線程因為異常結束,那么會有一個新的線程來替代它。

適用場景:需要保證順序地執(zhí)行各個任務;并且在任意時間點,不會有多個線程活動的應用場景。

newFixedThreadPool:創(chuàng)建一個固定線程數(shù)量的線程池(corePoolSize == maximumPoolSize,使用LinkedBlockingQuene作為阻塞隊列)。初始化時線程數(shù)量為零,之后每次提交一個任務就創(chuàng)建一個線程,直到線程達到線程池的最大容量。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執(zhí)行異常而結束,那么線程池會補充一個新線程。

適用場景:為了滿足資源管理的需求,而需要限制當前線程數(shù)量的應用場景,它適用于負載比較重的服務器。

newCachedThreadPool:創(chuàng)建一個可緩存的線程池,線程的最大數(shù)量為Integer.MAX_VALUE??臻e線程會臨時緩存下來,線程會等待60s還是沒有任務加入的話就會被關閉。

適用場景:適用于執(zhí)行很多的短時間異步任務的小程序,或者是負載較輕的服務器。

newScheduledThreadPool:創(chuàng)建一個支持執(zhí)行延遲任務或者周期性執(zhí)行任務的線程池。

ThreadPoolExecutor

阿里巴巴開發(fā)手冊并發(fā)編程有一條規(guī)定:線程池不允許使用Executors去創(chuàng)建,而是通過ThreadPoolExecutor的方式,這是為什么呢?主要是因為這樣的可以避免資源耗盡的風險,因為使用Executors返回線程池對象的弊端有:

FixedThreadPool 和 SingleThreadPool 允許的阻塞隊列長度為 Integer.MAX_VALUE,這樣會導致堆積大量的請求,從而導致OOM;

CachedThreadPool 允許創(chuàng)建的線程數(shù)量為 Integer.MAX_VALUE,可能會創(chuàng)建大量的線程,從而導致 OOM。

所以創(chuàng)建線程池,最好是根據(jù)線程池的用途,然后自己創(chuàng)建線程池。

目錄

一、線程池系列相關文章

二、繼承實現(xiàn)關系圖

三、低層數(shù)據(jù)存儲結構

3.1 核心屬性

3.1.1 說明

3.1.2 線程池五種狀態(tài)

3.2 構造器

四、工作原理

五、源碼解析

5.1?核心方法execute()

5.2?添加任務addWork()

5.3?執(zhí)行任務runWork()

5.4?阻塞隊列取任務getTask()

5.5 Worker無任務最后處理processWorkerExit()

5.6 關閉線程池

5.6.1?關閉線程池shutdown()

5.6.2?關閉線程池shutdownNow()

5.6.3?嘗試結束線程池tryTerminate()

5.6.4?判斷線程池是否關閉awaitTermination()


一、線程池系列相關文章

? ? ? ? 1.1?Java線程池ThreadPoolExcutor01-參數(shù)說明

? ? ? ? 1.2?Java線程池ThreadPoolExcutor02-阻塞隊列之ArrayBlockingQueue

? ? ? ? 1.3?Java線程池ThreadPoolExcutor03-阻塞隊列之LinkedBlockingQueue

? ? ? ? 1.4?Java線程池ThreadPoolExcutor04-阻塞隊列之PriorityBlockingQueue原理及擴容機制詳解

? ? ? ? 1.5?Java線程池ThreadPoolExcutor05-阻塞隊列之DelayQueue原理及擴容機制詳解

? ? ? ? 1.6?Java線程池ThreadPoolExcutor06-阻塞隊列之SynchronousQueue

? ? ? ? 1.7?Java線程池ThreadPoolExcutor07-阻塞隊列之LinkedTransferQueue

? ? ? ? 1.8?Java線程池ThreadPoolExcutor07-阻塞隊列之LinkedBlockingDeque

? ? ? ? 1.9?Java線程池ThreadPoolExcutor08-4種拒絕策略

二、繼承實現(xiàn)關系圖

java線程池threadpool,線程池,多線程,java,數(shù)據(jù)結構,算法,面試

三、低層數(shù)據(jù)存儲結構

3.1 核心屬性

public class ThreadPoolExecutor extends AbstractExecutorService {
...
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    private final BlockingQueue<Runnable> workQueue;
    private final ReentrantLock mainLock = new ReentrantLock();
    private final HashSet<Worker> workers = new HashSet<>();
    private final Condition termination = mainLock.newCondition();
    private int largestPoolSize;
    private long completedTaskCount;
    private volatile ThreadFactory threadFactory;
    private volatile RejectedExecutionHandler handler;
    private volatile long keepAliveTime;
    private volatile boolean allowCoreThreadTimeOut;
    private volatile int corePoolSize;
    private volatile int maximumPoolSize;
...
}
3.1.1 說明
屬性名 說明
COUNT_BITS 用于計算線程池的狀態(tài)值、容量
workQueue

阻塞隊列。七個:

ArrayBlockingQueue:基于數(shù)組結構的有界阻塞隊列,按FIFO排序任務。詳見1.2鏈接。

LinkedBlockingQueue:?基于鏈表結構的阻塞隊列,按FIFO排序任務,吞吐量通常要高于ArrayBlockingQuene。詳見1.3鏈接。

PriorityBlockingQueue:具有優(yōu)先級的無界阻塞隊列。詳見1.4鏈接。

DelayQueue:一個使用優(yōu)先級隊列實現(xiàn)的無界阻塞隊列,只有在延遲期滿時才能從中提取元素。詳見1.5鏈接。

SynchronousQueue:一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處于阻塞狀態(tài)。詳見1.6鏈接。

LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。與SynchronousQueue類似,還含有非阻塞方法。詳見1.7鏈接。

LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。詳見1.8鏈接。

threadFactory
線程工廠
handler

拒絕策略處理類。四種:AbortPolicy策略(默認)、DiscardPolicy策略、DiscardOldestPolicy策略 和 CallerRunsPolicy策略。詳見1.9鏈接。

keepAliveTime
為多余的空閑線程等待新任務的最長時間, 超過這個時間后多余的線程將被終止。這里把keepAliveTime設置為0L,意味著多余 的空閑線程會被立即終止。
allowCoreThreadTimeOut

如果為true,核心線程使用keepAliveTime超時等待工作。

如果為false(默認),核心線程即使在空閑時也保持活動。

corePoolSize 核心線程數(shù)
maximumPoolSize 最大線程數(shù)
3.1.2 線程池五種狀態(tài)

屬性ctl 是ThreadPoolExecutor內部一個用來進行技術和狀態(tài)控制的控制變量,它使用了一個原子整形字段來實現(xiàn)兩個方面的管理:

  • 低29位記錄線程池的線程數(shù),
  • 高3位記錄線程池的工作狀態(tài)

五種狀態(tài):

  • RUNNING線程池一旦被創(chuàng)建處于RUNNING狀態(tài)。線程池處于RUNNING狀態(tài)時,能夠接收新任務以及對已添加的任務進行處理。
  • SHUTDOWN線程池處于SHUTDOWN狀態(tài)時,不接收新任務,但能處理已添加的任務。調用線程池的shutdown()接口時,線程池由RUNNING狀態(tài)轉變?yōu)镾HUTDOWN狀態(tài)。
  • STOP:?線程池處于STOP狀態(tài)時,不接收新任務,不處理已添加的任務,并且會中斷正在處理的任務。調用線程池的shutdownNow()接口時,線程池由RUNNING狀態(tài)或者SHUTDOWN狀態(tài)變?yōu)镾TOP狀態(tài)。
  • TIDYING: 當所有的任務已終止,ctl記錄的任務數(shù)為0,線程池的狀態(tài)會變?yōu)門IDYING狀態(tài)。當線程池狀態(tài)為SHUTDOWN時,阻塞隊列為空并且線程池中執(zhí)行的任務也為空時,就會由SHUTDOWN狀態(tài)變?yōu)?TIDYING狀態(tài);當線程池為STOP時,線程池中執(zhí)行的任務為空時,就會又STOP狀態(tài)變?yōu)?TIDYING狀態(tài)。
  • TERMINATED:?線程池徹底終止,就會變成TERMINATED狀態(tài)。線程池處于TIDYING狀態(tài)時,調用terminated()就會由TIDYING狀態(tài)變?yōu)門ERMINATED狀態(tài)。

狀態(tài)轉換如下圖:

java線程池threadpool,線程池,多線程,java,數(shù)據(jù)結構,算法,面試

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 圖3.1.2-1

3.2 構造器

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    ...
}
參數(shù)名 說明
corePoolSize 核心線程數(shù)
maximumPoolSize 最大線程數(shù)
keepAliveTime 為多余的空閑線程等待新任務的最長時間, 超過這個時間后多余的線程將被終止。
unit keepAliveTime的單位
workQueue 阻塞隊列
threadFactory 線程工廠
handler 拒絕策略處理類

詳見1.1鏈接。

四、工作原理

java線程池threadpool,線程池,多線程,java,數(shù)據(jù)結構,算法,面試

  • 如果任務null直接退出,否則執(zhí)行步驟2;
  • 若工作線程數(shù)小于核心線程數(shù),執(zhí)行步驟3創(chuàng)建工作線程并執(zhí)行任務,否則執(zhí)行步驟4。
  • 若阻塞隊列已滿
    • 工作線程數(shù)小于最大線程數(shù),執(zhí)行步驟6創(chuàng)建工作線程并執(zhí)行任務,否則執(zhí)行步驟10?拒絕任務(執(zhí)行拒絕策略)
  • 若阻塞隊列未滿,添加任務到阻塞隊列,若線程狀態(tài)不為運行中,則任務從隊列中取出,并執(zhí)行步驟10 拒絕任務(執(zhí)行拒絕策略)

五、源碼解析

5.1?核心方法execute()

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
    
   
    int c = ctl.get();
    //通過位運算,獲取當前線程數(shù),如果線程數(shù)小于corePoolSize,則執(zhí)行addWorker(),創(chuàng)建新線程執(zhí)行任務
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        // 若添加工作任務執(zhí)行任務不成功,嘗試添加到阻塞隊列中
        c = ctl.get();
    }
    // 線程數(shù)超過核心線程數(shù),任務添加到阻塞隊列中
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 添加到阻塞隊列,但若線程池狀態(tài)不是運行中,則從隊列中取出
        if (! isRunning(recheck) && remove(command))
            // 并且執(zhí)行拒絕策略
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 阻塞隊列已滿,則若工作線程數(shù)小于最大線程數(shù) 創(chuàng)建線程執(zhí)行任務
    else if (!addWorker(command, false))
        // 若線程數(shù)已達最大線程數(shù),否則執(zhí)行拒絕策略
        reject(command);
}

從源碼中可以清晰看出線程池執(zhí)行任務的邏輯與“四、工作原理”所述一致。

5.2?添加任務addWork()

THreadPoolExecutor內部類Worker

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    private static final long serialVersionUID = 6138294804551838833L;
    //當前Worker所處于的線程
    final Thread thread;
    //待執(zhí)行的任務
    Runnable firstTask;
    //任務計數(shù)器
    volatile long completedTasks;
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
}

它是對Runnable進行了封裝,主要功能是對待執(zhí)行的任務進行中斷處理和狀態(tài)監(jiān)控。Worker還繼承了AQS,在每個任務執(zhí)行時進行了加鎖的處理??梢詫orker簡單理解為可中斷的、可進行鎖處理的Runnable。

創(chuàng)建工作線程并執(zhí)行是addWork()方法,源碼如下:

private boolean addWorker(Runnable firstTask, boolean core) {
    // 自旋,判斷線程池狀態(tài),并對線程數(shù)量執(zhí)行原子+1操作
    retry:
    for (;;) {
        int c = ctl.get();
        // 獲取線程池狀態(tài)
        int rs = runStateOf(c);
       	// 如果線程池已經(jīng)關閉,則直接返回false
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            // 判斷線程數(shù)是否已達上限,根據(jù)傳入?yún)?shù)core的不同,判斷corePoolSize或者maximumPoolSize。
            // 如果線程數(shù)已達上限,直接返回false
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 執(zhí)行原子操作,對線程數(shù)+1
            if (compareAndIncrementWorkerCount(c))
                // 執(zhí)行原子操作成功,則退出自旋 并 創(chuàng)建工作線程執(zhí)行任務
                break retry;
            c = ctl.get();  
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
    // ctl變量操作成功,執(zhí)行Worker相關邏輯
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 創(chuàng)建一個新的Worker,傳入待執(zhí)行的任務
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 加鎖后,再次判斷線程池狀態(tài)
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 如果創(chuàng)建了新的Worker,則調用其start方法立即執(zhí)行
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

addWork()方法首先在一個自旋中做三個判斷:

  • 線程是否關閉,若關閉則直接返回false退出
  • 通過參數(shù)core來確定工作線程數(shù)與核心線程數(shù)比較?還是?與最大線程數(shù)比較,若工作線程數(shù)大,則返回false退出
  • CAS嘗試將線程數(shù)加1,若成功則創(chuàng)建一個辨析的Worker并立即執(zhí)行其start()方法執(zhí)行該任務。

5.3?執(zhí)行任務runWork()

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // 指向線程池調execute添加的任務
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 首先釋放鎖,允許中斷
    w.unlock(); 
    boolean completedAbruptly = true;
    
    try {
        // 從worker中取第1個任務,若任務為空則從阻塞隊列中取任務,直到返回null,這里達到線程復用的效果,實現(xiàn)線程處理多個任務。
        while (task != null || (task = getTask()) != null) {
            // 執(zhí)行任務前先加鎖
            w.lock();
            // 如果線程池已經(jīng)終止,則中斷該線程。保存了線程池在STOP狀態(tài)下線程中斷的,非STOP狀態(tài)下線程沒有被中斷
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 記錄正在運行的任務
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 執(zhí)行任務(調任務的run方法)
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 執(zhí)行完任務,清除記錄信息
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                // 當前Worker計數(shù)器+1,統(tǒng)計worker執(zhí)行了多少任務,最后累加進completedTaskCount變量,可以調用相應方法返回一些統(tǒng)計信息。
                w.completedTasks++;
                // 釋放鎖
                w.unlock();
            }
        }
        // 表示worker是否異常終止,執(zhí)行到這里代表執(zhí)行正常,后續(xù)的方法需要這個變量
        completedAbruptly = false;
    } finally {
        // completedTasks累加到completedTaskCount變量中
        processWorkerExit(w, completedAbruptly);
    }
}

runWorker()的主要邏輯就是進行線程池的關閉檢查,然后執(zhí)行任務,并將計數(shù)器+1。

注意這行代碼?while (task != null || (task = getTask()) != null) ,當task = w.firstTask?的值為null時執(zhí)行task = getTask(),?getTask是從任務列隊是取任務。也就是說,Worker在執(zhí)行完提交給自己的任務后,會執(zhí)行任務隊列中的任務。

5.4?阻塞隊列取任務getTask()

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

說明:

  • if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty()))?這行代碼體現(xiàn)出了 SHUTDOWN狀態(tài)和 STOP狀態(tài)的區(qū)別。若線程池狀態(tài)為 SHUDOWN 狀態(tài),則條件為 false,取任務執(zhí)行;而如果線程池的狀態(tài)為 STOP 狀態(tài),則條件為 true,不管隊列是否還有任務,不再處理了。
  • timed后在的判斷邏輯有點復雜,以下幾種情況為true,CAS嘗試將線程數(shù)減1
    • 工作線程數(shù)大于最大線程數(shù)(后面wc>1||workQueue.isEmpty()應該自然滿足)(可能是在運行中調用setMaximumPoolSize)
    • 設置了allowCoreThreadTimeOut為true且隊列中取的任務為null,說明沒任務了
    • 工作線程數(shù)大于核心線程數(shù)?且隊列中取的任務為null(后面wc>1||workQueue.isEmpty()應該自然滿足)
  • try后面邏輯
    • 延時取任務:allowCoreThreadTimeOut為true 或者 wc > corePoolSize
    • 直接取任務(若沒任務則阻塞等待):allowCoreThreadTimeOut為false?或者 wc <= corePoolSize

結論:

  • allowCoreThreadTimeOut設置為true時,工作線程數(shù)達最大之后,因無新任務而線程減少,工作線程總數(shù)最小值可以為0
  • allowCoreThreadTimeOut設置為false時,只有wc大于核心線程數(shù),才去做CAS減線程數(shù)操作,所以工作線程數(shù)達到最大之后,因無新任務而線程減少,工作線程總數(shù)最小為核心線程數(shù)

5.5 Worker無任務最后處理processWorkerExit()

private void processWorkerExit(ThreadPoolExecutor.Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
   
    tryTerminate();

    int c = ctl.get();
    // 判斷狀態(tài)是否小于STOP
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

說明:

  • decrementWorkerCount():如果為異常結束,則工作線程數(shù)減1
  • try?邏輯:加鎖累加完成任務數(shù)
  • tryTerminate():?嘗試終止線程池
  • 判斷狀態(tài)是否小于STOP為true
    • allowCoreThreadTimeOut設置為true
      • 若隊列不為空:至少保留一個worker
      • 若隊列為空:直接退出,線程池的worker數(shù)減少,最終可能為0
    • allowCoreThreadTimeOut設置為false:?則保持worker數(shù)不少于corePoolSize(若線程數(shù)小于corePoolSize,則添加 null任務的worker

總結worker:線程池啟動后,worker在池內創(chuàng)建,包裝了提交的Runnable任務并執(zhí)行,執(zhí)行完就等待下一個任務,不再需要時就結束。

5.6 關閉線程池

從圖3.1.2-1看出有兩種方法關閉線程池:

  • shutdown:?不能再提交任務,已經(jīng)提交的任務可繼續(xù)執(zhí)行;
  • shutdownNow:?不能再提交任務,已經(jīng)提交的任務未執(zhí)行的任務不再執(zhí)行,正在執(zhí)行的任務可繼續(xù)執(zhí)行,但會中斷,返回已提交未執(zhí)行的任務
5.6.1?關閉線程池shutdown()
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

說明:

  • checkShutdownAccess():?安裝策略機構
  • advanceRunState(SHUTDOWN):?線程池狀態(tài)切換到SHUTDOWN狀態(tài)
  • interruptIdleWorkers():?中斷所有空閑的worker
  • ?tryTerminate():?嘗試結束線程池
5.6.2?關閉線程池shutdownNow()
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

說明:

  • checkShutdownAccess():?安裝策略機構
  • advanceRunState(STOP):?線程池狀態(tài)切換到STOP狀態(tài)
  • interruptWorkers():?中斷所有空閑的worker
  • drainQueue():?取出等待隊列里未執(zhí)行的任務
  • tryTerminate():?嘗試結束線程池
5.6.3?嘗試結束線程池tryTerminate()
final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

說明:

  • ctl.get():獲取ctl值判斷線程池狀態(tài),以下狀態(tài)不處理直接return
    • RUNNING狀態(tài),正在運行狀態(tài)肯定不能停
    • TIDYING或TERMINATED狀態(tài),已經(jīng)沒有正在運行的worker了
    • SHUTDOWN狀態(tài)且阻塞隊列不為空,執(zhí)行完才能停
    • 工作線程數(shù)不為0。又調了一次interruptIdleWorkers(ONLY_ONE),可能疑惑在調tryTerminate之前時已經(jīng)調用過了,為什么又調用,而且每次只中斷一個空閑worker?我們需要知道,shutdown時worker可能在執(zhí)行中,執(zhí)行完阻塞在隊列的take,不知道要結束,所有要補充調用interruptIdleWorkers。每次只中斷一個是因為processWorkerExit時,還會執(zhí)行tryTerminate,自動中斷下一個空閑的worker。
  • try邏輯:加鎖CAS嘗試將線程池狀態(tài)切換成TIDYING,再切換成TERMINATED狀態(tài),terminated是空方法供子類來實現(xiàn)。
5.6.4?判斷線程池是否關閉awaitTermination()
public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (;;) {
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            if (nanos <= 0)
                return false;
            nanos = termination.awaitNanos(nanos);
        }
    } finally {
        mainLock.unlock();
    }
}

說明:接收人timeout和TimeUnit兩個參數(shù),用于設定超時時間及單位。當?shù)却^設定時間時,會監(jiān)測ExecutorService是否已經(jīng)關閉,若關閉則返回true,否則返回false。一般情況下會和shutdown方法組合使用。

ps:?以上是研讀源碼加上翻閱許多文獻理解的總結,如有錯誤或不足的地方,歡迎指出,歡迎留言交流。我會繼續(xù)努力學習和分享更多有干貨的內容。文章來源地址http://www.zghlxwxcb.cn/news/detail-728213.html

到了這里,關于深入理解Java線程池ThreadPoolExcutor實現(xiàn)原理、數(shù)據(jù)結構和算法(源碼解析)的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • 深入理解Java線程間通信

    合理的使用Java多線程可以更好地利用服務器資源。一般來講,線程內部有自己私有的線程上下文,互不干擾。但是當我們需要多個線程之間相互協(xié)作的時候,就需要我們掌握Java線程的通信方式。本文將介紹Java線程之間的幾種通信原理。 在Java中,鎖的概念都是基于對象的,

    2024年02月09日
    瀏覽(23)
  • Java-多線程-深入理解ConcurrentHashMap

    Java-多線程-深入理解ConcurrentHashMap

    ????ConcurrentHashMap(Concurrent: 并存的,同時發(fā)生的 ;) ????ConcurrentHashMap是Java中的一個線程安全的哈希表實現(xiàn),它可以在多線程環(huán)境下高效地進行并發(fā)操作。 ????HashMap線程不安全,在多線程操作下可能會導致數(shù)據(jù)錯亂 ????使用HashMap和ConcurrentHashMap分別實

    2024年02月14日
    瀏覽(60)
  • “深入理解Java的多線程編程“

    多線程編程是指在一個程序中同時運行多個線程,以提高程序的并發(fā)性和性能。Java是一門支持多線程編程的強大編程語言,提供了豐富的多線程相關類和接口。 在Java中,可以通過以下方式實現(xiàn)多線程編程: 繼承Thread類:創(chuàng)建一個繼承自Thread類的子類,并重寫run()方法,在

    2024年02月13日
    瀏覽(53)
  • 【多線程系列-03】深入理解java中線程的生命周期,任務調度

    【多線程系列-03】深入理解java中線程的生命周期,任務調度

    多線程系列整體欄目 內容 鏈接地址 【一】深入理解進程、線程和CPU之間的關系 https://blog.csdn.net/zhenghuishengq/article/details/131714191 【二】java創(chuàng)建線程的方式到底有幾種?(詳解) https://blog.csdn.net/zhenghuishengq/article/details/127968166 【三】深入理解java中線程的生命周期,任務調度 ht

    2024年02月17日
    瀏覽(27)
  • 深入理解 Java 多線程、Lambda 表達式及線程安全最佳實踐

    線程使程序能夠通過同時執(zhí)行多個任務而更有效地運行。 線程可用于在不中斷主程序的情況下在后臺執(zhí)行復雜的任務。 創(chuàng)建線程 有兩種創(chuàng)建線程的方式。 擴展Thread類 可以通過擴展Thread類并覆蓋其run()方法來創(chuàng)建線程: 實現(xiàn)Runnable接口 另一種創(chuàng)建線程的方式是實現(xiàn)Runnable接口

    2024年03月15日
    瀏覽(29)
  • 深入理解數(shù)據(jù)結構:隊列的實現(xiàn)及其應用場景

    深入理解數(shù)據(jù)結構:隊列的實現(xiàn)及其應用場景

    隊列(Queue)是一種具有先進先出(FIFO)特性的數(shù)據(jù)結構。在隊列中,數(shù)據(jù)的插入和刪除操作分別在隊列的兩端進行。插入操作在隊列的尾部進行,而刪除操作則在隊列的頭部進行。這種特性使得隊列在很多實際應用中非常有用,比如任務調度、緩沖區(qū)管理等。 線性表是一種

    2024年04月28日
    瀏覽(30)
  • Java開發(fā) - 深入理解Redis哨兵機制原理

    Java開發(fā) - 深入理解Redis哨兵機制原理

    Redis的主從、哨兵模式、集群模式,在前文中都已經(jīng)有了詳細的搭建流程,可謂是手把手教程,也得到了很多朋友的喜歡。由于前文偏向于應用方面,就導致了理論知識的匱乏,我們可能會用了,但卻不明所以,所以今天,博主就通過接下里的幾篇博客給大家分別講解Redis哨兵

    2024年02月17日
    瀏覽(29)
  • 華為云出品《深入理解高并發(fā)編程:Java線程池核心技術》電子書發(fā)布

    華為云出品《深入理解高并發(fā)編程:Java線程池核心技術》電子書發(fā)布

    系統(tǒng)拆解線程池核心源碼的開源小冊 透過源碼看清線程池背后的設計和思路 詳細解析AQS并發(fā)工具類 點擊下方鏈接進入官網(wǎng),右上角搜索框搜索“《深入理解高并發(fā)編程:Java線程池核心技術》” 即可獲取下載。 https://auth.huaweicloud.com/authui/login.html?locale=zh-cnservice=https%3A%2F%2F

    2024年02月16日
    瀏覽(41)
  • Java開發(fā) - 深入理解Redis Cluster的工作原理

    Java開發(fā) - 深入理解Redis Cluster的工作原理

    前面我們講過Redis Cluster的搭建方式,也是本著應用優(yōu)先的原則,所以對其基礎概念和原理幾乎沒有涉及,但當學會了Redis集群的搭建方式之后,對于其原來我們還是要知道一些的,所以這篇博客,我們將一起來學習Redis Cluster的一些相關知識。 在開始Redis Cluster的講解之前,還

    2024年02月15日
    瀏覽(27)
  • “深入理解JVM:Java虛擬機的工作原理揭秘“

    標題:深入理解JVM:Java虛擬機的工作原理揭秘 摘要:本文將深入解析Java虛擬機(JVM)的工作原理,包括JVM的組成部分、類加載過程、運行時數(shù)據(jù)區(qū)域、垃圾回收機制等。通過詳細的代碼示例,幫助讀者更好地理解JVM的內部機制。 正文: 一、JVM的組成部分 Java虛擬機是Java語

    2024年02月13日
    瀏覽(24)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包