上篇的文章中我们介绍了AQS源码中lock方法和unlock方法,这两个方法主要是用来解决并发中互斥的问题,这篇文章我们主要介绍AQS中用来解决线程同步问题的await方法、signal方法和signalAll方法,这几个方法主要对应的是synchronized中的wait方法、notify方法和notifAll方法。在介绍着这几个方法之前,我们先来看看这个几个方法是怎么使用的。
-
我们实现一个阻塞的队列。
import java.util.ArrayList;import java.util.Arrays;import java.util.List;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class MyBlockedQueue<T> {final Lock lock = new ReentrantLock();// 条件变量:队列不满final Condition notFull = lock.newCondition();// 条件变量:队列不空final Condition notEmpty = lock.newCondition();private volatile List<T> list = new ArrayList<>();// 入队void enq(T x) {lock.lock();try {while (list.size() == 10) {// 等待队列不满try {notFull.await();} catch (InterruptedException e) {e.printStackTrace();}}// 省略入队操作list.add(x);// 入队后, 通知可出队notEmpty.signal();} finally {lock.unlock();}}// 出队void deq() {lock.lock();try {while (list.isEmpty()) {// 等待队列不空try {notEmpty.await();} catch (InterruptedException e) {e.printStackTrace();}}list.remove(0);// 出队后,通知可入队notFull.signal();} finally {lock.unlock();}}public List<T> getList() {return list;}public static void main(String[] args) throws InterruptedException {MyBlockedQueue<Integer> myBlockedQueue = new MyBlockedQueue<>();Thread thread1 = new Thread(() -> {for (int i = 0; i < 20; i++) {myBlockedQueue.enq(i);}});Thread thread2 = new Thread(() -> {for (int i = 0; i < 10; i++) {myBlockedQueue.deq();}});thread1.start();thread2.start();try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Arrays.toString(myBlockedQueue.getList().toArray()));}}
-
运行的结果如下:
我们可以看到condition在多线程的中使用,类似于实现了线程之前的通信,当一个条件满足的时候,执行某个线程中操作,当某个条件不满足的时候,将当前的线程挂起,等待这个条件满足的时候,其他的线程唤醒当前线程。
-
我们再来看看await的源码,具体如下图:
大致的流程就是:如果某个线程的调用了await的方法,走来会将这个线程通过CAS和尾插法的方式将这个等待的线程添加到AQS的等待的队列中去。然后将当前的线程进行解锁,避免这个线程没有释放的锁的时候,然后就被挂起。导致其他的线程获取不到锁,亦或者导致死锁的情况。然后将当前的线程进行park,最后等待其他的线程调用signal方法将当前的线程unpark。
- 我们再来看看signal方法和signalAll方法的源码
大致的流程就是:某个线程调用signal方法或者signalAll方法,signal方法会将当前的等待队列中第一个等待的线程的节点加入到原来的AQS队列中去,而signalAll方法是将等待队列中的所有的等待线程的节点全部加入到原来的AQS的队列中去,然后让他们重新获取锁,进行unpark。然后线程被唤醒,执行对应的线程中代码。