Tomcat源码分析【十二】请求处理过程分析之Poller

扫码关注公众号:Java 技术驿站

发送:vip
将链接复制到本浏览器,永久解锁本站全部文章

【公众号:Java 技术驿站】 【加作者微信交流技术,拉技术群】

文章首发于:clawhub.club


上一篇分析完Acceptor,将获取的Socket封装成NioChannel注册到Poller,在注册的过程中NioChannel会封装成PollerEvent。
本篇主要看PollerEvent是怎么处理的。

Poller类实现了Runnable接口,所以主要看其run方法:

      /**
             * The background thread that adds sockets to the Poller, checks the
             * poller for triggered events and hands the associated socket off to an
             * appropriate processor as events occur.
             * 后台线程将套接字添加到轮询器,检查轮询器是否触发事件,并在事件发生时将关联的套接字交给适当的处理器。
             */
            @Override
            public void run() {
                // Loop until destroy() is called
                while (true) {
                    boolean hasEvents = false;
                    try {
                        //没有关闭
                        if (!close) {
                            //处理轮询器事件队列中的事件。
                            hasEvents = events();
                            if (wakeupCounter.getAndSet(-1) > 0) {
                                //if we are here, means we have other stuff to do
                                //do a non blocking select
                                //非阻塞,只要有通道就绪就立刻返回。
                                keyCount = selector.selectNow();
                            } else {
                                //阻塞到至少有一个通道在你注册的事件上就绪了,最长阻塞时间为selectorTimeout毫秒。
                                keyCount = selector.select(selectorTimeout);
                            }
                            wakeupCounter.set(0);
                        }
                        //关闭
                        if (close) {
                            //处理轮询器事件队列中的事件。
                            events();
                            //超时的处理
                            timeout(0, false);
                            try {
                                selector.close();
                            } catch (IOException ioe) {
                                log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
                            }
                            break;
                        }
                    } catch (Throwable x) {
                        ExceptionUtils.handleThrowable(x);
                        log.error("", x);
                        continue;
                    }
                    //either we timed out or we woke up, process events first
                    //要么超时,要么醒来,先处理事件
                    if (keyCount == 0) hasEvents = (hasEvents | events());

                    //获取每一个SelectionKey,并处理
                    Iterator<SelectionKey> iterator =
                            keyCount > 0 ? selector.selectedKeys().iterator() : null;
                    // Walk through the collection of ready keys and dispatch
                    // any active event.
                    while (iterator != null && iterator.hasNext()) {
                        SelectionKey sk = iterator.next();
                        //附件,里面有数据
                        NioSocketWrapper attachment = (NioSocketWrapper) sk.attachment();
                        // Attachment may be null if another thread has called
                        // cancelledKey()
                        if (attachment == null) {
                            iterator.remove();
                        } else {
                            iterator.remove();
                            //处理数据
                            processKey(sk, attachment);
                        }
                    }//while

                    //process timeouts
                    timeout(keyCount, hasEvents);
                }//while
                // Poller销毁,计数器减一
                getStopLatch().countDown();
            }

主要就是获取队列中的事件,处理事件,这里看两个方法:

events

处理轮询器事件队列中的事件。

      /**
             * Processes events in the event queue of the Poller.
             * 处理轮询器事件队列中的事件。
             *
             * @return <code>true</code> if some events were processed,
             * <code>false</code> if queue was empty
             */
            public boolean events() {
                boolean result = false;

                PollerEvent pe = null;
                for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++) {
                    result = true;
                    try {
                        //PollerEvent也实现了Runnable接口
                        pe.run();
                        //重置
                        pe.reset();
                        if (running && !paused) {
                            eventCache.push(pe);
                        }
                    } catch (Throwable x) {
                        log.error("", x);
                    }
                }

                return result;
            }

继续看PollerEvent的run方法:

     @Override
            public void run() {
                //注册操作
                if (interestOps == OP_REGISTER) {
                    try {
                        //注册Selector,读操作
                        socket.getIOChannel().register(
                                socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
                    } catch (Exception x) {
                        log.error(sm.getString("endpoint.nio.registerFail"), x);
                    }
                } else {
                    final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
                    try {
                        if (key == null) {
                            // The key was cancelled (e.g. due to socket closure)
                            // and removed from the selector while it was being
                            // processed. Count down the connections at this point
                            // since it won't have been counted down when the socket
                            // closed.
                            socket.socketWrapper.getEndpoint().countDownConnection();
                            ((NioSocketWrapper) socket.socketWrapper).closed = true;
                        } else {
                            final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
                            if (socketWrapper != null) {
                                //we are registering the key to start with, reset the fairness counter.
                                int ops = key.interestOps() | interestOps;
                                socketWrapper.interestOps(ops);
                                //设置interest值
                                key.interestOps(ops);
                            } else {
                                socket.getPoller().cancelledKey(key);
                            }
                        }
                    } catch (CancelledKeyException ckx) {
                        try {
                            socket.getPoller().cancelledKey(key);
                        } catch (Exception ignore) {
                        }
                    }
                }
            }

这一步可以说是为了后面处理数据做铺垫。

processKey(sk, attachment)

    protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
                try {
                    if (close) {
                        //取消键,里面各种关闭
                        cancelledKey(sk);
                    } else if (sk.isValid() && attachment != null) {
                        //有效且有数据
                        if (sk.isReadable() || sk.isWritable()) {
                            //有文件时
                            if (attachment.getSendfileData() != null) {
                                processSendfile(sk, attachment, false);
                            } else {
                                //取消注册
                                unreg(sk, attachment, sk.readyOps());
                                boolean closeSocket = false;
                                // Read goes before write
                                if (sk.isReadable()) {
                                    //处理可读
                                    if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
                                        closeSocket = true;
                                    }
                                }
                                //可写
                                if (!closeSocket && sk.isWritable()) {
                                    if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
                                        closeSocket = true;
                                    }
                                }
                                if (closeSocket) {
                                    cancelledKey(sk);
                                }
                            }
                        }
                    } else {
                        //invalid key
                        cancelledKey(sk);
                    }
                } catch (CancelledKeyException ckx) {
                    cancelledKey(sk);
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    log.error("", t);
                }
            }

可以看出主要是调用processSocket方法:

     /**
         * Process the given SocketWrapper with the given status. Used to trigger
         * processing as if the Poller (for those endpoints that have one)
         * selected the socket.
         *
         * @param socketWrapper The socket wrapper to process
         * @param event         The socket event to be processed
         * @param dispatch      Should the processing be performed on a new
         *                          container thread
         *
         * @return if processing was triggered successfully
         */
        public boolean processSocket(SocketWrapperBase<S> socketWrapper,
                SocketEvent event, boolean dispatch) {
            try {
                if (socketWrapper == null) {
                    return false;
                }
                SocketProcessorBase<S> sc = processorCache.pop();
                if (sc == null) {
                    sc = createSocketProcessor(socketWrapper, event);
                } else {
                    sc.reset(socketWrapper, event);
                }
                Executor executor = getExecutor();
                if (dispatch && executor != null) {
                    executor.execute(sc);
                } else {
                    sc.run();
                }
            } catch (RejectedExecutionException ree) {
                getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
                return false;
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                // This means we got an OOM or similar creating a thread, or that
                // the pool and its queue are full
                getLog().error(sm.getString("endpoint.process.fail"), t);
                return false;
            }
            return true;
        }

这里将attachment和SocketEvent封装成SocketProcessor,交给Executor(即工作线程)处理。
下一篇继续分析工作线程的操作流程。


来源:https://www.jianshu.com/u/9632919f32c3

赞(0) 打赏
版权归原创作者所有,任何形式的转载请联系博主:daming_90:Java 技术驿站 » Tomcat源码分析【十二】请求处理过程分析之Poller

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏