ReentrantLock 特征
特点:
1.可重入
2.公平/非公平
3.可中断
4.支持条件等待
5.可设置锁超时
常用 API
使用例子:
public class ReentrantLockTest {
static ReentrantLock lock = new ReentrantLock(true);
static class ClientThread extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread() + \"开始尝试获取锁\");
lock.lock();
try {
System.out.println(Thread.currentThread() + \"成功获取锁\");
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
System.out.println(Thread.currentThread() + \"完成释放锁\");
}
}
}
public static void main(String[] args) throws InterruptedException {
ClientThread t1 = new ClientThread();
ClientThread t2 = new ClientThread();
ClientThread t3 = new ClientThread();
t1.start();
t2.start();
t3.start();
TimeUnit.SECONDS.sleep(10);
}
}
源码分析
获取锁
如果我使用下面的代码进行获取就行:
ReentrantLock lock = new ReentrantLock();
lock.lock();
lock.unlock();
ReentrantLock 默认调用的就是非公平锁 调用栈: java.util.concurrent.locks.ReentrantLock#lock
java.util.concurrent.locks.ReentrantLock.NonfairSync#lock
final void lock() { // 直接尝试加锁 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else // 如果获取锁失败进入 AQS acquire 逻辑 acquire(1); }
如果 compareAndSetState(0, 1)能够直接执行成功,那么将直接结束方法的执行。 如果失败,那么就会调用acquire 方法如下:
public final void acquire(int arg) { // tryAcquire(arg) 尝试获取锁 // acquireQueued 获取锁失败进行等待队列 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
我们先看 tryAcquire方法: java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire
- java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire
他会直接调用到 nonfairTryAcquire非公平锁的加锁逻辑 里面有两个逻辑:
- 如果当前状态无锁,直接尝试加锁,加锁成功返回 true
- 如果当前时锁重入,那么直接修改 AQS 状态共享变量值 state 等于 c + acquires, 加锁成功返回 ture
如果都不满足,那么返回加锁失败返回 false
// 非公平锁的逻辑 // 如何理解插队, 这里的插队是当前队列中被唤醒的线程, 和当前加入的线程都可以被执行 // 如果当前加入线程比队列中唤醒的线程先获取到锁, 就是插队现象 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 无锁状态, 尝试竞争 if (c == 0) { if (compareAndSetState(0, acquires)) { //是否获取到锁 setExclusiveOwnerThread(current); return true; } } // 当前线程持有锁, state 计数 +1 else if (current == getExclusiveOwnerThread()) { //判断是否是重入 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error(\"Maximum lock count exceeded\"); setState(nextc); return true; } return false; }
如果 tryAcquire 调用完成后是获取锁成功 acquire方法执行结束,最后代表 lock 方法执行结束。
获取锁失败进入同步队列
如果获取锁失败,那么就会执行 acquire代码后面段 if 逻辑的执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 这里其实可以分为两个方法来看
- addWaiter(Node.EXCLUSIVE)
- acquireQueued(xxx, arg)
按照执行顺序,我们先看 addWaiter(Node.EXCLUSIVE) 这里主要是入队的逻辑。 addWaiter: java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter
private Node addWaiter(Node mode) {
// 将当前线程转换为 AQS Node 节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 尝试直接加入到尾节点
Node pred = tail;
if (pred != null) {
// 当前节点的前驱节点指向尾节点
node.prev = pred;
// cas 修改 tail 节点,如果成功返回 node
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果失败,调用 enq
enq(node);
return node;
}
enq是将当前节点插入队列,必要的时候会进行初始化
//将节点插入队列,必要时进行初始化。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 如果没有尾节点,那么需要进行初始化
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
}
// 如果有尾节点/其实就是有头节点/已经被初始化,通过 CAS 入队
else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
前面我们看看完了,当前获取锁的线程当获取锁失败的时候,成功进入 AQS 队列,接下来我们继续看 acquireQueued又做了什么呢?
- 如果是队列头节点,会再次尝试获取锁
如果修改 java.util.concurrent.locks.AbstractQueuedSynchronizer.Node状态位
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { // 是否中断 boolean interrupted = false; for (;;) { // 获取 node 的前驱节点 final Node p = node.predecessor(); // 如果是头节点,再次尝试获取锁 if (p == head && tryAcquire(arg)) { // 将 node 设置为 头节点 setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 判断是否需要进行阻塞当前线程 if (shouldParkAfterFailedAcquire(p, node) && // 阻塞线程 parkAndCheckInterrupt()) interrupted = true; } } finally { // 是否失败 if (failed) // 如果失败,取消获取锁 cancelAcquire(node); } }
上面我们可以看到,for (;;)中有两个判断
- 如果是头节点,就调用tryAcquire尝试获取锁 (之前我们已经分析过 tryAcquire 了,我们主要看后面个 if )
- 如果不是就进入 shouldParkAfterFailedAcquire 方法
在调用 acquireQueued这个过程中可能调用多次 shouldParkAfterFailedAcquire 方法。shouldParkAfterFailedAcquire 会执行一下几个操作。
- 可以用来修改当前节点的状态,
和对链表上无效的节点出队
/** * 当获取锁失败后, 检查更新新节点状态如果是需要阻塞返回, true *
* 一个前继节点 waitStatus = 0, 第一次将继续设置为 SIGNAL, 告诉当前线程准备进入阻塞, 此时依旧获取不到, 当前线程进入阻塞 * * @param pred 前继节点 * @param node 当前节点 * @return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 前继节点的状态, 第一次进入的话, 一定是 0 if (ws == Node.SIGNAL) return true; if (ws > 0) { do { // 出队, 剔除无效的节点 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 第一次进来, pred.waitStatus = 0 执行这个分支 // 将前继节点的状态修改为 SIGNAL, 表示 pred.next 节点需要被唤醒(此时准备进入阻塞, 但是还未被阻塞, 再次获取锁失败之后才会被阻塞) compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
当 Node 被修改 Node.SIGNAL状态后,第一个 if 返回 true , 我们再次回到 acquireQueued 方法,就会执行 parkAndCheckInterrupt 方法,就是将当前的线程 park 然后返回当前线程的中断状态。
private final boolean parkAndCheckInterrupt() { // 阻塞线程 LockSupport.park(this); // 返回线程中断状态 return Thread.interrupted(); }
注意:这里线程 park 过后,其实获取锁就结束了前半段的操作,完成同步队列的入队,并且进入等待。我们就需要等待解锁唤醒。
释放锁
释放锁的代码如下:
lock.unlock();
释放锁做了什么呢?
- 释放当前锁的状态
- 在 AQS 队列中去唤醒排队的头节点
调用栈如下: java.util.concurrent.locks.ReentrantLock#unlock
- java.util.concurrent.locks.AbstractQueuedSynchronizer#release
我们可以从 release方法开始
// 解锁
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 判断是否有需要唤醒的线程
if (h != null && h.waitStatus != 0) //waitStatus 的值为 0, 只有当后继存在节点才会被设置为该值不为 0, 此时需要唤醒后继线程
unparkSuccessor(h);
return true;
}
return false;
}
释放锁,主要是调用 tryRelease, 首先就是考虑之前的重入问题,直接对 state 进行 -1 ,然后如果 c == 0表示当前线程不再持有锁,我们就可以修改 ownerThread == null . 这个时候,最后修改 state 为新值。
// tryRelease
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 判断是否是当前线程持有锁
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 如果 state == 0 表示当前线程不在占有该锁
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
释放锁成功后,再次回到 release方法,会再次判断,如果 AQS 队列不为空,那么就进行排队线程唤醒。 主要是调用 java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor
// 唤醒队列中的线程
private void unparkSuccessor(Node node) {
// 将当前节点状态修改为 0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 反向查找
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒队列中的节点
LockSupport.unpark(s.thread);
}
其实这里最关键的就是 LockSupport.unpark(s.thread); 这里就会回到 acquireQueued,执行唤醒后强锁的逻辑,依然在 acquireQueued里面。
释放锁后唤醒等待节点
当前节点被唤醒逻辑,首先会在 shouldParkAfterFailedAcquire 方法中出队,然后尝试加锁如果加锁成功就返回 true.
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
// 出队的逻辑
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don\'t park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
再次竞争锁,主要是在acquireQueued方法中调用 tryAcquire方法进行获取锁。如果获取锁失败,就又再次获取锁,如果获取锁成功返回。
测试和实践
支持锁中断
如果通过 lock_.lockInterruptibly(); 方式加锁,如果当前线程出现中断过后,会抛出 _java.lang.InterruptedException线程中断异常,所以 ReentrantLock支持可中断。 相关源码:
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
// LockSupport.park 会清除中断信号
LockSupport.park(this);
return Thread.interrupted();
}
//
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 抛出中断异常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
实验代码:
public class ReentrantLockTest {
static ReentrantLock lock = new ReentrantLock(true);
static class ClientThread implements Runnable {
@SneakyThrows
@Override
public void run() {
System.out.println(Thread.currentThread() + \"开始尝试获取锁\");
lock.lockInterruptibly();
try {
System.out.println(Thread.currentThread() + \"成功获取锁\");
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
System.out.println(Thread.currentThread() + \"完成释放锁\");
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(new ClientThread(), \"t1\");
Thread t2 = new Thread(new ClientThread(), \"t2\");
Thread t3 = new Thread(new ClientThread(), \"t3\");
t1.start();
t2.start();
// 锁中断
//lock.lockInterruptibly();
TimeUnit.SECONDS.sleep(1);
t3.start();
TimeUnit.SECONDS.sleep(1);
t3.interrupt();
TimeUnit.SECONDS.sleep(10);
}
}
输出结果:
Thread[t1,5,main]开始尝试获取锁
Thread[t2,5,main]开始尝试获取锁
Thread[t1,5,main]成功获取锁
Thread[t3,5,main]开始尝试获取锁
Exception in thread \"t3\" java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
at io.zhengsh.juc._1lock.reentrantlock.ReentrantLockTest$ClientThread.run(ReentrantLockTest.java:18)
at java.lang.Thread.run(Thread.java:748)
Thread[t1,5,main]完成释放锁
Thread[t2,5,main]成功获取锁
Thread[t2,5,main]完成释放锁
获取锁设置超时
lock.tryLock(2, TimeUnit.SECONDS)可以支持设置获取锁的超时时间,可以有效的避免线程饥饿问题 测试代码:
public class ReentrantLockTryTest {
static ReentrantLock lock = new ReentrantLock(true);
static class ClientThread implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread() + \"\\t\" + (System.currentTimeMillis() / 1000) + \"\\t开始尝试获取锁\");
try {
if (lock.tryLock(2, TimeUnit.SECONDS)) {
System.out.println(Thread.currentThread() + \"\\t\" + (System.currentTimeMillis() / 1000) + \"\\t获取锁成功\");
TimeUnit.SECONDS.sleep(5);
} else {
System.out.println(Thread.currentThread() + \"\\t\" + (System.currentTimeMillis() / 1000) + \"\\t获取锁失败\");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (lock.isHeldByCurrentThread() && lock.isLocked()) {
lock.unlock();
System.out.println(Thread.currentThread() + \"\\t\" + (System.currentTimeMillis() / 1000) + \"\\t完成释放锁\");
}
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(new ClientThread(), \"t1\");
Thread t2 = new Thread(new ClientThread(), \"t2\");
Thread t3 = new Thread(new ClientThread(), \"t3\");
t1.start();
t2.start();
t3.start();
//t1.interrupt();
TimeUnit.SECONDS.sleep(20);
}
}
输出结果
Thread[t1,5,main] 1653540581 开始尝试获取锁
Thread[t3,5,main] 1653540581 开始尝试获取锁
Thread[t2,5,main] 1653540581 开始尝试获取锁
Thread[t1,5,main] 1653540581 获取锁成功
Thread[t3,5,main] 1653540583 获取锁失败
Thread[t2,5,main] 1653540583 获取锁失败
Thread[t1,5,main] 1653540586 完成释放锁
条件等待队列使用
Condition 是在 java 1.5 中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition的await()、signal()这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用 Condition,阻塞队列实际上是使用了 Condition 来模拟线程间协作。 Condition 是个接口,基本的方法就是 await() 和 signal() 方法; Condition 依赖于 Lock 接口,生成一个 Condition 的基本代码是 lock.newCondition() 调用 Condition 的 await() 和 signal() 方法,都必须在 lock 保护之内,就是说必须在 lock.lock() 和 lock.unlock 之间才可以使用:
- Conditon中的await()对应Object的wait();
- Condition中的signal()对应Object的notify();
- Condition中的signalAll()对应Object的notifyAll()。
测试场景: 下面一个场景,需要ABC3个线程,A线程打印1次,然后是B线程打印2次,再是C线程打印3次,线程交替打印。 ABC线程需要交替执行,我们需要控制,线程的执行先后顺序 我们可以使用多条件Condition来控制,每一个线程拥有一个condition对象,调用各种的await方法,可以使线程等待,然后让别的线程调用这个condition对象的signal方法,唤醒线程。 代码如下:
public class ReentrantLockConditionTest {
private int data = 1;
private Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
Condition condition3 = lock.newCondition();
public void printA() {
lock.lock();
try {
while (data != 1) {
condition1.await();
}
// 打印5次
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + \" ->\" + data);
}
data = 2;
// 通知B线程
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printB() {
lock.lock();
try {
while (data != 2) {
condition2.await();
}
// 打印10次
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName() + \" ->\" + data);
}
data = 3;
// 通知C
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printC() {
lock.lock();
try {
while (data != 3) {
condition3.await();
}
// 打印15次
for (int i = 0; i < 15; i++) {
System.out.println(Thread.currentThread().getName() + \" ->\" + data);
}
data = 1;
// 通知A
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
ReentrantLockConditionTest conditionTest = new ReentrantLockConditionTest();
// A,B,C 交替执行
new Thread(conditionTest::printA, \"A\").start();
new Thread(conditionTest::printB, \"B\").start();
new Thread(conditionTest::printC, \"C\").start();
}
}
输出结果如下:
A ->1
B ->2
B ->2
C ->3
C ->3
C ->3