【AQS】共享锁的获取

扫码关注公众号:Java 技术驿站

发送:vip
将链接复制到本浏览器,永久解锁本站全部文章

【公众号:Java 技术驿站】 【加作者微信交流技术,拉技术群】

文章首发于:clawhub.club


共享锁的获取与独占锁很像,他们的区别就是当独占锁已经被某个线程持有时,其他线程只能等待它被释放后,才能去争锁,并且同一时刻只有一个线程能争锁成功。
而共享锁再获取锁之后,其他线程就可以去争锁,所以会有一个锁被多个线程持有的状态。

即:独占锁只有释放锁之后唤醒同步队列后面的节点,共享锁在获取到锁和释放锁之后都会唤醒同步队列后面的节点。

AQS中共享锁的获取逻辑代码:

    /**
         * Acquires in shared mode, ignoring interrupts.  Implemented by
         * first invoking at least once {@link #tryAcquireShared},
         * returning on success.  Otherwise the thread is queued, possibly
         * repeatedly blocking and unblocking, invoking {@link
         * #tryAcquireShared} until success.
         * 以共享模式获取,忽略中断。
         * 首先调用至少一次{@link # tryAcquireShared},成功后返回。
         * 否则,线程将排队,可能会反复阻塞和解除阻塞,调用{@link
         * # tryAcquireShared},直到成功。
         *
         * @param arg the acquire argument.  This value is conveyed to
         *            {@link #tryAcquireShared} but is otherwise uninterpreted
         *            and can represent anything you like.
         */
        public final void acquireShared(int arg) {
            //尝试获取共享锁,由子类实现
            if (tryAcquireShared(arg) < 0)
                //通过循环去获取共享锁
                doAcquireShared(arg);
        }

可知只有两个方法:

tryAcquireShared(arg)

这个方法由子类实现,CountDownLatch的子类Sync与Semaphore中的子类Sync均实现了这个方法:
以CountDownLatch中的为例:

       protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }

很简单,就是状态判断。

doAcquireShared(arg)

由AQS实现,Node节点种的nextWaiter属性不为空。这个方法很复杂:

     /**
         * Acquires in shared uninterruptible mode.
         * 以共享不可中断模式获取。
         *
         * @param arg the acquire argument
         */
        private void doAcquireShared(int arg) {
            //共享模式节点加入到同步队列尾部
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (; ; ) {
                    //获取前驱节点
                    final Node p = node.predecessor();
                    //如果前驱节点是头节点
                    if (p == head) {
                        //获取锁
                        int r = tryAcquireShared(arg);
                        //大于等于0为获取锁成功
                        if (r >= 0) {
                            //设置头节点,并释放锁
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                //标志线程中断
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    //如果获取锁失败,阻塞当前线程,并检查中断
                    if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                //取消节点,为了响应式中断设计
                if (failed)
                    cancelAcquire(node);
            }
        }
      /**
         * Sets head of queue, and checks if successor may be waiting
         * in shared mode, if so propagating if either propagate > 0 or
         * PROPAGATE status was set.
         *
         * @param node      the node
         * @param propagate the return value from a tryAcquireShared
         */
        private void setHeadAndPropagate(Node node, int propagate) {
            //设置头节点
            Node h = head; // Record old head for check below
            setHead(node);
            /*
             * Try to signal next queued node if:
             *   Propagation was indicated by caller,
             *     or was recorded (as h.waitStatus either before
             *     or after setHead) by a previous operation
             *     (note: this uses sign-check of waitStatus because
             *      PROPAGATE status may transition to SIGNAL.)
             * and
             *   The next node is waiting in shared mode,
             *     or we don't know, because it appears null
             *
             * The conservatism in both of these checks may cause
             * unnecessary wake-ups, but only when there are multiple
             * racing acquires/releases, so most need signals now or soon
             * anyway.
             */
            //节点没有取消,也不是初始化状态,同步队列中还有等待的线程
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                    (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                //如果节点是共享模式,执行释放锁
                if (s == null || s.isShared())
                    doReleaseShared();
            }
        }
     /**
         * Release action for shared mode -- signals successor and ensures
         * propagation. (Note: For exclusive mode, release just amounts
         * to calling unparkSuccessor of head if it needs signal.)
         * 共享模式释放锁
         */
        private void doReleaseShared() {
            /*
             * Ensure that a release propagates, even if there are other
             * in-progress acquires/releases.  This proceeds in the usual
             * way of trying to unparkSuccessor of head if it needs
             * signal. But if it does not, status is set to PROPAGATE to
             * ensure that upon release, propagation continues.
             * Additionally, we must loop in case a new node is added
             * while we are doing this. Also, unlike other uses of
             * unparkSuccessor, we need to know if CAS to reset status
             * fails, if so rechecking.
             */
            for (; ; ) {
                //获取头节点
                Node h = head;
                //队列初始化了,且队列中有等待线程节点,即最起码有两个节点
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    //如果头节点的状态时SIGNAL,则需要唤醒其后继节点
                    if (ws == Node.SIGNAL) {
                        //设置节点状态为0,失败则从新开始循环
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        //设置成功后,唤醒后继节点
                        unparkSuccessor(h);
                    } else if (ws == 0 &&
                            !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        //将头节点状态设置为PROPAGATE
                        continue;                // loop on failed CAS
                }
                //如果头节点改变了,继续循环
                if (h == head)                   // loop if head changed
                    break;
            }
        }
    /**
         * Checks and updates status for a node that failed to acquire.
         * Returns true if thread should block. This is the main signal
         * control in all acquire loops.  Requires that pred == node.prev.
         *
         * @param pred node's predecessor holding status
         * @param node the node
         * @return {@code true} if thread should block
         */
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            // 获得前驱节点的ws
            int ws = pred.waitStatus;
            //如果前驱节点是SIGNAL,返回true,就会执行挂起当前线程操作
            if (ws == Node.SIGNAL)
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
            //CANCELLED=1,所以如果ws>0,表示前驱节点已经释放锁了
            if (ws > 0) {
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    //1,前驱节点指向前驱节点的前驱
                    //2,当前节点的前驱指向前驱节点
                    // 即跳过取消了等待锁的前驱节点
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                //新前驱节点的后继指向当前节点
                pred.next = node;
            } else {
                //这种情况就直接将前驱节点的ws设置为SIGNAL
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
      /**
         * Convenience method to park and then check if interrupted
         *
         * @return {@code true} if interrupted
         */
        private final boolean parkAndCheckInterrupt() {
            //线程被挂起了,不会向下执行了,等待被唤醒
            LockSupport.park(this);
            return Thread.interrupted();
        }

从分析源码中可以看出,共享锁的释放依旧是状态的修改+循环CAS+队列。


来源:https://www.jianshu.com/p/347ea7f881f8

赞(0) 打赏
版权归原创作者所有,任何形式的转载请联系博主:daming_90:Java 技术驿站 » 【AQS】共享锁的获取

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏