ThreadPoolExecutor 创建线程池执行逻辑

2021-11-17 0 By admin

线程池是一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。

类关系图
类关系图

一、ThreadPoolExecutor线程池核心参数

真正创建的线程池是ThreadPoolExecutor类,ThreadPoolExecutor的构造方法:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) { 
  if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

他的核心参数为;

  1. corePoolSize:核心线程数,线程池的基本大小,任务队列满之前的最多的线程数量。
  2. maximumPoolSize:最大线程数,任务队列已满时,并且核心线程都被占用,线程池会创建新线程来处理任务,最多不超过maximumPoolSize。
  3. keepAliveTime:除核心线程外的线程存活时间,线程如果处于空闲状态,并且当前的线程数量大于corePoolSize,空闲线程会退出,直到线程数量=corePoolSiz。
  4. unit:空闲存活时间单位,如TimeUnit.SECONDS(秒)。
  5. workQueue:任务队列,任务调度时从队列中取出任务。例如ArrayBlockingQueue(有界队列),LinkedBlockingQuene(Integer.MAX_VALUE长度),SynchronousQueue(转手队列,容量为0),DelayedWorkQueue(优先级队列)。
  6. threadFactory:线程工厂,创建线程时使用的工厂,可以用来设定线程名、是否为守护线程等。
  7. handler:当核心线程正在占用,最大线程数也是都被占用,任务队列并且满了,对新来任务的处理方式。

二、threadFactory:线程工厂介绍

Executors提供的DefaultThreadFactory:

static class DefaultThreadFactory implements ThreadFactory {
 private static final AtomicInteger poolNumber = new AtomicInteger(1);
 private final ThreadGroup group;
 private final AtomicInteger threadNumber = new AtomicInteger(1);
 private final String namePrefix;

 DefaultThreadFactory() {
  SecurityManager s = System.getSecurityManager();
  group = (s != null) ? s.getThreadGroup() :
         Thread.currentThread().getThreadGroup();
  namePrefix = "pool-" +
       poolNumber.getAndIncrement() +
      "-thread-";
 }

 public Thread newThread(Runnable r) {
  Thread t = new Thread(group, r,
         namePrefix + threadNumber.getAndIncrement(),
         0);
  if (t.isDaemon())
   t.setDaemon(false);
  if (t.getPriority() != Thread.NORM_PRIORITY)
   t.setPriority(Thread.NORM_PRIORITY);
  return t;
 }
}

三、handler 新任务处理方法

当核心线程正在占用,最大线程数也是都被占用,任务队列并且满了,对新来任务的处理方式。
JDK中提供了4中拒绝策略,ThreadPoolExecutor类中几个内部实现类来处理这类情况。

3.1、AbortPolicy 丢弃任务

丢弃任务,抛运行时异常RejectedExecutionException。

/**
 * A handler for rejected tasks that throws a
 * {@code RejectedExecutionException}.
 */
public static class AbortPolicy implements RejectedExecutionHandler {
 /**
  * Creates an {@code AbortPolicy}.
  */
 public AbortPolicy() { }
 /**
  * Always throws RejectedExecutionException.
  *
  * @param r the runnable task requested to be executed
  * @param e the executor attempting to execute this task
  * @throws RejectedExecutionException always
  */
 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  throw new RejectedExecutionException("Task " + r.toString() +
            " rejected from " +
            e.toString());
 }
}

3.2、DiscardPolicy 直接放弃

直接放弃,什么都不做。

 /**
 * A handler for rejected tasks that silently discards the
 * rejected task.
 */
public static class DiscardPolicy implements RejectedExecutionHandler {
 /**
  * Creates a {@code DiscardPolicy}.
  */
 public DiscardPolicy() { }

 /**
  * Does nothing, which has the effect of discarding task r.
  *
  * @param r the runnable task requested to be executed
  * @param e the executor attempting to execute this task
  */
 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
 }
}

3.3、DiscardOldestPolicy 腾出空间

丢弃最旧的未处理请求,将本次请求入队。

 /**
 * A handler for rejected tasks that discards the oldest unhandled
 * request and then retries {@code execute}, unless the executor
 * is shut down, in which case the task is discarded.
 */
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
 /**
  * Creates a {@code DiscardOldestPolicy} for the given executor.
  */
 public DiscardOldestPolicy() { }

 /**
  * Obtains and ignores the next task that the executor
  * would otherwise execute, if one is immediately available,
  * and then retries execution of task r, unless the executor
  * is shut down, in which case task r is instead discarded.
  *
  * @param r the runnable task requested to be executed
  * @param e the executor attempting to execute this task
  */
 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  if (!e.isShutdown()) {
   e.getQueue().poll();
   e.execute(r);
  }
 }
}

3.4、CallerRunsPolicy 谁调用谁负责

谁调用线程池执行方法,谁去执行本次任务。

 /**
 * A handler for rejected tasks that runs the rejected task
 * directly in the calling thread of the {@code execute} method,
 * unless the executor has been shut down, in which case the task
 * is discarded.
 */
public static class CallerRunsPolicy implements RejectedExecutionHandler {
 /**
  * Creates a {@code CallerRunsPolicy}.
  */
 public CallerRunsPolicy() { }

 /**
  * Executes task r in the caller's thread, unless the executor
  * has been shut down, in which case the task is discarded.
  *
  * @param r the runnable task requested to be executed
  * @param e the executor attempting to execute this task
  */
 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  if (!e.isShutdown()) {
   r.run();
  }
 }
}

3.5、自定义处理器和默认处理器

实现RejectedExecutionHandler接口,可自定义处理器。
如果没有设置默认是AbortPolicy。

/**
* The default rejected execution handler
*/
private static final RejectedExecutionHandler defaultHandler =
 new AbortPolicy();

四、线程池接受新任务的执行逻辑

线程池接受新任务的执行逻辑
线程池接受新任务的执行逻辑