本文共 49558 字,大约阅读时间需要 165 分钟。
读完本文你将了解:
线程池的概念大家应该都很清楚,帮我们重复管理线程,避免创建大量的线程增加开销。
除了降低开销以外,线程池也可以提高响应速度,了解点 JVM 的同学可能知道,一个对象的创建大概需要经过以下几步:
创建一个对象的开销需要经过这么多步,也是需要时间的嘛,那可以复用已经创建好的线程的线程池,自然也在提高响应速度上做了贡献。
创建线程池需要使用 ThreadPoolExecutor
类,它的构造函数参数如下:
public ThreadPoolExecutor( int corePoolSize, //核心线程的数量 int maximumPoolSize, //最大线程数量 long keepAliveTime, //超出核心线程数量以外的线程空余存活时间 TimeUnit unit, //存活时间的单位 BlockingQueueworkQueue, //保存待执行任务的队列 ThreadFactory threadFactory, //创建新线程使用的工厂 RejectedExecutionHandler handler // 当任务无法执行时的处理器 ) {...} 1 2 3 4 5 6 7 8 1 2 3 4 5 6 7 8
参数介绍如注释所示,要了解这些参数左右着什么,就需要了解线程池具体的执行方法ThreadPoolExecutor.execute
:
public void execute (Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl. get(); //1.当前池中线程比核心数少,新建一个线程执行任务 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl. get(); } //2.核心池已满,但任务队列未满,添加到队列中 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl. get(); if (! isRunning(recheck) && remove(command)) //如果这时被关闭了,拒绝任务 reject(command); else if (workerCountOf(recheck) == 0) //如果之前的线程已被销毁完,新建一个线程 addWorker( null, false); } //3.核心池已满,队列已满,试着创建一个新线程 else if (!addWorker(command, false)) reject(command); //如果创建新线程失败了,说明线程池被关闭或者线程池完全满了,拒绝任务 } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
可以看到,线程池处理一个任务主要分三步处理,代码注释里已经介绍了,我再用通俗易懂的例子解释一下:
(线程比作员工,线程池比作一个团队,核心池比作团队中核心团队员工数,核心池外的比作外包员工)
结合这张图,这回流程你明白了吗?
由于 1 和 3 新建线程时需要获取全局锁,这将严重影响性能。因此 ThreadPoolExecutor
这样的处理流程是为了在执行 execute()
方法时尽量少地执行 1 和 3,多执行 2。
在
ThreadPoolExecutor
完成预热后(当前线程数不少于核心线程数),几乎所有的execute()
都是在执行步骤 2。
前面提到的 ThreadPoolExecutor
构造函数的参数,分别影响以下内容:
corePoolSize
:核心线程池数量 maximumPoolSize
:最大线程数量 keepAliveTime
:核心池以外的线程存活时间,即没有任务的外包的存活时间 allowCoreThreadTimeOut(true)
,则核心线程在空闲时头上也会响起死亡的倒计时workQueue
:保存待执行任务的阻塞队列 threadFactory
:每个线程创建的地方 handler
:饱和策略,大家都很忙,咋办呢,有四种策略 CallerRunsPolicy
:只要线程池没关闭,就直接用调用者所在线程来运行任务AbortPolicy
:直接抛出 RejectedExecutionException
异常DiscardPolicy
:悄悄把任务放生,不做了DiscardOldestPolicy
:把队列里待最久的那个任务扔了,然后再调用 execute()
试试看能行不RejectedExecutionHandler
接口自定义策略,比如如记录日志什么的当线程池中的核心线程数已满时,任务就要保存到队列中了。
线程池中使用的队列是 BlockingQueue
接口,常用的实现有如下几种:
关于阻塞队列的详细介绍请看这篇:
了解上面的内容后,我们就可以创建自己的线程池了。
①先定义线程池的几个关键属性的值:
private static final int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2; // 核心线程数为 CPU 数*2 private static final int MAXIMUM_POOL_SIZE = 64; // 线程池最大线程数 private static final int KEEP_ALIVE_TIME = 1; // 保持存活时间 1秒 1 2 3 1 2 3
②然后根据处理的任务类型选择不同的阻塞队列
如果是要求高吞吐量的,可以使用 SynchronousQueue
队列;如果对执行顺序有要求,可以使用 PriorityBlockingQueue
;如果最大积攒的待做任务有上限,可以使用 LinkedBlockingQueue
。
private final BlockingQueuemWorkQueue = new LinkedBlockingQueue<>(128); 1 1
③然后创建自己的 ThreadFactory
在其中为每个线程设置个名称:
private final ThreadFactory DEFAULT_THREAD_FACTORY = new ThreadFactory() { private final AtomicInteger mCount = new AtomicInteger( 1); public Thread newThread(Runnable r) { Thread thread = new Thread(r, TAG + " #" + mCount .getAndIncrement()); thread .setPriority( Thread .NORM_PRIORITY); return thread; } }; 1 2 3 4 5 6 7 8 9 1 2 3 4 5 6 7 8 9
④然后就可以创建线程池了
private ThreadPoolExecutor mExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, mWorkQueue, DEFAULT_THREAD_FACTORY, new ThreadPoolExecutor.DiscardOldestPolicy()); 1 2 3 1 2 3
这里我们选择的饱和策略为 DiscardOldestPolicy
,你可以可以创建自己的。
⑤完整代码:
public class ThreadPoolManager { private final String TAG = this.getClass().getSimpleName(); private static final int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2; // 核心线程数为 CPU数*2 private static final int MAXIMUM_POOL_SIZE = 64; // 线程队列最大线程数 private static final int KEEP_ALIVE_TIME = 1; // 保持存活时间 1秒 private final BlockingQueuemWorkQueue = new LinkedBlockingQueue<>( 128); private final ThreadFactory DEFAULT_THREAD_FACTORY = new ThreadFactory() { private final AtomicInteger mCount = new AtomicInteger( 1); public Thread newThread (Runnable r) { Thread thread = new Thread(r, TAG + " #" + mCount.getAndIncrement()); thread.setPriority(Thread.NORM_PRIORITY); return thread; } }; private ThreadPoolExecutor mExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, mWorkQueue, DEFAULT_THREAD_FACTORY, new ThreadPoolExecutor.DiscardOldestPolicy()); private static volatile ThreadPoolManager mInstance = new ThreadPoolManager(); public static ThreadPoolManager getInstance () { return mInstance; } public void addTask (Runnable runnable) { mExecutor.execute(runnable); } @Deprecated public void shutdownNow () { mExecutor.shutdownNow(); } } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
这样我们就有了自己的线程池。
JDK 为我们内置了五种常见线程池的实现,均可以使用 Executors
工厂类创建。
public static ExecutorService newFixedThreadPool ( int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); } 1 2 3 4 5 1 2 3 4 5
不招外包,有固定数量核心成员的正常互联网团队。
可以看到,FixedThreadPool
的核心线程数和最大线程数都是指定值,也就是说当线程池中的线程数超过核心线程数后,任务都会被放到阻塞队列中。
此外 keepAliveTime
为 0,也就是多余的空余线程会被立即终止(由于这里没有多余线程,这个参数也没什么意义了)。
而这里选用的阻塞队列是 LinkedBlockingQueue
,使用的是默认容量 Integer.MAX_VALUE
,相当于没有上限。
因此这个线程池执行任务的流程如下:
FixedThreadPool
用于负载比较重的服务器,为了资源的合理利用,需要限制当前线程数量。
public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService ( new ThreadPoolExecutor( 1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); } 1 2 3 4 5 6 1 2 3 4 5 6
不招外包,只有一个核心成员的创业团队。
从参数可以看出来,SingleThreadExecutor
相当于特殊的 FixedThreadPool
,它的执行流程如下:
听起来很可怜的样子 - -。
SingleThreadExecutor
用于串行执行任务的场景,每个任务必须按顺序执行,不需要并发执行。
public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor( 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); } 1 2 3 4 5 1 2 3 4 5
全部外包,没活最多待 60 秒的外包团队。
可以看到,CachedThreadPool
没有核心线程,非核心线程数无上限,也就是全部使用外包,但是每个外包空闲的时间只有 60 秒,超过后就会被回收。
CachedThreadPool
使用的队列是 SynchronousQueue
,这个队列的作用就是传递任务,并不会保存。
因此当提交任务的速度大于处理任务的速度时,每次提交一个任务,就会创建一个线程。极端情况下会创建过多的线程,耗尽 CPU 和内存资源。
它的执行流程如下:
SynchronousQueue
中提交任务 由于空闲 60 秒的线程会被终止,长时间保持空闲的 CachedThreadPool
不会占用任何资源。
CachedThreadPool
用于并发执行大量短期的小任务,或者是负载较轻的服务器。
public static ScheduledExecutorService newScheduledThreadPool ( int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor ( int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); } private static final long DEFAULT_KEEPALIVE_MILLIS = 10 L; 1 2 3 4 5 6 7 8 9 1 2 3 4 5 6 7 8 9
定期维护的 2B 业务团队,核心与外包成员都有。
ScheduledThreadPoolExecutor
继承自 ThreadPoolExecutor
, 最多线程数为 Integer.MAX_VALUE
,使用 DelayedWorkQueue
作为任务队列。
ScheduledThreadPoolExecutor
添加任务和执行任务的机制与ThreadPoolExecutor
有所不同。
ScheduledThreadPoolExecutor
添加任务提供了另外两个方法:
scheduleAtFixedRate()
:按某种速率周期执行scheduleWithFixedDelay()
:在某个延迟后执行它俩的代码如下:
public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0L) throw new IllegalArgumentException(); ScheduledFutureTasksft = new ScheduledFutureTask (command, null, triggerTime(initialDelay, unit), unit.toNanos(period), sequencer.getAndIncrement()); RunnableScheduledFuture t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; } public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0L) throw new IllegalArgumentException(); ScheduledFutureTask sft = new ScheduledFutureTask (command, null, triggerTime(initialDelay, unit), -unit.toNanos(delay), sequencer.getAndIncrement()); RunnableScheduledFuture t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
可以看到,这两种方法都是创建了一个 ScheduledFutureTask
对象,调用 decorateTask()
方法转成 RunnableScheduledFuture
对象,然后添加到队列中。
看下 ScheduledFutureTask
的主要属性:
private class ScheduledFutureTaskextends FutureTask < V > implements RunnableScheduledFuture < V > { //添加到队列中的顺序 private final long sequenceNumber; //何时执行这个任务 private volatile long time; //执行的间隔周期 private final long period; //实际被添加到队列中的 task RunnableScheduledFuture outerTask = this; //在 delay queue 中的索引,便于取消时快速查找 int heapIndex; //... } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
DelayQueue
中封装了一个优先级队列,这个队列会对队列中的 ScheduledFutureTask
进行排序,两个任务的执行 time 不同时,time 小的先执行;否则比较添加到队列中的顺序 sequenceNumber ,先提交的先执行。
ScheduledThreadPoolExecutor
的执行流程如下:
具体执行任务的步骤也比较复杂:
DelayQueue.take()
DelayQueue.add()
ScheduledThreadPoolExecutor
用于需要多个后台线程执行周期任务,同时需要限制线程数量的场景。
ExecutorService
提供了两种提交任务的方法:
execute()
:提交不需要返回值的任务submit()
:提交需要返回值的任务void execute(Runnable command); 1 1
execute()
的参数是一个 Runnable,也没有返回值。因此提交后无法判断该任务是否被线程池执行成功。
ExecutorService executor = Executors.newCachedThreadPool(); executor.execute( new Runnable() { @Override public void run () { //do something } }); 1 2 3 4 5 6 7 1 2 3 4 5 6 7
Future submit(Callable task); Future submit(Runnable task, T result); Future submit(Runnable task); 1 2 3 1 2 3
submit()
有三种重载,参数可以是 Callable
也可以是 Runnable
。
同时它会返回一个 Funture
对象,通过它我们可以判断任务是否执行成功。
获得执行结果调用 Future.get()
方法,这个方法会阻塞当前线程直到任务完成。
提交一个 Callable
任务时,需要使用 FutureTask
包一层:
FutureTask futureTask = new FutureTask( new Callable() { //创建 Callable 任务 @Override public String call () throws Exception { String result = ""; //do something return result; } }); Future submit = executor.submit(futureTask); //提交到线程池 try { Object result = submit.get(); //获取结果 } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
线程池即使不执行任务也会占用一些资源,所以在我们要退出任务时最好关闭线程池。
有两个方法关闭线程池:
`shutdown()
shutdownNow()
它们的共同点是:都是通过遍历线程池中的工作线程,逐个调用 Thread.interrup()
来中断线程,所以一些无法响应中断的任务可能永远无法停止(比如 Runnable
)。
了解 JDK 提供的几种线程池实现,在实际开发中如何选择呢?
根据任务类型决定。
前面已经介绍了,这里再小节一下:CachedThreadPool
用于并发执行大量短期的小任务,或者是负载较轻的服务器。FixedThreadPool
用于负载比较重的服务器,为了资源的合理利用,需要限制当前线程数量。SingleThreadExecutor
用于串行执行任务的场景,每个任务必须按顺序执行,不需要并发执行。ScheduledThreadPoolExecutor
用于需要多个后台线程执行周期任务,同时需要限制线程数量的场景。自定义线程池时,如果任务是 CPU 密集型(需要进行大量计算、处理),则应该配置尽量少的线程,比如 CPU 个数 + 1,这样可以避免出现每个线程都需要使用很长时间但是有太多线程争抢资源的情况;
如果任务是 IO密集型(主要时间都在 I/O,CPU 空闲时间比较多),则应该配置多一些线程,比如 CPU 数的两倍,这样可以更高地压榨 CPU。为了错误避免创建过多线程导致系统奔溃,建议使用有界队列。因为它在无法添加更多任务时会拒绝任务,这样可以提前预警,避免影响整个系统。
执行时间、顺序有要求的话可以选择优先级队列,同时也要保证低优先级的任务有机会被执行。
这篇文章简单介绍了 中线程池的工作原理和一些常见线程池的使用,在实际开发中最好使用线程池来统一管理异步任务,而不是直接 new 一个线程执行任务。
我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:
如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。
那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?
在Java中可以通过线程池来达到这样的效果。今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPool类中的方法讲起,然后再讲述它的实现原理,接着给出了它的使用示例,最后讨论了一下如何合理配置线程池的大小。
以下是本文的目录大纲:
若有不正之处请多多谅解,并欢迎批评指正。
java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。下面我们来看一下ThreadPoolExecutor类的具体实现源码。
在ThreadPoolExecutor类中提供了四个构造方法:
public class ThreadPoolExecutor extends AbstractExecutorService { ..... public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueueworkQueue); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue workQueue,ThreadFactory threadFactory); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue workQueue,RejectedExecutionHandler handler); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler); ... }
从上面的代码可以得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。
下面解释下一下构造器中各个参数的含义:
TimeUnit.DAYS; //天 TimeUnit.HOURS; //小时 TimeUnit.MINUTES; //分钟 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //纳秒
ArrayBlockingQueue; LinkedBlockingQueue; SynchronousQueue;
ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
具体参数的配置与线程池的关系将在下一节讲述。
从上面给出的ThreadPoolExecutor类的代码可以知道,ThreadPoolExecutor继承了AbstractExecutorService,我们来看一下AbstractExecutorService的实现:
public abstract class AbstractExecutorService implements ExecutorService { protectedRunnableFuture newTaskFor(Runnable runnable, T value) { }; protected RunnableFuture newTaskFor(Callable callable) { }; public Future submit(Runnable task) {}; public Future submit(Runnable task, T result) { }; public Future submit(Callable task) { }; private T doInvokeAny(Collection > tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { }; public T invokeAny(Collection > tasks) throws InterruptedException, ExecutionException { }; public T invokeAny(Collection > tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { }; public List > invokeAll(Collection > tasks) throws InterruptedException { }; public List > invokeAll(Collection > tasks, long timeout, TimeUnit unit) throws InterruptedException { }; } AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。 我们接着看ExecutorService接口的实现: public interface ExecutorService extends Executor { void shutdown(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; Future submit(Callable task); Future submit(Runnable task, T result); Future submit(Runnable task); List > invokeAll(Collection > tasks) throws InterruptedException; List > invokeAll(Collection > tasks, long timeout, TimeUnit unit) throws InterruptedException; T invokeAny(Collection > tasks) throws InterruptedException, ExecutionException; T invokeAny(Collection > tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; } 而ExecutorService又是继承了Executor接口,我们看一下Executor接口的实现: public interface Executor { void execute(Runnable command); }
到这里,大家应该明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系了。
Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;
然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;
然后ThreadPoolExecutor继承了类AbstractExecutorService。
在ThreadPoolExecutor类中有几个非常重要的方法:
execute() submit() shutdown() shutdownNow()
execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。
shutdown()和shutdownNow()是用来关闭线程池的。
还有很多其他的方法:
比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法,有兴趣的朋友可以自行查阅API。
在上一节我们从宏观上介绍了ThreadPoolExecutor,下面我们来深入解析一下线程池的具体实现原理,将从下面几个方面讲解:
1.线程池状态
在ThreadPoolExecutor中定义了一个变量,另外定义了几个static final变量表示线程池的各个状态:
volatile int runState; static final int RUNNING = 0; static final int SHUTDOWN = 1; static final int STOP = 2; static final int TERMINATED = 3;
runState表示当前线程池的状态,它是一个volatile变量用来保证线程之间的可见性;
下面的几个static final变量表示runState可能的几个取值。
当创建线程池后,初始时,线程池处于RUNNING状态;
如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;
如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;
当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。
2.任务的执行
在了解将任务提交给线程池到任务执行完毕整个过程之前,我们先来看一下ThreadPoolExecutor类中其他的一些比较重要成员变量:
private final BlockingQueueworkQueue; //任务缓存队列,用来存放等待执行的任务 private final ReentrantLock mainLock = new ReentrantLock(); //线程池的主要状态锁,对线程池状态(比如线程池大小 //、runState等)的改变都要使用这个锁 private final HashSet workers = new HashSet (); //用来存放工作集 private volatile long keepAliveTime; //线程存活时间 private volatile boolean allowCoreThreadTimeOut; //是否允许为核心线程设置存活时间 private volatile int corePoolSize; //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列) private volatile int maximumPoolSize; //线程池最大能容忍的线程数 private volatile int poolSize; //线程池中当前的线程数 private volatile RejectedExecutionHandler handler; //任务拒绝策略 private volatile ThreadFactory threadFactory; //线程工厂,用来创建线程 private int largestPoolSize; //用来记录线程池中曾经出现过的最大线程数 private long completedTaskCount; //用来记录已经执行完毕的任务个数
每个变量的作用都已经标明出来了,这里要重点解释一下corePoolSize、maximumPoolSize、largestPoolSize三个变量。
corePoolSize在很多地方被翻译成核心池大小,其实我的理解这个就是线程池的大小。举个简单的例子:
假如有一个工厂,工厂里面有10个工人,每个工人同时只能做一件任务。
因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做;
当10个工人都有任务在做时,如果还来了任务,就把任务进行排队等待;
如果说新任务数目增长的速度远远大于工人做任务的速度,那么此时工厂主管可能会想补救措施,比如重新招4个临时工人进来;
然后就将任务也分配给这4个临时工人做;
如果说着14个工人做任务的速度还是不够,此时工厂主管可能就要考虑不再接收新的任务或者抛弃前面的一些任务了。
当这14个工人当中有人空闲时,而新任务增长的速度又比较缓慢,工厂主管可能就考虑辞掉4个临时工了,只保持原来的10个工人,毕竟请额外的工人是要花钱的。
这个例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。
也就是说corePoolSize就是线程池大小,maximumPoolSize在我看来是线程池的一种补救措施,即任务量突然过大时的一种补救措施。
不过为了方便理解,在本文后面还是将corePoolSize翻译成核心池大小。
largestPoolSize只是一个用来起记录作用的变量,用来记录线程池中曾经有过的最大线程数目,跟线程池的容量没有任何关系。
下面我们进入正题,看一下任务从提交到最终执行完毕经历了哪些过程。
在ThreadPoolExecutor类中,最核心的任务提交方法是execute()方法,虽然通过submit也可以提交任务,但是实际上submit方法里面最终调用的还是execute()方法,所以我们只需要研究execute()方法的实现原理即可:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated } }
上面的代码可能看起来不是那么容易理解,下面我们一句一句解释:
首先,判断提交的任务command是否为null,若是null,则抛出空指针异常;
接着是这句,这句要好好理解一下:
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
由于是或条件运算符,所以先计算前半部分的值,如果线程池中当前线程数不小于核心池大小,那么就会直接进入下面的if语句块了。
如果线程池中当前线程数小于核心池大小,则接着执行后半部分,也就是执行:
addIfUnderCorePoolSize(command)
如果执行完addIfUnderCorePoolSize这个方法返回false,则继续执行下面的if语句块,否则整个方法就直接执行完毕了。
如果执行完addIfUnderCorePoolSize这个方法返回false,然后接着判断:
if (runState == RUNNING && workQueue.offer(command))
如果当前线程池处于RUNNING状态,则将任务放入任务缓存队列;如果当前线程池不处于RUNNING状态或者任务放入缓存队列失败,则执行:
addIfUnderMaximumPoolSize(command)
如果执行addIfUnderMaximumPoolSize方法失败,则执行reject()方法进行任务拒绝处理。
回到前面:
if (runState == RUNNING && workQueue.offer(command))
这句的执行,如果说当前线程池处于RUNNING状态且将任务放入任务缓存队列成功,则继续进行判断:
if (runState != RUNNING || poolSize == 0)
这句判断是为了防止在将此任务添加进任务缓存队列的同时其他线程突然调用shutdown或者shutdownNow方法关闭了线程池的一种应急措施。如果是这样就执行:
ensureQueuedTaskHandled(command)
进行应急处理,从名字可以看出是保证 添加到任务缓存队列中的任务得到处理。
我们接着看2个关键方法的实现:addIfUnderCorePoolSize和addIfUnderMaximumPoolSize:
private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); //创建线程去执行firstTask任务 } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; }
这个是addIfUnderCorePoolSize方法的具体实现,从名字可以看出它的意图就是当低于核心吃大小时执行的方法。下面看其具体实现,首先获取到锁,因为这地方涉及到线程池状态的变化,先通过if语句判断当前线程池中的线程数目是否小于核心池大小,有朋友也许会有疑问:前面在execute()方法中不是已经判断过了吗,只有线程池当前线程数目小于核心池大小才会执行addIfUnderCorePoolSize方法的,为何这地方还要继续判断?原因很简单,前面的判断过程中并没有加锁,因此可能在execute方法判断的时候poolSize小于corePoolSize,而判断完之后,在其他线程中又向线程池提交了任务,就可能导致poolSize不小于corePoolSize了,所以需要在这个地方继续判断。然后接着判断线程池的状态是否为RUNNING,原因也很简单,因为有可能在其他线程中调用了shutdown或者shutdownNow方法。然后就是执行
t = addThread(firstTask);
这个方法也非常关键,传进去的参数为提交的任务,返回值为Thread类型。然后接着在下面判断t是否为空,为空则表明创建线程失败(即poolSize>=corePoolSize或者runState不等于RUNNING),否则调用t.start()方法启动线程。
我们来看一下addThread方法的实现:
private Thread addThread(Runnable firstTask) { Worker w = new Worker(firstTask); Thread t = threadFactory.newThread(w); //创建一个线程,执行任务 if (t != null) { w.thread = t; //将创建的线程的引用赋值为w的成员变量 workers.add(w); int nt = ++poolSize; //当前线程数加1 if (nt > largestPoolSize) largestPoolSize = nt; } return t; }
在addThread方法中,首先用提交的任务创建了一个Worker对象,然后调用线程工厂threadFactory创建了一个新的线程t,然后将线程t的引用赋值给了Worker对象的成员变量thread,接着通过workers.add(w)将Worker对象添加到工作集当中。
下面我们看一下Worker类的实现:
private final class Worker implements Runnable { private final ReentrantLock runLock = new ReentrantLock(); private Runnable firstTask; volatile long completedTasks; Thread thread; Worker(Runnable firstTask) { this.firstTask = firstTask; } boolean isActive() { return runLock.isLocked(); } void interruptIfIdle() { final ReentrantLock runLock = this.runLock; if (runLock.tryLock()) { try { if (thread != Thread.currentThread()) thread.interrupt(); } finally { runLock.unlock(); } } } void interruptNow() { thread.interrupt(); } private void runTask(Runnable task) { final ReentrantLock runLock = this.runLock; runLock.lock(); try { if (runState < STOP && Thread.interrupted() && runState >= STOP) boolean ran = false; beforeExecute(thread, task); //beforeExecute方法是ThreadPoolExecutor类的一个方法,没有具体实现,用户可以根据 //自己需要重载这个方法和后面的afterExecute方法来进行一些统计信息,比如某个任务的执行时间等 try { task.run(); ran = true; afterExecute(task, null); ++completedTasks; } catch (RuntimeException ex) { if (!ran) afterExecute(task, ex); throw ex; } } finally { runLock.unlock();