Tomcat源码分析【十三】请求处理过程分析之SocketProcessor

扫码关注公众号:Java 技术驿站

发送:vip
将链接复制到本浏览器,永久解锁本站全部文章

【公众号:Java 技术驿站】 【加作者微信交流技术,拉技术群】

文章首发于:clawhub.club


Acceptor监听指定端口,获取到SocketChannel,将其封装成NioChannel注册到Poller;
Poller接收到Acceptor发过来的NioChannel,将其封装成PollerEvent放入本地队列;
Poller消费并处理本地队列PollerEvent,组装SocketProcessor对象,交由工作线程池Executor处理。

本篇主要分析工作线程池处理SocketProcessor对象。
先贴上Poller与工作线程池交接的方法processSocket:

     /**
         * Process the given SocketWrapper with the given status. Used to trigger
         * processing as if the Poller (for those endpoints that have one)
         * selected the socket.
         *
         * @param socketWrapper The socket wrapper to process
         * @param event         The socket event to be processed
         * @param dispatch      Should the processing be performed on a new
         *                          container thread
         *
         * @return if processing was triggered successfully
         */
        public boolean processSocket(SocketWrapperBase<S> socketWrapper,
                SocketEvent event, boolean dispatch) {
            try {
                if (socketWrapper == null) {
                    return false;
                }
                //尝试使用缓存中的SocketProcessor
                SocketProcessorBase<S> sc = processorCache.pop();
                if (sc == null) {
                    //创建一个新的SocketProcessor
                    sc = createSocketProcessor(socketWrapper, event);
                } else {
                    //重置
                    sc.reset(socketWrapper, event);
                }
                //获取工作线程池
                Executor executor = getExecutor();
                if (dispatch && executor != null) {
                    //在线程池中执行SocketProcessor任务
                    executor.execute(sc);
                } else {
                    //在当前线程执行SocketProcessor的run方法
                    sc.run();
                }
            } catch (RejectedExecutionException ree) {
                getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
                return false;
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                // This means we got an OOM or similar creating a thread, or that
                // the pool and its queue are full
                getLog().error(sm.getString("endpoint.process.fail"), t);
                return false;
            }
            return true;
        }

这个方法位于AbstractEndpoint抽象类中,这里的executor最初由NioEndpoint的生命周期钩子startInternal方法中创建:

     @Override
        public void startInternal() throws Exception {
            if (!running) {
                //略.......
                // Create worker collection
                if (getExecutor() == null) {
                    createExecutor();
                }
                //初始化连接限制
                initializeConnectionLatch();
                // Start poller threads
                //轮询器启动线程,都为守护线程
                pollers = new Poller[getPollerThreadCount()];
                for (int i = 0; i < pollers.length; i++) {
                    pollers[i] = new Poller();
                    Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-" + i);
                    pollerThread.setPriority(threadPriority);
                    pollerThread.setDaemon(true);
                    pollerThread.start();
                }
                //开始所有的Acceptor线程,用于监听套接字的
                startAcceptorThreads();
            }
        }
    public void createExecutor() {
            internalExecutor = true;
            TaskQueue taskqueue = new TaskQueue();
            TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
            executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
            taskqueue.setParent( (ThreadPoolExecutor) executor);
        }

SocketProcessor实现了Runnable接口,所以本篇入口在此:

     @Override
        public final void run() {
            synchronized (socketWrapper) {
                // It is possible that processing may be triggered for read and
                // write at the same time. The sync above makes sure that processing
                // does not occur in parallel. The test below ensures that if the
                // first event to be processed results in the socket being closed,
                // the subsequent events are not processed.
                if (socketWrapper.isClosed()) {
                    return;
                }
                doRun();
            }
        }
     @Override
            protected void doRun() {
                //获取用户连接
                NioChannel socket = socketWrapper.getSocket();
                SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());

                try {
                    //https的握手行为
                    int handshake = -1;

                    try {
                        if (key != null) {
                            if (socket.isHandshakeComplete()) {
                                // No TLS handshaking required. Let the handler
                                // process this socket / event combination.
                                handshake = 0;
                            } else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
                                    event == SocketEvent.ERROR) {
                                // Unable to complete the TLS handshake. Treat it as
                                // if the handshake failed.
                                handshake = -1;
                            } else {
                                handshake = socket.handshake(key.isReadable(), key.isWritable());
                                // The handshake process reads/writes from/to the
                                // socket. status may therefore be OPEN_WRITE once
                                // the handshake completes. However, the handshake
                                // happens when the socket is opened so the status
                                // must always be OPEN_READ after it completes. It
                                // is OK to always set this as it is only used if
                                // the handshake completes.
                                event = SocketEvent.OPEN_READ;
                            }
                        }
                    } catch (IOException x) {
                        handshake = -1;
                        if (log.isDebugEnabled()) log.debug("Error during SSL handshake", x);
                    } catch (CancelledKeyException ckx) {
                        handshake = -1;
                    }
                    //握手完成,或者不需要握手时 handshake == 0
                    if (handshake == 0) {
                        SocketState state = SocketState.OPEN;
                        // Process the request from this socket
                        if (event == null) {
                            //默认是读事件处理
                            state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
                        } else {
                            //相应指定事件
                            state = getHandler().process(socketWrapper, event);
                        }
                        if (state == SocketState.CLOSED) {
                            close(socket, key);
                        }
                    } else if (handshake == -1) {
                        close(socket, key);
                    } else if (handshake == SelectionKey.OP_READ) {
                        socketWrapper.registerReadInterest();
                    } else if (handshake == SelectionKey.OP_WRITE) {
                        socketWrapper.registerWriteInterest();
                    }
                } catch (CancelledKeyException cx) {
                    socket.getPoller().cancelledKey(key);
                } catch (VirtualMachineError vme) {
                    ExceptionUtils.handleThrowable(vme);
                } catch (Throwable t) {
                    log.error("", t);
                    socket.getPoller().cancelledKey(key);
                } finally {
                    socketWrapper = null;
                    event = null;
                    //return to cache
                    if (running && !paused) {
                        //缓存
                        processorCache.push(this);
                    }
                }
            }
        }

看下面这句代码:

     //相应指定事件
    state = getHandler().process(socketWrapper, event);

getHandler()返回的是ConnectionHandler,实现了AbstractEndpoint.Handler接口,位于AbstractProtocol抽象类中。
调用其process方法,这个方法实在是太长了,我只简单的截取一些:

      @Override
            public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
                //略......

                S socket = wrapper.getSocket();

                //略......
                try {
                    //略......
                    //创建一个新的Http11Processor
                    if (processor == null) {
                        processor = getProtocol().createProcessor();
                        register(processor);
                    }

                    //略......
                    SocketState state = SocketState.CLOSED;
                    //只要状态为 SocketState.UPGRADING,一直循环
                    do {
                        //处理核心
                        state = processor.process(wrapper, status);
                      //略......

                    } while (state == SocketState.UPGRADING);

                     //略......
            }

获取到Http11Processor,并调用其process方法,在其父类AbstractProcessorLight中

     @Override
        public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status)
                throws IOException {

            SocketState state = SocketState.CLOSED;
            Iterator<DispatchType> dispatches = null;
            do {
                if (dispatches != null) {
                    DispatchType nextDispatch = dispatches.next();
                    //处理不在标准HTTP模式下的正在处理中的请求。
                    //目前使用的包括Servlet 3.0异步和HTTP升级连接
                    //将来可能会增加更多的用途。这些通常以HTTP请求开始。
                    state = dispatch(nextDispatch.getSocketStatus());
                } else if (status == SocketEvent.DISCONNECT) {
                    // Do nothing here, just wait for it to get recycled
                } else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
                    state = dispatch(status);
                    if (state == SocketState.OPEN) {
                        // There may be pipe-lined data to read. If the data isn't
                        // processed now, execution will exit this loop and call
                        // release() which will recycle the processor (and input
                        // buffer) deleting any pipe-lined data. To avoid this,
                        // process it now.
                        state = service(socketWrapper);
                    }
                } else if (status == SocketEvent.OPEN_WRITE) {
                    // Extra write event likely after async, ignore
                    state = SocketState.LONG;
                } else if (status == SocketEvent.OPEN_READ){
                    state = service(socketWrapper);
                } else {
                    // Default to closing the socket if the SocketEvent passed in
                    // is not consistent with the current state of the Processor
                    state = SocketState.CLOSED;
                }

                if (getLog().isDebugEnabled()) {
                    getLog().debug("Socket: [" + socketWrapper +
                            "], Status in: [" + status +
                            "], State out: [" + state + "]");
                }

                if (state != SocketState.CLOSED && isAsync()) {
                    state = asyncPostProcess();
                    if (getLog().isDebugEnabled()) {
                        getLog().debug("Socket: [" + socketWrapper +
                                "], State after async post processing: [" + state + "]");
                    }
                }

                if (dispatches == null || !dispatches.hasNext()) {
                    // Only returns non-null iterator if there are
                    // dispatches to process.
                    dispatches = getIteratorAndClearDispatches();
                }
            } while (state == SocketState.ASYNC_END ||
                    dispatches != null && state != SocketState.CLOSED);

            return state;
        }

调用子类Http11Processor的service方法:

     @Override
        public SocketState service(SocketWrapperBase<?> socketWrapper)
                throws IOException {
            //request:org.apache.coyote.Request
            //结构,其中包含请求和响应对象。
            RequestInfo rp = request.getRequestProcessor();
            //解析状态
            rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);

            // Setting up the I/O
            //初始化输入输出流
            setSocketWrapper(socketWrapper);
            inputBuffer.init(socketWrapper);
            outputBuffer.init(socketWrapper);

            // Flags
            keepAlive = true;
            openSocket = false;
            readComplete = true;
            boolean keptAlive = false;
            SendfileState sendfileState = SendfileState.DONE;

            //没有错误状态; keepAlive = true;同步的;非更新HTTP连接后使用的新协议的实例;文件处理完毕;没有被暂停
            while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null &&
                    sendfileState == SendfileState.DONE && !protocol.isPaused()) {

                // Parsing the request header
                //解析请求头
                try {
                    if (!inputBuffer.parseRequestLine(keptAlive, protocol.getConnectionTimeout(),
                            protocol.getKeepAliveTimeout())) {
                        if (inputBuffer.getParsingRequestLinePhase() == -1) {
                            return SocketState.UPGRADING;
                        } else if (handleIncompleteRequestLineRead()) {
                            break;
                        }
                    }

                    if (protocol.isPaused()) {
                        //response:org.apache.coyote.Response
                        // 503 - Service unavailable
                        //服务无效;无法提供服务;找不到服务器
                        response.setStatus(503);
                        setErrorState(ErrorState.CLOSE_CLEAN, null);
                    } else {
                        keptAlive = true;
                        // Set this every time in case limit has been changed via JMX
                        request.getMimeHeaders().setLimit(protocol.getMaxHeaderCount());
                        if (!inputBuffer.parseHeaders()) {
                            // We've read part of the request, don't recycle it
                            // instead associate it with the socket
                            openSocket = true;
                            readComplete = false;
                            break;
                        }
                        if (!protocol.getDisableUploadTimeout()) {
                            socketWrapper.setReadTimeout(protocol.getConnectionUploadTimeout());
                        }
                    }
                } catch (IOException e) {
                    if (log.isDebugEnabled()) {
                        log.debug(sm.getString("http11processor.header.parse"), e);
                    }
                    setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e);
                    break;
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    UserDataHelper.Mode logMode = userDataHelper.getNextMode();
                    if (logMode != null) {
                        String message = sm.getString("http11processor.header.parse");
                        switch (logMode) {
                            case INFO_THEN_DEBUG:
                                message += sm.getString("http11processor.fallToDebug");
                                //$FALL-THROUGH$
                            case INFO:
                                log.info(message, t);
                                break;
                            case DEBUG:
                                log.debug(message, t);
                        }
                    }
                    // 400 - Bad Request
                    response.setStatus(400);
                    setErrorState(ErrorState.CLOSE_CLEAN, t);
                }

                // Has an upgrade been requested?
                //升级协议
                Enumeration<String> connectionValues = request.getMimeHeaders().values("Connection");
                boolean foundUpgrade = false;
                while (connectionValues.hasMoreElements() && !foundUpgrade) {
                    foundUpgrade = connectionValues.nextElement().toLowerCase(
                            Locale.ENGLISH).contains("upgrade");
                }

                if (foundUpgrade) {
                    // Check the protocol
                    String requestedProtocol = request.getHeader("Upgrade");

                    UpgradeProtocol upgradeProtocol = protocol.getUpgradeProtocol(requestedProtocol);
                    if (upgradeProtocol != null) {
                        if (upgradeProtocol.accept(request)) {
                            //101
                            response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
                            response.setHeader("Connection", "Upgrade");
                            response.setHeader("Upgrade", requestedProtocol);
                            action(ActionCode.CLOSE, null);
                            getAdapter().log(request, response, 0);

                            InternalHttpUpgradeHandler upgradeHandler =
                                    upgradeProtocol.getInternalUpgradeHandler(
                                            socketWrapper, getAdapter(), cloneRequest(request));
                            UpgradeToken upgradeToken = new UpgradeToken(upgradeHandler, null, null);
                            action(ActionCode.UPGRADE, upgradeToken);
                            return SocketState.UPGRADING;
                        }
                    }
                }

                if (getErrorState().isIoAllowed()) {
                    // Setting up filters, and parse some request headers
                    //准备阶段
                    rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
                    try {
                        //很长的方法,各种解析请求,组装request
                        prepareRequest();
                    } catch (Throwable t) {
                        ExceptionUtils.handleThrowable(t);
                        if (log.isDebugEnabled()) {
                            log.debug(sm.getString("http11processor.request.prepare"), t);
                        }
                        // 500 - Internal Server Error
                        response.setStatus(500);
                        setErrorState(ErrorState.CLOSE_CLEAN, t);
                    }
                }

                int maxKeepAliveRequests = protocol.getMaxKeepAliveRequests();
                if (maxKeepAliveRequests == 1) {
                    keepAlive = false;
                } else if (maxKeepAliveRequests > 0 &&
                        socketWrapper.decrementKeepAlive() <= 0) {
                    keepAlive = false;
                }

                // Process the request in the adapter
                if (getErrorState().isIoAllowed()) {
                    try {
                        //服务处理阶段
                        rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
                        //获取CoyoteAdapter,调用其service方法
                        getAdapter().service(request, response);
                        // Handle when the response was committed before a serious
                        // error occurred.  Throwing a ServletException should both
                        // set the status to 500 and set the errorException.
                        // If we fail here, then the response is likely already
                        // committed, so we can't try and set headers.
                        if (keepAlive && !getErrorState().isError() && !isAsync() &&
                                statusDropsConnection(response.getStatus())) {
                            setErrorState(ErrorState.CLOSE_CLEAN, null);
                        }
                    } catch (InterruptedIOException e) {
                        setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e);
                    } catch (HeadersTooLargeException e) {
                        log.error(sm.getString("http11processor.request.process"), e);
                        // The response should not have been committed but check it
                        // anyway to be safe
                        if (response.isCommitted()) {
                            setErrorState(ErrorState.CLOSE_NOW, e);
                        } else {
                            response.reset();
                            response.setStatus(500);
                            setErrorState(ErrorState.CLOSE_CLEAN, e);
                            response.setHeader("Connection", "close"); // TODO: Remove
                        }
                    } catch (Throwable t) {
                        ExceptionUtils.handleThrowable(t);
                        log.error(sm.getString("http11processor.request.process"), t);
                        // 500 - Internal Server Error
                        response.setStatus(500);
                        setErrorState(ErrorState.CLOSE_CLEAN, t);
                        getAdapter().log(request, response, 0);
                    }
                }

                // Finish the handling of the request
                //完成请求的处理
                rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);
                if (!isAsync()) {
                    // If this is an async request then the request ends when it has
                    // been completed. The AsyncContext is responsible for calling
                    // endRequest() in that case.
                    endRequest();
                }
                rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);

                // If there was an error, make sure the request is counted as
                // and error, and update the statistics counter
                if (getErrorState().isError()) {
                    response.setStatus(500);
                }

                if (!isAsync() || getErrorState().isError()) {
                    request.updateCounters();
                    if (getErrorState().isIoAllowed()) {
                        inputBuffer.nextRequest();
                        outputBuffer.nextRequest();
                    }
                }

                if (!protocol.getDisableUploadTimeout()) {
                    int connectionTimeout = protocol.getConnectionTimeout();
                    if (connectionTimeout > 0) {
                        socketWrapper.setReadTimeout(connectionTimeout);
                    } else {
                        socketWrapper.setReadTimeout(0);
                    }
                }

                //keepalive阶段
                rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);

                //如果需要,触发sendfile处理。
                sendfileState = processSendfile(socketWrapper);
            }

            //结束阶段
            rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);

            if (getErrorState().isError() || protocol.isPaused()) {
                return SocketState.CLOSED;
            } else if (isAsync()) {
                return SocketState.LONG;
            } else if (isUpgrade()) {
                return SocketState.UPGRADING;
            } else {
                if (sendfileState == SendfileState.PENDING) {
                    return SocketState.SENDFILE;
                } else {
                    if (openSocket) {
                        if (readComplete) {
                            return SocketState.OPEN;
                        } else {
                            return SocketState.LONG;
                        }
                    } else {
                        return SocketState.CLOSED;
                    }
                }
            }
        }

巨无霸方法,主要是组装org.apache.coyote.Request与org.apache.coyote.Response,
这里着重关心业务处理阶段,即获取CoyoteAdapter,并调用其service方。这里的CoyoteAdapter是在初始化Connector的时候创建的。

     @Override
        public void service(org.apache.coyote.Request req, org.apache.coyote.Response res)
                throws Exception {

            //org.apache.catalina.connector.Request,实现了javax.servlet.http.HttpServletRequest接口
            Request request = (Request) req.getNote(ADAPTER_NOTES);
            //org.apache.catalina.connector.Response,实现了javax.servlet.http.HttpServletResponse接口
            Response response = (Response) res.getNote(ADAPTER_NOTES);

            //准备阶段
            if (request == null) {
                //创建请求与响应
                // Create objects
                request = connector.createRequest();
                request.setCoyoteRequest(req);
                response = connector.createResponse();
                response.setCoyoteResponse(res);

                // Link objects,关联关系确定
                request.setResponse(response);
                response.setRequest(request);

                // Set as notes
                req.setNote(ADAPTER_NOTES, request);
                res.setNote(ADAPTER_NOTES, response);

                // Set query string encoding
                //编码方式
                req.getParameters().setQueryStringCharset(connector.getURICharset());
            }

            if (connector.getXpoweredBy()) {
                response.addHeader("X-Powered-By", POWERED_BY);
            }

            boolean async = false;
            boolean postParseSuccess = false;

            req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get());

            try {
                // Parse and set Catalina and configuration specific
                // request parameters
                //解析并设置Catalina和配置特定的请求参数
                postParseSuccess = postParseRequest(req, request, res, response);
                if (postParseSuccess) {
                    //check valves if we support async
                    //如果我们支持异步止回阀
                    request.setAsyncSupported(
                            connector.getService().getContainer().getPipeline().isAsyncSupported());
                    // Calling the container
                    //调用容器
                    connector.getService().getContainer().getPipeline().getFirst().invoke(
                            request, response);
                }
                //如果是异步请求
                if (request.isAsync()) {
                    async = true;
                    ReadListener readListener = req.getReadListener();
                    if (readListener != null && request.isFinished()) {
                        // Possible the all data may have been read during service()
                        // method so this needs to be checked here
                        ClassLoader oldCL = null;
                        try {
                            oldCL = request.getContext().bind(false, null);
                            if (req.sendAllDataReadEvent()) {
                                req.getReadListener().onAllDataRead();
                            }
                        } finally {
                            request.getContext().unbind(false, oldCL);
                        }
                    }

                    Throwable throwable =
                            (Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);

                    // If an async request was started, is not going to end once
                    // this container thread finishes and an error occurred, trigger
                    // the async error process
                    if (!request.isAsyncCompleting() && throwable != null) {
                        request.getAsyncContextInternal().setErrorState(throwable, true);
                    }
                } else {
                    //结束请求和响应
                    request.finishRequest();
                    response.finishResponse();
                }

            } catch (IOException e) {
                // Ignore
            } finally {
                AtomicBoolean error = new AtomicBoolean(false);
                res.action(ActionCode.IS_ERROR, error);

                if (request.isAsyncCompleting() && error.get()) {
                    // Connection will be forcibly closed which will prevent
                    // completion happening at the usual point. Need to trigger
                    // call to onComplete() here.
                    res.action(ActionCode.ASYNC_POST_PROCESS,  null);
                    async = false;
                }

                // Access log
                if (!async && postParseSuccess) {
                    // Log only if processing was invoked.
                    // If postParseRequest() failed, it has already logged it.
                    Context context = request.getContext();
                    Host host = request.getHost();
                    // If the context is null, it is likely that the endpoint was
                    // shutdown, this connection closed and the request recycled in
                    // a different thread. That thread will have updated the access
                    // log so it is OK not to update the access log here in that
                    // case.
                    // The other possibility is that an error occurred early in
                    // processing and the request could not be mapped to a Context.
                    // Log via the host or engine in that case.
                    long time = System.currentTimeMillis() - req.getStartTime();
                    if (context != null) {
                        context.logAccess(request, response, time, false);
                    } else if (response.isError()) {
                        if (host != null) {
                            host.logAccess(request, response, time, false);
                        } else {
                            connector.getService().getContainer().logAccess(
                                    request, response, time, false);
                        }
                    }
                }

                req.getRequestProcessor().setWorkerThreadName(null);

                // Recycle the wrapper request and response
                if (!async) {
                    updateWrapperErrorCount(request, response);
                    //重新利用
                    request.recycle();
                    response.recycle();
                }
            }
        }

这里有两个重要方法,postParseRequest的作用是解析并设置Catalina和配置特定的请求参数。
connector.getService().getContainer().getPipeline().getFirst().invoke(request, response)的作用是调用容器,真正执行业务逻辑。
即工作线程终于组装好了request和response,可以调用到Servlet引擎中的Servlet方法了。

虽然本篇文章中设计的源码都特别的长,但是目的只有一个,组装request和response,调用容器。


来源:https://www.jianshu.com/u/9632919f32c3

赞(0) 打赏
版权归原创作者所有,任何形式的转载请联系博主:daming_90:Java 技术驿站 » Tomcat源码分析【十三】请求处理过程分析之SocketProcessor

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏