Java多线程基础(一)---线程通信(wait、notify和notifyAll、单线程通信生产者消费者)
  TEZNKK3IfmPf 2023年11月12日 28 0

1 学习内容

  1. 同步阻塞与异步非阻塞实例分析
  2. wait和notify关键字
  3. wait和sleep区别
  4. wait和notify图解分析
  5. 总结

2 具体内容

2.1 同步阻塞与异步非阻塞
2.1.1 同步阻塞消息处理

有这样一个系统功能,客户端提交Event至服务器,服务器接收到客户请求之后创建线程处理客户请求,经过复杂的业务计算后将结果返回给客户端。
Java多线程基础(一)---线程通信(wait、notify和notifyAll、单线程通信生产者消费者)

缺陷:

  • 同步Event提交,客户端等待时间太长(提交Event时长 + 接收Event创建Thread时间 + 业务处理时间 + 返回结果时长)会陷入阻塞,导致二次提交Event耗时过长
  • 由于客户端提交的Event数量不多,导致系统同时受理业务数量有限,也就是系统整体吞吐量不高
  • 这种一个线程处理一个Event的方式,会导致频繁的创建和开启,增加系统开销
  • 在业务达到峰值的时候,大量的业务处理线程会导致频繁的CPU上下文切换,进而降低系统性能

2.1.2 异步非阻塞消息处理

  • 在分析了同步阻塞消息处理缺陷后,我们采用异步非阻塞的方式,可以提高系统的吞吐量,且业务处理线程也可以控制在一个固定的范围,以增加系统的稳定性,如图
    Java多线程基础(一)---线程通信(wait、notify和notifyAll、单线程通信生产者消费者)

客户端提交Event后会得到一个相应的工单号并且立即返回,Event则会被放置在Event队列中。服务器有若干个工作线程,不断从Event队列中获取任务并且进行异步处理,最后将处理结果保存在另外一个结果集中,如果客户端想要获得结果处理,则可凭借工单号再次获取。

  • 我们发现异步非阻塞方式优势明显:
  • 客户端不用等到结果处理结束之后才返回,从而提高了系统的吞吐量和并发量
  • 服务器端的线程数量维持在一个可控的范围之内不会因太多的线程上下文切换而导致的系统额外开销
  • 服务端线程可以重复使用,减少了线程创建带来的资源浪费

2.2 单线程间通信
2.2.1 初识wait和notify
实现一个EventQueue,该Queue有如下三种状态:

  • 队列满----最多可容纳多少个Event,好比一个系统最多同时可受理多少业务;
  • 队列空----当所有Event被处理并且没有新的Event提交的时候,此时队列为空的状态;
  • 有Event但是没有满—有新的Event提交,但是没有达到队列的上限。

示例:EventQueue

   package com.kangna.concurrent.chapter05;
    import java.util.LinkedList
    public class EventQueue {
        //事件队列的最大值
    	private final int max;  
    	static class Event {
    
    	}
    
    	private final static int DEFAULT_MAX_EVENT = 10;
    	final LinkedList<Event> eventQueue = new LinkedList<>();
    
    	public EventQueue() {
    		this(DEFAULT_MAX_EVENT);
    	}
    
    	public EventQueue(int max) {
    		this.max = max;
    	}
    
    	/**
    	 * 如果事件队列没有满则添加到队尾,否则等待
    	 * @param event
    	 */
    	public void offer(Event event) {
    		synchronized (eventQueue) {
    			if (eventQueue.size() >= max) { //事件队列 > 队列定义的最大值
    				try {
    					console("the Queue is full.");
    					eventQueue.wait();
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    			console("the new event is committed.");
    			eventQueue.addLast(event); 
    			eventQueue.notify();  //唤醒那些曾经执行monitor的wait方法而陷入阻塞的线程
    		}
    	}
    
    	/**
    	 * 从队头获取数据,如果队列中无可用的数据那么工作线程就会调用wait阻塞
    	 */
    	public Event take() {
    		synchronized (eventQueue) {
    			if (eventQueue.isEmpty()) {
    				try {
    					console("the queue is empty. 没有可以拿的要我怎么办!");
    					eventQueue.wait(); 
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    			Event event = eventQueue.removeLast();
    			this.eventQueue.notify(); 
    			console("the event" + event + "is handled.");
    			return event;
    		}
    	}
    
    	public void console(String message) {
    		System.out.printf("%s:%s\n", Thread.currentThread().getName(), message);
    	}
  }

定义测试类

public class EventClient {
	
	public static void main(String args[]){
		final EventQueue eventQueue = new EventQueue();
		new Thread(() -> {
			for( ; ; ){
				eventQueue.offer(new EventQueue.Event()); //内部类实例
			}
		}, "producer").start();
		
		new Thread(() -> {
			for( ; ; ){
				eventQueue.take();
				try {
					TimeUnit.MILLISECONDS.sleep(10);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}, "consumer").start();;
	}
}

从上述的日志可以看出,producer线程生产了10个线程,此时队列已经满了,那么它将会执行eventQueue的wait方法进入阻塞状态,consumer线程要处理数据,所以要花费10毫秒处理其中的一条数据,然后通知producer线程可以继续提交数据了,如此循环往复。

2.2.2 wait和notify方法详解

wait和notify方法是Object中的方法, 也就是说JDK中的每一个类都拥有这两个方法,先来看看wait方法:

方法 描述
public final void wait() throws InterruptedException 致当前线程等待,直到另一个线程调用该对象的notify()方法或notifyAll()方法。 换句话说,这个方法的行为就好像简单地执行呼叫wait(0) 。 当前的线程必须拥有该对象的显示器。 该线程释放此监视器(monitor)的所有权,并等待另一个线程通知等待该对象监视器的线程通过调用notify方法或notifyAll方法notifyAll 。 然后线程等待,直到它可以重新获得监视器的所有权并恢复执行。
public final void wait(long timeout)throws InterruptedException 导致当前线程等待,直到另一个线程调用此对象的notify()方法或notifyAll()方法,或指定的时间已过
public final void wait(long timeout, int nanos)throws InterruptedException 导致当前线程等待,直到另一个线程调用此对象的notify()方法或notifyAll()方法,或其他一些线程中断当前线程,或一定量的实时时间
  • wait的这三个方法都将调用wait(long timeout)这个方法,前文使用的wait方法等价于wait(0),0代表永不超时
  • Object中的wait(long timeout)方法会导致当前线程陷入阻塞,直到其他线程调用notify或者notifyAll方法才能将其唤醒,或者阻塞时间到达了timeout时间而自动唤醒
  • 当前线程执行了该对象的wait方法之后,将会放弃对该monitor的所有权并且进入与该对象关联的wait set中,也就是说一旦线程执行了某个object的wait方法之后,它就会释放对该对象monitor的所有权,其它线程也就有机会继续争夺该monitor的所有权

再来分析一下notify方法

public final native void notify();
  • 唤醒单个正在执行该对象wait方法的线程
  • 如果有某个线程由于执行该对象的wait方法而进入阻塞则会被唤醒,如果没有则会忽略
  • 被唤醒的线程需要重新获取该对象的monitor锁才能继续执行

2.2.3 关于wait和notify注意事项

  • wait方法是可中断的方法,这也意味着,当前线程一旦调用了wait方法进入阻塞状态,其它线程可以使用interrupt方法将其打断,可中断方法被打断后收到中断异常InterruptException,同时interrupt标识也会被擦除

  • 线程执行了某个对象的wait方法之后,会加入对应的wait set中,每一个线程的monitor都有一个与之有关联的wait set

  • 当线程加入wait set之后,notify可以将其唤醒,也就是从wait set中弹出,同时中断wait中的线程也会将其唤醒

  • 必须在同步方法中使用wait和notify方法,因为执行wait和notify的前提条件是必须持有同步方法的monitor的所有权

  • 同步代码的monitor必须与执行wait、notify方法的对象一致,简单地说就是用那个monitor对象进行同步,就用哪个对象进行wait和notify操作。运行下面代码中的会抛出IllegalMonitorException异常信息:

    public class Test {
     	private final Object MUTEX = new Object();
     	private synchronized void testWait(){
     		try {
     			MUTEX.wait();
     		} catch (InterruptedException e) {
     			e.printStackTrace();
     		}
     	}
     	private synchronized void testNotify(){
     			MUTEX.notify();
     	}
     	public static void main(String args[]){
     		Test test = new Test();
     		test.testWait();
     		test.testNotify();
     	}
     }
    

上述同步方法中monitor使用的是this,而wait和notify方法使用的是MUTEX的方法。虽然是在同步方法中执行和wait和notify,但是wait和notify方法的执行并未以获取MUTEX的monitor为前提。

2.3 单线程生产者与消费者(notify)

Java多线程基础(一)---线程通信(wait、notify和notifyAll、单线程通信生产者消费者)

  • 这个就类似于工厂里传送带运行东西,张三负责 将货物放到传送带 ,而李四就负责将货物取走,没有传送带上的货物没有被取走的时候,张三就不会再去放一个货物,相同的,你张三没有放货物,我李四也是取不到的。

notify方法示例(单线程生产者与消费者)

package com.thread.basicmethod.chapter05;
/******************************** 
 * @Author: kangna
 * @Date: 2019/8/23 22:39
 * @Version: 2.0
 * @Desc: 生产者 消费者
 ********************************/
public class ProduceConsumerVersion_2 {
    private int i = 0;

    final private Object LOCK = new Object();

    private volatile boolean isProduced = false;

    public void produce() {
        synchronized (LOCK) {
            //如果已经生产过,消费者还没有消费,那就等一下
            if (isProduced) {
                try {
                    LOCK.wait(); // 加入wait set中,可以被打断
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                i++;
                System.out.println("produce--->" + i);
                // 可能上一段时间没有生产出来,我现在生产出来了。通知消费者消费
                LOCK.notify(); // 唤醒消费者,快到碗里来
                isProduced = true; // 生产者已经生产
            }
        }
    }

    public void consume() {
        synchronized (LOCK) {
            if (isProduced) { // 如果生产者生产
                System.out.println("consume--->" + i);
                // 消费者消费了之后,通知生产者,在再生产
                LOCK.notify(); // 唤醒wait set 中的 生产者,赶紧给我 生产,我要
                isProduced = false; // 消费之后,将状态置为 false
            } else {
                try {
                    LOCK.wait(); // 如果没有生产
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        ProduceConsumerVersion_2 pc = new ProduceConsumerVersion_2();
        new Thread("P") {
            @Override
            public void run() {
               while (true) {
                   pc.produce();
               }
            }
        }.start();

        new Thread("S") {
            @Override
            public void run() {
               while (true) {
                   pc.consume();
               }
            }
        }.start();
    }
}
  • 目前这个程序个多线程中还存在问题,就是在多个生产者多个消费者。

2.4 wait和sleep
从上面可以看出,wait和sleep都可以使线程进入阻塞状态,但是两者有着本质的区别:

  • wait和sleep方法都可以使线程进入阻塞状态
  • wait和sleep方法均是可中断的方法,被中断后回抛出中断异常
  • wait是Object级别的方法,而sleep是Thread特有的方法
  • wait方法的执行必须在同步方法中,而sleep则不然
  • 线程在同步方法中执行sleep方法时,并不会释放monitor锁,而wait方法则会
  • sleep方法进行短暂的休眠后会自动退出阻塞,而wait方法则需要被其它线程中断后才能退出阻塞

wait 方法依靠monitor
package com.thread.basicmethod.chapter05;

import java.util.concurrent.TimeUnit;

/********************************
 * @Author: kangna
 * @Date: 2019/8/24 14:43
 * @Version:
 * @Desc: wait方法的使用需要依靠monitor
 ********************************/
public class DifferenceOfSleepAndWait {
    private static Object LOCK = new Object();

    public static void m1() {
        try {
            TimeUnit.MILLISECONDS.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public static void m2() {
//        try {
//            LOCK.wait();  同步加锁,否则抛异常 java.lang.IllegalMonitorStateException
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
        synchronized (LOCK) { // 加锁
            try {
                LOCK.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) {
        m1();
        m2();
    }
}

sleep不释放monitor,wait释放monitor代码验证

public class DifferenceOfSleepAndWait {
    private final static Object LOCK = new Object();

    public static void m1() {
        synchronized (LOCK) {
            System.out.println("The thread " + Thread.currentThread().getName() + " enter.");
            try {
                TimeUnit.MILLISECONDS.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void m2() {
        synchronized (LOCK) {
            try {
                System.out.println("The thread " + Thread.currentThread().getName() + " enter.");
                LOCK.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        Stream.of("T1", "T2").forEach(name ->
                new Thread(name) {
                    @Override
                    public void run() {
                        DifferenceOfSleepAndWait.m1(); // 而两个线程同时执行m1方法会有 先后顺序
                        // m2(); // 线程 T1,T2同时进入 wait Queue,放弃monitor
                    }
                }.start()
        );
    }   
}

The difference of sleep and wait
1.sleep is the method of Thread,but the wait is the method of Object.
2.Sleep will not release the object monitor(Lock),but the wait will release the monitor and add to the Object monitor waiting queue.
3.Use sleep not depend on the monitor,but wait need.
4.The sleep method not need be wake up ,but wait need.

2.5 图解wait和notify

等待/通知机制依托于同步机制,其目的就是确保等待线程从wait方法返回的时候能够感知到线程对变量做出的修改。

Java多线程基础(一)---线程通信(wait、notify和notifyAll、单线程通信生产者消费者)

如图,WaitThread首先获取了对象的monitor锁,然后调用对象的wait方法,从而放弃了monitor锁进入了WaitQueue中,进入等待状态。由于WaitThread释放了对象的锁,NotifyThread之后获取了对象monitor锁,并调用对象的notify方法,将WaitThread从WaitQueue转移到SynchronizedQueue中,此时WaitThread状态变为阻塞状态。NotifyThread释放了锁之后,WaitThread再次获取到锁并从wait方法返回继续执行。

3 总结

  1. 使用wait、notify和notifyAll方法时需要先对调用对象加锁。
  2. notify或者二notifyAll方法调用后,等待线程依旧不会从wait方法返回,而是调用notify、notifyAll的线程释放锁后,等待线程才有机会从wait方法返回。
  3. wait方法返回的前提是获得了调用对象的monitor锁。
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月12日 0

暂无评论

推荐阅读
  TEZNKK3IfmPf   21天前   48   0   0 java
  TEZNKK3IfmPf   2024年05月31日   55   0   0 java
TEZNKK3IfmPf