国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

線程池創(chuàng)建線程異步獲取Future超時

這篇具有很好參考價值的文章主要介紹了線程池創(chuàng)建線程異步獲取Future超時。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

ExecutorService executorServer = xxxxxx.getThreadPool();

Future future = executorServer.submit(new xxxxxxx(Param, funcId));


 Object obj = future.get(100, TimeUnit.MILLISECONDS); //總超時時間設置

其中,future.get是從開始進行get方法時進行計算的時間,非future生成開始計算的,即什么時候get什么時候開始計時。

線程池從生成線程,如果核心線程不為0,則有任務時一直生成核心線程,直至到核心線程,之后開始方隊列中,最后任務多就開始開辟新線程到最大線程數(shù)。

執(zhí)行任務時,首先線程池開辟線程,之后 線程start( execute方法->addWorker-->t.start())后開始執(zhí)行call方法。但從.start 到 執(zhí)行call方法,需要CPU進行線程的上下文切換。 可以根據(jù)重寫的

ThreadPoolExecutor方法來跟蹤。

在高并發(fā)下,如果線程池不加?executorServer.prestartAllCoreThreads();

則在線程為1500的并發(fā)下,即新創(chuàng)建線程start狀態(tài)為NEW,真正運行時為RUNNABLE(start后),到call真正的調(diào)用會耗時,因CPU切換。

如不加,則會有12-160多ms的消耗,

如果加上prestartAllCoreThreads()則性能會好很多,最大從start到call才3ms.

同時,如使用hutool-all-5.3.8.jar的ThreadUtil.execAsyn方法效果也比較好,但沒有restartAllCoreThreads()這個好,測試效果有9-14ms的消耗。

public static ThreadPoolExecutor getThreadPool() {
if (executorServer == null || executorServer.isShutdown() || executorServer.isTerminated()) {
			synchronized (JzPreCheckUtil.class) {
				if (executorServer == null || executorServer.isShutdown() || executorServer.isTerminated()) {
					XxxConfig.isJzPrecheckEnabled(); //加載相關配置
					log.info("corePoolSize="+XxxConfig.corePoolSize+"  maximumPoolSize="+XxxConfig.maximumPoolSize+" keepAliveTime="+XxxConfig.keepAliveTime);
					ThreadFactory threadFactory = new ThreadFactory() {
						@Override
						public Thread newThread(Runnable r) {
							long t1=System.currentTimeMillis();
							String threadName="jzThreadPool" + r.hashCode();
							Thread newThread=new Thread(r, threadName);
							long t2=System.currentTimeMillis();
							log.info("創(chuàng)建線程="+threadName+" t1="+t1+"  t2="+t2+"  時間差="+(t2-t1));
							return newThread;
						}
					};

				
					executorServer = new ThreadPoolExecutor(XxxConfig.corePoolSize, XxxConfig.maximumPoolSize, XxxConfig.keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),threadFactory);
					executorServer.prestartAllCoreThreads();

					log.info("prestartAllCoreThreads 之后="+executorServer.getQueue().size()+"  線程活著的數(shù)量="+executorServer.getActiveCount()+
							"  核心線程="+executorServer.getCorePoolSize()+"  最大線程="+executorServer.getMaximumPoolSize()+
							"  線程數(shù)="+executorServer.getPoolSize());
					
				}
			}
		}	
		log.info("queue 長度  后="+executorServer.getQueue().size()+"  線程活著的數(shù)量="+executorServer.getActiveCount()+
				"  核心線程="+executorServer.getCorePoolSize()+"  最大線程="+executorServer.getMaximumPoolSize()+
				"  線程數(shù)="+executorServer.getPoolSize());

		return executorServer;
	}
   @Override
    public Object call() throws Exception {
      
        xxx
        return out;
      
    }

重寫文章來源地址http://www.zghlxwxcb.cn/news/detail-430489.html

ThreadPoolExecutor參考:
package com.stock.framework.precheck.test2;

import com.stock.framework.jzprecheck.JzConfig;
import org.slf4j.Logger;

import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashSet;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

public class MyThreadPool2 extends ThreadPoolExecutor {

    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    public static Logger log = XXConfig.log;

    public MyThreadPool2(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public MyThreadPool2(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public MyThreadPool2(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public MyThreadPool2(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    public MyThreadPool2 getThreadPool(){

      //  return XXXXUtil.getMyThreadPool();

        return null;
    }

    @Override
    public void execute(Runnable command) {

        log.info("execute-11="+System.currentTimeMillis());
        if (command == null){
            throw new NullPointerException();
        }


        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        AtomicInteger ctl=getCtl();
        int c = ctl.get();
        if (workerCountOf(c) < getCorePoolSize()) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        log.info("execute-22="+System.currentTimeMillis());
        BlockingQueue<Runnable> workQueue= (BlockingQueue<Runnable>) getPrivateValue(null,"workQueue");

        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

    public void reject(Runnable command) {
        RejectedExecutionHandler handler= (RejectedExecutionHandler) getPrivateValue(null,"handler");;
        handler.rejectedExecution(command, this);
    }

    public int runStateOf(int c)     { return c & ~CAPACITY; }

    boolean isRunning(int c) {
        return c < SHUTDOWN;
    }


    private boolean compareAndIncrementWorkerCount(int expect) {
        AtomicInteger ctl=getCtl();
        return ctl.compareAndSet(expect, expect + 1);
    }

    public Object getInstance(Object instance,String className,Object object) throws ClassNotFoundException, IllegalAccessException, InvocationTargetException, InstantiationException {

        Class<?> enclosingClass = Class.forName(className);
        Constructor constructor = enclosingClass.getDeclaredConstructors()[0];

        constructor.setAccessible(true);
        return  constructor.newInstance(instance,object);
    }
    public  Object getPrivateValue(Object obj ,String propName) {
        Object value = null;
        try {

            // 通過屬性獲取對象的屬性
            //.getDeclaredFields() 獲得某個類的所有聲明的字段,即包括public、private和proteced但不包括父類申明字段
            //.getClass() 是?個對象實例的?法,只有對象實例才有這個?法,具體的類是沒有的

            Field field = obj.getClass().getDeclaredField(propName);
            // 對象的屬性的訪問權限設置為可訪問
            //允許獲取實體類private的參數(shù)信息
            field.setAccessible(true);
            // 獲取屬性的對應的值
            value = field.get(obj);
        } catch (Exception e) {
            e.printStackTrace();
            log.error(e.toString());
            return null;
        }
        return value;
    }

    public boolean addWorker(Runnable firstTask, boolean core) {
        log.info("addWorker-1="+System.currentTimeMillis());

        retry:
        for (;;) {
            AtomicInteger ctl=getCtl();
            int c = ctl.get();
            int rs = runStateOf(c);

            BlockingQueue<Runnable> workQueue= (BlockingQueue<Runnable>) getPrivateValue(null,"workQueue");

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                            firstTask == null &&
                            ! workQueue.isEmpty()))
                return false;
            log.info("addWorker-retry-1="+System.currentTimeMillis());
            for (;;) {
                log.info("addWorker-retry-2="+System.currentTimeMillis());
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                        wc >= (core ? getCorePoolSize() : getMaximumPoolSize()))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        log.info("addWorker-Worker-="+System.currentTimeMillis());
        boolean workerStarted = false;
        boolean workerAdded = false;
       // Worker w = null;
        Object w=null;
        try {
           // w = new Worker(firstTask);

            w = getInstance(getThreadPool(),"java.util.concurrent.ThreadPoolExecutor$Worker",firstTask);

            //final Thread t = w.thread;
            final Thread t =  (Thread)getPrivateValue(w,"thread");

            log.info("addWorker-t-="+System.currentTimeMillis());
            if (t != null) {
                ReentrantLock mainLock11= (ReentrantLock) getPrivateValue(null,"mainLock");
                final ReentrantLock mainLock = mainLock11;
                log.info("addWorker-lock-="+System.currentTimeMillis());
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    AtomicInteger ctl=getCtl();
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();

                        HashSet workers= (HashSet) getPrivateValue(null,"workers");
                        workers.add(w);

                        int s = workers.size();
                        if (s > getLargestPoolSize()){
                           // largestPoolSize = s;
                            setPrivateValue(null,"largestPoolSize",s);
                        }

                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                log.info("addWorker-workerAdded="+workerAdded+"   "+System.currentTimeMillis());
                if (workerAdded) {
                    log.info("addWorker-t.start()= start before 狀態(tài)="+t.getState()+"  優(yōu)先級別="+t.getPriority()+"  "+System.currentTimeMillis());

                    t.start();
                    log.info("addWorker-t.start()= start after 狀態(tài)="+t.getState()+"  優(yōu)先級別="+t.getPriority()+"  "+System.currentTimeMillis());

                    workerStarted = true;
                }
            }
        } catch (InstantiationException e) {
            log.error(e.toString());
            e.printStackTrace();
        } catch (InvocationTargetException e) {
            log.error(e.toString());
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            log.error(e.toString());
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            log.error(e.toString());
            e.printStackTrace();
        } finally {
            if (! workerStarted)
            {
                log.info("!workerStarted-t-="+System.currentTimeMillis());
                // addWorkerFailed(w);
                getPrivateMethodValue(getThreadPool(),"addWorkerFailed",w);

            }

        }
        return workerStarted;
    }

    

    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(getCtl().get()));
    }
    private boolean compareAndDecrementWorkerCount(int expect) {
        return getCtl().compareAndSet(expect, expect - 1);
    }

    public AtomicInteger getCtl() {

        /*if(executorServer==null){
            // executorServer = JzPreCheckUtil.getThreadPool();
            executorServer   = getThreadPool();
        }*/
        AtomicInteger ctl= (AtomicInteger) getPrivateValue(getThreadPool(),"ctl");
        return ctl;
    }



    int workerCountOf(int c)  { return c & CAPACITY; }


    public  Object getPrivateValue(ThreadPoolExecutor executorServer ,String propName) {
        Object value = null;
        try {
            if(executorServer==null){

                // executorServer = JzPreCheckUtil.getThreadPool();
                executorServer = getThreadPool();
            }
            // 通過屬性獲取對象的屬性
            //.getDeclaredFields() 獲得某個類的所有聲明的字段,即包括public、private和proteced但不包括父類申明字段
            //.getClass() 是?個對象實例的?法,只有對象實例才有這個?法,具體的類是沒有的

            Field field = executorServer.getClass().getSuperclass().getDeclaredField(propName);
            // 對象的屬性的訪問權限設置為可訪問
            //允許獲取實體類private的參數(shù)信息
            field.setAccessible(true);
            // 獲取屬性的對應的值
            value = field.get(executorServer);
        } catch (Exception e) {
            log.error(e.toString());
            e.printStackTrace();
            return null;
        }
        return value;
    }


    public  void setPrivateValue(ThreadPoolExecutor executorServer, String propName,Object value) {

        try {
            if(executorServer==null){
                // executorServer = JzPreCheckUtil.getThreadPool();
                executorServer = getThreadPool();
            }
            // 通過屬性獲取對象的屬性
            //.getDeclaredFields() 獲得某個類的所有聲明的字段,即包括public、private和proteced但不包括父類申明字段
            //.getClass() 是?個對象實例的?法,只有對象實例才有這個?法,具體的類是沒有的
            Field field = executorServer.getClass().getSuperclass().getDeclaredField(propName);
            // 對象的屬性的訪問權限設置為可訪問
            //允許獲取實體類private的參數(shù)信息
            field.setAccessible(true);
            // 獲取屬性的對應的值
            field.set(executorServer,value);

        } catch (Exception e) {
            log.error(e.toString());
            e.printStackTrace();

        }
    }


    

    public Object getPrivateMethodValue(Object obj, String propName,Object val) {
        //Object value = null;
        try {
            Class c = obj.getClass().getSuperclass();

            // https://blog.csdn.net/qq_34626094/article/details/122687833
            //getDeclaredMethod java.lang.NoSuchMethodException的異常原因在于調(diào)用getDeclaredMethod時要同時指定方法名和參數(shù)名,這兩個不能錯誤。
            Method method = c.getDeclaredMethod(propName,val.getClass());
            method.setAccessible(true);
            Object[] values = new Object[1];
            values[0] = val;
            return method.invoke(obj,values);
        } catch (Exception e) {
            log.error(e.toString());
            return null;
        }
    }
}

到了這里,關于線程池創(chuàng)建線程異步獲取Future超時的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • 異步編程 - 04 基于JDK中的Future實現(xiàn)異步編程(上)_Future & FutureTask 源碼解析

    異步編程 - 04 基于JDK中的Future實現(xiàn)異步編程(上)_Future & FutureTask 源碼解析

    這里我們主要探討如何使用JDK中的Future實現(xiàn)異步編程,這包含 如何使用FutureTask實現(xiàn)異步編程及其內(nèi)部實現(xiàn)原理; 如何使用CompletableFuture實現(xiàn)異步編程及其內(nèi)部實現(xiàn)原理, 以及CompletableFuture與JDK Stream如何完美結合的。 在Java并發(fā)包(JUC包)中Future代表著異步計算結果,F(xiàn)uture中

    2024年02月09日
    瀏覽(20)
  • 【并發(fā)編程】線程池多線程異步去分頁調(diào)用其他服務接口獲取海量數(shù)據(jù)

    前段時間在做一個數(shù)據(jù)同步工具,其中一個服務的任務是調(diào)用A服務的接口,將數(shù)據(jù)庫中指定數(shù)據(jù)請求過來,交給kafka去判斷哪些數(shù)據(jù)是需要新增,哪些數(shù)據(jù)是需要修改的。 剛開始的設計思路是,,我創(chuàng)建多個服務同時去請求A服務的接口,每個服務都請求到全量數(shù)據(jù),由于這些

    2024年02月13日
    瀏覽(31)
  • java多線程異步處理并獲取處理后的返回值

    示例部分代碼:

    2024年02月06日
    瀏覽(18)
  • 單線程、同步、異步、預解析、作用域、隱式全局變量、對象創(chuàng)建、new

    單線程、同步、異步、預解析、作用域、隱式全局變量、對象創(chuàng)建、new

    cpu 資源分配的最小單位 一個進程可以有多個線程 cpu 調(diào)度的最小單位 線程建立在進程的建立基礎上的一次程序的運行單位 線程分為:單線程 多線程 單線程:js是單線程 (同一個時間只能完成一個任務) 多線程:百度是多線程 同步任務是指在主線程上排隊的任務,只有當前

    2024年01月22日
    瀏覽(55)
  • 【linux c多線程】線程的創(chuàng)建,線程信息的獲取,獲取線程返回值

    ? 專欄內(nèi)容 : 參天引擎內(nèi)核架構 本專欄一起來聊聊參天引擎內(nèi)核架構,以及如何實現(xiàn)多機的數(shù)據(jù)庫節(jié)點的多讀多寫,與傳統(tǒng)主備,MPP的區(qū)別,技術難點的分析,數(shù)據(jù)元數(shù)據(jù)同步,多主節(jié)點的情況下對故障容災的支持。 手寫數(shù)據(jù)庫toadb 本專欄主要介紹如何從零開發(fā),開發(fā)的

    2024年02月04日
    瀏覽(17)
  • 異步編程 - 06 基于JDK中的Future實現(xiàn)異步編程(中)_CompletableFuture源碼解析

    異步編程 - 06 基于JDK中的Future實現(xiàn)異步編程(中)_CompletableFuture源碼解析

    CompletableFuture實現(xiàn)了CompletionStage接口 。 1)一個CompletionStage代表著一個異步計算節(jié)點,當另外一個CompletionStage計算節(jié)點完成后,當前CompletionStage會執(zhí)行或者計算一個值;一個節(jié)點在計算終止時完成,可能反過來觸發(fā)其他依賴其結果的節(jié)點開始計算。 2)一個節(jié)點(CompletionStag

    2024年02月09日
    瀏覽(22)
  • 從 Future 到 CompletableFuture:簡化 Java 中的異步編程

    在并發(fā)編程中,我們經(jīng)常需要處理多線程的任務,這些任務往往具有依賴性,異步性,且需要在所有任務完成后獲取結果。Java 8 引入了 CompletableFuture 類,它帶來了一種新的編程模式,讓我們能夠以函數(shù)式編程的方式處理并發(fā)任務,顯著提升了代碼的可讀性和簡潔性。 在這篇

    2024年02月11日
    瀏覽(20)
  • 并發(fā)編程 | 從Future到CompletableFuture - 簡化 Java 中的異步編程

    在并發(fā)編程中,我們經(jīng)常需要處理多線程的任務,這些任務往往具有依賴性,異步性,且需要在所有任務完成后獲取結果。Java 8 引入了 CompletableFuture 類,它帶來了一種新的編程模式,讓我們能夠以函數(shù)式編程的方式處理并發(fā)任務,顯著提升了代碼的可讀性和簡潔性。 在這篇

    2024年02月13日
    瀏覽(27)
  • 多線程系列(十九) -Future使用詳解

    多線程系列(十九) -Future使用詳解

    在前幾篇線程系列文章中,我們介紹了線程池的相關技術,任務執(zhí)行類只需要實現(xiàn) Runnable 接口,然后交給線程池,就可以輕松的實現(xiàn)異步執(zhí)行多個任務的目標,提升程序的執(zhí)行效率,比如如下異步執(zhí)行任務下載。 而實際上 Runnable 接口并不能滿足所有的需求,比如有些場景下

    2024年03月14日
    瀏覽(40)
  • C++11并發(fā)與多線程筆記(10) future其他成員函數(shù)、shared_future、atomic

    C++11并發(fā)與多線程筆記(10) future其他成員函數(shù)、shared_future、atomic

    status = result.wait_for(std::chrono::seconds(幾秒)); 卡住當前流程,等待std::async()的異步任務運 行一段時間,然后返回其狀態(tài)std::future_status 。如果std::async()的參數(shù)是std::launch::deferred(延遲執(zhí)行),則不會卡住主流程。 std::future_status是枚舉類型,表示異步任務的執(zhí)行狀態(tài)。類型的取值

    2024年02月12日
    瀏覽(45)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包