点进ThreadPoolExectutor源码ThreadPoolExecutor.java可以看到线程池的核心实现;
ThreadPoolExecutor
首先是ThreadPoolExecutor()里面可以看到线程池核心参数- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler) {
- if (corePoolSize < 0 ||
- maximumPoolSize <= 0 ||
- maximumPoolSize < corePoolSize ||
- keepAliveTime < 0)
- throw new IllegalArgumentException();
- if (workQueue == null || threadFactory == null || handler == null)
- throw new NullPointerException();
- this.acc = System.getSecurityManager() == null ?
- null :
- AccessController.getContext();
- this.corePoolSize = corePoolSize;
- this.maximumPoolSize = maximumPoolSize;
- this.workQueue = workQueue;
- this.keepAliveTime = unit.toNanos(keepAliveTime);
- this.threadFactory = threadFactory;
- this.handler = handler;
- }
复制代码 wc记录的是工作线程数,timed 标记的是“当前这个线程是否受超时限制”。
再看下里面try内逻辑:
- 如果 timed == true(通常是因为 wc > corePoolSize),说明当前线程属于“多出来的非核心线程”。
- 那么它去队列拿任务时,用 poll(keepAliveTime)。
- 关键点:如果在 keepAliveTime 时间内没拿到任务(返回 null),下一次循环时 timedOut 就会变为 true,进而导致返回 null,最终导致这个 Worker 退出循环被销毁。
总结:是先“等待超时拿到 null”,然后才“被回收”。
execute()
再看下execute()方法- private final class Worker
- extends AbstractQueuedSynchronizer
- implements Runnable
- {
- // 省略部分方法和变量
- /** Thread this worker is running in. Null if factory fails. */
- final Thread thread;
- /** Initial task to run. Possibly null. */
- Runnable firstTask;
- Worker(Runnable firstTask) {
- setState(-1); // inhibit interrupts until runWorker
- this.firstTask = firstTask;
- this.thread = getThreadFactory().newThread(this);
- }
- /** Delegates main run loop to outer runWorker */
- public void run() {
- runWorker(this);
- }
- }
复制代码 无论是看注释还是看if-else那块代码都能理解:
如果任务数小于核心线程数,那就创建核心线程
如果线程池正在运行,尝试将任务加入队列(workQueue.offer(command))
成功后需要二次检查:
- 如果线程池已关闭,移除任务并拒绝
- 如果没有线程了,创建新线程
你可能会疑惑,为什么任务已经入队了,还要判断 workerCount == 0 并可能创建一个空任务线程?
- 原因:假设核心线程数(Core)设为 0,或者在任务入队的瞬间,现有的线程刚好都挂了(抛异常)或者都超时销毁了。
- 后果:如果这里不检查,任务孤零零地躺在队列里,永远没人去取它,导致“死锁”般的假死状态。
- 作用:兜底策略,确保只要队列里有任务,就至少有一个线程活着去处理它。
如果队列满了
- 尝试创建非核心线程(addWorker(command, false))
- 如果失败(达到最大线程数),拒绝任务
再关注一下另外两个的重要的方法;
prestartCoreThread()
- Worker(Runnable firstTask) {
- setState(-1); // inhibit interrupts until runWorker
- this.firstTask = firstTask;
- this.thread = getThreadFactory().newThread(this);
- }
复制代码 prestartAllCoreThreads()
- final void runWorker(Worker w) {
- Thread wt = Thread.currentThread();
- Runnable task = w.firstTask;
- w.firstTask = null;
- w.unlock(); // allow interrupts
- boolean completedAbruptly = true;
- try {
- while (task != null || (task = getTask()) != null) {
- w.lock();
- // If pool is stopping, ensure thread is interrupted;
- // if not, ensure thread is not interrupted. This
- // requires a recheck in second case to deal with
- // shutdownNow race while clearing interrupt
- if ((runStateAtLeast(ctl.get(), STOP) ||
- (Thread.interrupted() &&
- runStateAtLeast(ctl.get(), STOP))) &&
- !wt.isInterrupted())
- wt.interrupt();
- try {
- beforeExecute(wt, task);
- Throwable thrown = null;
- try {
- task.run();
- } catch (RuntimeException x) {
- thrown = x; throw x;
- } catch (Error x) {
- thrown = x; throw x;
- } catch (Throwable x) {
- thrown = x; throw new Error(x);
- } finally {
- afterExecute(task, thrown);
- }
- } finally {
- task = null;
- w.completedTasks++;
- w.unlock();
- }
- }
- completedAbruptly = false;
- } finally {
- processWorkerExit(w, completedAbruptly);
- }
- }
复制代码
- 核心机制:private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
首先注意到这两个方法内部都调用了:
Java- private Runnable getTask() {
- boolean timedOut = false; // Did the last poll() time out?
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
- // Check if queue empty only if necessary.
- if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
- decrementWorkerCount();
- return null;
- }
- int wc = workerCountOf(c);
- // Are workers subject to culling?
- boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
- if ((wc > maximumPoolSize || (timed && timedOut))
- && (wc > 1 || workQueue.isEmpty())) {
- if (compareAndDecrementWorkerCount(c))
- return null;
- continue;
- }
- try {
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- workQueue.take();
- if (r != null)
- return r;
- timedOut = true;
- } catch (InterruptedException retry) {
- timedOut = false;
- }
- }
- }
复制代码
- null: 这里的 firstTask 是空。
- 回忆一下之前看的 runWorker 源码:如果 firstTask 为空,线程启动后就不会立即执行任务,而是直接进入 while 循环调用 getTask()。
- getTask() 会调用 workQueue.take(),导致该线程在队列上阻塞等待。
- true: 表示创建的是核心线程。
结论:private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
} 的作用就是——招一个工人,让他没事干先去仓库门口等着,随时准备干活。
2. prestartCoreThread() —— 启动一个
Java- public boolean prestartCoreThread() { // 1. 检查当前线程数是否小于核心数 // 2. 尝试创建一个空任务的核心线程 return workerCountOf(ctl.get()) < corePoolSize && private Runnable getTask() {
- boolean timedOut = false; // Did the last poll() time out?
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
- // Check if queue empty only if necessary.
- if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
- decrementWorkerCount();
- return null;
- }
- int wc = workerCountOf(c);
- // Are workers subject to culling?
- boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
- if ((wc > maximumPoolSize || (timed && timedOut))
- && (wc > 1 || workQueue.isEmpty())) {
- if (compareAndDecrementWorkerCount(c))
- return null;
- continue;
- }
- try {
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- workQueue.take();
- if (r != null)
- return r;
- timedOut = true;
- } catch (InterruptedException retry) {
- timedOut = false;
- }
- }
- }; }
复制代码
- 理解:如果核心线程还没满,就提前启动一个核心线程。
- 返回值:如果成功启动了一个线程,返回 true;如果核心线程早已满了,返回 false。
3. prestartAllCoreThreads() —— 全部启动
Java- public int prestartAllCoreThreads() { int n = 0; // 只要 addWorker 返回 true(说明还没满),就一直循环创建 while (private Runnable getTask() {
- boolean timedOut = false; // Did the last poll() time out?
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
- // Check if queue empty only if necessary.
- if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
- decrementWorkerCount();
- return null;
- }
- int wc = workerCountOf(c);
- // Are workers subject to culling?
- boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
- if ((wc > maximumPoolSize || (timed && timedOut))
- && (wc > 1 || workQueue.isEmpty())) {
- if (compareAndDecrementWorkerCount(c))
- return null;
- continue;
- }
- try {
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- workQueue.take();
- if (r != null)
- return r;
- timedOut = true;
- } catch (InterruptedException retry) {
- timedOut = false;
- }
- }
- }) ++n; return n;}
复制代码
- 理解:不管现在有多少线程,只要没达到 corePoolSize,就一口气把剩下的坑位全填满,让所有核心线程全部就位待命。
- 返回值:返回这次一共新启动了多少个线程。
典型应用场景:
- 高并发系统的启动时刻:比如“双11”零点,流量瞬间铺天盖地。如果这时候再去创建线程,线程创建的开销可能会导致系统卡顿(Jitter)。使用 prestartAllCoreThreads() 可以在流量到达前先把线程池填满。
- 低延迟敏感系统:为了避免请求第一次处理时的抖动(Latency Spike),提前预热。
总结
- 入口:ThreadPoolExecutor 构造参数(配置)。
- 调度:execute()(决策:是建线程还是入队)。
- 载体:Worker(封装了 Thread 和 Runnable)。
- 引擎:runWorker()(死循环:取任务 -> 执行 -> 统计)。
- 油箱:getTask()(阻塞队列取货,决定线程生死)。
程池的核心本质:
线程池__不仅仅是 new Thread() 的集合,它更是一个 “生产者-消费者” 模型。
- 生产者:__execute() 方法,负责把任务“生产”出来并推送到队列或直接交给工人。
- 消费者:__Worker 线程,负责从队列这个“缓冲区”里不断 getTask() 并消费。
- 管理者:__ThreadPoolExecutor 持有 ctl 状态,动态控制工人的数量(招人/裁员)。
1、如果核心线程数设置为0会发生啥?
当 corePoolSize = 0 _时,线程池__就像一个“全兼职”_的机构。
- 任务来了先尝试进队列。
- 如果进了队列,必须兜底检查是否有线程活着,如果没有,通过 addWorker(null, false) 创建一个临时工(非核心线程)来处理队列里的活。注意这里必须用 false__,因为核心编制(Core)是0,只能招临时工(Max)。
- 这个临时工线程在 keepAliveTime _超时后会被销毁,_线程池__最终会变回空状态。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |