countdownlatch的await方法的源码阅读之doAcquireSharedInterruptibly(int arg)

来源:6-2 J.U.C之AQS-CountDownLatch

mongo_m

2018-08-28

 老师,能否稍微讲解下countDownLatch类的await方法的实现当中调用的doAcquireSharedInterruptibly(int arg)方法的执行流程?我看了之后完全不明白每个语句的作用。谢谢老师!辛苦了。


写回答

1回答

Jimin

2018-08-28

你好,我加了些注解,也算是CountDownLatch的核心,你可以结合代码细看一下:


public void await() throws InterruptedException {

    sync.acquireSharedInterruptibly(1);

}

//AQS里面定义的方法,CountDownLatch使用CLH队列的时候,它放进去的Node类型是Shared类型

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {   

    if (Thread.interrupted())

        throw new InterruptedException();

    // tryAcquireShared 返回-1表示失败,如果失败,就要将线程“阻塞”,一种是通过死循环自旋,另一种是调用LockSupport的park方法将线程阻塞

    if (tryAcquireShared(arg) < 0)

        doAcquireSharedInterruptibly(arg);

}

protected int tryAcquireShared(int acquires) {

    //state=0,说明已经有线程把这个CountDownLatch的计数器操作为0

    return (getState() == 0) ? 1 : -1;

}

// countDown也会用到

protected boolean tryReleaseShared(int releases) {

    // 把拿到的state属性减1(如果大于0),如果state为0,返回true

    for (;;) {

        int c = getState();

        if (c == 0)

            return false;

        int nextc = c-1;

        // 死循环的意义就在这里,如果CAS设置新的state失败,说明线程之间的竞争很激烈,它会不断去尝试直至成功

        if (compareAndSetState(c, nextc))

            return nextc == 0;

    }

}   

private void doAcquireSharedInterruptibly(int arg)

    throws InterruptedException {

    final Node node = addWaiter(Node.SHARED);

    boolean failed = true;

    try {

        for (;;) {

            final Node p = node.predecessor();

            if (p == head) {

                // 只有CLH队头的线程才会跳出这个方法

                int r = tryAcquireShared(arg);

                if (r >= 0) {

                    // 计数器的值变为0了

                    setHeadAndPropagate(node, r);

                    p.next = null; // help GC

                    failed = false;

                    return;

                }

            }

            // state还不是0,就会走到这里,第一次的时候,waitStatus是0,那么node的waitStatus就会被置为SIGNAL

            // 所以第二次走到这里的时候,parkAndCheckInterrupt方法就会被执行

            // LockSupport的park方法就会把当前线程阻塞住。

            if (shouldParkAfterFailedAcquire(p, node) &&

                parkAndCheckInterrupt())

                throw new InterruptedException();

        }

    } finally {

        if (failed)

            cancelAcquire(node);

    }

}

0
5
Jimin
回复
mongo_m
其实本质上没有什么特别,只是在实际操作中附加了一些线程的操作,显得有些不一样。AQS里面的CLH队列是CLH同步锁的一种变形。其主要从两方面进行了改造:节点的结构与节点等待机制。在结构上引入了头结点和尾节点,他们分别指向队列的头和尾,尝试获取锁、入队列、释放锁等实现都与头尾节点相关,并且每个节点都引入前驱节点和后后续节点的引用;在等待机制上由原来的自旋改成阻塞唤醒. 关于CLH锁,现在提的少了一些,可以参考一下:https://www.cnblogs.com/duanxz/p/6244045.html
2018-09-01
共5条回复

Java高并发编程,构建并发知识体系,提升面试成功率

构建完整并发与高并发知识体系,倍增高薪面试成功率!

3923 学习 · 832 问题

查看课程