【锁】Condition接口分析

扫码关注公众号:Java 技术驿站

发送:vip
将链接复制到本浏览器,永久解锁本站全部文章

【公众号:Java 技术驿站】 【加作者微信交流技术,拉技术群】

文章首发于:clawhub.club


Condition是用来代替传统Object中的wait()和notify()实现线程间的协作,Condition的await()和signal()用于处理线程间协作更加安全与高效,JAVA中的阻塞队列就是用Condision实现。Condition的使用必须在锁定与解锁直接使用,且只能通过lock.newCondition()获取。

使用例子:

假设有一个有界的缓冲区,它支持put和take方法。如果在空缓冲区上尝试take,那么线程将阻塞,直到缓冲区中有数据可用为止;如果在一个数据已满的缓冲区上尝试put,那么线程将阻塞,直到空间可用为止。我们希望在单独的等待集中继续等待put线程和take线程,这样我们就可以在缓冲区中的数据或空间可用时只通知单个线程。这可以使用两个Condition实例来实现。

    //有界缓冲
     class BoundedBuffer {
        //重入锁
       final Lock lock = new ReentrantLock();
       //未满
       final Condition notFull  = lock.newCondition(); 
       //未空
       final Condition notEmpty = lock.newCondition(); 

       //缓冲数组
       final Object[] items = new Object[100];
       int putptr, takeptr, count;

       //插入
       public void put(Object x) throws InterruptedException {
         //锁定
         lock.lock();
         try {
           //当数组已满,其他插入线程等待,阻塞
           while (count == items.length)
             notFull.await();
           //插入
           items[putptr] = x;
           if (++putptr == items.length) putptr = 0;
           ++count;
           唤醒获取线程
           notEmpty.signal();
         } finally {
           //解锁
           lock.unlock();
         }
       }
       //获取
       public Object take() throws InterruptedException {
         //锁定
         lock.lock();
         try {
           //缓冲区没有值,等待
           while (count == 0)
             notEmpty.await();
           Object x = items[takeptr];
           if (++takeptr == items.length) takeptr = 0;
           --count;
           //唤醒插入线程
           notFull.signal();
           return x;
         } finally {
           //解锁
           lock.unlock();
         }
       }
     }

AbstractQueuedSynchronizer中的ConditionObject实现了Condition接口,而AQS是实现Lock的基石,
一般锁的内部都有同步器,即锁的内部都会有Condision的实现,所以才会用lock.newCondition()获取Condision。

await()源码翻译:

            /**
             * Implements interruptible condition wait.
             * 实现可中断条件wait。
             * <ol>
             * <li> If current thread is interrupted, throw InterruptedException.
             * 如果当前线程被中断,抛出InterruptedException。
             * <li> Save lock state returned by {@link #getState}.
             * 保存{@link #getState}返回的锁状态。
             * <li> Invoke {@link #release} with saved state as argument,
             * throwing IllegalMonitorStateException if it fails.
             * 使用保存的状态作为参数调用{@link #release},如果失败则抛出IllegalMonitorStateException。
             * <li> Block until signalled or interrupted.
             * 阻塞,直到发出信号或中断。
             * <li> Reacquire by invoking specialized version of
             * {@link #acquire} with saved state as argument.
             * 通过调用{@link # acquisition}的专门化版本并将保存的状态作为参数重新获取。
             * <li> If interrupted while blocked in step 4, throw InterruptedException.
             * 如果在步骤4中阻塞时中断,则抛出InterruptedException。
             * </ol>
             */
            public final void await() throws InterruptedException {
                //线程中断,抛出中断异常
                if (Thread.interrupted())
                    throw new InterruptedException();
                //在等待队列中添加一个新的waiter。
                Node node = addConditionWaiter();
                //释放节点(释放锁)
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                //节点不在等待队列中
                while (!isOnSyncQueue(node)) {
                    //阻塞
                    LockSupport.park(this);
                    //检查中断
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                //重新获取锁
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                //如果下一个节点不为null
                if (node.nextWaiter != null) // clean up if cancelled
                    //从条件队列中取消已取消的服务员节点的链接
                    unlinkCancelledWaiters();
                //发生中断
                if (interruptMode != 0)
                    //抛出InterruptedException,重新中断当前线程,或者什么都不做,这取决于模式。
                    reportInterruptAfterWait(interruptMode);
            }

signal()源码翻译:

             /**
             * Moves the longest-waiting thread, if one exists, from the
             * wait queue for this condition to the wait queue for the
             * owning lock.
             * 将等待时间最长的线程(如果存在的话)从该条件的等待队列移动到拥有锁的等待队列。
             *
             * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
             *                                      returns {@code false}
             */
            public final void signal() {
                //如果仅针对当前(调用)线程保持同步,则返回true。
                //没有拿到锁的线程,不能调用此方法
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                //等待时间最长的线程,队首
                Node first = firstWaiter;
                if (first != null)
                    // 将节点从条件队列传输到同步队列,唤醒
                    doSignal(first);
            }
            /**
             * Removes and transfers nodes until hit non-cancelled one or
             * null. Split out from signal in part to encourage compilers
             * to inline the case of no waiters.
             * 删除和传输节点,直到到达不可取消的或null。
             * 从signal中分离出来,部分原因是为了鼓励编译器在没有等待器的情况下内联。
             *
             * @param first (non-null) the first node on condition queue 条件队列上的第一个节点
             */
            private void doSignal(Node first) {
                //循环,直到将节点从条件队列传输到同步队列返回false 和 条件队列上的第一个节点不为空。
                do {
                    //向后移动一次首哨兵,如果当前首哨兵所在节点为空,则将尾哨兵设为空
                    if ((firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    //断开第一个节点与后继节点之间的关系
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&
                        (first = firstWaiter) != null);
            }
        /**
         * Transfers a node from a condition queue onto sync queue.
         * Returns true if successful.
         * 将节点从条件队列传输到同步队列。如果成功返回true。
         *
         * @param node the node
         * @return true if successfully transferred (else the node was
         * cancelled before signal)
         */
        final boolean transferForSignal(Node node) {
            /*
             * If cannot change waitStatus, the node has been cancelled.
             * 如果无法更改等待状态,则节点已被取消。
             */
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;

            /*
             * Splice onto queue and try to set waitStatus of predecessor to
             * indicate that thread is (probably) waiting. If cancelled or
             * attempt to set waitStatus fails, wake up to resync (in which
             * case the waitStatus can be transiently and harmlessly wrong).
             * 将Splice连接到队列,并尝试设置前辈的等待状态,以指示线程(可能)正在等待。、
             * 如果取消或尝试设置等待状态失败,则唤醒并重新同步
             * (在这种情况下,等待状态可能是暂时错误的,并且不会造成任何危害)。
             */
            Node p = enq(node);
            int ws = p.waitStatus;
            //节点的CAS等待状态字段。
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                //解阻塞
                LockSupport.unpark(node.thread);
            return true;
        }

从上面段源码可知,最底层是用LockSupport.park(this)与LockSupport.unpark(node.thread)来阻塞与解阻塞线程。
一个ReenTrantLock对应一个AQS阻塞队列(同步队列),然后对应多个condition。每个condition对应一个条件队列(等待队列)。
这篇就学习到这,后期学习AQS。


来源:https://www.jianshu.com/p/347ea7f881f8

赞(0) 打赏
版权归原创作者所有,任何形式的转载请联系博主:daming_90:Java 技术驿站 » 【锁】Condition接口分析

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏