【J.U.C】线程池之实现原理

扫码关注公众号:Java 技术驿站

发送:vip
将链接复制到本浏览器,永久解锁本站全部文章

【公众号:Java 技术驿站】 【加作者微信交流技术,拉技术群】

文章首发于:clawhub.club


线程池状态

线程池的内部状态由AtomicInteger修饰的ctl表示,其高3位表示线程池的运行状态,低29位表示线程池中的线程数量:
具体看代码注释。

    /**
         * The main pool control state, ctl, is an atomic integer packing
         * two conceptual fields
         * 主池控制状态ctl是一个原子整数,包含两个概念字段:
         * workerCount, indicating the effective number of threads
         * workerCount,指示有效线程数
         * runState,    indicating whether running, shutting down etc
         * runState,指示是否运行、关机等
         * In order to pack them into one int, we limit workerCount to
         * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
         * billion) otherwise representable. If this is ever an issue in
         * the future, the variable can be changed to be an AtomicLong,
         * and the shift/mask constants below adjusted. But until the need
         * arises, this code is a bit faster and simpler using an int.
         * 为了将它们打包成一个整型,我们将workerCount限制为(2^29)-1(大约5亿)个线程,而不是(2^31)-1(20亿)个线程。
         * 如果将来出现这样的问题,可以将变量更改为AtomicLong,并调整下面的shift/mask常量。
         * 但是在需要之前,使用int可以使这段代码更快更简单。
         * <p>
         * The workerCount is the number of workers that have been
         * permitted to start and not permitted to stop.  The value may be
         * transiently different from the actual number of live threads,
         * for example when a ThreadFactory fails to create a thread when
         * asked, and when exiting threads are still performing
         * bookkeeping before terminating. The user-visible pool size is
         * reported as the current size of the workers set.
         * <p>
         * workerCount是允许开始和不允许停止的工作线程数量。
         * 该值可能与实际活动线程的数量存在暂时性差异,
         * 例如,当ThreadFactory在被请求时无法创建线程,
         * 以及退出的线程在终止前仍在执行bookkeeping时。
         * 用户可见的池大小报告为工作线程集的当前大小。
         * <p>
         * The runState provides the main lifecycle control, taking on values:
         * runState提供了主要的生命周期控制,具有以下值:
         * <p>
         * RUNNING:  Accept new tasks and process queued tasks
         * 接受新任务并处理排队的任务
         * SHUTDOWN: Don't accept new tasks, but process queued tasks
         * 不接受新任务,而是处理排队的任务
         * STOP:     Don't accept new tasks, don't process queued tasks,
         * and interrupt in-progress tasks
         * 不接受新任务,不处理排队的任务,中断正在进行的任务
         * TIDYING:  All tasks have terminated, workerCount is zero,
         * the thread transitioning to state TIDYING
         * will run the terminated() hook method
         * 所有任务都已终止,workerCount为零,
         * 过渡到状态TIDYING的线程将运行terminated()钩子方法
         * TERMINATED: terminated() has completed
         * terminated()方法执行完毕
         * <p>
         * The numerical order among these values matters, to allow
         * ordered comparisons. The runState monotonically increases over
         * time, but need not hit each state. The transitions are:
         * 为了允许有序比较,这些值之间的数值顺序很重要。
         * 运行状态会随着时间单调地增加,但不需要达到每个状态。
         * 转换:
         * <p>
         * RUNNING -> SHUTDOWN
         * On invocation of shutdown(), perhaps implicitly in finalize()
         * 在调用shutdown()时,可以隐式地在finalize()中调用
         * (RUNNING or SHUTDOWN) -> STOP
         * On invocation of shutdownNow()
         * 调用shutdownNow()
         * SHUTDOWN -> TIDYING
         * When both queue and pool are empty
         * 当队列和池都为空时
         * STOP -> TIDYING
         * When pool is empty
         * 当池是空的时候
         * TIDYING -> TERMINATED
         * When the terminated() hook method has completed
         * 当terminated()钩子方法完成时
         * <p>
         * Threads waiting in awaitTermination() will return when the
         * state reaches TERMINATED.
         * 当状态达到TERMINATED时,在awaitTermination()中等待的线程将返回。
         * <p>
         * Detecting the transition from SHUTDOWN to TIDYING is less
         * straightforward than you'd like because the queue may become
         * empty after non-empty and vice versa during SHUTDOWN state, but
         * we can only terminate if, after seeing that it is empty, we see
         * that workerCount is 0 (which sometimes entails a recheck -- see
         * below).
         * 检测状态由SHUTDOWN到TIDYING并不是你想的那么简单,因为队列在非空之后可能变为空,在SHUTDOWN状态下也可能变为空,
         * 但我们只能在看到它是空的和 workerCount is 0 情况下终止。有时需要根据上下文重新检查。
         */
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        /**
         * Integer.SIZE为32,COUNT_BITS为29
         */
        private static final int COUNT_BITS = Integer.SIZE - 3;
        /**
         * 29左移一位、减一、等于:2^29-1 最大线程数
         */
        private static final int CAPACITY = (1 << COUNT_BITS) - 1;

        // runState is stored in the high-order bits
        /**
         * 即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
         * 111 0 0000 0000 0000 0000 0000 0000 0000
         * -1 原码:0000 ... 0001 反码:1111 ... 1110 补码:1111 ... 1111
         * 左移操作:后面补 0
         * 111 0 0000 0000 0000 0000 0000 0000 0000
         */
        private static final int RUNNING = -1 << COUNT_BITS;
        /**
         * 即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
         * 000 0 0000 0000 0000 0000 0000 0000 0000
         */
        private static final int SHUTDOWN = 0 << COUNT_BITS;
        /**
         * 即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
         * 001 0 0000 0000 0000 0000 0000 0000 0000
         */
        private static final int STOP = 1 << COUNT_BITS;
        /**
         * 即高3位为010,所有任务都已终止,workerCount为零,过渡到状态TIDYING的线程将运行terminated()钩子方法;
         * 010 0 0000 0000 0000 0000 0000 0000 0000
         */
        private static final int TIDYING = 2 << COUNT_BITS;
        /**
         * 即高3位为011,terminated()方法执行完毕;
         * 011 0 0000 0000 0000 0000 0000 0000 0000
         */
        private static final int TERMINATED = 3 << COUNT_BITS;

        // Packing and unpacking ctl

        /**
         * 根据ctl计算runState
         *
         * @param c ctl
         * @return runState
         */
        private static int runStateOf(int c) {
            //2^29   =  001 0 0000 0000 0000 0000 0000 0000 0000
            //2^29-1 =  000 1 1111 1111 1111 1111 1111 1111 1111
            //~(2^29-1)=111 0 0000 0000 0000 0000 0000 0000 0000
            //假设c为 STOP 001 0 0000 0000 0000 0000 0000 0000 0000
            // 最终值:    001 0 0000 0000 0000 0000 0000 0000 0000
            return c & ~CAPACITY;
        }

        /**
         * 根据ctl计算 workerCount
         *
         * @param c ctl
         * @return workerCount
         */
        private static int workerCountOf(int c) {
            //2^29-1 =  000 1 1111 1111 1111 1111 1111 1111 1111
            //假设c =   000 0 0000 0000 0000 0000 0000 0000 0001  1个线程
            //最终值:  000 0 0000 0000 0000 0000 0000 0000 0001  1
            return c & CAPACITY;
        }

        /**
         * 根据runState和workerCount计算ctl
         *
         * @param rs runState
         * @param wc workerCount
         * @return ctl
         */
        private static int ctlOf(int rs, int wc) {
            //假设 rs: STOP  001 0 0000 0000 0000 0000 0000 0000 0000
            //假设 wc:       000 0 0000 0000 0000 0000 0000 0000 0001  1个线程
            //最终值:       001 0 0000 0000 0000 0000 0000 0000 0001
            return rs | wc;
        }

        /*
         * Bit field accessors that don't require unpacking ctl.
         * These depend on the bit layout and on workerCount being never negative.
         */
        private static boolean runStateLessThan(int c, int s) {
            return c < s;
        }

        private static boolean runStateAtLeast(int c, int s) {
            return c >= s;
        }

        /**
         * RUNNING状态为负数,肯定小于SHUTDOWN
         * @param c ctl
         * @return 线程池是否为运行状态
         */
        private static boolean isRunning(int c) {
            return c < SHUTDOWN;
        }

        /**
         * Attempts to CAS-increment the workerCount field of ctl.
         * 试图增加ctl的workerCount字段。
         */
        private boolean compareAndIncrementWorkerCount(int expect) {
            return ctl.compareAndSet(expect, expect + 1);
        }

        /**
         * Attempts to CAS-decrement the workerCount field of ctl.
         * 尝试减少ctl的workerCount字段。
         */
        private boolean compareAndDecrementWorkerCount(int expect) {
            return ctl.compareAndSet(expect, expect - 1);
        }

        /**
         * Decrements the workerCount field of ctl. This is called only on
         * abrupt termination of a thread (see processWorkerExit). Other
         * decrements are performed within getTask.
         * 递减ctl的workerCount字段。这只在线程突然终止时调用(请参阅processWorkerExit)。
         * 在getTask中执行其他递减。
         */
        private void decrementWorkerCount() {
            do {
            } while (!compareAndDecrementWorkerCount(ctl.get()));
        }

Doug Lea大神的设计啊,感觉计算机的基础真的是数学。

2019102910039\_1.png

线程池内部状态转换图.png

内部类Worker

Worker继承了AbstractQueuedSynchronizer,并且实现了Runnable接口。
维护了以下三个变量,其中completedTasks由volatile修饰。

      /**
             * Thread this worker is running in.  Null if factory fails.
             * 线程这个工作程序正在运行。如果工厂失败,则为空。
             */
            final Thread thread;
            /**
             * Initial task to run.  Possibly null.
             * 要运行的初始任务。可能是null。
             */
            Runnable firstTask;
            /**
             * Per-thread task counter
             * 线程任务计数器
             */
            volatile long completedTasks;

构造方法:

      /**
             * Creates with given first task and thread from ThreadFactory.
             * 使用ThreadFactory中给定的第一个任务和线程创建。
             *
             * @param firstTask the first task (null if none)
             */
            Worker(Runnable firstTask) {
                //禁止中断,直到运行工作程序
                // inhibit interrupts until runWorker
                setState(-1); 
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }

既然实现了Runnable接口,必然实现run方法:

    /**
             * Delegates main run loop to outer runWorker
             * 将主运行循环委托给外部运行工人
             */
            public void run() {
                runWorker(this);
            }
runWorker(Worker w)执行任务

2019102910039\_2.png

runWorker.png

      /**
         * Main worker run loop.  Repeatedly gets tasks from queue and
         * executes them, while coping with a number of issues:
         * 主工作程序运行循环。重复地从队列获取任务并执行它们,同时处理一些问题
         * <p>
         * 1. We may start out with an initial task, in which case we
         * don't need to get the first one. Otherwise, as long as pool is
         * running, we get tasks from getTask. If it returns null then the
         * worker exits due to changed pool state or configuration
         * parameters.  Other exits result from exception throws in
         * external code, in which case completedAbruptly holds, which
         * usually leads processWorkerExit to replace this thread.
         * 我们可能从一个初始任务开始,在这种情况下,我们不需要获得第一个任务。
         * 否则,只要池在运行,我们就从getTask获得任务。
         * 如果返回null,则工作人员将由于更改池状态或配置参数而退出。
         * 其他出口由外部代码中的异常引发,在这种情况下completedAbruptly保持,
         * 这通常导致processWorkerExit替换这个线程。
         * <p>
         * 2. Before running any task, the lock is acquired to prevent
         * other pool interrupts while the task is executing, and then we
         * ensure that unless pool is stopping, this thread does not have
         * its interrupt set.
         * 在运行任何任务之前,都要获取锁,以防止在执行任务时发生其他池中断,
         * 然后确保除非池停止,否则这个线程没有中断集。
         * <p>
         * 3. Each task run is preceded by a call to beforeExecute, which
         * might throw an exception, in which case we cause thread to die
         * (breaking loop with completedAbruptly true) without processing
         * the task.
         * 每个任务运行之前都有一个对beforeExecute的调用,这可能会引发异常,
         * 在这种情况下,我们在不处理任务的情况下导致线程死亡(终止循环completedAbruptly true)。
         * <p>
         * 4. Assuming beforeExecute completes normally, we run the task,
         * gathering any of its thrown exceptions to send to afterExecute.
         * We separately handle RuntimeException, Error (both of which the
         * specs guarantee that we trap) and arbitrary Throwables.
         * Because we cannot rethrow Throwables within Runnable.run, we
         * wrap them within Errors on the way out (to the thread's
         * UncaughtExceptionHandler).  Any thrown exception also
         * conservatively causes thread to die.
         * 假设beforeExecute正常完成,我们运行任务,收集它抛出的任何异常发送给afterExecute。
         * 我们分别处理RuntimeException、Error(规范保证我们捕获了这两个错误)和任意可抛出对象。
         * 因为我们不能在Runnable.run中重新抛出抛出的对象,所以我们在抛出时将它们封装在错误中(到线程的UncaughtExceptionHandler)。
         * 任何抛出的异常也会保守地导致线程死亡。
         * <p>
         * 5. After task.run completes, we call afterExecute, which may
         * also throw an exception, which will also cause thread to
         * die. According to JLS Sec 14.20, this exception is the one that
         * will be in effect even if task.run throws.
         * task.run完成后,我们调用afterExecute,这也可能引发异常,这也会导致线程死亡。
         * 根据JLS Sec 14.20,即使task.run抛出,这个异常也将生效。
         * <p>
         * The net effect of the exception mechanics is that afterExecute
         * and the thread's UncaughtExceptionHandler have as accurate
         * information as we can provide about any problems encountered by
         * user code.
         * 异常机制的净效果是,afterExecute和线程的UncaughtExceptionHandler
         * 拥有关于用户代码遇到的任何问题的尽可能准确的信息。
         *
         * @param w the worker
         */
        final void runWorker(Worker w) {
            //获取当前线程
            Thread wt = Thread.currentThread();
            //获取第一个任务
            Runnable task = w.firstTask;
            //第一个任务位置置空
            w.firstTask = null;
            //因为Worker实现了AQS,此处是释放锁
            //new Worker()是state==-1,此处是调用Worker类的 release(1)方法,将state置为0。
            // Worker中interruptIfStarted()中只有state>=0才允许调用中断
            // allow interrupts
            w.unlock();
            //是否突然完成,如果是由于异常导致的进入finally,那么completedAbruptly==true就是突然完成的
            boolean completedAbruptly = true;
            try {
                //先处理firstTask,之后依次处理其他任务
                while (task != null || (task = getTask()) != null) {
                    //获取锁
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    //如果池停止,确保线程被中断;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    //如果没有,请确保线程没有中断。
                    // 这需要在第二种情况下重新检查,以处理清除中断时的shutdownNow竞争
                    if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        //自定义实现
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            //执行任务
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x;
                            throw x;
                        } catch (Error x) {
                            thrown = x;
                            throw x;
                        } catch (Throwable x) {
                            thrown = x;
                            throw new Error(x);
                        } finally {
                            //自定义实现
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        //任务完成数+1
                        w.completedTasks++;
                        //释放锁
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                //Worker的结束后的处理工作
                processWorkerExit(w, completedAbruptly);
            }
        }
getTask()
     /**
         * Performs blocking or timed wait for a task, depending on
         * current configuration settings, or returns null if this worker
         * must exit because of any of:
         * 根据当前配置设置执行阻塞或定时等待任务,
         * 或者如果该worker因为任何原因必须退出,则返回null
         * 1. There are more than maximumPoolSize workers (due to
         * a call to setMaximumPoolSize).
         * 大于 maximumPoolSize 个 workers(由于调用setMaximumPoolSize)
         * 2. The pool is stopped.
         * 线程池关闭
         * 3. The pool is shutdown and the queue is empty.
         * 线程池关闭了并且队列为空
         * 4. This worker timed out waiting for a task, and timed-out
         * workers are subject to termination (that is,
         * {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
         * both before and after the timed wait, and if the queue is
         * non-empty, this worker is not the last thread in the pool.
         * 这个worker超时等待任务,超时的worker在超时等待之前和之后都可能终止
         * (即,{@code allowCoreThreadTimeOut || workerCount > corePoolSize}),
         * 如果队列不是空的,那么这个worker不是池中的最后一个线程。
         *
         * @return task, or null if the worker must exit, in which case
         * workerCount is decremented 如果worker必须退出,则为空,在这种情况下workerCount将递减
         */
        private Runnable getTask() {
            // Did the last poll() time out?
            boolean timedOut = false;

            for (; ; ) {
                //获取线程池状态
                int c = ctl.get();
                int rs = runStateOf(c);

                // Check if queue empty only if necessary.
                //仅在必要时检查队列是否为空。
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    //递减ctl的workerCount字段
                    decrementWorkerCount();
                    return null;
                }

                //获取workerCount数量
                int wc = workerCountOf(c);

                // Are workers subject to culling?
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

                //线程超时控制
                if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
                    //尝试减少ctl的workerCount字段
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }

                try {
                    //如果有超时控制,则使用带超时时间的poll
                    //否则使用take,没有任务的时候一直阻塞
                    //这两个方法都会抛出InterruptedException
                    Runnable r = timed ?
                            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                            workQueue.take();
                    //有任务就返回
                    if (r != null)
                        return r;
                    //获取任务超时,肯定是走了poll逻辑
                    timedOut = true;
                } catch (InterruptedException retry) {
                    //被中断
                    timedOut = false;
                }
            }
        }
processWorkerExit(Worker w, boolean completedAbruptly)
       /**
         * Performs cleanup and bookkeeping for a dying worker. Called
         * only from worker threads. Unless completedAbruptly is set,
         * assumes that workerCount has already been adjusted to account
         * for exit.  This method removes thread from worker set, and
         * possibly terminates the pool or replaces the worker if either
         * it exited due to user task exception or if fewer than
         * corePoolSize workers are running or queue is non-empty but
         * there are no workers.
         * 为垂死的worker进行清理和bookkeeping。仅从工作线程调用。
         * 除非completedAbruptly被设置,否则假定workerCount已经被调整以考虑退出。
         * 此方法从工作集中移除线程,如果线程池由于用户任务异常而退出,
         * 或者运行的工作池小于corePoolSize,或者队列非空但没有工作池,
         * 则可能终止线程池或替换工作池
         *
         * @param w                 the worker
         * @param completedAbruptly if the worker died due to user exception
         *                          是否突然完成,死于异常
         */
        private void processWorkerExit(Worker w, boolean completedAbruptly) {
            // If abrupt, then workerCount wasn't adjusted
            // true:用户线程运行异常,需要扣减
            // false:getTask方法中扣减线程数量
            if (completedAbruptly)
                //递减ctl的workerCount字段。
                decrementWorkerCount();

            //获取主锁,锁定
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //更新完成任务计数器
                completedTaskCount += w.completedTasks;
                //移除worker
                workers.remove(w);
            } finally {
                //解锁
                mainLock.unlock();
            }

            // 有worker线程移除,可能是最后一个线程退出需要尝试终止线程池
            tryTerminate();

            int c = ctl.get();
            // 如果线程为running或shutdown状态,即tryTerminate()没有成功终止线程池,则判断是否有必要一个worker
            if (runStateLessThan(c, STOP)) {
                // 正常退出,计算min:需要维护的最小线程数量
                if (!completedAbruptly) {
                    // allowCoreThreadTimeOut 默认false:是否需要维持核心线程的数量
                    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                    // 如果min ==0 或者workerQueue为空,min = 1
                    if (min == 0 && !workQueue.isEmpty())
                        min = 1;
                    // 如果线程数量大于最少数量min,直接返回,不需要新增线程
                    if (workerCountOf(c) >= min)
                        return; // replacement not needed
                }
                // 添加一个没有firstTask的worker
                addWorker(null, false);
            }
        }

任务提交

Executor.execute(Runnable command)

Executor接口提供的方法,在将来的某个时候执行给定的命令.该命令可以在新线程、池化线程或调用线程中执行,具体由Executor的实现者决定。

    /**
         * Executes the given command at some time in the future.  The command
         * may execute in a new thread, in a pooled thread, or in the calling
         * thread, at the discretion of the {@code Executor} implementation.
         * 在将来的某个时候执行给定的命令.该命令可以在新线程、池化线程或调用线程中执行,具体由Executor的实现者决定
         * @param command the runnable task
         * @throws RejectedExecutionException if this task cannot be
         * accepted for execution
         * @throws NullPointerException if command is null
         */
        void execute(Runnable command);
ExecutorService.submit(Callable task)

提交一个value-returning任务以执行,并返回一个表示该任务未决结果的Future。Future的{@code get}方法将在成功完成任务后返回任务的结果。

    /**
         * Submits a value-returning task for execution and returns a
         * Future representing the pending results of the task. The
         * Future's {@code get} method will return the task's result upon
         * successful completion.
         * 提交一个value-returning任务以执行,并返回一个表示该任务未决结果的Future。
         * Future的{@code get}方法将在成功完成任务后返回任务的结果。
         * <p>
         * If you would like to immediately block waiting
         * for a task, you can use constructions of the form
         * {@code result = exec.submit(aCallable).get();}
         * 如果希望立即阻塞等待任务,可以使用如下的结构
         * {@code result = exec.submit(aCallable).get();}
         *
         * <p>Note: The {@link Executors} class includes a set of methods
         * that can convert some other common closure-like objects,
         * for example, {@link java.security.PrivilegedAction} to
         * {@link Callable} form so they can be submitted.
         *
         * @param task the task to submit
         * @param <T> the type of the task's result
         * @return a Future representing pending completion of the task
         * @throws RejectedExecutionException if the task cannot be
         *         scheduled for execution
         * @throws NullPointerException if the task is null
         */
        <T> Future<T> submit(Callable<T> task);

任务执行

execute(Runnable command) 任务执行

2019102910039\_3.png

execute.png

     /**
         * Executes the given task sometime in the future.  The task
         * may execute in a new thread or in an existing pooled thread.
         * <p>
         * If the task cannot be submitted for execution, either because this
         * executor has been shutdown or because its capacity has been reached,
         * the task is handled by the current {@code RejectedExecutionHandler}.
         *
         * @param command the task to execute
         * @throws RejectedExecutionException at discretion of
         *                                    {@code RejectedExecutionHandler}, if the task
         *                                    cannot be accepted for execution
         * @throws NullPointerException       if {@code command} is null
         */
        public void execute(Runnable command) {
            //任务为空,抛出异常
            if (command == null)
                throw new NullPointerException();
            /*
             * Proceed in 3 steps:
             * 三步处理:
             *
             * 1. If fewer than corePoolSize threads are running, try to
             * start a new thread with the given command as its first
             * task.  The call to addWorker atomically checks runState and
             * workerCount, and so prevents false alarms that would add
             * threads when it shouldn't, by returning false.
             * 如果运行的线程小于corePoolSize,则尝试用给定的命令作为第一个任务启动一个新线程。
             * 对addWorker的调用原子性地检查runState和workerCount,因此可以通过返回false来防止错误警报,
             * 因为错误警报会在不应该添加线程的时候添加线程。
             *
             * 2. If a task can be successfully queued, then we still need
             * to double-check whether we should have added a thread
             * (because existing ones died since last checking) or that
             * the pool shut down since entry into this method. So we
             * recheck state and if necessary roll back the enqueuing if
             * stopped, or start a new thread if there are none.
             * 如果一个任务可以成功排队,那么我们仍然需要再次检查是否应该添加一个线程
             * (因为自上次检查以来已有的线程已经死亡),或者池在进入这个方法后关闭。
             * 因此,我们重新检查状态,如果必要的话,如果停止,则回滚队列;
             * 如果没有,则启动一个新线程。
             *
             * 3. If we cannot queue task, then we try to add a new
             * thread.  If it fails, we know we are shut down or saturated
             * and so reject the task.
             * 如果无法对任务排队,则尝试添加新线程。
             * 如果它失败了,我们知道pool被关闭或饱和,所以拒绝任务。
             */
            //获取线程控制字段的值
            int c = ctl.get();
            //如果当前工作线程数量少于corePoolSize(核心线程数)
            if (workerCountOf(c) < corePoolSize) {
                //创建新的线程并执行任务,如果成功就返回
                if (addWorker(command, true))
                    return;
                //上一步失败,重新获取ctl
                c = ctl.get();
            }
            //如果线城池正在运行,且入队成功
            if (isRunning(c) && workQueue.offer(command)) {
                //重新获取ctl
                int recheck = ctl.get();
                //如果线程没有运行且删除任务成功
                if (!isRunning(recheck) && remove(command))
                    //拒绝任务
                    reject(command);
                //如果当前的工作线程数量为0,只要还有活动的worker线程,就可以消费workerQueue中的任务
                else if (workerCountOf(recheck) == 0)
                    //第一个参数为null,说明只为新建一个worker线程,没有指定firstTask
                    addWorker(null, false);

                //如果线程池不是running状态 或者 无法入队列
                //尝试开启新线程,扩容至maxPoolSize,如果addWork(command, false)失败了,拒绝当前command
            } else if (!addWorker(command, false))
                reject(command);
        }
addWorker(Runnable firstTask, boolean core)创建线程,执行任务

2019102910039\_4.png

addWorker.png

      /**
         * Checks if a new worker can be added with respect to current
         * pool state and the given bound (either core or maximum). If so,
         * the worker count is adjusted accordingly, and, if possible, a
         * new worker is created and started, running firstTask as its
         * first task. This method returns false if the pool is stopped or
         * eligible to shut down. It also returns false if the thread
         * factory fails to create a thread when asked.  If the thread
         * creation fails, either due to the thread factory returning
         * null, or due to an exception (typically OutOfMemoryError in
         * Thread.start()), we roll back cleanly.
         * 检查是否可以根据当前池状态和给定的界限(核心或最大值)添加新worker,
         * 如果是这样,worker计数将相应地进行调整,如果可能,将创建并启动一个新worker,
         * 并将运行firstTask作为其第一个任务。
         * 如果池已停止或有资格关闭,则此方法返回false。
         * 如果线程工厂在被请求时没有创建线程,则返回false。
         * 如果线程创建失败,要么是由于线程工厂返回null,要么是由于异常
         * (通常是Thread.start()中的OutOfMemoryError)),我们将干净地回滚。
         *
         * @param firstTask the task the new thread should run first (or
         *                  null if none). Workers are created with an initial first task
         *                  (in method execute()) to bypass queuing when there are fewer
         *                  than corePoolSize threads (in which case we always start one),
         *                  or when the queue is full (in which case we must bypass queue).
         *                  Initially idle threads are usually created via
         *                  prestartCoreThread or to replace other dying workers.
         *                  新线程应该首先运行的任务(如果没有,则为null)
         *                  当线程数少于corePoolSize线程时(在方法execute()中),
         *                  或者当队列已满时(在这种情况下,我们必须绕过队列),
         *                  使用初始的第一个任务创建worker(在方法execute()中)来绕过队列。
         *                  最初,空闲线程通常是通过prestartCoreThread创建的,或者用来替换其他垂死的worker。
         * @param core      if true use corePoolSize as bound, else
         *                  maximumPoolSize. (A boolean indicator is used here rather than a
         *                  value to ensure reads of fresh values after checking other pool
         *                  state).
         *                  如果为true,则用corePoolSize绑定,如果是false,则用maximumPoolSize。
         *                  (这里使用布尔指示符而不是值来确保在检查其他池状态后读取新值)
         * @return true if successful
         */
        private boolean addWorker(Runnable firstTask, boolean core) {
            //好久没见过这种写法了
            retry:
            //线程池状态与工作线程数量处理
            //worker数量+1
            for (; ; ) {
                //获取当前线程池状态与线程数
                int c = ctl.get();
                //获取当前线程池状态
                int rs = runStateOf(c);

                // Check if queue empty only if necessary. 仅在必要时检查队列是否为空。
                //如果池子处于SHUTDOWN,STOP,TIDYING,TERMINATED的时候 不处理提交的任务
                //判断线程池是否可以添加worker线程
                if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
                    return false;
                //线程池处于工作状态
                for (; ; ) {
                    //获取工作线程数量
                    int wc = workerCountOf(c);

                    //如果线程数量超过最大值或者超过corePoolSize或者超过maximumPoolSize 拒绝执行任务
                    if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;

                    //试图增加ctl的workerCount字段
                    if (compareAndIncrementWorkerCount(c))
                        //中断外层循环
                        break retry;
                    // Re-read ctl
                    c = ctl.get();
                    //如果当前线程池状态已经改变
                    if (runStateOf(c) != rs)
                        //继续外层循环
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                    //否则CAS因workerCount更改而失败;重试内循环
                }
            }

            //添加到worker线程集合,并启动线程
            //工作线程状态
            boolean workerStarted = false;
            boolean workerAdded = false;
            //继承AQS并实现了Runnable接口
            Worker w = null;
            try {
                //将任务封装
                w = new Worker(firstTask);
                //获取当前线程
                final Thread t = w.thread;
                if (t != null) {
                    //获取全局锁
                    final ReentrantLock mainLock = this.mainLock;
                    //全局锁定
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.持锁时重新检查。
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.退出ThreadFactory故障,或者在获取锁之前关闭。
                        int rs = runStateOf(ctl.get());

                        //如果当前线程池关闭了
                        if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                            // precheck that t is startable
                            //测试该线程是否活动。如果线程已经启动并且还没有死,那么它就是活的。
                            if (t.isAlive())
                                throw new IllegalThreadStateException();
                            //入工作线程池
                            workers.add(w);
                            int s = workers.size();
                            //跟踪最大的池大小
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            //状态
                            workerAdded = true;
                        }
                    } finally {
                        //释放锁
                        mainLock.unlock();
                    }
                    //如果工作线程加入成功,开始线程的执行,并设置状态
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                //判断工作线程是否启动成功
                if (!workerStarted)
                    //回滚工作线程创建
                    addWorkerFailed(w);
            }
            //返回工作线程状态
            return workerStarted;
        }
addWorkerFailed(Worker w)回滚工作线程创建
     /**
         * Rolls back the worker thread creation.
         * 回滚工作线程创建
         * - removes worker from workers, if present
         * 如果存在,则从worker中移除worker
         * - decrements worker count
         * 递减ctl的workerCount字段。
         * - rechecks for termination, in case the existence of this
         * worker was holding up termination
         * 重新检查终止,以防这个worker的存在导致终止
         */
        private void addWorkerFailed(Worker w) {
            //获取全局锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //如果存在,则从worker中移除worker
                if (w != null)
                    workers.remove(w);
                //递减ctl的workerCount字段。
                decrementWorkerCount();
                //重新检查终止
                tryTerminate();
            } finally {
                mainLock.unlock();
            }
        }
tryTerminate()重新检查终止
     /**
         * Transitions to TERMINATED state if either (SHUTDOWN and pool
         * and queue empty) or (STOP and pool empty).  If otherwise
         * eligible to terminate but workerCount is nonzero, interrupts an
         * idle worker to ensure that shutdown signals propagate. This
         * method must be called following any action that might make
         * termination possible -- reducing worker count or removing tasks
         * from the queue during shutdown. The method is non-private to
         * allow access from ScheduledThreadPoolExecutor.
         * 如果是SHUTDOWN或者STOP 且池子为空,转为TERMINATED状态。
         * 如果有条件终止,但是workerCount不为零,则中断空闲worker,以确保关机信号传播。
         * 必须在任何可能使终止成为可能的操作之后调用此方法--在关机期间减少worker数量或从队列中删除任务。
         * 该方法是非私有的,允许从ScheduledThreadPoolExecutor访问。
         */
        final void tryTerminate() {
            for (; ; ) {
                int c = ctl.get();
                //如果线程池处于运行中,或者阻塞队列中仍有任务,返回
                if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
                    return;

                // Eligible to terminate,还有工作线程
                if (workerCountOf(c) != 0) {
                    //中断空闲工作线程
                    interruptIdleWorkers(ONLY_ONE);
                    return;
                }

                //获取全局锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    //设置ctl状态TIDYING
                    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                        try {
                            //方法在执行程序终止时调用,默认什么都不执行
                            terminated();
                        } finally {
                            //完成terminated()方法,状态为TERMINATED
                            ctl.set(ctlOf(TERMINATED, 0));
                            //唤醒所有等待条件的节点
                            termination.signalAll();
                        }
                        return;
                    }
                } finally {
                    mainLock.unlock();
                }
                // else retry on failed CAS
            }
        }
     /**
         * Method invoked when the Executor has terminated.  Default
         * implementation does nothing. Note: To properly nest multiple
         * overridings, subclasses should generally invoke
         * {@code super.terminated} within this method.
         * 方法在执行程序终止时调用
         */
        protected void terminated() {
        }
interruptIdleWorkers(boolean onlyOne)

        /**
         * Interrupts threads that might be waiting for tasks (as
         * indicated by not being locked) so they can check for
         * termination or configuration changes. Ignores
         * SecurityExceptions (in which case some threads may remain
         * uninterrupted).
         * 中断可能正在等待任务的线程(没有被锁住)这样他们就可以检查终止或配置更改。
         * 忽略securityexception(在这种情况下,一些线程可能保持不间断)。
         *
         * @param onlyOne If true, interrupt at most one worker. This is
         *                called only from tryTerminate when termination is otherwise
         *                enabled but there are still other workers.  In this case, at
         *                most one waiting worker is interrupted to propagate shutdown
         *                signals in case all threads are currently waiting.
         *                Interrupting any arbitrary thread ensures that newly arriving
         *                workers since shutdown began will also eventually exit.
         *                To guarantee eventual termination, it suffices to always
         *                interrupt only one idle worker, but shutdown() interrupts all
         *                idle workers so that redundant workers exit promptly, not
         *                waiting for a straggler task to finish.
         *                If true, 最多打断一个worker。
         *                只有在以其他方式启用终止时,才从tryTerminate调用这个函数,但是仍然有其他worker。
         */
        private void interruptIdleWorkers(boolean onlyOne) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers) {
                    Thread t = w.thread;
                    if (!t.isInterrupted() && w.tryLock()) {
                        try {
                            t.interrupt();
                        } catch (SecurityException ignore) {
                        } finally {
                            w.unlock();
                        }
                    }
                    if (onlyOne)
                        break;
                }
            } finally {
                mainLock.unlock();
            }
        }

reject(Runnable command)拒绝策略

       /**
         * Invokes the rejected execution handler for the given command.
         * Package-protected for use by ScheduledThreadPoolExecutor.
         * 为给定的命令调用被拒绝的执行处理程序
         */
        final void reject(Runnable command) {
            handler.rejectedExecution(command, this);
        }

来源:https://www.jianshu.com/p/347ea7f881f8

赞(0) 打赏
版权归原创作者所有,任何形式的转载请联系博主:daming_90:Java 技术驿站 » 【J.U.C】线程池之实现原理

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏