java并发编程:线程执行器

2020-05-07 0 By admin

前面介绍了创建 Java 线程和操作线程的基本方法。程序员在编写并发程序时,除了要创建程序外,还需要对线程的运行进行控制。为了减轻负担,Java 提供了线程执行器用于对线程的运行进行管理。本章介绍了几种常用的线程执行器。

一、线程池

前面介绍的内容,都是通过创建线程的方法使程序并行运行的。采用的方法一般是:创建一个 Runnable 对象,然后封装为 Thread 对象,通过 start() 方法启动线程,并在线程运行过程中通过 sleep()、interrupt() 等方法来控制线程的运行。
1、之前的程序中,线程的创建、运行、休眠和终止都是要手动完成的,如果需要创建的线程过多的话,这样就很麻烦了。
2、创建的线程数量过多或者太少,都不能很好的提高执行效率。
3、创建一个新的线程需要的代价很高,尤其是创建多个生命周期较短的线程时,程序执行的时间有可能浪费在线程的创建和注销上。

线程池说明

线程池包括了若干个准备运行的空闲线程。线程在程序运行的开始创建,可以把创建的 Runnable 对象交给线程池中的线程运行;运行完成后,如果没有其他任务,线程转入休眠状态,等有任务的时候再唤醒,直到所有任务都执行结束再关闭线程池,从而减少了重复创建和销毁线程的开销。
线程池机制分离了任务的创建和执行。使用线程池执行器,仅需要实现 Runnable 对象,并将该对象交给执行器。执行器会使用线程池中的线程执行,避免额外创建线程的开销。线程池执行器起到了维护和管理线程的作用,从而将程序员从繁琐的线程管理任务中解放出来。

从 JDK1.5 开始,Java 并发库中引入了 Executor 框架。该框架包括接口 Executor 及其子接口 ExecutorService,以及实现了上面两个接口的类 ThreadPoolExecutor。

1.1、Executor 接口

Executor 接口的对象可以接受提交到线程池的 Runnable 任务。该接口实现了任务的提交和任务的执行分离。
该接口定义了一个 execute() 方法,形式如下:
void execute(Runnable command)
该方法用于异步地(在未来某个时间)执行给定的 Runnable 对象。
其使用方法如下:

Executor executor = ...;
executor.execute(new RunnableTask1());

可以看出,使用 executor 不用显式地创建线程,也不用显式地调用线程的 start() 方法。

1.2、ExecutorService 接口

ExecutorService 接口从父接口 Executor 继承,定义的形式如下:
public interface ExecutorService extends Executor
ExecutorService 接口提供了关闭线程池的 shutdown() 方法,关闭后的线程池将不再接受新的任务。

1.3、ThreadPoolExecutor 类

ThreadPoolExecutor 类可以用来创建一个线程池,定义的形式如下:
public class ThreadPoolExecutor extends AbstractExecutorService
它有以下 4 个构造方法:

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) {}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {}
  1. corePoolSize: 线程池核心线程数
  2. maximumPoolSize:线程池最大数
  3. keepAliveTime: 空闲线程存活时间
  4. unit: 时间单位
  5. workQueue: 线程池所使用的缓冲队列
  6. threadFactory:线程池创建线程使用的工厂
  7. handler: 线程池对拒绝任务的处理策略

ThreadPoolExecutor 类的实例可以通过它的构造方法来创建,也可以通过工厂类 Executor 的相关方法来创建。
当创建了 ThreadPoolExecutor 对象后,就可以将 Runnable 和 Callable 对象交给该线程池运行。每一个 ThreadPoolExecutor 都维护了一些基本的统计信息,例如已完成的线程数。
ThreadPoolExecutor 类的常用方法:

  1. protected void afterExecute(Runnable r,Throwable t) : 在执行给定的 Runnable 对象 r 之后调用的方法。
  2. protected void beforeExecute(Thread t,Runnable r) : 在执行给定的 Runnable 对象 r 之前调用的方法。
  3. void execute(Runnable command) : 执行给定的 Runnable 对象。
  4. int getActiveCount() : 获的处于活动状态的线程数。
  5. int getMaximumPoolSize() : 获得线程池的最大容量。
  6. BlockingQueue<Runnable> getQueue() : 获得阻塞队列。
  7. boolean isShutdown() : 判断线程池是否关闭。
  8. void shutdown() : 关闭线程池。
  9. List<Runnable> shutdownNow() : 尝试停止所有处于活动状态的线程,返回等待执行的任务列表。

1.4、工厂类 Executors

Executors 类提供了线程池创建的工厂方法:

  1. newFixedThreadPool() : 创建一个固定大小的线程池,空闲线程会一直保留。
  2. newSingleThreadExecutor() : 创建只有一个线程的线程池;显然,该线程池将顺序执行每个提交的任务。
  3. newCachedThreadPool() : 创建一个线程池,该线程池在需要时创建新的线程,而且会复用线程;适合执行生命周期短的异步任务。
  4. newSingleThreadScheduledExecutor() : 创建只有一个线程的线程池,可以在经过指定的时间间隔或周期性地执行。
  5. newScheduledThreadPool() : 创建一个线程池,可以在经过指定时间的时间间隔或周期性执行。

上面列出的方法均为静态方法,可以直接调用。

二、固定数目的线程执行器

使用 Executors 类的 newFixedThreadPool() 方法可以创建固定数目的线程执行器,线程的数目可以由用户指定;也可以不指定,默认为当前处理器可以处理的最大线程数。
ExecutorService executor = Executors.newFixedThreadPool(4);
当提交到线程池的 Runnable 对象数大于执行器可以处理的线程数时,将有部分 Runnable 对象等待。

三、使用线程执行器处理有返回值的线程

有返回值的线程定义需要继承 Callable 接口,然后将线程交给执行器执行,提交线程需要使用 submit() 方法。

四、延迟执行、周期性执行的执行器

如果想在某一段时间之后执行线程操作,或者周期性地重复执行线程操作,则可以使用工厂类 Executors 的 newScheduledThreadPool() 方法或 newSingleThreadScheduledExecutor() 方法。
newScheduledThreadPool() 方法使用给定数目的线程来调度执行任务;而 newSingleThreadExecutor() 方法在一个单独的线程中调度执行任务。这两个方法都将返回一个 ScheduledExecutorService 线程池对象。

五、取消任务的执行

线程在执行的过程中,也可能因为各种各样的原因被取消执行,JDK 考虑到这种情况,提供了 Future 接口的 cancel() 方法,该方法可以取消线程的执行。

六、任务装载和结果处理的分离

当使用 Executor 处理并发任务时,通常会把 Callable 对象提交给 Executor 处理,在通过类 FutureTask 的 get() 方法获取结果。如果当前任务的结果还没有计算出来,线程将会阻塞在此处等待,即使后面的结果计算出来了,也不能处理。很显然,这会导致性能的下降。
CompletionService 接口整合了 Executor 和阻塞队列的功能,它的定义形式如下:

public interface CompletionService<V>

该接口提供了一种服务,该服务可以分离异步任务的产生和已完成任务的结果;可以通过 submit 提交任务,通过 task() 方法取得已完成的任务的结果,并且按完成的先后顺序处理它们的结果。

七、管理被拒绝的任务

可以通过方法 shutdown() 关闭执行器,在调用该方法后,执行器将等待那些正在运行的或等待的任务的完成,然后才能真正地关闭执行器。关闭后的执行器将不再接收新的任务。
由于关闭执行器有一定的时间延迟,很可能在提交了关闭申请但还没有真正关闭执行器之前,新的任务被提交到该执行器,对于这样的任务,可以通过接口 RejectedExecutionHandler 来处理。