前言
ThreadPoolExecutor用于管理和控制线程,提供了线程池的功能,可以执行Runnable或Callable任务,它具有以下特性
- 线程池管理:
ThreadPoolExecutor
可以帮助您管理线程池,包括创建、启动、关闭线程池以及管理线程的生命周期。
- 任务执行:它可以执行提交给线程池的任务,这些任务可以是
Runnable
或 Callable
接口的实现。
- 灵活的配置选项:
ThreadPoolExecutor
提供了丰富的配置选项,可以通过设置核心线程数、最大线程数、任务队列、拒绝策略等参数来适应不同的场景和需求。
- 任务队列:线程池使用任务队列来存储等待执行的任务。
ThreadPoolExecutor
支持多种类型的任务队列,例如无界队列、有界队列、同步移交队列等。
- 拒绝策略:当任务无法被接受执行时,
ThreadPoolExecutor
提供了多种拒绝策略,例如抛出异常、丢弃任务、阻塞等待等,可以根据需要选择合适的策略。
- 线程池状态管理:
ThreadPoolExecutor
提供了方法来管理线程池的状态,例如启动线程池、关闭线程池、等待线程池中所有任务执行完成等。
- 监控和调优:可以通过监控线程池的状态、记录任务执行时间等方式来调优线程池的性能,以确保线程池的稳定运行和高效利用。
- 可扩展性:
ThreadPoolExecutor
是一个抽象类,可以通过继承并重写其方法来实现自定义的线程池行为,例如自定义任务调度策略、任务执行行为等。
类图如下:
关键属性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
private final BlockingQueue<Runnable> workQueue;
private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<>();
private final Condition termination = mainLock.newCondition();
private int largestPoolSize;
private long completedTaskCount;
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;
private volatile long keepAliveTime;
private volatile boolean allowCoreThreadTimeOut;
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
|
构造器
我们直接看全参数的构造器,其它构造器底层也是调用的这个全参构造器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
|
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
|
关键方法
execute(Runnable command)
这个方法用于提交一个新的任务给线程池执行。可以看到内部会先判断核心线程数是否已满,否的话就创建一个核心线程的worker去执行任务,同时将这个worker加入到wokers集合中;
如果核心线程已满并且线程池还是在运行中,此时会尝试将任务加入到workQueue中去,然后回来再次检查线程池的状态:如果线程池已经不是运行中就从workQueue中移除这个节点,而这个操作也失败的话,就调用拒绝策略;
如果核心线程数满了,加入队列也失败了,此时会尝试构造一个非核心线程的woker去执行任务,而如果这个也失败了,也会调用失败策略去处理这个任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
|
现在我们看下addWorker方法,它负责向线程池添加新的工作线程。此方法接收两个参数:firstTask,新线程首先应运行的任务,以及core,一个布尔值,决定添加新线程时应使用corePoolSize还是maximumPoolSize作为边界。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| private boolean addWorker(Runnable firstTask, boolean core) { retry: for (int c = ctl.get();;) { if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) return false;
for (;;) { if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateAtLeast(c, SHUTDOWN)) continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int c = ctl.get(); if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) { if (t.getState() != Thread.State.NEW) throw new IllegalThreadStateException(); workers.add(w); workerAdded = true; int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
|
submit(Runnable task)
submit也是提交一个任务给线程池执行,它底层调用的也是execute方法,只是它与execute有一些区别:
- 返回值:execute方法没有返回值,而submit方法返回一个Future对象。这个Future对象可以用来获取任务的执行结果,或者取消任务的执行。
- 异常处理:如果你提交给execute方法的任务在执行过程中抛出了未检查的异常,那么这个异常会被传递给未捕获异常处理器(如果有的话),并且会导致当前线程终止。而submit方法则不同,如果你提交给submit方法的任务在执行过程中抛出了异常,那么这个异常会被吞掉,除非你调用了Future.get()方法来获取任务的执行结果,这时候会抛出ExecutionException。
- 任务类型:execute方法只能接受Runnable任务,而submit方法既可以接受Runnable任务,也可以接受Callable任务
1 2 3 4 5 6
| public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
|
remove(Runnable task)
从队列中移除指定任务,并尝试终止线程池。
1 2 3 4 5
| public boolean remove(Runnable task) { boolean removed = workQueue.remove(task); tryTerminate(); return removed; }
|
shutdown()
这个方法用于平滑地关闭线程池。已经提交的任务会继续执行,但是不会接受新的任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); }
|
shutdownNow()
立即关闭线程池。尝试停止所有正在执行的任务,停止处理还在队列中的任务,并返回队列中等待的任务列表。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
|
工作线程的运行机制
主要是runWorker方法,可以看到这里面主要是一循环,里面会调用线程的run方法执行任务,同时会调用getTask()方法获取队列中的任务。在执行任务时会先做加锁处理,处理完任务后会累加已完成任务数,然后释放锁并再次调用getTask方法获取队列中的任务,直到队列中没有任务才会终止掉这个循环。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); try { task.run(); afterExecute(task, null); } catch (Throwable ex) { afterExecute(task, ex); throw ex; } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| private Runnable getTask() { boolean timedOut = false; for (;;) { int c = ctl.get();
if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) { decrementWorkerCount(); return null; }
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; }
try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
|
任务执行完毕后就要处理当前工作线程了,这段逻辑维护在processWorkerExit方法中。在获取锁后,会从workers集合中移除掉这个worker,然后尝试终止线程池。然后可以分为以下几种情况
- 如果allowCoreThreadTimeOut为false,且线程池中工作者线程总量小于核心线程数,添加一个新的worker到线程池中;
- 如果allowCoreThreadTimeOut为false,且线程池中工作者线程总量大于等于核心线程数,说明核心线程数已满,直接return;
- 如果allowCoreThreadTimeOut为true,且等待队列中有任务待执行且workers已空,则添加一个新的worker到线程池中;
- 如果allowCoreThreadTimeOut为true,且等待队列中有任务待执行且workers至少有一个工作者线程存货,直接return;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate();
int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; } addWorker(null, false); } }
|
拒绝策略
当线程池因某些原因不能处理任务时,会拒绝处理任务,主要策略有以下几种
- AbortPolicy:这是默认的策略。当任务被拒绝时,它会抛出一个RejectedExecutionException异常。
- CallerRunsPolicy:这个策略下,如果线程池未关闭,任务会在调用者的线程中运行。如果线程池已关闭,则丢弃任务。
- DiscardPolicy:这个策略直接丢弃无法处理的任务,不给任何反馈。
- DiscardOldestPolicy:此策略将丢弃工作队列中最旧的未处理任务,然后尝试重新提交当前任务。