ReentrantLock源码分析

首先可以先看看整体的流程,图片来自[Java3y]博主的图

008eGmZEly1gmput7lwi0j31600u07wh.jpg

加解锁过程如上,其中几个关键节点:

  1. 判断state是否为0表示当前的锁没有被占用
  2. 公平锁不会先去尝试CAS获取锁,非公平锁在入队之前就会直接尝试CAS获取
  3. 解锁过程找的是离头节点最近的小于0的节点(这里考虑的是某些节点在队列中就被cancel的情况)

下面直接从源码开始分析:

锁实现

首先看看ReentrantLock的类:一些关键点已经进行注释了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
/** Synchronizer providing all implementation mechanics */
private final Sync sync;//锁本质是单例的静态内部类

/**
* Base of synchronization control for this lock. Subclassed
* into fair and nonfair versions below. Uses AQS state to
* represent the number of holds on the lock.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {//是父类,子类实现包含非公平和公平两种的静态内部类。
private static final long serialVersionUID = -5179523762034025860L;

/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();

/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {//非公平的获取锁,一次尝试
final Thread current = Thread.currentThread();
int c = getState();//首先拿到锁的标识位volite修饰的。
if (c == 0) {//如果是0表示没有被占用
if (compareAndSetState(0, acquires)) {//通过CAS方式set为1,
setExclusiveOwnerThread(current);//将占用的标志位标识为当前线程
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//如果状态位不为0,就表示已经被占用了,这时去判断是否是当前的线程,如果是的话就可重入。
int nextc = c + acquires;//标识位++;
if (nextc < 0) // overflow(防止int型溢出为-)
throw new Error("Maximum lock count exceeded");
setState(nextc);//已经被锁住了,只有当前线程能够拿到,所以可以不用cas,直接set
return true;
}
return false;
}

protected final boolean tryRelease(int releases) {//尝试释放锁
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {//如果标志位(其实就是信号量为0),就说明当前的锁已经被释放了。
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}

final ConditionObject newCondition() {
return new ConditionObject();
}

// Methods relayed from outer class

final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}

final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}

final boolean isLocked() {
return getState() != 0;
}

/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}

加锁

接下来看看加锁的过程,以非公平锁为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))//CAS修改锁的标识位
setExclusiveOwnerThread(Thread.currentThread());//给当前的线程分配锁
else
acquire(1);//底层是AQS的acquire,但实现类其实就是下面的tryAcquire
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

非公平锁和公平锁的区别点:非公平锁在lock的时候直接进行CAS尝试获取锁,公平锁这里不会进行,而是尝试入队,然后找队列头节点去尝试获取锁。

在看看入队操作是怎么进入的,首先就是尝试是否能获取到锁(可重入的判断),然后再进行入队操作addWaiter(Node.EXCLUSIVE:就是null).

  1. 这里其实主要分为两步,tryAcquire调用的是父类Sync里的nonfairTryAcquire,做两件事情,使用CAS尝试获取锁,如果获取不到就再看看是否是当前线程占用的锁,如果是就++,这里就是可重入的实现了。
  2. 如果第一步没有获取到锁,就acquireQueued(addWaiter(Node.EXCLUSIVE), arg)尝试入队。
1
2
3
4
5
6
//调用的方法,其实就是两个部分,首先tryAcquire,如果没有获取到就acquireQueued
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();//尝试获取失败和入队成功后,将当前线程挂起。
}

入队操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {//如果尾节点为空,就去初始化,不为空就加入
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);//会进行队列的初始化,并将节点入队(注意这个是带头节点的双向链表)。
return node;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();//这里是返回当前节点的前一个节点,如果为空就抛出空指针异常退出循环
if (p == head && tryAcquire(arg)) {//如果是头节点就尝试获取锁(CAS方式)
setHead(node);//如果获取到锁就将头节点移除,下一个节点设置为head
p.next = null; // help GC
failed = false;//标志位,表示头节点已经被释放了,不需要进行cancelAcquire操作
return interrupted;
}
//shouldParkAfterFailedAcquire这个方法是将前置节点和当前节点传入,本质思想是将前置节点的waitStatus置为-1表示后面还有节点再等待,同时会向前循环移除掉取消的node(也就是waitStatus>0的节点)。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;//标识位表示前置节点已做了修改
}
} finally {
if (failed)
cancelAcquire(node);
}
}

自此,获取锁的操作就完成了。总结步骤:

  1. 直接尝试获取锁,成功了就直接返回,(CAS改变锁的状态为1,同时修改锁的当前排他线程为当前线程)(这里是非公平与公平的区别,公平锁不会直接尝试,会直接acquire,然后非公平锁的acquire当前线程不会在判断state为0的时候去尝试获取锁,而是去找队列的头节点,除非队列中节点为空才会直接获取)

  2. 获取不到就acquire去尝试(主要是为了检查是否是当前线程拥有的锁,可重入),拿不到就入队,入队的时候会排除取消的节点(wa

    itState大于0)然后再将入队节点的前置节点置为-1,表示后续有节点在等待,同时会循环到头节点,头节点acquire去尝试获取锁。

  3. 入队完成后调用selfInterpt将当前线程挂起。

解锁

ReentrantLock解锁的入口是继承自Lock接口,实现unlock方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
public void unlock() {
sync.release(1);
}

public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

分为两步

  1. 尝试释放锁
  2. 修改队列中节点信息(就是将当前的节点的waitStatus置为0,然后从尾到头寻找第一个等待标志位小于0的节点,然后置unpark)
1
2
3
4
5
6
7
8
9
10
11
12
protected final boolean tryRelease(int releases) {
int c = getState() - releases;//修改标识位
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);//将排他锁释放
}
setState(c);
return free;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)//循环找
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

AQS

最后我们来看看AQS,ReentrantLock就是典型的AQS的机制。

AQS:AbstractQuenedSynchronizer抽象的队列式同步器,AQS的核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并将共享资源设置为锁定。配套的是一套线程阻塞、等待以及被唤醒时锁分配的机制。AQS是将每一条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node),来实现锁的分配。可以结合ReentrantLock来理解下。