前言

ThreadPoolExecutor用于管理和控制线程,提供了线程池的功能,可以执行Runnable或Callable任务,它具有以下特性

  1. 线程池管理ThreadPoolExecutor 可以帮助您管理线程池,包括创建、启动、关闭线程池以及管理线程的生命周期。
  2. 任务执行:它可以执行提交给线程池的任务,这些任务可以是 RunnableCallable 接口的实现。
  3. 灵活的配置选项ThreadPoolExecutor 提供了丰富的配置选项,可以通过设置核心线程数、最大线程数、任务队列、拒绝策略等参数来适应不同的场景和需求。
  4. 任务队列:线程池使用任务队列来存储等待执行的任务。ThreadPoolExecutor 支持多种类型的任务队列,例如无界队列、有界队列、同步移交队列等。
  5. 拒绝策略:当任务无法被接受执行时,ThreadPoolExecutor 提供了多种拒绝策略,例如抛出异常、丢弃任务、阻塞等待等,可以根据需要选择合适的策略。
  6. 线程池状态管理ThreadPoolExecutor 提供了方法来管理线程池的状态,例如启动线程池、关闭线程池、等待线程池中所有任务执行完成等。
  7. 监控和调优:可以通过监控线程池的状态、记录任务执行时间等方式来调优线程池的性能,以确保线程池的稳定运行和高效利用。
  8. 可扩展性ThreadPoolExecutor 是一个抽象类,可以通过继承并重写其方法来实现自定义的线程池行为,例如自定义任务调度策略、任务执行行为等。

类图如下:

image-20240402104814011

关键属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// 用于记录线程池的控制状态。控制状态包括线程池中活跃的线程数、线程池运行状态等信息,是线程池中重要的状态指标。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

// 线程池的状态,这些状态的数值顺序很重要,因为它们允许进行有序的比较.
// 线程池处于运行状态,可以接受新任务,也可以处理队列中的任务
private static final int RUNNING = -1 << COUNT_BITS;
// 线程池处于关闭状态,不再接受新任务,但会继续处理队列中的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 线程池处于停止状态,不再接受新任务,也不会处理队列中的任务,同时会中断正在处理的任务。
private static final int STOP = 1 << COUNT_BITS;
// 所有的任务都已终止,workerCount为0,线程池将会转变到TIDYING状态,并将运行钩子函数terminated()
private static final int TIDYING = 2 << COUNT_BITS;
// terminated()方法已经完成,此时线程池处于最终状态。
private static final int TERMINATED = 3 << COUNT_BITS;
// 这是一个阻塞队列,用于存储待执行的任务。
private final BlockingQueue<Runnable> workQueue;
// 用于对线程池的状态和任务队列进行加锁操作,确保线程池的线程安全性。
// 主要用在三个地方:1、保护workers集合的访问和修改;2、保护线程池状态的修改;3、保护线程池统计数据的修改
private final ReentrantLock mainLock = new ReentrantLock();

// 用于存储线程池中的工作线程(Worker对象)。每个 Worker 对象代表一个工作线程,负责从任务队列中取出任务并执行。
private final HashSet<Worker> workers = new HashSet<>();

// 用于在线程池关闭时通知等待的线程。当线程池处于 TERMINATED 状态时,将会使用这个条件变量通知等待的线程
private final Condition termination = mainLock.newCondition();

// 用于跟踪线程池达到的最大线程数。只有持有mainLock的时候才能被访问。
private int largestPoolSize;

// 用于记录已完成的任务数。工作线程终止时会更新这个值。
private long completedTaskCount;

// 线程工厂,所有的线程都通过这个工厂创建。
private volatile ThreadFactory threadFactory;

// 拒绝策略,当线程池饱和或者在执行过程中关闭时被调用。
private volatile RejectedExecutionHandler handler;

// 定义空闲线程等待新任务的。超时时间,当线程池中的线程数超过corePoolSize或者allowCoreThreadTimeOut为true时使用。否则,它们会无限期地等待新的任务
private volatile long keepAliveTime;

// 属性为false时,即使空闲,核心线程也会保持活动状态。否则,核心线程会使用keepAliveTime作为等待新任务的超时时间。
private volatile boolean allowCoreThreadTimeOut;

// 线程池中的核心线程数。
private volatile int corePoolSize;

// 定义线程池的最大数量。当工作队列满了,且已经创建的线程数小于maximumPoolSize,则会创建新的线程来执行任务。
private volatile int maximumPoolSize;
// 默认的拒绝策略,直接丢弃掉任务。
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

构造器

我们直接看全参数的构造器,其它构造器底层也是调用的这个全参构造器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
 /**
*
* @param corePoolSize 核心线程数
* @param maximumPoolSize 最大线程数
* @param keepAliveTime 定义空闲线程等待新任务的时间
* @param unit 时间单位,秒或毫秒
* @param workQueue 队列
* @param threadFactory 指定线程工厂
* @param handler 指定拒绝策略
* @throws IllegalArgumentException
* @throws NullPointerException
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

关键方法

execute(Runnable command)

这个方法用于提交一个新的任务给线程池执行。可以看到内部会先判断核心线程数是否已满,否的话就创建一个核心线程的worker去执行任务,同时将这个worker加入到wokers集合中;

如果核心线程已满并且线程池还是在运行中,此时会尝试将任务加入到workQueue中去,然后回来再次检查线程池的状态:如果线程池已经不是运行中就从workQueue中移除这个节点,而这个操作也失败的话,就调用拒绝策略;

如果核心线程数满了,加入队列也失败了,此时会尝试构造一个非核心线程的woker去执行任务,而如果这个也失败了,也会调用失败策略去处理这个任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // 如果正在运行的线程数少于核心线程数
if (addWorker(command, true)) // 尝试添加一个新的线程来执行提交的任务,添加成功就直接返回
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { // 如果线程池正在运行并且任务可以成功地加入到工作队列
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) // 如果线程池已经停止,从工作队列中移除这个任务并拒绝它
reject(command);
else if (workerCountOf(recheck) == 0) // 如果没有线程在运行,就启动一个新的线程
addWorker(null, false);
}
else if (!addWorker(command, false)) // 如果任务不能加入到工作队列,尝试添加一个新的线程
reject(command); // 如果添加失败,拒绝这个任务
}

现在我们看下addWorker方法,它负责向线程池添加新的工作线程。此方法接收两个参数:firstTask,新线程首先应运行的任务,以及core,一个布尔值,决定添加新线程时应使用corePoolSize还是maximumPoolSize作为边界。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;

for (;;) {
// 如果线程的数量超过了阈值,直接返回false。这里阈值的判定是根据core参数来的。
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
if (compareAndIncrementWorkerCount(c)) // 累加ctl成功就跳出最外层循环,继续后续逻辑
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN)) // 为true,说明线程池已关闭或正在关闭,此时笔应该添加新的线程了。
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 下面内容为构造工作者线程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int c = ctl.get();
// 如果线程池处于运行状态,或者线程池状态小于STOP并且没有第一个任务,就继续后续逻辑
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
// 线程状态不是NEW,抛出异常
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize) // 赋值largestPoolSize
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {// 添加成功就开始执行这个线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted) // 添加失败的逻辑
// 内部逻辑大致是从workqueues中移除这个worker,减少workerCOunt,并尝试关闭线程池
addWorkerFailed(w);
}
return workerStarted;
}

submit(Runnable task)

submit也是提交一个任务给线程池执行,它底层调用的也是execute方法,只是它与execute有一些区别:

  1. 返回值:execute方法没有返回值,而submit方法返回一个Future对象。这个Future对象可以用来获取任务的执行结果,或者取消任务的执行。
  2. 异常处理:如果你提交给execute方法的任务在执行过程中抛出了未检查的异常,那么这个异常会被传递给未捕获异常处理器(如果有的话),并且会导致当前线程终止。而submit方法则不同,如果你提交给submit方法的任务在执行过程中抛出了异常,那么这个异常会被吞掉,除非你调用了Future.get()方法来获取任务的执行结果,这时候会抛出ExecutionException。
  3. 任务类型:execute方法只能接受Runnable任务,而submit方法既可以接受Runnable任务,也可以接受Callable任务
1
2
3
4
5
6
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

remove(Runnable task)

从队列中移除指定任务,并尝试终止线程池。

1
2
3
4
5
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}

shutdown()

这个方法用于平滑地关闭线程池。已经提交的任务会继续执行,但是不会接受新的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void shutdown() {
// 获取mainLock的锁。这是为了确保在修改线程池状态时不会有其他线程同时进行修改。
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 检查当前线程是否有权限关闭线程池。
checkShutdownAccess();
// 将线程池的状态设置为SHUTDOWN。
advanceRunState(SHUTDOWN);
// 中断所有空闲的工作线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate(); // 尝试关闭线程池
}

shutdownNow()

立即关闭线程池。尝试停止所有正在执行的任务,停止处理还在队列中的任务,并返回队列中等待的任务列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
// 移除并返回所有等待中的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}

工作线程的运行机制

主要是runWorker方法,可以看到这里面主要是一循环,里面会调用线程的run方法执行任务,同时会调用getTask()方法获取队列中的任务。在执行任务时会先做加锁处理,处理完任务后会累加已完成任务数,然后释放锁并再次调用getTask方法获取队列中的任务,直到队列中没有任务才会终止掉这个循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// 如果线程池正在停止,确保线程被中断;如果没有,确保线程没有被中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
// 执行任务
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
// 完成任务数加1
w.completedTasks++;
// 解锁工作线程
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 处理工作线程退出
processWorkerExit(w, completedAbruptly);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
// 无限循环,直到获取到任务或者需要退出
for (;;) {
int c = ctl.get();

// 如果线程池的状态至少是SHUTDOWN,并且(线程池的状态是STOP或者工作队列为空),那么减少工作线程的数量并返回null
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

任务执行完毕后就要处理当前工作线程了,这段逻辑维护在processWorkerExit方法中。在获取锁后,会从workers集合中移除掉这个worker,然后尝试终止线程池。然后可以分为以下几种情况

  • 如果allowCoreThreadTimeOut为false,且线程池中工作者线程总量小于核心线程数,添加一个新的worker到线程池中;
  • 如果allowCoreThreadTimeOut为false,且线程池中工作者线程总量大于等于核心线程数,说明核心线程数已满,直接return;
  • 如果allowCoreThreadTimeOut为true,且等待队列中有任务待执行且workers已空,则添加一个新的worker到线程池中;
  • 如果allowCoreThreadTimeOut为true,且等待队列中有任务待执行且workers至少有一个工作者线程存货,直接return;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 如果工作线程是突然结束的,那么工作线程的数量还没有被调整,所以这里需要减少工作线程的数量
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 增加已完成任务的数量,并从工作线程集合中移除这个工作线程
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试终止线程池
tryTerminate();

int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) { // 这里结果为true,说明工作线程不是突然结束的
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // 工作者线程数量大于1,直接返回掉
}
addWorker(null, false);
}
}

拒绝策略

当线程池因某些原因不能处理任务时,会拒绝处理任务,主要策略有以下几种

  1. AbortPolicy:这是默认的策略。当任务被拒绝时,它会抛出一个RejectedExecutionException异常。
  2. CallerRunsPolicy:这个策略下,如果线程池未关闭,任务会在调用者的线程中运行。如果线程池已关闭,则丢弃任务。
  3. DiscardPolicy:这个策略直接丢弃无法处理的任务,不给任何反馈。
  4. DiscardOldestPolicy:此策略将丢弃工作队列中最旧的未处理任务,然后尝试重新提交当前任务。