线程间通信介绍和常用通信方法说明
线程作为程序内部的多个执行流,相互之间是可以进行通信的。线程间的通信可以从两方面理解:
- 线程间数据交换(通过共享数据的方式实现,简单并高效);
- 线程间状态的改变(让自己线程阻塞,唤醒其他线程;实现协调工作)。
注意点:【线程间通过共享数据通信时,一般需要引入线程同步控制】。
线程间也可以通过 wait()、notify()和 notifyAll() 等方法进行通信进而改变线程状态。
一、等待集合的介绍
等待集合(Wait-sets)就像候车室一样,当线程调用 wait() 方法发生等待后,都要进入到该等待集合中,除非发生以下情况,否则线程会一直在该等待集合中。
- 其他线程调用了方法 notify() 或 notifyAll();
- 其他线程调用了方法 interrupt() 中断了该线程;
- 方法 wait() 的等待时间结束。
二、线程状态改变的方法(阻塞|非阻塞)
wait()、notify()或 notifyAll() 方法是类 Object 中定义的方法,由于 Java 中的类都是 Object 类中的子类,因此在 Java 语言中任何类的对象都可以调用这些方法,但这些方法更多的是在多线程环境中使用。
- wait(),notify(),notifyAll()三个方法必须使用在同步代码块或同步方法中。
- 2.wait(),notify(),notifyAll()三个方法的调用者必须是同步代码块或同步方法中的同步监视器。否则,会出现IllegalMonitorStateException异常
- wait(),notify(),notifyAll()三个方法是定义在java.lang.Object类中。
2.1、wait() 方法 – 阻塞线程
wait() 方法调用的一般形式:对象名.wait()
称作线程在对象上的等待,作用是把当前线程放入对象的【等待集合】中。
- 很多情况下,都是在当前对象上调用 wait() 方法,即 this.wait() ,而 this 又可以省去,所以只有一个 wait() 来表示等待,表示在当前对象上等待。
- wait() 方法通常需要放入以
synchronized()
方法修饰的语句块或方法中,如果在 synchronized 外调用 wait() 方法,运行时刻 Java 虚拟机会抛出 IllegalMonitorException 异常。 - 当线程调用 wait() 方法后,Java 虚拟机会让【当前线程进入到休眠状态,并释放对象的同步锁的控制权】,允许其他线程执行同步代码块,要唤醒该线程,需要在一个对象上调用 notify() 或 notifyAll() 方法。
2.2、notify() 方法 – 唤醒一个
线程不能一直待在等待集合中,必须有方法对其进行唤醒 notify() 方法可以对线程进行唤醒。
当调用某个对象的 notify() 方法时,将从该对象等待集合中选择一个等待的线程唤醒,唤醒的线程将从等待集合中删除。
如果有多个线程被wait,就唤醒优先级高的那个。
2.3、notifyAll() 方法 – 唤醒所有
notifyAll() 方法会将所有在等待集合中的线程唤醒,但由于所有被唤醒的线程仍然要去争用 synchronized 锁,而 synchronized 锁具有排他性,最终只有一个线程获得该锁,进入执行状态,其他的线程仍然要继续等待。
三、sleep()和wait()的异同
相同点:一旦执行方法,都可以使得当前的线程进入阻塞状态。
3.1、不同点
- 两个方法声明的位置不同:Thread类中声明sleep() , Object类中声明wait()。
- 调用的要求不同:sleep()可以在任何需要的场景下调用。 wait()必须使用在同步代码块或同步方法中。
- 关于是否释放同步监视器:如果两个方法都使用在同步代码块或同步方法中,sleep()不会释放锁,wait()会释放锁。
四、经典例题:生产者/消费者问题
生产者(Productor)将产品交给店员(Clerk),而消费者(Customer)从店员处取走产品。
店员一次只能持有固定数量的产品(比如:20),如果生产者试图生产更多的产品,店员会叫生产者停一下,如果店中有空位放产品了再通知生产者继续生产;
如果店中没有产品了,店员会告诉消费者等一下,如果店中有产品了再通知消费者来取走产品。
4.1、分析
- 是否是多线程的问题?是,生产者的线程,消费者的线程
- 是否有共享数据的问题?是,店员、产品、产品数
- 如何解决线程的安全问题?同步机制,有三种方法
- 是否涉及线程的通信?是
class Clerk{
private int productCount = 0;
//生产产品
public synchronized void produceProduct() {
if(productCount < 20){
productCount++;
System.out.println(Thread.currentThread().getName() + ": 开始生产第" + productCount + "个产品");
notify();
}else{
//等待
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//消费产品
public synchronized void consumeProduct() {
if(productCount > 0){
System.out.println(Thread.currentThread().getName() + ":开始消费第" + productCount + "个产品");
productCount--;
notify();
}else{
//等待
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Producer extends Thread{//生产者
private Clerk clerk;
public Producer(Clerk clerk){
this.clerk = clerk;
}
@Override
public void run() {
System.out.println(getName() + ": 开始生产产品......");
while(true){
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
clerk.produceProduct();
}
}
}
class Consumer extends Thread{ //消费者
private Clerk clerk;
public Consumer(Clerk clerk){
this.clerk = clerk;
}
@Override
public void run() {
System.out.println(getName() + ": 开始消费产品......");
while(true){
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
clerk.consumeProduct();
}
}
}
public class ProductTest {
public static void main(String[] args) {
Clerk clerk = new Clerk();
Producer p1 = new Producer(clerk);
p1.setName("生产者1");
Consumer c1 = new Consumer(clerk);
c1.setName("消费者1");
Consumer c2 = new Consumer(clerk);
c2.setName("消费者2");
p1.start();
c1.start();
c2.start();
}
}