引言
上一篇文章我们有介绍过线程池的一个基本执行流程《【Java并发编程】面试必备之线程池》以及它的7个核心参数,以及每个参数的作用、以及如何去使用线程池
还留了几个小问题。。建议看这篇文章之前可先看下前面那篇文章。这篇文章我们就来分析下上篇文章的几个小问题
线程池是否区分核心线程和非核心线程?
如何保证核心线程不被销毁?
线程池的线程是如何做到复用的?
我们先看最后一个问题一般一个线程执行完任务之后就结束了,Thread.start()
只能调用一次,一旦这个调用结束,则该线程就到了stop
状态,不能再次调用start
。如果你对一个已经启动的线程对象再调用一次start
方法的话,会产生:IllegalThreadStateException
异常,但是Thread
的run
方法是可以重复调用的。所以这里也会有一个面试经常问到的问题:Thread类中run()和start()方法的有什么区别?
下面我们就从jdk
的源码来一起看看如何实现线程复用的:
线程池执行任务的ThreadPoolExecutor
#execute
方法为入口public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 线程池当前线程数小于 corePoolSize 时进入if条件调用 addWorker 创建核心线程来执行任务 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 线程池当前线程数大于或等于 corePoolSize ,就将任务添加到 workQueue 中 if (isRunning(c) && workQueue.offer(command)) { // 获取到当前线程的状态,赋值给 recheck ,是为了重新检查状态 int recheck = ctl.get(); // 如果 isRunning 返回 false ,那就 remove 掉这个任务,然后执行拒绝策略,也就是回滚重新排队 if (! isRunning(recheck) && remove(command)) reject(command); // 线程池处于 running 状态,但是没有线程,那就创建线程执行任务 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果任务放入 workQueue 失败,则尝试通过创建非核心线程来执行任务 // 创建非核心线程失败,则说明线程池已经关闭或者已经饱和,会执行拒绝策略 else if (!addWorker(command, false)) reject(command); }
excute*方法主要业务逻辑
如果当前的线程池运行线程小于coreSize,则创建新线程来执行任务。
如果当前运行的线程等于coreSize或多余coreSize(动态修改了coreSize才会出现这种情况),把任务放到阻塞队列中。
如果队列已满无法将新加入的任务放进去的话,则需要创建新的线程来执行任务。
如果新创建线程已经达到了最大线程数,任务将会被拒绝。
addWorker 方法
上述方法的核心主要就是addWorker方法,
private boolean addWorker(Runnable firstTask, boolean core) { // 前面还有一部分就省略了。。。。 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
这个方法我们先看看这个work类吧
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); }
work*类实现了Runnable接口,然后run方法里面调用了runWorker方法
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); // 新增创建 Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 判断 task 是否为空,如果不为空直接执行 // 如果 task 为空,调用 getTask() 方法,从 workQueue 中取出新的 task 执行 while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
这个
runwork
方法中会优先取worker
绑定的任务,如果创建这个worker的时候没有给worker
绑定任务,worker
就会从队列里面获取任务来执行,执行完之后worker
并不会销毁,而是通过while
循环不停的执行getTask
方法从阻塞队列中获取任务调用task.run()来执行任务,这样的话就达到了线程复用的目的。while (task != null || (task = getTask()) != null)
这个循环条件只要getTask
返回获取的值不为空这个循环就不会终止, 这样线程也就会一直在运行。
那么任务执行完怎么保证核心线程不销毁?非核心线程销毁?
答案就在这个getTask()
方法里面private Runnable getTask() { // 超时标记,默认为false,如果调用workQueue.poll()方法超时了,会标记为true // 这个标记非常之重要,下面会说到 boolean timedOut = false; for (;;) { // 获取ctl变量值 int c = ctl.get(); int rs = runStateOf(c); // 如果当前状态大于等于SHUTDOWN,并且workQueue中的任务为空或者状态大于等于STOP // 则操作AQS减少工作线程数量,并且返回null,线程被回收 // 也说明假设状态为SHUTDOWN的情况下,如果workQueue不为空,那么线程池还是可以继续执行剩下的任务 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 操作AQS将线程池中的线程数量减一 decrementWorkerCount(); return null; } // 获取线程池中的有效线程数量 int wc = workerCountOf(c); // 如果主动开启allowCoreThreadTimeOut,或者获取当前工作线程大于corePoolSize,那么该线程是可以被超时回收的 // allowCoreThreadTimeOut默认为false,即默认不允许核心线程超时回收 // 这里也说明了在核心线程以外的线程都为“临时”线程,随时会被线程池回收 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 这里说明了两点销毁线程的条件: // 1.原则上线程池数量不可能大于maximumPoolSize,但可能会出现并发时操作了setMaximumPoolSize方法,如果此时将最大线程数量调少了,很可能会出现当前工作线程大于最大线程的情况,这时就需要线程超时回收,以维持线程池最大线程小于maximumPoolSize, // 2.timed && timedOut 如果为true,表示当前操作需要进行超时控制,这里的timedOut为true,说明该线程已经从workQueue.poll()方法超时了 // 以上两点满足其一,都可以触发线程超时回收 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { // 尝试用AQS将线程池线程数量减一 if (compareAndDecrementWorkerCount(c)) // 减一成功后返回null,线程被回收 return null; // 否则循环重试 continue; } try { // 如果timed为true,阻塞超时获取任务,否则阻塞获取任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; // 如果poll超时获取任务超时了, 将timeOut设置为true // 继续循环执行,如果碰巧开发者开启了allowCoreThreadTimeOut,那么该线程就满足超时回收了 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
所以保证线程不被销毁的关键代码就是这一句代码
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
只要
timed
为false
这个workQueue.take()
就会一直阻塞,也就保证了线程不会被销毁。timed
的值又是通过allowCoreThreadTimeOut
和正在运行的线程数量是否大于coreSize
控制的。只要
getTask
方法返回null
我们的线程就会被回收(runWorker
方***调用processWorkerExit
)这个方法的源码也就解释了为什么我们在创建线程池的时候设置了
allowCoreThreadTimeOut
=true
的话,核心线程也会进行销毁。通过这个方法我也们可以回答上面那个问题线程池是不区分核心线程和非核心线程的。
结束
由于自己才疏学浅,难免会有纰漏,假如你发现了错误的地方,还望留言给我指出来,我会对其加以修正。
如果你觉得文章还不错,你的转发、分享、赞赏、点赞、留言就是对我最大的鼓励。
感谢您的阅读,十分欢迎并感谢您的关注。
巨人的肩膀摘苹果
http://objcoding.com/2019/04/25/threadpool-running/
全部评论
(1) 回帖