博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ThreadPoolExecutor 源码分析
阅读量:5100 次
发布时间:2019-06-13

本文共 9274 字,大约阅读时间需要 30 分钟。

public class ThreadPoolExecutor extends AbstractExecutorService {}

ThreadPoolExecutor继承了AbstractExecutorService,该抽象类为线程池提供了默认实现。后面讲到线程池代码时详细说明。

构造函数

ThreadPoolExecutor有很多重载的构造函数,所有构造函数最终都调用了一个构造函数,只是有些构造函数有默认参数而已,看下最终调用的构造函数

public ThreadPoolExecutor(     int corePoolSize,     int maximumPoolSize,     long keepAliveTime,     TimeUnit unit,     BlockingQueue
workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler;}

corePoolSize:核心线程数,当提交一个新的任务到线程池,如果当前线程池运行的线程数(包括闲置的线程)小于核心线程数,则会创建一个新的线程作为核心线程来执行该任务。

  maximumPoolSize:线程池允许最大的线程数,当提交一个新的任务到线程池,如果当前线程池运行的线程数(包括闲置的线程)大于corePoolSize,小于maximumPoolSize,并且等待队列满的时候,会创建一个新的线程来处理该任务。
  keepAliveTime:当线程池中线程数量大于corePoolSize时,闲置线程最长可以存活的时间。
  unit:时间单位。
  workQueue:保存任务的队列,当池中线程数大于corePoolSize时,新来的任务保存到该队列。
  threadFactory:线程工厂,线程池中的线程都是通过这个工厂创建的。
  handler:任务拒绝执行策略,当线程池无法处理新来任务时的处理策略。

线程池设计思路

 

1 当一个任务通过submit或者execute方法提交到线程池的时候,如果当前池中线程数(包括闲置线程)小于coolPoolSize,则创建一个线程执行该任务。

  2 如果当前池中线程数大于等于coolPoolSize,则将该任务加入到等待队列。
  3 如果任务不能入队,说明等待队列已满,若当前池中线程数小于maximumPoolSize,则创建一个临时线程(非核心线程)执行该任务。
  4 如果当前池中线程数已经等于maximumPoolSize,此时无法执行该任务,根据拒绝执行策略处理,后面还会详细讲解具体的拒绝执行策略。
注意:当池中线程数大于coolPoolSize,超过keepAlive时间的闲置线程会被回收掉。注意,回收的是非核心线程,核心线程一般是不会回收的。如果设置allowCoreThreadTimeOut(true),则核心线程在闲置keepAlive时间后也会被回收。

任务队列是一个阻塞队列,线程执行完任务后会去队列取任务来执行,如果队列为空,线程就会阻塞,直到取到任务。

主要方法

线程池状态

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));    //COUNT_BITS计算后等于29,活动线程数占用的位数    private static final int COUNT_BITS = Integer.SIZE - 3;    //活动线程最大数量    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;    //线程池5种运行状态,保存在ctl高3位    //11111111 11111111 11111111 11111111左移29位后只保留高位3个1即:    //11100000 00000000 00000000 00000000    private static final int RUNNING    = -1 << COUNT_BITS;    //0左移29位后    //00000000 00000000 00000000 00000000    private static final int SHUTDOWN   =  0 << COUNT_BITS;    //1左移29位后    //00100000 00000000 00000000 00000000    private static final int STOP       =  1 << COUNT_BITS;    //2左移29位后    //01000000 00000000 00000000 00000000    private static final int TIDYING    =  2 << COUNT_BITS;    //3左移29位后    //01100000 00000000 00000000 00000000    private static final int TERMINATED =  3 << COUNT_BITS;

线程池维护了一个int原子变量ctl,表示线程池当前状态。通过这一个字段表示线程池当前活动线程数和线程池的运行状态。其中低29位用来表示活动线程数,高3位用来表示线程池的运行状态。

线程池的状态

RUNNING:该状态下的线程池可以接受新任务,并且可以处理等待队列中的任务。

  SHUTDOWN:该状态下的线程池不再接受新任务,但是可以处理等待队列中的任务。
  STOP:该状态下的线程池不再接受新任务,不再处理等待队列中的任务,会中断正在执行的任务。
  TIDYING:所有的任务都已经中止,活动线程数为0,此状态下的线程池即将转移到TERMINATED状态。
  TERMINATED:terminated()执行完后到达此状态。

线程池的状态转移包括如下几个:

RUNNING -> SHUTDOWN,在执行shutdown()方法时,线程池经历了这种状态转移过程。

  RUNNING -> STOP或者SHUTDOWN -> STOP,在执行shutdownNow()方法时,线程池经历了这种状态转移过程。
  SHUTDOWN -> TIDYING,当等待队列和池中的任务都为空时,经历了这种状态转移过程。
  STOP -> TIDYING,池中任务为空时,经历这种状态转移过程。
  TIDYING -> TERMINATED,执行terminated()方法时经历这个状态转移过程。

Executor

public void execute(Runnable command) {    if (command == null)      throw new NullPointerException();    //取线程池当前状态    int c = ctl.get();    //线程数小于核心线程数,创建一个核心线程,并将任务作为该线程第一个任务    //如果创建线程失败,返回false    if (workerCountOf(c) < corePoolSize) {      if (addWorker(command, true))        return;      //重新获取状态      c = ctl.get();    }    //尝试将任务添加到等待队列    if (isRunning(c) && workQueue.offer(command)) {      int recheck = ctl.get();      //重新判断线程池是否处于RUNNING状态,若不处于RUNNING状态,删除等待队列中该任务并拒绝任务      if (! isRunning(recheck) && remove(command))        reject(command);      //如果没有线程则创建一个非核心线程      else if (workerCountOf(recheck) == 0)        addWorker(null, false);    }    //任务添加到等待队列失败,尝试创建一个非核心线程执行该任务,创建失败则拒绝执行任务    else if (!addWorker(command, false))      reject(command);}//当前活动线程数量private static int workerCountOf(int c)  {     //c & 00011111 11111111 11111111 11111111    //"与"运算取低29位的值    return c & CAPACITY; }

execute的执行逻辑其实前面已经提到了,这里根据代码再分析下:

1 对于空的任务,线程池会抛出NPE异常

  2 通过workerCountOf方法获取线程池的线程数,若线程数小于核心线程数,创建一个核心线程并将任务作为该核心线程的第一个任务。若创建线程失败,重新获取线程池状态
  3 尝试将任务添加到等待队列,需要注意的是,任务添加到等待队列成功后,需要进一步检查线程池状态,因为这个过程线程池的状态可能已经改变。
  4 尝试将任务添加到等待队列,添加失败拒绝执行任务。

使用prestartAllCoreThread方法可以提前创建好所有的核心线程。

PS : workCountOf方法很简单,通过”与”运算取ctl的低29位的值。

private static int workerCountOf(int c)  { return c & CAPACITY; }

addWoker

//firstTask:当池中线程数小于corePoolSize或者等待队列已满,创建的工作者线程执行的第一个任务//core:是否作为核心线程private boolean addWorker(Runnable firstTask, boolean core) {    retry:    for (;;) {      int c = ctl.get();      //线程池当前状态      int rs = runStateOf(c);      //这个条件看起来有点晕,不着急,我们仔细分析下      //原判断条件为:      //rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())      //如果线程池状态为RUNNING,或者      //线程池状态是SHUTDOWN,并且firstTask为空,并且等待队列不为空,可以接受任务。      //其他情况下,addWorker直接返回false。      //通俗点讲,就是线程池处于SHUTDOWN状态时,还可以处理等待队列中的任务,但是不可以接受新任务了。      //RUNNING状态下的线程池当然可以接受新的任务了      if (rs >= SHUTDOWN &&          ! (rs == SHUTDOWN &&             firstTask == null &&             ! workQueue.isEmpty()))        return false;      for (;;) {        int wc = workerCountOf(c);        //线程池中线程数量是否达到上限        //核心线程数的上限是coolPoolSize,非核心线程数的上限是maximumPoolSize        if (wc >= CAPACITY ||            wc >= (core ? corePoolSize : maximumPoolSize))          return false;        //增加线程数成功,结束retry对应的for循环        if (compareAndIncrementWorkerCount(c))          break retry;        //重新读取状态值        c = ctl.get();        //状态改变了,到retry处重新开始for循环        if (runStateOf(c) != rs)          continue retry;      }    }    //到这里说明CAS增加线程数成功了    boolean workerStarted = false;    boolean workerAdded = false;    //Worker是线程池实现的内部类,实现了AQS和Runnable,包装了需要执行的任务和执行的线程    //Worker就是线程池的工作线程,是干活的工人    Worker w = null;    try {      final ReentrantLock mainLock = this.mainLock;      //创建一个工作者线程      w = new Worker(firstTask);      final Thread t = w.thread;      if (t != null) {        mainLock.lock();        try {          //获取锁之后重新获取状态          int c = ctl.get();          int rs = runStateOf(c);          if (rs < SHUTDOWN ||              (rs == SHUTDOWN && firstTask == null)) {            //如果该线程已经启动了,抛出异常,因为我们稍后才会启动该线程            if (t.isAlive())              throw new IllegalThreadStateException();            //workers是线程池的私有属性,存储了Worker            workers.add(w);            int s = workers.size();            //更新线程池的最大数量            if (s > largestPoolSize)              largestPoolSize = s;            //添加成功了            workerAdded = true;          }        } finally {          mainLock.unlock();        }        if (workerAdded) {          //如果添加成功了,启动线程          t.start();          //启动成功了          workerStarted = true;        }      }    } finally {      //处理启动失败的情况,回滚,从workers中移除该worker,将wc减1      if (! workerStarted)        addWorkerFailed(w);    }    //返回添加的线程是否启动成功    return workerStarted;}
View Code

挺长的,需要的自己打开

步骤一:检查。只有线程池处理RUNNING状态,或者处于SHUTDOWN状态并且任务队列不为空的情况下才能继续添加线程。

  步骤二:检查。检查线程池数量是否超过上限,如果添加的是核心线程,上限即corePoolSize,如果不是核心线程,上限即为maximumPoolSize。
  步骤三:将工作线程数量原子加1,如果加成功,则继续步骤四添加工作线程。如果不成功,说明ctl字段已经被其他线程修改过了,所以要回到步聚一重新检查。
  步骤四:创建一个Worker对象(Worker是一个内部类,可以认为就是一个线程)
  步骤五:加锁 mainLock.lock()
  步骤六:重新检查线程池状态
  步骤七:把创建好的Worker对象加入到workers中,并且更新数量

这里再说下该方法的思路:

1 首先试图原子地增加线程数,这个过程需要检查ctl的状态,如果检查发现不能创建新worker,返回false。否则自旋CAS增加线程数,直到设置成功。

  2 线程数增加成功后,真正创建worker并添加到workers工作集合中。创建worker成功后,启动该工作者线程,返回是否启动成功。如果启动worker失败,需要做回滚操作,从workers中移除该worker,并将wc减1。

工作线程-worker

//Worker实现了AQS,提供了锁操作private final class Worker        extends AbstractQueuedSynchronizer        implements Runnable    {        private static final long serialVersionUID = 6138294804551838833L;        //运行任务的线程        final Thread thread;        //执行的第一个任务,第一个任务可能为空        Runnable firstTask;        //该工作者已经执行完成的任务        volatile long completedTasks;        Worker(Runnable firstTask) {            //设置锁的状态为-1            setState(-1);            this.firstTask = firstTask;            //通过线程工厂新建一个线程,要执行的任务就是本Worker            //前面讲到addWorker方法,线程创建成功后会启动线程,线程执行的的任务正是本Worker,也就是            //执行run方法            this.thread = getThreadFactory().newThread(this);        }        public void run() {            //addWorker方法启动的线程最终会执行runWorker方法,该方法线程会从队列中取出任务执行            //若队列中没有任务可以执行,线程会阻塞            runWorker(this);        }        //下面几个方法都是锁操作,这里不再介绍        protected boolean isHeldExclusively() {            return getState() != 0;        }        //……}
View Code

前面讲到的addWorker方法,该方法创建了Worker实例并将firstTask作为Worker构造函数的参数。firstTask作为Worker第一个运行的任务。Worker构造函数创建线程的时候将firstTask作为该线程的Runnable参数。启动该线程的时候执行本Worker的run方法。run方法会调用runWorker,当线程执行完它的firstTask后会从等待队列取任务来执行,若等待队列为空,该线程就会阻塞等待,直到等待队列不空。

  Worker是线程池的内部类,Worker它封装了Thread和Runnable,同时实现了AQS锁,这个锁主要是在中断线程时使用。

  Worker的主要作用是让线程不断循环,从任务队列中取任务执行

PS:

在FutureTask中,run()方法会判断有没有callable类型,有就执行call()。所以这里无论是runnable 或者 callable都可以变成worker,没影响。

runWorker 

  • 从队列中取任务
  • 检查线程池状态,检查线程是否被中断
  • 执行任务

方法太多了,先这样吧。

转载于:https://www.cnblogs.com/RobertLionLin/p/11426439.html

你可能感兴趣的文章
css scroll bug
查看>>
[编织消息框架][JAVA核心技术]动态代理应用8-IRpcReceive实现
查看>>
由一个经典布局问题引发的思考
查看>>
vue 字符串长度控制显示的字数超出显示省略号
查看>>
vim常用命令
查看>>
欧几里德算法(模板)
查看>>
oracle 11g 压缩数据文件
查看>>
opencv2411配置
查看>>
【洛谷P1061 Jam的计数法】搜索
查看>>
Android studio 安装apk时报错:INSTALL_FAILED_NO_MATCHING_ABIS: Failed to extract native libraries...
查看>>
20. 多态
查看>>
pip国内源
查看>>
docker 从本地拷贝文件
查看>>
VS 2013使用ReportViewer 提示An error occurred during local report processing异常处理
查看>>
protobuf 协议 windows 下 java 环境搭建
查看>>
hacker入门篇——相关书籍
查看>>
10大逆向思维的测试方法
查看>>
(转)VS2010中出现无法嵌入互操作类
查看>>
HTTP状态码
查看>>
转载一个关于javascript弹出窗口的详解
查看>>