【AQS】Condition接口的实现

扫码关注公众号:Java 技术驿站

发送:vip
将链接复制到本浏览器,永久解锁本站全部文章

【公众号:Java 技术驿站】 【加作者微信交流技术,拉技术群】

文章首发于:clawhub.club


同步队列与条件队列

sync queue

2019102910024\_1.png

同步队列.png

同步队列是双向链表,使用prev和next来连接节点,nextWaiter属性只是一个标志作用,共享锁模式下使用。
入队时没有锁 -> 在队列中争锁 -> 离开队列时获得了锁

condition queue

2019102910024\_2.png

条件队列.png

条件队列是用nextWaiter连接节点的单链表。其waitStatus属性中只关注CONDITION,表示线程处于正常的等待状态。
入队时持有锁 -> 在队列中释放锁 -> 离开队列时没有锁 -> 转移到sync queue

同步队列与条件队列的联系

2019102910024\_3.png

同步队列与条件队列的联系.png

当调用某个条件队列的signal方法时,会将某个或所有等待在这个条件队列中的线程唤醒,被唤醒的线程和普通线程一样需要去争锁,
如果没有抢到,则要被加到等待锁的sync queue中去,此时节点就从condition queue中被转移到sync queue中。

CondtionObject源码分析

核心属性

    /**
             * First node of condition queue.
             * 首哨兵
             */
            private transient Node firstWaiter;
            /**
             * Last node of condition queue.
             * 尾哨兵
             */
            private transient Node lastWaiter;

条件等待await()

    /**
             * Implements interruptible condition wait.
             * 实现可中断条件wait。
             * <ol>
             * <li> If current thread is interrupted, throw InterruptedException.
             * 如果当前线程被中断,抛出InterruptedException。
             * <li> Save lock state returned by {@link #getState}.
             * 保存{@link #getState}返回的锁状态。
             * <li> Invoke {@link #release} with saved state as argument,
             * throwing IllegalMonitorStateException if it fails.
             * 使用保存的状态作为参数调用{@link #release},如果失败则抛出IllegalMonitorStateException。
             * <li> Block until signalled or interrupted.
             * 阻塞,直到发出信号或中断。
             * <li> Reacquire by invoking specialized version of
             * {@link #acquire} with saved state as argument.
             * 通过调用{@link # acquisition}的专门化版本并将保存的状态作为参数重新获取。
             * <li> If interrupted while blocked in step 4, throw InterruptedException.
             * 如果在步骤4中阻塞时中断,则抛出InterruptedException。
             * </ol>
             */
            public final void await() throws InterruptedException {
                //线程中断,抛出中断异常
                if (Thread.interrupted())
                    throw new InterruptedException();
                //在等待队列中添加一个新的waiter。
                Node node = addConditionWaiter();
                //释放锁
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                //节点不在等待队列中
                while (!isOnSyncQueue(node)) {
                    //阻塞
                    LockSupport.park(this);
                    //检查中断
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                //重新获取锁
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                //如果下一个节点不为null
                if (node.nextWaiter != null) // clean up if cancelled
                    //从条件队列中取消已取消的服务员节点的链接
                    unlinkCancelledWaiters();
                //发生中断
                if (interruptMode != 0)
                    //抛出InterruptedException,重新中断当前线程,或者什么都不做,这取决于模式。
                    reportInterruptAfterWait(interruptMode);
            }

await方法中有几个重要的方法:addConditionWaiter(),fullyRelease(node),isOnSyncQueue(node),checkInterruptWhileWaiting(node),acquireQueued(node, savedState),reportInterruptAfterWait(interruptMode)

addConditionWaiter()

主要就是一些链表操作,将取消等待的节点去掉,新增等待节点在队尾。

      /**
             * Adds a new waiter to wait queue.
             * 在等待队列中添加一个新的waiter。
             *
             * @return its new wait node 它的新等待节点
             */
            private Node addConditionWaiter() {
                //保存条件队列的最后一个节点。
                Node t = lastWaiter;
                // If lastWaiter is cancelled, clean out.
                //如果lastWaiter被取消,请清理。
                if (t != null && t.waitStatus != Node.CONDITION) {
                    //从条件队列中取消已取消的Waiter节点的链接
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
                //新节点
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                //如果当前等待队列中没有节点,则node为新的首哨兵
                if (t == null)
                    firstWaiter = node;
                else
                    //指针连接
                    t.nextWaiter = node;
                //尾哨兵移动到node
                lastWaiter = node;
                return node;
            }
     /**
             * Unlinks cancelled waiter nodes from condition queue.
             * Called only while holding lock. This is called when
             * cancellation occurred during condition wait, and upon
             * insertion of a new waiter when lastWaiter is seen to have
             * been cancelled. This method is needed to avoid garbage
             * retention in the absence of signals. So even though it may
             * require a full traversal, it comes into play only when
             * timeouts or cancellations occur in the absence of
             * signals. It traverses all nodes rather than stopping at a
             * particular target to unlink all pointers to garbage nodes
             * without requiring many re-traversals during cancellation
             * storms.
             * 从条件队列中取消已取消的服务员节点的链接。仅在持锁时调用。
             * 当在条件等待期间发生取消时调用此函数,当看到lastWaiter已被取消时调用新waiter。
             * 在没有信号的情况下,需要使用这种方法来避免垃圾保留。
             * 因此,尽管它可能需要一个完整的遍历,但只有在没有信号的情况下超时或取消才会起作用。
             * 它遍历所有节点,而不是停在一个特定的目标上,以断开到垃圾节点的所有指针的链接
             * ,而不需要在取消风暴期间进行多次重新遍历。
             */
            private void unlinkCancelledWaiters() {
                Node t = firstWaiter;
                Node trail = null;
                //迭代等待队列,只要ws不为CONDITION,就移除节点
                while (t != null) {
                    //保存当前节点的下一个节点
                    Node next = t.nextWaiter;
                    //ws不为CONDITION,准备移除此节点
                    if (t.waitStatus != Node.CONDITION) {
                        //当前节点的后继置空,GC
                        t.nextWaiter = null;
                        //最开始,trail为null,首哨兵指向当前节点的下一个节点
                        if (trail == null)
                            firstWaiter = next;
                        else
                            trail.nextWaiter = next;
                        //如果已经没有后继节点
                        if (next == null)
                            //尾哨兵指向最后一个等待条件节点
                            lastWaiter = trail;
                    } else
                        //trail指向当前等待条件节点
                        trail = t;
                    //后移
                    t = next;
                }
            }
fullyRelease(node)

释放锁,只有获取了锁的线程,才能释放锁,即await()方法必须在获取锁之后才能使用。
由源码可知,不管重入了多少次锁,一次性全部释放。

      /**
         * Invokes release with current state value; returns saved state.
         * Cancels node and throws exception on failure.
         * 使用当前状态值调用release;返回保存的状态。取消节点并在失败时抛出异常。
         *
         * @param node the condition node for this wait
         * @return previous sync state
         */
        final int fullyRelease(Node node) {
            boolean failed = true;
            try {
                //返回同步状态的当前值
                int savedState = getState();
                //释放锁
                if (release(savedState)) {
                    //锁释放成功
                    failed = false;
                    return savedState;
                } else {
                    throw new IllegalMonitorStateException();
                }
            } finally {
                if (failed)
                    //锁释放失败的时候,将节点的ws设置为取消状态
                    node.waitStatus = Node.CANCELLED;
            }
        }
     /**
         * Releases in exclusive mode.  Implemented by unblocking one or
         * more threads if {@link #tryRelease} returns true.
         * This method can be used to implement method {@link Lock#unlock}.
         * 以独占模式释放,如果{@link #tryRelease}返回true,则通过解阻塞一个或多个线程来实现。
         * 此方法可用于实现方法{@link Lock#unlock}。
         *
         * @param arg the release argument.  This value is conveyed to
         *            {@link #tryRelease} but is otherwise uninterpreted and
         *            can represent anything you like.
         * @return the value returned from {@link #tryRelease}
         */
        public final boolean release(int arg) {
            //由子类实现
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    //唤醒node的后继节点(如果存在的话)。
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
      /**
         * Wakes up node's successor, if one exists.
         * 唤醒node的后继节点(如果存在的话)。
         * 释放锁和取消获取锁时被调用
         *
         * @param node the node
         */
        private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             * 如果状态为负,清除预期信号,如果此操作失败或状态被等待线程更改,则没有问题。
             * CANCELLED = 1
             * 0
             * SIGNAL = -1
             * CONDITION = -2
             * PROPAGATE = -3
             */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);

            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             * Thread to unpark被保存在后续节点中,它通常只是下一个节点,
             * 但是,如果取消或明显为空,则从tail向后遍历以找到实际的非取消后继。
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                //从后向前遍历节点,最后s为离当前节点最近的需要被唤醒的节点
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            //唤醒操作
            if (s != null)
                LockSupport.unpark(s.thread);
        }
isOnSyncQueue(node)

判断当前节点是否在同步队列,如果不在同步队列就挂起当前线程。
如果isOnSyncQueue检测到当前节点不在sync queue中,则说明既没有发生中断,也没有发生过signal,
则当前线程是被“假唤醒”的,那么我们将再次进入循环体,将线程挂起。

     /**
         * Returns true if a node, always one that was initially placed on
         * a condition queue, is now waiting to reacquire on sync queue.
         * 如果一个节点(始终是最初放置在条件队列中的节点)现在正等待在同步队列上重新获取,则返回true。
         *
         * @param node the node
         * @return true if is reacquiring
         */
        final boolean isOnSyncQueue(Node node) {
            if (node.waitStatus == Node.CONDITION || node.prev == null)
                return false;
            if (node.next != null) // If has successor, it must be on queue
                return true;
            /*
             * node.prev can be non-null, but not yet on queue because
             * the CAS to place it on queue can fail. So we have to
             * traverse from tail to make sure it actually made it.  It
             * will always be near the tail in calls to this method, and
             * unless the CAS failed (which is unlikely), it will be
             * there, so we hardly ever traverse much.
             * 从后向前查找节点是否存在此同步队列
             */
            return findNodeFromTail(node);
        }
        /**
         * Returns true if node is on sync queue by searching backwards from tail.
         * Called only when needed by isOnSyncQueue.
         *
         * @return true if present
         */
        private boolean findNodeFromTail(Node node) {
            //从后向前遍历
            Node t = tail;
            for (; ; ) {
                if (t == node)
                    return true;
                if (t == null)
                    return false;
                t = t.prev;
            }
        }
acquireQueued(node, savedState)

正常的获取锁,以前分析过。

     /**
         * Acquires in exclusive uninterruptible mode for thread already in
         * queue. Used by condition wait methods as well as acquire.
         * 获取队列中已存在线程的独占不可中断模式。
         * 用于条件等待方法以及获取。
         * 能走到这一步,那么这个等待锁的线程所封装的节点一定在等待队列中
         *
         * @param node the node
         * @param arg  the acquire argument
         * @return {@code true} if interrupted while waiting
         */
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                //循环,最终节点会获取到锁
                for (; ; ) {
                    //获取节点的前驱节点
                    final Node p = node.predecessor();
                    //如果前驱节点是头节点,那么就尝试一次获取锁
                    if (p == head && tryAcquire(arg)) {
                        //获取锁成功,当前节点变成了头节点,节点中的线程属性也清空。
                        setHead(node);
                        // help GC
                        p.next = null;
                        failed = false;
                        return interrupted;
                    }
                    //走到这,要么节点的前驱不是头节点,要么是获取锁失败了。
                    //如果前驱节点waitStatus为SIGNAL,挂起当前线程,并且检查中断
                    //如果前驱节点waitStatus不为SIGNAL,最终将其设置为SIGNAL
                    if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt())
                        //在这之前,线程已经被挂起了,坐等解阻塞
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
checkInterruptWhileWaiting(node)和reportInterruptAfterWait(interruptMode)

这两个方法就是处理线程中断的逻辑了,interruptMode有三个值:

  • 0
    代表整个过程中一直没有中断发生。
  • THROW_IE
    表示退出await()方法时需要抛出InterruptedException,这种模式对应于中断发生在signal之前
  • REINTERRUPT
    表示退出await()方法时只需要再自我中断以下,这种模式对应于中断发生在signal之后,即中断来的太晚了。
    /**
             * Checks for interrupt, returning THROW_IE if interrupted
             * before signalled, REINTERRUPT if after signalled, or
             * 0 if not interrupted.
             * 检查是否有中断,如果在信号之前中断,返回THROW_IE;如果在信号之后中断,
             * 返回REINTERRUPT;如果没有中断,返回0。
             */
            private int checkInterruptWhileWaiting(Node node) {
                return Thread.interrupted() ?
                        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                        0;
            }
     /**
         * Transfers node, if necessary, to sync queue after a cancelled wait.
         * Returns true if thread was cancelled before being signalled.
         * 如果需要,在取消等待后传输节点来同步队列。如果线程在发出信号之前被取消,则返回true。
         *
         * @param node the node
         * @return true if cancelled before the node was signalled
         */
        final boolean transferAfterCancelledWait(Node node) {
            if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
                enq(node);
                return true;
            }
            /*
             * If we lost out to a signal(), then we can't proceed
             * until it finishes its enq().  Cancelling during an
             * incomplete transfer is both rare and transient, so just
             * spin.
             * 如果我们被signal()取代,那么我们就不能继续,直到它完成它的enq()。
             * 在一个不完整的转移过程中取消是罕见的,也是短暂的,所以只要旋转。
             */
            while (!isOnSyncQueue(node))
                //释放CUP
                Thread.yield();
            return false;
        }
      /**
             * Throws InterruptedException, reinterrupts current thread, or
             * does nothing, depending on mode.
             * 抛出InterruptedException,重新中断当前线程,或者什么都不做,这取决于模式。
             */
            private void reportInterruptAfterWait(int interruptMode)
                    throws InterruptedException {
                if (interruptMode == THROW_IE)
                    throw new InterruptedException();
                else if (interruptMode == REINTERRUPT)
                    selfInterrupt();
            }

唤醒signal()

只有获取锁的线程,才能执行这个方法。

     /**
             * Moves the longest-waiting thread, if one exists, from the
             * wait queue for this condition to the wait queue for the
             * owning lock.
             * 将等待时间最长的线程(如果存在的话)从该条件的等待队列移动到拥有锁的等待队列。
             *
             * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
             *                                      returns {@code false}
             */
            public final void signal() {
                //如果仅针对当前(调用)线程保持同步,则返回true。
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                //等待时间最长的线程,队首
                Node first = firstWaiter;
                if (first != null)
                    // 将节点从条件队列传输到同步队列,唤醒
                    doSignal(first);
            }

最重要的就是doSignal(first)方法。

      /**
             * Removes and transfers nodes until hit non-cancelled one or
             * null. Split out from signal in part to encourage compilers
             * to inline the case of no waiters.
             * 删除和传输节点,直到到达不可取消的或null。
             * 从signal中分离出来,部分原因是为了鼓励编译器在没有等待器的情况下内联。
             *
             * @param first (non-null) the first node on condition queue 条件队列上的第一个节点
             */
            private void doSignal(Node first) {
                //循环,直到将节点从条件队列传输到同步队列返回false 和 条件队列上的第一个节点不为空。
                do {
                    //向后移动一次首哨兵,如果当前首哨兵所在节点为空,则将尾哨兵设为空
                    if ((firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    //断开第一个节点与后继节点之间的关系
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&
                        (first = firstWaiter) != null);
            }

ransferForSignal(Node node) 方法,就是将节点从调节队列转移到同步队列。


        /**
         * Transfers a node from a condition queue onto sync queue.
         * Returns true if successful.
         * 将节点从条件队列传输到同步队列。如果成功返回true。
         *
         * @param node the node
         * @return true if successfully transferred (else the node was
         * cancelled before signal)
         */
        final boolean transferForSignal(Node node) {
            /*
             * If cannot change waitStatus, the node has been cancelled.
             * 如果无法更改等待状态,则节点已被取消。
             */
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;

            /*
             * Splice onto queue and try to set waitStatus of predecessor to
             * indicate that thread is (probably) waiting. If cancelled or
             * attempt to set waitStatus fails, wake up to resync (in which
             * case the waitStatus can be transiently and harmlessly wrong).
             * 将Splice连接到队列,并尝试设置前辈的等待状态,以指示线程(可能)正在等待。、
             * 如果取消或尝试设置等待状态失败,则唤醒并重新同步
             * (在这种情况下,等待状态可能是暂时错误的,并且不会造成任何危害)。
             */
            Node p = enq(node);
            int ws = p.waitStatus;
            //节点的CAS等待状态字段。
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                //解阻塞
                LockSupport.unpark(node.thread);
            return true;
        }

来源:https://www.jianshu.com/p/347ea7f881f8

赞(0) 打赏
版权归原创作者所有,任何形式的转载请联系博主:daming_90:Java 技术驿站 » 【AQS】Condition接口的实现

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏