java 并发编程:线程同步障栅

2020-05-07 0 By admin

在多线程程序中,很多时候需要让多个线程相互合作完成一个任务,这要求线程间能够进行协调。
例如:任务A和B是完成一项工作的两个划分,只有A任务计算出结果后,任务B才能开始计算。我们再将任务A划分为4个子任务,交给4个线程并行执行,由于子任务有大小区分,处理小任务的线程有可能很快就执行完毕了,因此该任务需要等待其他线程执行完成后,才能继续向下执行任务B;这时候我们需要用到线程障栅。

一、障栅

CyclicBarrier 类是一个同步辅助类,实现了一个称为障栅的集合点,在不是所有线程都到达集合点前,线程之前可以相互等待。Cyclic 的含义是“循环的,周期的”,代表该障栅在所有等待的线程到达集合点并被释放后可以循环使用。
CyclicBarrier 类比较适合于线程数量固定的情况。

1.1、CyclicBarrier 类的构造方法:

1、CyclicBarrier(int parties) //创建障栅对象;parties为需要等待的线程个数。
2、CyclicBarrier(int parties,Runnable barrierAction) // barrierAction 定义最后一个进入障栅的线程要执行的动作。

1.2、CyclicBarrier 类的常用方法:

  1. int await() 在此障栅上的线程调用该方法后将等待
  2. int ​await(long timeout, TimeUnit unit)如果在指定时间内达到parties的数量则继续运行,否则抛出超时异常。
  3. int getParties() 障栅对象的parties个数。​​
  4. int getNumberWaiting() 在障栅处等待的线程个数。
  5. void reset() 重置障栅到初始状态。

在障栅的对象上可以调用 await() 方法,该方法需要放置到 try…catch…语句块中,并捕获 InterruptedException 和 BrokenBarrierException 异常。线程在完成自己的工作后调用 await() 方法等待。当最后一个线程调用 await() 方法后,将唤醒所有等待线程,并继续该障栅后的工作。

二、倒计时门闩

倒计时门闩就像一个带有计数器开关的门,只有在门前等待的线程到达一定数量时,门闩才会打开,线程才可以继续执行。
倒计时门闩由 CountDownLatch 类实现,该类从 Object 继承而来,可以通过一个给定的值进行初始化,通常在同步状态中保存的是当前的计数值,线程调用 await() 方法等待;方法 countDown() 会导致计数值递减,当计数值为零时,所有的倒计时门闩范围内的等待线程的阻塞状态将解除。

2.1、CountDownLatch 类的构造方法:

CountDownLatch(int count)
其中,count 为初始计数,必须为正数,否则将抛出 IllegalArgumentException 异常。

2.2、CountDownLatch 类常用的方法:

  1. public void await() //调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
  2. public boolean await(long timeout, TimeUnit unit) //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
  3. public void countDown() { }; //将count值减1

三、信号量

信号量机制通常用于限制对于某个资源同时访问的线程数量。
信号量机制是一种典型的同步机制,可以用于解决常用的线程同步问题。
在 Java 并发库中,类 Semaphore 可以实现信号量机制,其定义如下:
public class Semaphore extends Object implements Serializable
信号量管理了一个许可集合,可以通过方法 acquire() 获取一个许可,如果没有许可则可以等待。通过方法 release() 可以释放一个许可。

3.1、它的构造方法如下:

Semaphore(int permits) // 用给定的许可数创建一个 Semaphore 对象。
Semaphore(int permits,boolean fair) // 设置许可分配策略:公平分配/非公平分配

3.2、Semaphore 类常用的方法

  1. acquire() 获取一个许可,如果没有许可可以获取,则阻塞。
  2. void acquire(int permits) 获取permits个许可,如果没有许可可以获取,则阻塞。
  3. void acquireUninterruptibly() 从此信号量中获取许可,在有可用的许可前将其阻塞。
  4. int availablePermits() 获得可用的许可数
  5. int drainPermits() 获取并返回立即可用的许可数
  6. int getQueueLength() 返回等待获取许可的线程队列长度
  7. boolean hasQueuedThreads() 线程等待队列中是否有线程等待获取许可
  8. void release() 释放一个许可
  9. void release(int permits) 释放指定数量的许可

使用场景:
银行设置了4个窗口可以同时办理业务,现在有20个顾客需要办理业务,使用信号量机制可以实现这种场景。

四、同步队列

同步队列是一个没有数据缓冲的阻塞队列,在同步队列上插入操作必须等待相应的删除执行完成后才能执行,反之亦然。
不能调用peek() 方法查看队列中是否有数据,也不允许对整个队列遍历。
类 SynchronousQueue 是 Java 集合框架中的一员。该类的定义形式如下:
public class SynchronousQueue
extends AbstraceQueue
implements BlockingQueue,Serializable

其中 E 指明同步队列中元素的类型。

4.1、该类定义了两个构造方法:

public SynchronousQueue()
public SynchronousQueue(boolean fair) //使用公平性策略创建同步队列

4.2、SynchronousQueue 类常用的方法:

  1. drainTo() 一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数), 通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
  2. offer(E e) 如果另一个线程正在等待以便接收指定元素,则将指定元素插入到此队列。
  3. offer(E o, long timeout, TimeUnit unit) 将指定元素插入到此队列,如有必要则等待指定时间,以便另一个线程接受它。
  4. poll() 如果另一个线程当前正要使用某个元素,则获取并移除此队列的头
  5. poll(long timeout, TimeUnit unit) 获取并移除此队列的头,如有必要则等待指定的时间,以便另一个线程插入它。
  6. put(E o) 将指定元素 o 添加到此队列,如有必要则等待另一个线程接收它。
  7. take() 获取并移除此队列的头,如有必要则等待另一个线程插入它。

使用场景:使用同步队列实现生产者和消费者的问题。
由于使用同步队列作为公共缓冲区,故生产者和消费者对公共缓冲区的操作不再需要添加额外的同步控制操作。
生产者对缓冲区进行插入操作,消费者对缓冲区进行删除操作。两者协同操作,不允许一个人连续执行两次。

五、交换器

从 JDK1.5 版本开始提供了线程间数据交换的功能,该功能可以通过 Exchanger 类完成。

5.1、Exchanger 类的定义形式如下:

public class Exchanger(V) extends Object
参数 V 表示要交换数据的类型。

该类从 Object 类继承,它提供了一个同步点,在该同步点上线程间可以交换数据。一个线程通过 exchange() 方法将其数据提供给另一个线程,并接受另一个线程的数据。

5.2、方法 exchange() 定义如下:

V exchange( V x)
该方法将等待另一个线程到达交换点,然后交换指定的数据 x,该方法将返回的交换后的数据。

六、阶段化处理

在日常生活中,我们在做一件事情的时候,习惯把一件事情划分为若干个阶段,然后规定每个阶段的任务和完成的时间,从而实现阶段化的控制和管理,阶段化处理往往在完成某一项工作时很高效。
从 JDK1.7 版本开始引入类 Phaser,它是 Java 并发库中功能强大并且较复杂的一个功能,可以用来完成阶段式的并发执行任务的功能。
Phaser 类直接从 Object 类继承,它是一个可复用的同步障栅,与 CyclicBarrier 和 CountDownLatch 功能类似,但使用上更加灵活。