ExecutorService executorServer = xxxxxx.getThreadPool();
Future future = executorServer.submit(new xxxxxxx(Param, funcId));
Object obj = future.get(100, TimeUnit.MILLISECONDS); //總超時時間設置
其中,future.get是從開始進行get方法時進行計算的時間,非future生成開始計算的,即什么時候get什么時候開始計時。
線程池從生成線程,如果核心線程不為0,則有任務時一直生成核心線程,直至到核心線程,之后開始方隊列中,最后任務多就開始開辟新線程到最大線程數(shù)。
執(zhí)行任務時,首先線程池開辟線程,之后 線程start( execute方法->addWorker-->t.start())后開始執(zhí)行call方法。但從.start 到 執(zhí)行call方法,需要CPU進行線程的上下文切換。 可以根據(jù)重寫的
ThreadPoolExecutor方法來跟蹤。
在高并發(fā)下,如果線程池不加?executorServer.prestartAllCoreThreads();
則在線程為1500的并發(fā)下,即新創(chuàng)建線程start狀態(tài)為NEW,真正運行時為RUNNABLE(start后),到call真正的調(diào)用會耗時,因CPU切換。
如不加,則會有12-160多ms的消耗,
如果加上prestartAllCoreThreads()則性能會好很多,最大從start到call才3ms.
同時,如使用hutool-all-5.3.8.jar的ThreadUtil.execAsyn方法效果也比較好,但沒有restartAllCoreThreads()這個好,測試效果有9-14ms的消耗。文章來源:http://www.zghlxwxcb.cn/news/detail-430489.html
public static ThreadPoolExecutor getThreadPool() {
if (executorServer == null || executorServer.isShutdown() || executorServer.isTerminated()) {
synchronized (JzPreCheckUtil.class) {
if (executorServer == null || executorServer.isShutdown() || executorServer.isTerminated()) {
XxxConfig.isJzPrecheckEnabled(); //加載相關配置
log.info("corePoolSize="+XxxConfig.corePoolSize+" maximumPoolSize="+XxxConfig.maximumPoolSize+" keepAliveTime="+XxxConfig.keepAliveTime);
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
long t1=System.currentTimeMillis();
String threadName="jzThreadPool" + r.hashCode();
Thread newThread=new Thread(r, threadName);
long t2=System.currentTimeMillis();
log.info("創(chuàng)建線程="+threadName+" t1="+t1+" t2="+t2+" 時間差="+(t2-t1));
return newThread;
}
};
executorServer = new ThreadPoolExecutor(XxxConfig.corePoolSize, XxxConfig.maximumPoolSize, XxxConfig.keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),threadFactory);
executorServer.prestartAllCoreThreads();
log.info("prestartAllCoreThreads 之后="+executorServer.getQueue().size()+" 線程活著的數(shù)量="+executorServer.getActiveCount()+
" 核心線程="+executorServer.getCorePoolSize()+" 最大線程="+executorServer.getMaximumPoolSize()+
" 線程數(shù)="+executorServer.getPoolSize());
}
}
}
log.info("queue 長度 后="+executorServer.getQueue().size()+" 線程活著的數(shù)量="+executorServer.getActiveCount()+
" 核心線程="+executorServer.getCorePoolSize()+" 最大線程="+executorServer.getMaximumPoolSize()+
" 線程數(shù)="+executorServer.getPoolSize());
return executorServer;
}
@Override
public Object call() throws Exception {
xxx
return out;
}
重寫文章來源地址http://www.zghlxwxcb.cn/news/detail-430489.html
ThreadPoolExecutor參考:
package com.stock.framework.precheck.test2;
import com.stock.framework.jzprecheck.JzConfig;
import org.slf4j.Logger;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashSet;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
public class MyThreadPool2 extends ThreadPoolExecutor {
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
public static Logger log = XXConfig.log;
public MyThreadPool2(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public MyThreadPool2(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public MyThreadPool2(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public MyThreadPool2(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
public MyThreadPool2 getThreadPool(){
// return XXXXUtil.getMyThreadPool();
return null;
}
@Override
public void execute(Runnable command) {
log.info("execute-11="+System.currentTimeMillis());
if (command == null){
throw new NullPointerException();
}
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
AtomicInteger ctl=getCtl();
int c = ctl.get();
if (workerCountOf(c) < getCorePoolSize()) {
if (addWorker(command, true))
return;
c = ctl.get();
}
log.info("execute-22="+System.currentTimeMillis());
BlockingQueue<Runnable> workQueue= (BlockingQueue<Runnable>) getPrivateValue(null,"workQueue");
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
public void reject(Runnable command) {
RejectedExecutionHandler handler= (RejectedExecutionHandler) getPrivateValue(null,"handler");;
handler.rejectedExecution(command, this);
}
public int runStateOf(int c) { return c & ~CAPACITY; }
boolean isRunning(int c) {
return c < SHUTDOWN;
}
private boolean compareAndIncrementWorkerCount(int expect) {
AtomicInteger ctl=getCtl();
return ctl.compareAndSet(expect, expect + 1);
}
public Object getInstance(Object instance,String className,Object object) throws ClassNotFoundException, IllegalAccessException, InvocationTargetException, InstantiationException {
Class<?> enclosingClass = Class.forName(className);
Constructor constructor = enclosingClass.getDeclaredConstructors()[0];
constructor.setAccessible(true);
return constructor.newInstance(instance,object);
}
public Object getPrivateValue(Object obj ,String propName) {
Object value = null;
try {
// 通過屬性獲取對象的屬性
//.getDeclaredFields() 獲得某個類的所有聲明的字段,即包括public、private和proteced但不包括父類申明字段
//.getClass() 是?個對象實例的?法,只有對象實例才有這個?法,具體的類是沒有的
Field field = obj.getClass().getDeclaredField(propName);
// 對象的屬性的訪問權限設置為可訪問
//允許獲取實體類private的參數(shù)信息
field.setAccessible(true);
// 獲取屬性的對應的值
value = field.get(obj);
} catch (Exception e) {
e.printStackTrace();
log.error(e.toString());
return null;
}
return value;
}
public boolean addWorker(Runnable firstTask, boolean core) {
log.info("addWorker-1="+System.currentTimeMillis());
retry:
for (;;) {
AtomicInteger ctl=getCtl();
int c = ctl.get();
int rs = runStateOf(c);
BlockingQueue<Runnable> workQueue= (BlockingQueue<Runnable>) getPrivateValue(null,"workQueue");
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
log.info("addWorker-retry-1="+System.currentTimeMillis());
for (;;) {
log.info("addWorker-retry-2="+System.currentTimeMillis());
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? getCorePoolSize() : getMaximumPoolSize()))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
log.info("addWorker-Worker-="+System.currentTimeMillis());
boolean workerStarted = false;
boolean workerAdded = false;
// Worker w = null;
Object w=null;
try {
// w = new Worker(firstTask);
w = getInstance(getThreadPool(),"java.util.concurrent.ThreadPoolExecutor$Worker",firstTask);
//final Thread t = w.thread;
final Thread t = (Thread)getPrivateValue(w,"thread");
log.info("addWorker-t-="+System.currentTimeMillis());
if (t != null) {
ReentrantLock mainLock11= (ReentrantLock) getPrivateValue(null,"mainLock");
final ReentrantLock mainLock = mainLock11;
log.info("addWorker-lock-="+System.currentTimeMillis());
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
AtomicInteger ctl=getCtl();
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
HashSet workers= (HashSet) getPrivateValue(null,"workers");
workers.add(w);
int s = workers.size();
if (s > getLargestPoolSize()){
// largestPoolSize = s;
setPrivateValue(null,"largestPoolSize",s);
}
workerAdded = true;
}
} finally {
mainLock.unlock();
}
log.info("addWorker-workerAdded="+workerAdded+" "+System.currentTimeMillis());
if (workerAdded) {
log.info("addWorker-t.start()= start before 狀態(tài)="+t.getState()+" 優(yōu)先級別="+t.getPriority()+" "+System.currentTimeMillis());
t.start();
log.info("addWorker-t.start()= start after 狀態(tài)="+t.getState()+" 優(yōu)先級別="+t.getPriority()+" "+System.currentTimeMillis());
workerStarted = true;
}
}
} catch (InstantiationException e) {
log.error(e.toString());
e.printStackTrace();
} catch (InvocationTargetException e) {
log.error(e.toString());
e.printStackTrace();
} catch (IllegalAccessException e) {
log.error(e.toString());
e.printStackTrace();
} catch (ClassNotFoundException e) {
log.error(e.toString());
e.printStackTrace();
} finally {
if (! workerStarted)
{
log.info("!workerStarted-t-="+System.currentTimeMillis());
// addWorkerFailed(w);
getPrivateMethodValue(getThreadPool(),"addWorkerFailed",w);
}
}
return workerStarted;
}
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(getCtl().get()));
}
private boolean compareAndDecrementWorkerCount(int expect) {
return getCtl().compareAndSet(expect, expect - 1);
}
public AtomicInteger getCtl() {
/*if(executorServer==null){
// executorServer = JzPreCheckUtil.getThreadPool();
executorServer = getThreadPool();
}*/
AtomicInteger ctl= (AtomicInteger) getPrivateValue(getThreadPool(),"ctl");
return ctl;
}
int workerCountOf(int c) { return c & CAPACITY; }
public Object getPrivateValue(ThreadPoolExecutor executorServer ,String propName) {
Object value = null;
try {
if(executorServer==null){
// executorServer = JzPreCheckUtil.getThreadPool();
executorServer = getThreadPool();
}
// 通過屬性獲取對象的屬性
//.getDeclaredFields() 獲得某個類的所有聲明的字段,即包括public、private和proteced但不包括父類申明字段
//.getClass() 是?個對象實例的?法,只有對象實例才有這個?法,具體的類是沒有的
Field field = executorServer.getClass().getSuperclass().getDeclaredField(propName);
// 對象的屬性的訪問權限設置為可訪問
//允許獲取實體類private的參數(shù)信息
field.setAccessible(true);
// 獲取屬性的對應的值
value = field.get(executorServer);
} catch (Exception e) {
log.error(e.toString());
e.printStackTrace();
return null;
}
return value;
}
public void setPrivateValue(ThreadPoolExecutor executorServer, String propName,Object value) {
try {
if(executorServer==null){
// executorServer = JzPreCheckUtil.getThreadPool();
executorServer = getThreadPool();
}
// 通過屬性獲取對象的屬性
//.getDeclaredFields() 獲得某個類的所有聲明的字段,即包括public、private和proteced但不包括父類申明字段
//.getClass() 是?個對象實例的?法,只有對象實例才有這個?法,具體的類是沒有的
Field field = executorServer.getClass().getSuperclass().getDeclaredField(propName);
// 對象的屬性的訪問權限設置為可訪問
//允許獲取實體類private的參數(shù)信息
field.setAccessible(true);
// 獲取屬性的對應的值
field.set(executorServer,value);
} catch (Exception e) {
log.error(e.toString());
e.printStackTrace();
}
}
public Object getPrivateMethodValue(Object obj, String propName,Object val) {
//Object value = null;
try {
Class c = obj.getClass().getSuperclass();
// https://blog.csdn.net/qq_34626094/article/details/122687833
//getDeclaredMethod java.lang.NoSuchMethodException的異常原因在于調(diào)用getDeclaredMethod時要同時指定方法名和參數(shù)名,這兩個不能錯誤。
Method method = c.getDeclaredMethod(propName,val.getClass());
method.setAccessible(true);
Object[] values = new Object[1];
values[0] = val;
return method.invoke(obj,values);
} catch (Exception e) {
log.error(e.toString());
return null;
}
}
}
到了這里,關于線程池創(chuàng)建線程異步獲取Future超時的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!