Tomcat源码分析【九】Connector连接器

扫码关注公众号:Java 技术驿站

发送:vip
将链接复制到本浏览器,永久解锁本站全部文章

【公众号:Java 技术驿站】 【加作者微信交流技术,拉技术群】

文章首发于:clawhub.club


定义

Connector官方文档

HTTP连接器元素表示支持HTTP/1.1协议的连接器组件。除了能够执行servlet和JSP页面之外,它还使Catalina能够作为独立的web服务器运行。
此组件的特定实例侦听服务器上特定TCP端口号上的连接。可以将一个或多个这样的连接器配置为单个服务的一部分,
每个连接器都转发到关联的引擎来执行请求处理并创建响应。
如果希望使用AJP协议(如mod_jk 1.2)配置用于连接到web服务器的连接器。 (such as the mod_jk 1.2.x connector for Apache 1.3),请参阅AJP连接器文档。
每个传入请求在请求期间都需要一个线程。如果接收到的并发请求多于当前可用的请求处理线程所能处理的,那么将创建更多的线程,
直到配置的最大值(maxThreads属性的值)。如果还同时接收到更多的请求,则将它们堆积在连接器创建的服务器套接字中,
直到配置的最大值(acceptCount属性的值)。任何进一步的同步请求都会收到“连接拒绝”错误,直到有可用的资源来处理它们。

我们使用Tomcat,一般只用到Web服务器、Servlet容器这两个特性,现在用JSP的太少了,使用AJP协议的地方也没接触过。
所以可以简单的将Connector理解为,监听特定TCP端口,接收用户请求,封装成Request和Response,传递到Servlet处理业务,最后返回结果给客户端。

Connector容器,涉及到Http11NioProtocol(ProtocolHandler)、Http11Processor(Processor)、CoyoteAdapter(Adapter)、
NioEndpoint、ConnectionHandler(AbstractEndpoint.Handler)、Poller、Acceptor等,这些类相互协作,监听、处理、封装、传递请求,响应客户。

下面根据生命周期来看Connector类:

Connector的构造

     public Connector(String protocol) {
            boolean aprConnector = AprLifecycleListener.isAprAvailable() &&
                    AprLifecycleListener.getUseAprConnector();

            if ("HTTP/1.1".equals(protocol) || protocol == null) {
                if (aprConnector) {
                    protocolHandlerClassName = "org.apache.coyote.http11.Http11AprProtocol";
                } else {
                    protocolHandlerClassName = "org.apache.coyote.http11.Http11NioProtocol";
                }
            } else if ("AJP/1.3".equals(protocol)) {
                if (aprConnector) {
                    protocolHandlerClassName = "org.apache.coyote.ajp.AjpAprProtocol";
                } else {
                    protocolHandlerClassName = "org.apache.coyote.ajp.AjpNioProtocol";
                }
            } else {
                protocolHandlerClassName = protocol;
            }

            // Instantiate protocol handler
            ProtocolHandler p = null;
            try {
                Class<?> clazz = Class.forName(protocolHandlerClassName);
                p = (ProtocolHandler) clazz.getConstructor().newInstance();
            } catch (Exception e) {
                log.error(sm.getString(
                        "coyoteConnector.protocolHandlerInstantiationFailed"), e);
            } finally {
                this.protocolHandler = p;
            }

            // Default for Connector depends on this system property
            setThrowOnFailure(Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE"));
        }

首先通过协议名称创建ProtocolHandler,这里假设为Http11NioProtocol,这个类将引领全局,再来看Http11NioProtocol的构造:

     public Http11NioProtocol() {
            super(new NioEndpoint());
        }
     public AbstractHttp11JsseProtocol(AbstractJsseEndpoint<S,?> endpoint) {
            super(endpoint);
        }
     public AbstractHttp11Protocol(AbstractEndpoint<S,?> endpoint) {
            super(endpoint);
            setConnectionTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
            ConnectionHandler<S> cHandler = new ConnectionHandler<>(this);
            setHandler(cHandler);
            getEndpoint().setHandler(cHandler);
        }
     public AbstractProtocol(AbstractEndpoint<S,?> endpoint) {
            this.endpoint = endpoint;
            setConnectionLinger(Constants.DEFAULT_CONNECTION_LINGER);
            setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
        }

创建了NioEndpoint,和ConnectionHandler。

initInternal初始化

     @Override
        protected void initInternal() throws LifecycleException {
            //父类初始化
            super.initInternal();
            if (protocolHandler == null) {
                throw new LifecycleException(
                        sm.getString("coyoteConnector.protocolHandlerInstantiationFailed"));
            }
            // Initialize adapter
            //初始化Adapter
            adapter = new CoyoteAdapter(this);
            //adapter设置到protocolHandler中
            protocolHandler.setAdapter(adapter);
            // Make sure parseBodyMethodsSet has a default
            if (null == parseBodyMethodsSet) {
                setParseBodyMethods(getParseBodyMethods());
            }
            if (protocolHandler.isAprRequired() && !AprLifecycleListener.isAprAvailable()) {
                throw new LifecycleException(sm.getString("coyoteConnector.protocolHandlerNoApr",
                        getProtocolHandlerClassName()));
            }
            if (AprLifecycleListener.isAprAvailable() && AprLifecycleListener.getUseOpenSSL() &&
                    protocolHandler instanceof AbstractHttp11JsseProtocol) {
                AbstractHttp11JsseProtocol<?> jsseProtocolHandler =
                        (AbstractHttp11JsseProtocol<?>) protocolHandler;
                if (jsseProtocolHandler.isSSLEnabled() &&
                        jsseProtocolHandler.getSslImplementationName() == null) {
                    // OpenSSL is compatible with the JSSE configuration, so use it if APR is available
                    jsseProtocolHandler.setSslImplementationName(OpenSSLImplementation.class.getName());
                }
            }
            try {
                //初始化protocolHandler
                protocolHandler.init();
            } catch (Exception e) {
                throw new LifecycleException(
                        sm.getString("coyoteConnector.protocolHandlerInitializationFailed"), e);
            }
        }

这里首先初始化CoyoteAdapter,这个类是用来和Container(Servlet)交互的,之后初始化protocolHandler。再来看protocolHandler.init()方法:

     @Override
        public void init() throws Exception {
            // Upgrade protocols have to be configured first since the endpoint
            // init (triggered via super.init() below) uses this list to configure
            // the list of ALPN protocols to advertise
            for (UpgradeProtocol upgradeProtocol : upgradeProtocols) {
                configureUpgradeProtocol(upgradeProtocol);
            }

            super.init();
        }

协议升级操作,再调用上一层:

     @Override
        public void init() throws Exception {
            if (getLog().isInfoEnabled()) {
                getLog().info(sm.getString("abstractProtocolHandler.init", getName()));
            }

            if (oname == null) {
                // Component not pre-registered so register it
                oname = createObjectName();
                if (oname != null) {
                    Registry.getRegistry(null, null).registerComponent(this, oname, null);
                }
            }

            if (this.domain != null) {
                rgOname = new ObjectName(domain + ":type=GlobalRequestProcessor,name=" + getName());
                Registry.getRegistry(null, null).registerComponent(
                        getHandler().getGlobal(), rgOname, null);
            }

            String endpointName = getName();
            endpoint.setName(endpointName.substring(1, endpointName.length()-1));
            endpoint.setDomain(domain);

            endpoint.init();
        }

这里重要的是endpoint.init()方法:

     public final void init() throws Exception {
            if (bindOnInit) {
                bind();
                bindState = BindState.BOUND_ON_INIT;
            }
            if (this.domain != null) {
                // Register endpoint (as ThreadPool - historical name)
                oname = new ObjectName(domain + ":type=ThreadPool,name=\"" + getName() + "\"");
                Registry.getRegistry(null, null).registerComponent(this, oname, null);

                for (SSLHostConfig sslHostConfig : findSslHostConfigs()) {
                    registerJmx(sslHostConfig);
                }
            }
        }

是否要在初始化的时候创建连接。默认为true,进入bind方法:

     /**
         * Initialize the endpoint.
         */
        @Override
        public void bind() throws Exception {
            //初始化服务套接字
            initServerSocket();
            // Initialize thread count defaults for acceptor, poller
            if (acceptorThreadCount == 0) {
                // FIXME: Doesn't seem to work that well with multiple accept threads
                acceptorThreadCount = 1;
            }
            if (pollerThreadCount <= 0) {
                //minimum one poller thread
                pollerThreadCount = 1;
            }
            //计数器
            setStopLatch(new CountDownLatch(pollerThreadCount));
            // Initialize SSL if needed
            initialiseSsl();
            //打开selector池
            selectorPool.open();
        }
     /**
         * Separated out to make it easier for folks that extend NioEndpoint to implement custom [server]sockets
         */
        protected void initServerSocket() throws Exception {
            if (!getUseInheritedChannel()) {
                serverSock = ServerSocketChannel.open();
                socketProperties.setProperties(serverSock.socket());
                InetSocketAddress addr = (getAddress() != null ? new InetSocketAddress(getAddress(), getPort()) : new InetSocketAddress(getPort()));
                serverSock.socket().bind(addr, getAcceptCount());
            } else {
                // Retrieve the channel provided by the OS
                Channel ic = System.inheritedChannel();
                if (ic instanceof ServerSocketChannel) {
                    serverSock = (ServerSocketChannel) ic;
                }
                if (serverSock == null) {
                    throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
                }
            }
            //mimic APR behavior
            serverSock.configureBlocking(true);
        }

初始化服务套接字等操作。

startInternal开启

      /**
         * Begin processing requests via this Connector.
         *
         * @exception LifecycleException if a fatal startup error occurs
         */
        @Override
        protected void startInternal() throws LifecycleException {
            // Validate settings before starting
            if (getPort() < 0) {
                throw new LifecycleException(sm.getString(
                        "coyoteConnector.invalidPort", Integer.valueOf(getPort())));
            }
            setState(LifecycleState.STARTING);
            try {
                protocolHandler.start();
            } catch (Exception e) {
                throw new LifecycleException(
                        sm.getString("coyoteConnector.protocolHandlerStartFailed"), e);
            }
        }

Connector的startInternal最后调用到了protocolHandler.start()方法,从上一步的initInternal到这一步,已经能简单的看出,
Connector组件的核心就是这个protocolHandler。

        @Override
        public void start() throws Exception {
            if (getLog().isInfoEnabled()) {
                getLog().info(sm.getString("abstractProtocolHandler.start", getName()));
            }
            //AbstractEndpoint
            endpoint.start();
            // Start async timeout thread
            asyncTimeout = new AsyncTimeout();
            Thread timeoutThread = new Thread(asyncTimeout, getNameInternal() + "-AsyncTimeout");
            int priority = endpoint.getThreadPriority();
            if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
                priority = Thread.NORM_PRIORITY;
            }
            timeoutThread.setPriority(priority);
            timeoutThread.setDaemon(true);
            timeoutThread.start();
        }

这里有两步,开启endpoint,初始化AsyncTimeout,这两个部分以前分析过,这里再简单过一遍:
AsyncTimeout的作用是周期检测异步请求是否超时,重要的还是endpoint.start()方法:

    public final void start() throws Exception {
            //bind状态为未绑定的时候,执行bind
            if (bindState == BindState.UNBOUND) {
                //子类实现
                bind();
                bindState = BindState.BOUND_ON_START;
            }
            //这由子类实现
            startInternal();
        }

这里只需看endpoint的startInternal方法:

     /**
         * Start the NIO endpoint, creating acceptor, poller threads.
         */
        @Override
        public void startInternal() throws Exception {
            if (!running) {
                running = true;
                paused = false;
                //用于SocketProcessor对象的缓存
                processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                        socketProperties.getProcessorCache());
                //为轮询器事件缓存PollerEvent
                eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                        socketProperties.getEventCache());
                //Bytebuffer缓存,每个通道持有一组缓冲区(两个,SSL持有四个)
                //NioChannel
                nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                        socketProperties.getBufferPool());
                // Create worker collection
                if (getExecutor() == null) {
                    createExecutor();
                }
                //初始化连接限制
                initializeConnectionLatch();
                // Start poller threads
                //轮询器启动线程,都为守护线程
                pollers = new Poller[getPollerThreadCount()];
                for (int i = 0; i < pollers.length; i++) {
                    pollers[i] = new Poller();
                    Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-" + i);
                    pollerThread.setPriority(threadPriority);
                    pollerThread.setDaemon(true);
                    pollerThread.start();
                }
                //开始所有的Acceptor线程,用于监听套接字的
                startAcceptorThreads();
            }
        }

这里主要是创建了acceptor和poller线程,下面分别看一下:

Acceptor

主要作用是获取新的连接,组装后注册到Poller中。

Poller

首先看一下构造函数:

     public Poller() throws IOException {
                this.selector = Selector.open();
            }

可以看到一个Poller对应一个selector, 用于检测已就绪的 Socket。 默认最多不超过 2 个,这里简单的介绍,后期后详细分析。

stopInternal关闭

      /**
         * Terminate processing requests via this Connector.
         *
         * @exception LifecycleException if a fatal shutdown error occurs
         */
        @Override
        protected void stopInternal() throws LifecycleException {
            setState(LifecycleState.STOPPING);
            try {
                if (protocolHandler != null) {
                    protocolHandler.stop();
                }
            } catch (Exception e) {
                throw new LifecycleException(
                        sm.getString("coyoteConnector.protocolHandlerStopFailed"), e);
            }
        }

依然调用protocolHandler方法:

        @Override
        public void stop() throws Exception {
            if(getLog().isInfoEnabled()) {
                getLog().info(sm.getString("abstractProtocolHandler.stop", getName()));
            }
            if (asyncTimeout != null) {
                asyncTimeout.stop();
            }
            endpoint.stop();
        }

终止异步检测线程,关闭endpoint.AbstractEndpoint中的stop方法:

     public final void stop() throws Exception {
            stopInternal();
            if (bindState == BindState.BOUND_ON_START || bindState == BindState.SOCKET_CLOSED_ON_STOP) {
                unbind();
                bindState = BindState.UNBOUND;
            }
        }

分别调用NioEndpoint的stopInternal与unbind,这些方法后期会分析,这里先略过。

destroyInternal销毁

     @Override
        protected void destroyInternal() throws LifecycleException {
            try {
                if (protocolHandler != null) {
                    protocolHandler.destroy();
                }
            } catch (Exception e) {
                throw new LifecycleException(
                        sm.getString("coyoteConnector.protocolHandlerDestroyFailed"), e);
            }

            if (getService() != null) {
                getService().removeConnector(this);
            }

            super.destroyInternal();
        }

除了调用 protocolHandler.destroy()方法外,还解除了Service容器与当前Connector之间的关系。
protocolHandler.destroy():

     @Override
        public void destroy() throws Exception {
            if(getLog().isInfoEnabled()) {
                getLog().info(sm.getString("abstractProtocolHandler.destroy", getName()));
            }

            try {
                endpoint.destroy();
            } finally {
                if (oname != null) {
                    if (mserver == null) {
                        Registry.getRegistry(null, null).unregisterComponent(oname);
                    } else {
                        // Possibly registered with a different MBeanServer
                        try {
                            mserver.unregisterMBean(oname);
                        } catch (MBeanRegistrationException | InstanceNotFoundException e) {
                            getLog().info(sm.getString("abstractProtocol.mbeanDeregistrationFailed",
                                    oname, mserver));
                        }
                    }
                }

                if (rgOname != null) {
                    Registry.getRegistry(null, null).unregisterComponent(rgOname);
                }
            }
        }
     public final void destroy() throws Exception {
            if (bindState == BindState.BOUND_ON_INIT) {
                unbind();
                bindState = BindState.UNBOUND;
            }
            Registry registry = Registry.getRegistry(null, null);
            registry.unregisterComponent(oname);
            for (SSLHostConfig sslHostConfig : findSslHostConfigs()) {
                unregisterJmx(sslHostConfig);
            }
        }

本篇文章就分析到这里吧,有个重要的逻辑没有分析,就是Acceptor接收请求,传递到Poller中处理,再转给worker线程包装,
最后调用容器的 pipeline 进行处理。


来源:https://www.jianshu.com/u/9632919f32c3

赞(0) 打赏
版权归原创作者所有,任何形式的转载请联系博主:daming_90:Java 技术驿站 » Tomcat源码分析【九】Connector连接器

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏