AQS 是一个抽象类,它维护了一个共享变量 state 和一个线程等待队列,为 ReentrantLock 等类提供底层支持。
AQS 的思想是,如果被请求的共享资源处于空闲状态,则当前线程成功获取锁;否则,将当前线程加入到等待队列中,当其他线程释放锁时,从等待队列中挑选一个线程,把锁分配给它。
一、AQS原理
1、状态 state 由 volatile 变量修饰,用于保证多线程之间的可见性;
private volatile int state;
2、同步队列由内部定义的 Node 类实现,每个 Node 包含了等待状态、前后节点、线程的引用等,是一个先进先出的双向链表。
static final class Node {
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile Node prev;
volatile Node next;
volatile Thread thread;
}
3、AQS 支持两种同步方式:
独占模式下:每次只能有一个线程持有锁,例如 ReentrantLock。
共享模式下:多个线程可以同时获取锁,例如 Semaphore 和 CountDownLatch。
4、核心方法包括:
acquire
:获取锁,失败进入等待队列;release
:释放锁,唤醒等待队列中的线程;acquireShared
:共享模式获取锁;releaseShared
:共享模式释放锁。
5、AQS 使用一个 CLH 队列来维护等待线程,CLH 是三个作者 Craig、Landin 和 Hagersten 的首字母缩写,是一种基于链表的自旋锁。
在 CLH 中,当一个线程尝试获取锁失败后,会被添加到队列的尾部并自旋,等待前一个节点的线程释放锁。
6、CLH 的优点是,假设有 100 个线程在等待锁,锁释放之后,只会通知队列中的第一个线程去竞争锁。避免同时唤醒大量线程,浪费 CPU 资源。
7、AQS 的设计是基于模板方法模式的,它有一些方法必须要子类去实现的,主要有:
isHeldExclusively()
:该线程是否正在独占资源。只有用到 condition 才需要去实现它。tryAcquire(int)
:独占方式。尝试获取资源,成功则返回 true,失败则返回 false。tryRelease(int)
:独占方式。尝试释放资源,成功则返回 true,失败则返回 false。tryAcquireShared(int)
:共享方式。尝试获取资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。tryReleaseShared(int)
:共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回 true,否则返回 false。
这些方法虽然都是protected
的,但是它们并没有在 AQS 具体实现,而是直接抛出异常:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
不使用抽象方法的目的是:避免强迫子类中把所有的抽象方法都实现一遍,减少无用功,这样子类只需要实现自己关心的抽象方法即可,比如 信号 Semaphore
只需要实现 tryAcquire 方法而不用实现其余不需要用到的模版方法
二、实现类
1、独占模式ReentrantLock
ReentrantLock 是基于 AQS 实现的 可重入排他锁,使用 CAS 尝试获取锁,失败的话,会进入 CLH 阻塞队列,支持公平锁、非公平锁,可以中断、超时等待。
内部通过一个计数器 state 来跟踪锁的状态和持有次数。当线程调用 lock()
方法获取锁时,ReentrantLock 会检查 state 的值,如果为 0,通过 CAS 修改为 1,表示成功加锁。否则根据当前线程的公平性策略,加入到等待队列中。
线程首次获取锁时,state 值设为 1;如果同一个线程再次获取锁时,state 加 1;每释放一次锁,state 减 1。
当线程调用 unlock()
方法时,ReentrantLock 会将持有锁的 state 减 1,如果 state = 0
,则释放锁,并唤醒等待队列中的线程来竞争锁。
使用方式:
class CounterWithLock {
private int count = 0;
private final Lock lock = new ReentrantLock();
public void increment() {
lock.lock(); // 获取锁
try {
count++;
} finally {
lock.unlock(); // 释放锁
}
}
public int getCount() {
return count;
}
}
new ReentrantLock()
默认创建的是非公平锁 NonfairSync。在非公平锁模式下,锁可能会授予刚刚请求它的线程,而不考虑等待时间。当切换到公平锁模式下,锁会授予等待时间最长的线程。
创建 ReentrantLock 的时候,传递参数 true 是公平锁。不传递参数或者传false是非公平锁
ReentrantLock lock = new ReentrantLock(true);
// true 代表公平锁,false 代表非公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
非公平锁和公平锁区别
公平锁意味着在多个线程竞争锁时,获取锁的顺序与线程请求锁的顺序相同,即先来先服务。
非公平锁不保证线程获取锁的顺序,当锁被释放时,任何请求锁的线程都有机会获取锁,而不是按照请求的顺序。
公平锁实现逻辑
公平锁的核心逻辑在 AQS 的 hasQueuedPredecessors()
方法中,该方法用于判断当前线程前面是否有等待的线程。
如果队列前面有等待线程,当前线程就不能抢占锁,必须按照队列顺序排队。如果队列前面没有线程,或者当前线程是队列头部的线程,就可以获取锁。
2、共享模式CountDownLatch
CountDownLatch 内部是一个继承了 AQS 的实现类 Sync,实现了acquireShared
:共享模式获取锁,releaseShared
:共享模式释放锁两个方法
需要注意的是构造器中的计数值(count)实际上就是闭锁需要等待的线程数量。这个值只能被设置一次,而且 CountDownLatch没有提供任何机制去重新设置这个计数值。
核心方法
// 构造方法:
public CountDownLatch(int count)
public void await() // 等待
public boolean await(long timeout, TimeUnit unit) // 超时等待
public void countDown() // count - 1
public long getCount() // 获取当前还有多少count
案例
玩游戏的时候,在游戏真正开始之前,一般会等待一些前置任务完成,比如“加载地图数据”,“加载人物模型”,“加载背景音乐”等等。只有当所有的东西都加载完成后,玩家才能真正进入游戏。模拟一下这个 demo:
public class CountDownLatchDemo {
// 定义前置任务线程
static class PreTaskThread implements Runnable {
private String task;
private CountDownLatch countDownLatch;
public PreTaskThread(String task, CountDownLatch countDownLatch) {
this.task = task;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
Random random = new Random();
Thread.sleep(random.nextInt(1000));
System.out.println(task + " - 任务完成");
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
// 假设有三个模块需要加载
CountDownLatch countDownLatch = new CountDownLatch(3);
// 主任务
new Thread(() -> {
try {
System.out.println("等待数据加载...");
System.out.println(String.format("还有%d个前置任务", countDownLatch.getCount()));
countDownLatch.await();
System.out.println("数据加载完成,正式开始游戏!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 前置任务
new Thread(new PreTaskThread("加载地图数据", countDownLatch)).start();
new Thread(new PreTaskThread("加载人物模型", countDownLatch)).start();
new Thread(new PreTaskThread("加载背景音乐", countDownLatch)).start();
}
}
输出:
等待数据加载...
还有 3 个前置任务
加载人物模型 - 任务完成
加载背景音乐 - 任务完成
加载地图数据 - 任务完成
数据加载完成,正式开始游戏!
3、共享模式Semaphore
Semaphore 内部有一个继承了 AQS 的同步器 Sync,重写了tryAcquireShared
方法。在这个方法里,会去尝试获取资源。如果获取失败(想要的资源数量小于目前已有的资源数量),就会返回一个负数(代表尝试获取资源失败)。然后当前线程就会进入 AQS 的等待队列。
可以在构造方法中传入初始资源总数,以及是否使用“公平”的同步器。默认情况下,是非公平的。
// 默认情况下使用非公平
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
最主要的方法是 acquire 方法和 release 方法。acquire()
方法会申请一个 permit,而 release 方法会释放一个 permit。当然,也可以申请多个 acquire(int permits)
或者释放多个 release(int permits)
。
每次 acquire,permits 就会减少一个或者多个。如果减少到了 0,再有其他线程来 acquire,那就要阻塞这个线程直到有其它线程 release permit 为止。
案例
Semaphore 通常用于那些资源有明确访问数量限制的场景,常用于限流 ,去限制线程的数量 。
比如:数据库连接池,同时进行连接的线程有数量限制,连接不能超过一定的数量,当连接达到了限制数量后,后面的线程只能排队等前面的线程释放了数据库连接才能获得数据库连接。
比如:停车场场景,车位数量有限,同时只能容纳多少台车,车位满了之后只有等里面的车离开停车场外面的车才可以进入。
举个例子,我想限制同时只能有 3 个线程在工作:
public class SemaphoreDemo {
static class MyThread implements Runnable {
private int value;
private Semaphore semaphore;
public MyThread(int value, Semaphore semaphore) {
this.value = value;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire(); // 获取permit
System.out.println(String.format("当前线程是%d, 还剩%d个资源,还有%d个线程在等待",
value, semaphore.availablePermits(), semaphore.getQueueLength()));
// 睡眠随机时间,打乱释放顺序
Random random =new Random();
Thread.sleep(random.nextInt(1000));
System.out.println(String.format("线程%d释放了资源", value));
} catch (InterruptedException e) {
e.printStackTrace();
} finally{
semaphore.release(); // 释放permit
}
}
}
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 10; i++) {
new Thread(new MyThread(i, semaphore)).start();
}
}
}
输出:
当前线程是1, 还剩1个资源,还有0个线程在等待
当前线程是2, 还剩0个资源,还有0个线程在等待
当前线程是0, 还剩1个资源,还有0个线程在等待
线程0释放了资源
当前线程是5, 还剩0个资源,还有6个线程在等待
线程1释放了资源
当前线程是6, 还剩0个资源,还有5个线程在等待
线程6释放了资源
线程5释放了资源
当前线程是7, 还剩1个资源,还有4个线程在等待
当前线程是9, 还剩0个资源,还有3个线程在等待
线程9释放了资源
当前线程是8, 还剩0个资源,还有2个线程在等待
线程2释放了资源
当前线程是4, 还剩0个资源,还有1个线程在等待
线程8释放了资源
当前线程是3, 还剩0个资源,还有0个线程在等待
线程7释放了资源
线程3释放了资源
线程4释放了资源
Semaphore 默认的 acquire 方法是会让线程进入等待队列,且抛出异常中断。但它还有一些方法可以忽略中断或不进入阻塞队列:
// 忽略中断
public void acquireUninterruptibly()
public void acquireUninterruptibly(int permits)
// 不进入等待队列,底层使用CAS
public boolean tryAcquire
public boolean tryAcquire(int permits)
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException
public boolean tryAcquire(long timeout, TimeUnit unit)
评论区