AQS(AbstractQueuedSynchronizer)
概念
AQS 就是一个抽象类,主要用来构建锁和同步器。
AQS 为构建锁和同步器提供了一些通用功能的实现,因此,使用 AQS 能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的 ReentrantLock
,Semaphore
,其他的诸如 ReentrantReadWriteLock
,SynchronousQueue
等等皆是基于 AQS 的。
以下通过ReentrantLock举例,通过锁的使用、源码解读的方式带大家入门AQS。
前置知识
线程的三种创建方式
- 继承Thread
//自定义线程类, 因为java是单继承,很少用
@Slf4j
public class MyThread extends Thread {
@Override
public void run() {
log.info("MyThread");
}
}
public static void main(String[] args) {
new MyThread().start();
log.info("mainThread");
}
/*output
09:52:18.219 [main] INFO cn.stopyc.thread_test.Test - mainThread
09:52:18.219 [Thread-0] INFO cn.stopyc.thread_test.MyThread - MyThread
*/
- 实现Runnable接口
@Slf4j
public class RunnableThread implements Runnable{
@Override
public void run() {
log.info("RunnableThread");
}
}
public static void main(String[] args) {
new Thread(new RunnableThread()).start();
log.info("mainThread");
}
/*output
09:53:59.723 [main] INFO cn.stopyc.thread_test.Test - mainThread
09:53:59.723 [Thread-0] INFO cn.stopyc.thread_test.RunnableThread - RunnableThread
*/
- 实现Callable接口
@Slf4j
public class CallableThread implements Callable<Boolean> {
@Override
public Boolean call() throws Exception {
log.info("CallableThread");
return true;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Boolean> futureTask = new FutureTask<>(new CallableThread());
new Thread(futureTask).start();
//阻塞等待结果
Boolean flag = futureTask.get();
log.info("flag:{}",flag);
log.info("main thread");
}
/*output
09:59:49.732 [Thread-0] INFO cn.stopyc.thread_test.CallableThread - CallableThread
09:59:49.735 [main] INFO cn.stopyc.thread_test.Test - flag:true
09:59:49.737 [main] INFO cn.stopyc.thread_test.Test - main thread
*/
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Boolean> futureTask = new FutureTask<>(new CallableThread());
new Thread(futureTask).start();
log.info("main thread");
//阻塞等待结果
Boolean flag = futureTask.get();
log.info("flag:{}",flag);
}
/*output
09:58:36.378 [main] INFO cn.stopyc.thread_test.Test - main thread
09:58:36.378 [Thread-0] INFO cn.stopyc.thread_test.CallableThread - CallableThread
09:58:36.381 [main] INFO cn.stopyc.thread_test.Test - flag:true
*/
LockSupport的使用
LockSupport是一个工具类,提供了基本的线程阻塞和唤醒功能,它是创建锁和其他同步组件的基础工具,内部是使用sun.misc.Unsafe类实现的。 LockSupporta和使用它的线程都会关联一个许可,park方法表示消耗一个许可,调用park方法时,如果许可可用则park方法返回,如果没有许可则一直阻塞直到许可可用。unpark方法表示增加一个许可,多次调用并不会积累许可,因为许可数最大值为1。
他和wait和notify、notifyAll(这些只能结合synchronized使用)最大的区别是,他可以指定唤醒哪个线程。
这里简单使用一下LockSupport,来完成一个需求,要求是三个线程打印ABC。
@Slf4j
public class Test2 {
private void printA(Thread tb) {
try {
Thread.sleep(1000);
log.info("A");
LockSupport.unpark(tb);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private void printB(Thread tc) {
LockSupport.park();
log.info("B");
LockSupport.unpark(tc);
}
private void printC() {
LockSupport.park();
log.info("C");
}
public static void main(String[] args) {
Test2 t4 = new Test2();
Thread tc = new Thread(t4::printC);
Thread tb = new Thread(()-> t4.printB(tc));
Thread ta = new Thread(()-> t4.printA(tb));
tc.start();
tb.start();
ta.start();
}
}
/*
10:34:21.115 [Thread-2] INFO cn.stopyc.thread_test.Test2 - A
10:34:21.118 [Thread-1] INFO cn.stopyc.thread_test.Test2 - B
10:34:21.118 [Thread-0] INFO cn.stopyc.thread_test.Test2 - C
*/
可以看到,ThreadB使用park阻塞了当前的线程,等待ThreadA利用unpark把ThreadB唤醒。所以保证了ABC打印的顺序。
ReentrantLock的简单使用
@Slf4j
public class Test3 {
static int i = 0;
ReentrantLock lock = new ReentrantLock();
private void add() {
try {
for (int i1 = 0; i1 < 10000; i1++) {
i += 1;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) throws InterruptedException {
Test3 test3 = new Test3();
for (int i = 0; i < 100; i++) {
new Thread(test3::add).start();
}
Thread.sleep(3000L);
log.info("i 为: {}", i);
}
}
/*
10:57:56.291 [main] INFO cn.stopyc.thread_test.Test3 - i 为: 589193
*/
使用可重入锁
@Slf4j
public class Test3 {
static int i = 0;
ReentrantLock lock = new ReentrantLock();
private void add() {
lock.lock();
try {
for (int i1 = 0; i1 < 10000; i1++) {
i += 1;
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
Test3 test3 = new Test3();
for (int i = 0; i < 100; i++) {
new Thread(test3::add).start();
}
Thread.sleep(3000L);
log.info("i 为: {}", i);
}
}
/*
10:59:30.274 [main] INFO cn.stopyc.thread_test.Test3 - i 为: 1000000
*/
模板方法设计模式
如果有学过设计模式的同学应该对模板方法设计不陌生吧,核心思想就是,完成一个功能,步骤都是一样,只有中间核心的某个或者某几个步骤不一样,所以我们可以父类定义通用方法步骤,子类延迟实现关键核心代码,这个就是模板方法设计模式,以下举例说明。
炒菜的步骤是固定的,分为倒油、热油、倒蔬菜、倒调料品、翻炒等步骤。现通过模板方法模式来用代码模拟。
抽象炒菜类
public abstract class AbstractClass {
public final void cookProcess() {
//第一步:倒油
this.pourOil();
//第二步:热油
this.heatOil();
//第三步:倒蔬菜
this.pourVegetable();
//第四步:倒调味料
this.pourSauce();
//第五步:翻炒
this.fry();
}
public void pourOil() {
System.out.println("倒油");
}
//第二步:热油是一样的,所以直接实现
public void heatOil() {
System.out.println("热油");
}
//第三步:倒蔬菜是不一样的(一个下包菜,一个是下菜心)
public abstract void pourVegetable();
//第四步:倒调味料是不一样
public abstract void pourSauce();
//第五步:翻炒是一样的,所以直接实现
public void fry(){
System.out.println("炒啊炒啊炒到熟啊");
}
}
注意:为防止恶意操作,一般模板方法都加上 final 关键词。
炒包菜
public class ConcreteClass_BaoCai extends AbstractClass{
@Override
public void pourVegetable() {
System.out.println("下锅的蔬菜是包菜");
}
@Override
public void pourSauce() {
System.out.println("下锅的酱料是辣椒");
}
}
炒菜心
public class ConcreteClass_CaiXin extends AbstractClass {
@Override
public void pourVegetable() {
System.out.println("下锅的蔬菜是菜心");
}
@Override
public void pourSauce() {
System.out.println("下锅的酱料是蒜蓉");
}
}
测试方法
public class Test {
public static void main(String[] args) {
//炒手撕包菜
AbstractClass baoCai = new ConcreteClass_BaoCai();
baoCai.cookProcess();
//炒蒜蓉菜心
AbstractClass caiXin = new ConcreteClass_CaiXin();
caiXin.cookProcess();
}
}
可重入锁的lock方法
点进可重入锁的lock方法,
抽象静态的内部类,继承了AQS
点击sync实现的lock方法,这里进入的是非公平锁的实现(默认就是非公平锁)
非公平锁和公平锁的区别:AQS会维护一个同步队列,让竞争锁的线程进行等待,公平锁遵循一个FIFO的原则,而非公平锁是让队头的线程和刚来的线程(即还没有进入队列的线程)竞争。
里面使用了三个方法,进去看,发现都是AQS中的方法
最后是调用的是UNSAFE的本地方法,所以AQS是实现可重入锁的关键。
AQS核心原理
如果请求的资源也就是锁是空闲的,那么线程就通过CAS把锁设置为占用状态,后续线程过来之后,一样通过CAS的方式,尝试获取锁,如果获取失败,会添加到一个阻塞队列中去,过程会有多次获取锁的动作,如果一直获取不到,就会阻塞线程,最后等待被唤醒或者发生线程中断现象,然后继续尝试获取锁。
AQS有两个关键的属性,一个state(状态量),用来记录锁的状态的;另一个是Node,用来存放获取不到锁的线程的,Node还维护了一个前指针和一个后指针,还有一个等待状态的属性。
state的值对应锁的什么状态,在不同的实现子类的定义是不一样的,拿可重入锁举例:
- 当
state
的状态量为0时,表示当前锁没有被线程持有,可以被其他线程获取。 - 当
state
的状态量为1时,表示当前锁已被某个线程持有。这种情况下,如果同一个线程再次获取该锁,则属于可重入操作,即可重入计数会递增,代表该线程对锁的占有次数。 - 当
state
的状态量大于1时,表示当前锁已被某个线程持有,并且该线程多次重入了该锁。state
的具体值代表当前线程对锁的占有次数。
可重入锁允许同一个线程多次获取锁,每次获取锁后都会增加锁的重入计数,直到锁的重入计数为0时,其他线程才能获得该锁。这种机制确保了线程在释放锁之前必须重复地获取它,从而保证了线程对锁的正确使用和释放。state
的值用于记录锁被重入的次数。
线程获取锁的时候,其实是用CAS对state变量进行修改,如果预期为0,就修改为1,表示已经获取成功了,本线程继续获取锁的时候就++,所以叫做可重入。其他线程过来获取锁的时候,CAS就会获取失败。当线程释放锁后,会唤醒最早到的阻塞的线程,并弹出队头。
对于Node,这种前后指针其实就是一个双向链表,但是使用的时候把他封装为队列的形式,他保存的是获取不到锁的线程。
Node
node中刚才说了,有一个等待的状态、两个指针,和一个等待的线程。对于等待的状态是如何定义的呢?
- Cancelled=1:表示当前任务需要取消,表示线程可能不等了,或者中断了,直接结束。
- Signal=-1:表示当前的线程,等会执行完,需要去唤醒下一个线程。
- init:0:初始状态,不需要干什么
- Condition=-2:条件队列
- Propagate=-3:共享锁
tryAcquire
解释:尝试获取锁的方法
当锁已经被抢了,那么当前线程就会走到下面的acquire逻辑。
tryAcquire的作用就是我们一开始所说的,state的值代表的锁的状态对于不同的子类实现的含义是不一样的,所以需要延迟到子类去实现。
点进去,作为Abstract抽象类,为什么AQS不把他设置为抽象方法呢,而是要直接去抛出不支持的异常呢?其实我们继续往下看,里面AQS还有一些别的方法
这些是共享锁的实现,而对于可重入锁这种独占锁,他不需要实现这两个方法,所以AQS不把他设置为abstract的目的就是让子类去自定义实现方法,而不需要硬性规定,如果调用了不支持的操作,直接抛出异常,快速失败。
这里我们直接去找到子类重写的tryAcquire,因为是当前线程,所以无并发,直接++即可。
enq
接下来我们再看一个重要的方法,enq,也就是线程获取不到锁后入队的一个实现。
主要也是通过CAS的操作,对node结点进行一个队尾插入的操作。
acquireQueued
当结点入队之后,如果是队头的结点,就先去尝试获取一下锁(而不是直接变为阻塞),如果获取不到,就需要去改变前继结点的状态,比如说改为signal,表示前面的结点释放锁之后需要通知一下自己,设置完成之后,就把线程进行阻塞。
其中shouldParkAfterFailedAcquire方法是用来设置等待状态,因为如果你线程阻塞了,而你前面的一个结点的等待状态不是signal,也就是表示前面一个线程把锁释放后,不会通知当前阻塞的线程。所以这个方法是用来设置前驱结点的等待状态为signal,然后重新获取一次锁,如果获取不到,最后才调用parkAndCheckInterrupt进行阻塞。
unlock
一样,都是先尝试释放锁。
首先判断释放锁的是不是当前获取锁的线程,如果不是,那怎么可能可以释放锁,free的标志是完全释放了锁,因为可重入锁,他的state标志可以是大于1的,表示当前线程获取不止一次锁,所以只有当线程释放了同等数量的锁后,才会标记为真正完全地释放了锁。
释放锁成功后,判断同步队列里面是否还有别的线程,然后线程是否是阻塞的状态(从上面的入队的代码可以得知,入队不代表就是阻塞,入队后还会尝试获取锁,如果获取不到,会设置状态,最后才会阻塞),如果都是,才会去unpark唤醒阻塞的线程。
总体流程
调用可重入锁的lock的方法的时候,会先CAS尝试获取一下锁,如果获取成功,直接运行后继代码;如果获取失败了,就进入到准备入队的方法中,进入入队方法后,还是一样,尝试先获取锁,如果还是获取获取不到,就通过尾插法添加进同步的队列中,然后继续尝试获取一下锁,如果还是获取不到,那么就需要去阻塞线程,阻塞线程前需要设置前驱结点为signal,然后再次循环阻塞线程,阻塞线程直到被唤醒或者发生了中断,如果发生中断,线程就醒过来,然后尝试获取一下锁,如果获取不到,继续阻塞。
当调用unlock的时候,先判断是否是获取到锁的线程,然后CAS尝试释放一下锁,释放成功的话,就会去判断是否需要去唤醒线程,需要唤醒的条件是,队列里面有结点,并且结点还要是阻塞的状态。