java并发编程:Fork/Join 框架

2020-05-08 0 By admin

Java 从问世之初就在语言层次支持线程的概念,从 JDK 1.5 版本开始开始提供并发编程类库;从 JDK 1.7 版本开始引入了 Fork/Join 框架编程模式,该框架可以适应多核时代并发/并行编程的要求。

一、概述

Fork/Join 框架是适用于多核处理器上并行编程的轻量级并行框架。
该框架主要由 Fork 和 Join 两个操作构成:

  1. Fork 操作主要用于对任务和数据进行划分,一般是将一个大问题划分为若干个小问题。
  2. Join 操作主要用于对各个部分的运行结果进行合并,相当于程序中的一个障栅。

两个操作与MapReduce 中的Map/Reduce 操作类似。

Fork/Join 框架实现了任务定义和任务处理功能的分离,使程序员在实现并行编程的同时,将更多的精力实现业务逻辑中。

在 Java 语言中,Fork/Join 框架实现了 ExecutorService 接口,与任何一个 ExecutorService 接口功能相同的是:Fork/Join 会自动地将任务分配给线程池中的线程,并负责线程管理等相关工作。与 ExecutorService 不同的是:Fork/Join 使用了工作窃取算法,从而使线程执行过程中的负载均衡。

二、相关知识

Fork/Join 框架通常被用于解决那些可以递归的分解为更小的任务的问题。该框架与负载均衡、分治方法和工作窃取算法有紧密联系。

2.1、负载均衡

负载均衡有利于加快程序的执行,减少资源的浪费。

2.2、分治方法

分支方法是简化问题的一种处理方法。该方法的基本思想是将一个复杂的任务分解为若干个小任务,然后分别解决这些小任务。
使用分治方法的步骤:分解问题、解决小问题和合并结果。
分治法可以用来解决合并排序、二分搜索和矩阵相乘等问题。
在使用 Fork/Join 框架的过程中,首先将对任务 ForkJoinTask 进行分解,任务分解的数目可以根据问题的特征以及CPU的核数进行设定,然后分解的任务交给 ForkJoinPool 处理,最后对处理结果进行收集。

2.3、工作窃取算法

工作窃取算法是提高程序性能、保证负载均衡的一种方法。该方法的基本思想是:当程序某些线程工作完成后,去查看其他线程是否有未处理完的工作;如果有,则窃取一部分工作来执行。

Fork/Join 框架采用双端队列作为任务的存储结构,该队列支持【后进先出】的数据 pop 和 push 操作,也支持【先进先出】的task 操作。
由某一工作线程创建的子任务仍然会被加入到线程的队列中,一般采用 push 操作;工作线程从自己的双端队列中取出任务执行,一般采用 pop 操作;当工作线程需要从其他线程的工作队列中窃取任务执行时,一般采用 take 操作。

三、Fork/Join 框架的编程模型

Java 语言中的 Fork/Join 框架比较适合解决那些具有递归性质、可以进行任务分解的程序。Fork/Join 框架需要对任务进行分解和合并的操作。

在分解前,首先查看问题的规模是否超过了预设的门槛值;在任务规模不大的情况下,采用串行的解决方法,由于该方法省去了分解和合并的操作,有时效果会更好;当问题的规模大于门槛值时,采用 Fork/Join 框架求解。
使用 Fork/Join 框架的编程模型:

if(问题规范 < 门槛值) {
// 使用串行模式解决或选择其他算法解决
}else {
//将任务 Task 进行分解,分解为若干个小任务
//将小任务提交给线程处理
//如果任务有返回结果,则收集结果
}

在编写使用 Fork/Join 框架的程序时,程序员不需要考虑线程操作的一些问题。如线程同步和通信等。
Fork/Join 框架中的 ForkJoinPool 类会自动地完成线程池的管理,程序员仅关注任务的划分和执行结果的收集。

四、ForkJoinPool 类

ForkJoinPool 类是 Fork/Join 框架的核心,也是 Fork/Join 框架执行的入口点,它实现了接口 ExecutorService。ForkJoinPool 类的任务是负责管理线程,并提供线程执行状态和任务处理的相关信息。

4.1、ForkJoinPool 的创建

ForkJoinPool 类从 AbstractExecutorService 类继承,主要用于处理 ForkJoinTask 中的任务。
ForkJoinPool 类构造方法有三种形式:
1、ForkJoinPool() : 创建一个线程池,线程池中的线程数目根据CPU的核数设定。
2、ForkJoinPool(int parallelism) :用户指定线程池中线程的数目。
3、public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler, boolean asyncMode) :指明线程工厂、异常处理的handler 和 工作模式。

4.2、ForkJoinPool 的使用

ForkJoinPool 的使用大致可以分为两种:

  1. 通过invoke、execute 和 submit 执行任务。
  2. 在程序执行过程中通过 fork 操作执行的任务。

ForkJoinPool 常用的方法

  1. invoke(ForkJoinTask<T> task) :是一个同步调用方法;处理给定任务并返回结果,返回结果的类型由 T 指定。
  2. invokeAll(Collection<? extendsCallable<T>> tasks) : 是一个同步调用方法;执行给定的任务列表,返回持有任务状态和任务完成时的结果的Future列表。
  3. execute(ForkJoinTask<?> task) : 安排(异步)执行给定的任务,一般没有返回结果。
  4. execute(Runnable task) : 把一个 Runnable 线程所代表的任务送到 ForkJoinPool 中;ForkJoinPool 不会对 Runnable(Callable) 对象使用工作窃取算法,该算法只会被应用到 ForkJoinTask 对象中。
  5. submit(Callable<T> task) :提交一个有返回值的任务用于执行,并返回一个可在未来获取任务结果的 Future。
  6. submit(ForkJoinTask<T> task) :提交一个ForkJoinTask 用于执行。
  7. submit(Runnable task) :提交一个Runnable 任务用于执行,并返回一个该任务的Future表示。
  8. submit(Runnable task, T result) :提交一个 Runnable 任务用于执行,并返回一个该任务的 Future表示。

以上是三类执行任务的方法,对于三种方法的具体使用

  在外部对 Fork/Join 操作调用 在 Fork/Join 框架范围内调用
异步执行 execute(ForkJoinTask) ForkJoinTask.fork()
同步执行(等待子任务完成) invoke(ForkJoinTask) ForkJoinTask.invoke()
执行并获得结果 submit(ForkJoinTask) ForkJoinTask.fork()

以下方法可以用于监视 Fork/Join 框架的操作情况:

  1. awaitQuiescence(long timeout,TimeUnit unit): 如果被此池中的ForkJoinTask调用,等价于此方法:
  2. awaitTermination(long timeout,TimeUnit unit) :阻塞直到所有任务执行完毕。当任意以下情况出现则停止阻塞,以先发生为准:收到关闭请求、超时、线程中断。
  3. commonPool()返回common pool 实例。
  4. drainTasksTo(Collection<? superForkJoinTask<?>> c): 移除任务调度队列中所有未有效提交和已分解的任务,并将它们添加到给定的集合,但不改变其执行状态。
  5. getActiveThreadCount(): 返回当前正在窃取或执行任务的线程数的估计值。
  6. getAsyncMode(): 当使用先进先出模式时返回true
  7. getCommonPoolParallelism(): 返回common pool的目标并行级别
  8. getFactory() :返回线程工厂用于构造新的线程。
  9. getParallelism() :返回这个线程池的目标并行度
  10. getPoolSize() :返回已启动但尚未终止的工作线程数。
  11. getQueuedSubmissionCount() :返回还未开始运行的任务数的估计值。
  12. getQueuedTaskCount(): 返回工作线程的任务队列中总任务数的估计值(但不包括提交到池中但未开始执行的任务)。
  13. getRunningThreadCount() :返回处于未阻塞状态等待加入任务或其它同步管理的工作线程数的估计值。
  14. getStealCount(): 返回从其它线程任务队列中窃取的任务数估计值。
  15. getUncaughtExceptionHandler(): 返回内部工作线程在执行任务时遇到不可恢复的错误时的处理程序。
  16. hasQueuedSubmissions() :当存在已提交到当前线程池但尚未执行的任务时返回true。
  17. isQuiescent():当所有工作线程处于空闲时返回true。
  18. isShutdown() :此线程池已被关闭时返回true。
  19. isTerminated() :如果此池关闭后所有任务已停止,返回true。
  20. isTerminating() :如果此线程池正在终止任务但尚未全部终止,返回true。
  21. managedBlock(ForkJoinPool.ManagedBlocker blocker) :运行给定的可能阻塞任务。
  22. newTaskFor(Callable<T> callable) :返回给定callable任务的RunnableFuture。
  23. newTaskFor(Runnable runnable, T value) :返回给定runnable任务和给定值的RunnableFuture
  24. pollSubmission() :如果存在下一个未运行的可用的已提交任务,将其返回并在任务队列中删除
  25. shutdown() :按此前提交任务的执行顺序来有序关闭任务,但不再接受新任务。
  26. shutdownNow() :尝试取消、停止所有任务,并拒绝所有随后提交的任务。
  27. toString() :返回可识别此线程池的字符串及它的状态,包括运行状态、并行级别、工作线程和任务数量。

五、Fork/Join 框架中的任务

ForkJoinTask 类是所有在 Fork/Join 框架中执行的任务的基类,它提供了一系列机制来实现 Fork 和 Join 操作。该类有两个子类,分别是 RecursiveAction 和 RecursiveTask。

  1. 从 RecursiveAction 类继承的子类方法一般没有返回值。
  2. 从 RecursiveTask 类继承的子类方法则有返回值。

5.1、任务的创建

在创建任务时,最好不要从 ForkJoinTask 类直接继承,而是从该类的子类 RecursiveAction 或 RecursiveTask 继承。
1、从 RecursiveAction 类继承创建任务
继承后的新类需要重写该类的 compute() 方法。
常用的方法:

  1. isDone() :用于判断任务是否完成。
  2. cancel(boolean manyInterruptIfRunning):用于取消一个任务的执行。

2、从 RecursiveTask 类继承创建任务
从 RecursiveTask 类继承时通常要指明一个特定的数据类型,例如:

public class ExecTask extends RecursiveTask<Integer> {}

ExecTask 类继承自 RecursiveTask ,并对整型数据进行操作。
从 RecursiveTask 类继承的子类需要重写 protected <T> compute() 方法,该方法有返回值,通过泛型 T 指明返回值的类型。
获取返回值有两种方法:

  1. join():该方法与 Thread.join() 方法不同,用于获取执行结果。
  2. get():当任务结束后返回任务的计算结果。

5.2、任务的运行方式

Fork/Join 框架提供了一种更为有效的任务管理方法,当 ForkJoinPool 执行 ForkJoinTask 任务时可以采用同步或者异步的运行方式。
当采用同步运行方式时,把任务交给 ForkJoinPool 处理后不会立即返回,而是等待任务全部结束才能够返回继续执行;采用异步的运行方式时,把任务发送到 ForkJoinPool 后会马上返回并继续执行。
采用不同的运行方式时,任务调用的方法是不同的。
前面说的 invokeAll() 方法会使任务被挂起等待,直到被提交到 ForkJoinPool 的任务处理完毕后才继续执行;可见 invokeAll() 方法使任务运行在同步方式中。
如果要运行在异步方式中,可以采用 fork() 方法,处于该方式中的 ForkJoinPool 不会使用工作窃取算法来提高程序的性能;在这种情况下,需要调用 join() 和 get() 方法来等待任务的结束,这个时候 ForkJoinPool 才会使用工作窃取算法。

5.3、任务的取消

在一个任务开始运行之前,可以取消该任务的执行。类 ForkJoinTask 提供了 cancel() 方式实现任务的取消,但该方法只能取消那些还没有开始运行的任务,ForkJoinPool 类没有提供取消那些正在运行或者正在等待运行任务的方法。

六、Fork/Join 框架的限制

Fork/Join 框架执行的任务有如下限制:

  1. 仅可以使用 fork 和 join 操作作为同步机制;如果使用其他的同步机制,当处于同步操作模式下时任务无法被执行。例如:如果想让一个任务在 Fork/Join 框架内休眠一段时间,执行这个任务的工作线程在休眠期间不会去执行其他任务。
  2. 任务不能抛出异常。
  3. 任务不能执行I/O操作。

七、几种线程机制的比较

截止到 Fork/Join 框架,已经学习了多种线程机制,如 Thread、Executors 和 Fork/Join;这几种方式的比较:

  1. Thread 类和接口从 Java 诞生之初就提供支持,它的特点是直观、易于理解和使用、使用范围广,程序员可以参与线程的详情管理过程中,如线程的创建、启动、执行、结束等操作,但这些操作也增加了程序员编写并发线程的复杂性。
  2. Executors 从 JDK 1.5 版本开始支持,它在线程类 Thread 的基础上提供了自动线程管理的功能,提供了线程取消的操作;使用 Executors,用户无须关心线程的启动、结束等一些微观操作。它的优点是适用范围广,缺点是缺乏面向多核处理器的相关优化。
  3. Fork/Join 框架从 JDK 1.7 开始提供支持,它在 Executors 的基础上提供了工作窃取算法,使得程序在多核处理器上执行时更加高效。但是在很多情况下,Fork/Join 框架的编程模型决定了它更适合于那些具有递归操作的程序。