CountDownLatch源码

这次来分析下CountDownLatch,CountDownLatch用于线程通信的一种方式,实现让一个或多个线程等待某些步骤完成之后再运行。也是比较典型的AQS的实现。

总览CountDownLatch方法

直接把源码全部拉下来看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/**
* A synchronization aid that allows one or more threads to wait until
* a set of operations being performed in other threads completes.
*/
public class CountDownLatch {
//静态内部类实现AQS模板
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {
setState(count);
}

int getCount() {
return getState();
}

protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
//CAS实现的释放锁过程
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

private final Sync sync;//私有类,也就是具体的锁对象

//创建锁对象,传入参数表示需要等待的线程数。
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

//关键方法:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

//同样的await方法,多提供了
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}


public void countDown() {
sync.releaseShared(1);
}


public long getCount() {
return sync.getCount();
}


public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
}

加锁的实现

看下关键方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())//这里先判断当前线程是否被interrupted,底层是native方法
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)//这里是核心,调用的是CountDownLatch实现的判断方法,Shared方式,也就是允许节点的nextWriter可写节点,允许state大于1,只有当state为0时返回1。
doAcquireSharedInterruptibly(arg);
}
//带时间的等待,不超过设置的最大等待时间,超过了就被唤起。
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

下面看看doAcquireSharedInterruptibly的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//从末尾节点找到头节点,如果尝试获取锁是大于0(这里shared返回的只能是1或-1)的表明,如果是头节点再去尝试获取锁并释放共享相关的节点。
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//获取锁失败,将对应节点挂起,park在parkAndCheckInterrupt方法里,后续被唤起也是从这里开始。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

解锁的实现

解锁的过程,看看countdown的实现:

1
2
3
4
5
6
7
8
9
10
11
public void countDown() {
sync.releaseShared(1);
}
//这里的tryReleaseShared也是一样,共享锁的state--,只有state等于0的时候返回true
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

下面具体释放锁的流程:逻辑就是从队列头开始,找标志位位-1(表示后面还有节点需要释放,置为0)、标志位为0的节点(等待唤起的节点,置为-3)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void doReleaseShared() {

for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);//unpark下一个节点(ws<=0),这里进来的是head节点,shared情况下头节点一般是空节点。
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//如果发现没有符合的释放了,就直接退出循环
if (h == head) // loop if head changed
break;
}
}

总结释放过程:

首先将count–,然后判断count是否是0,如果是0就去尝试释放(对于被countdownLatch给awit的节点,会被入队,然后-1表示后续还有共享节点(需要注意有头节点,ws就是Node.SIGNAL也就是-1,也就是整个阻塞队列的ws都在头节点来标识),0表示处理可以释放的节点,也就是队列中会是-1、-1、0来表示3个被阻塞的线程),释放从头开始,找-1、0。

至于为什么需要置为-3,可以看看注释的解释:

1
2
3
4
5
6
7
8
9
10
11
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/

tips:park的底层

1
2
3
4
5
6
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);//被当前线程阻碍
UNSAFE.park(false, 0L);//这里被park住,unpark后继续
setBlocker(t, null);//释放障碍
}