Tomcat源码分析【十】请求处理过程分析之NIO网络操作

扫码关注公众号:Java 技术驿站

发送:vip
将链接复制到本浏览器,永久解锁本站全部文章

【公众号:Java 技术驿站】 【加作者微信交流技术,拉技术群】

文章首发于:clawhub.club


在分析Tomcat的请求处理过程之前,得先复习一下Java NIO的一些知识:

概念

NIO最大的特点是面向缓冲区,有三大核心:Channel、Buffer、Selector。

Channel

Channel是双向的,既可以读数据,也可以写数据。这里只关注TCP(Server和Client)相关的ServerSocketChannel和SocketChannel。

Buffer

数据总是从通道读取到缓冲区,或者从缓冲区写入通道中。

Selector

用于监听多个通道的事件,比如连接打开、数据到达。

TCP客户端服务端例子

ServerHandler与其实现


    import java.io.IOException;
    import java.nio.channels.SelectionKey;

    /**
     * 消息处理
     */
    public interface ServerHandler {
        /**
         * 连接请求
         *
         * @param selectionKey selectionKey
         * @throws IOException IOException
         */
        void handleAccept(SelectionKey selectionKey) throws IOException;

        /**
         * 读请求
         *
         * @param selectionKey selectionKey
         * @return String
         * @throws IOException IOException
         */
        String handleRead(SelectionKey selectionKey) throws IOException;
    }

    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.StandardCharsets;

    /**
     * The type Server handler.
     */
    public class ServerHandlerImpl implements ServerHandler {
        /**
         * The Buffer size.
         */
        private int bufferSize = 1024;

        @Override
        public void handleAccept(SelectionKey selectionKey) throws IOException {
            //获取channel
            SocketChannel socketChannel = ((ServerSocketChannel) selectionKey.channel()).accept();
            //非阻塞
            socketChannel.configureBlocking(false);
            //注册selector,操作—用于读取操作的操作集位。
            socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufferSize));
            System.out.println("请求连接......");
        }

        @Override
        public String handleRead(SelectionKey selectionKey) throws IOException {
            //获取channel
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            //检索当前附件
            ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
            String receivedStr = "";

            if (socketChannel.read(buffer) == -1) {
                //无数据
                socketChannel.shutdownOutput();
                socketChannel.shutdownInput();
                socketChannel.close();
                System.out.println("连接断开......");
            } else {
                //翻转这个缓冲区。
                buffer.flip();
                //按照编码读取缓冲区中数据
                receivedStr = StandardCharsets.UTF_8.newDecoder().decode(buffer).toString();
                //Clears this buffer.
                buffer.clear();
                //返回数据给客户端
                buffer = buffer.put(("服务端接收到的消息为: " + receivedStr).getBytes(StandardCharsets.UTF_8));
                //翻转这个缓冲区。
                buffer.flip();
                //缓冲区数据写入到通道
                socketChannel.write(buffer);
                //注册selector 继续读取数据
                socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufferSize));
            }
            return receivedStr;
        }

    }

NioSocketServer服务端及启动


    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.util.Iterator;

    /**
     * 服务端
     */
    public class NioSocketServer {
        /**
         * The Open.
         */
        private volatile boolean open = true;

        /**
         * Is open boolean.
         *
         * @return the boolean
         */
        public boolean isOpen() {
            return open;
        }

        /**
         * Sets open.
         *
         * @param open the open
         */
        public void setOpen(boolean open) {
            this.open = open;
        }

        /**
         * Start.
         */
        public void start() {
            //打开服务器套接字通道。
            try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {
                //将ServerSocket绑定到8080端口
                serverSocketChannel.socket().bind(new InetSocketAddress(8080));
                //设置为非阻塞模式
                serverSocketChannel.configureBlocking(false);
                //打开一个选择器。
                Selector selector = Selector.open();
                //为serverChannel注册selector,套接字接受操作的操作集位。
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
                System.out.println("服务端启动......");

                //创建消息处理器
                ServerHandler handler = new ServerHandlerImpl();

                //只要服务是打开的,就一直循环
                while (isOpen()) {
                    //选择一组键,其对应的通道已准备好进行I/O操作。
                    selector.select();
                    System.out.println("开始处理请求.......");
                    //获取selectionKeys并处理
                    Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
                    while (keyIterator.hasNext()) {
                        SelectionKey key = keyIterator.next();
                        try {
                            //连接请求
                            if (key.isAcceptable()) {
                                handler.handleAccept(key);
                            }
                            //读请求
                            if (key.isReadable()) {
                                System.out.println(handler.handleRead(key));
                            }
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                        //处理完后移除当前使用的key
                        keyIterator.remove();
                    }
                    System.out.println("完成请求处理。");
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * The type Server main.
     */
    public class ServerMain {
        /**
         * The entry point of application.
         *
         * @param args the input arguments
         */
        public static void main(String[] args) {
            NioSocketServer server = new NioSocketServer();
            //1分钟后关闭服务
            new Thread(() -> {
                try {
                    Thread.sleep(60 * 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    //服务关闭
                    server.setOpen(false);
                }
            }).start();
            //开启服务
            server.start();
        }
    }

NioSocketClient客户端及启动


    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.net.SocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.StandardCharsets;

    /**
     * 客户端
     */
    public class NioSocketClient {
        /**
         * Start.
         */
        public void start() {
            //Opens a socket channel.
            try (SocketChannel socketChannel = SocketChannel.open()) {
                SocketAddress socketAddress = new InetSocketAddress("localhost", 8080);
                //连接服务端socket
                socketChannel.connect(socketAddress);

                int sendCount = 0;
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                while (sendCount < 5) {
                    buffer.clear();
                    //向服务端发送消息
                    buffer.put(("当前时间: " + System.currentTimeMillis()).getBytes());
                    buffer.flip();
                    socketChannel.write(buffer);
                    buffer.clear();

                    //从服务端读取消息
                    int readLength = socketChannel.read(buffer);
                    buffer.flip();
                    byte[] bytes = new byte[readLength];
                    buffer.get(bytes);
                    System.out.println(new String(bytes, StandardCharsets.UTF_8));
                    buffer.clear();

                    sendCount++;
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * The type Client main.
     */
    public class ClientMain {
        /**
         * The entry point of application.
         *
         * @param args the input arguments
         */
        public static void main(String[] args) {
            new NioSocketClient().start();
        }
    }

结果:

    服务端启动......
    开始处理请求.......
    请求连接......
    完成请求处理。
    开始处理请求.......
    当前时间: 1567409176760
    完成请求处理。
    开始处理请求.......
    当前时间: 1567409177761
    完成请求处理。
    开始处理请求.......
    当前时间: 1567409178762
    完成请求处理。
    开始处理请求.......
    当前时间: 1567409179763
    完成请求处理。
    开始处理请求.......
    当前时间: 1567409180763
    完成请求处理。
    开始处理请求.......
    连接断开......

    完成请求处理。

简单的了解NIO 服务端的原理后,再来看Tomcat的请求处理过程,应该会更清晰一些,从前几篇文章可知,在Tomcat启动时就就有如下两个步骤:

  1. 打开服务器套接字通道ServerSocketChannel.open()
  2. 将ServerSocket绑定到指定端口serverSocketChannel.socket().bind(new InetSocketAddress(8888))

这体现在NioEndpoint类中的initServerSocket()方法中:

     /**
         * Separated out to make it easier for folks that extend NioEndpoint to implement custom [server]sockets
         */
        protected void initServerSocket() throws Exception {
            if (!getUseInheritedChannel()) {
                serverSock = ServerSocketChannel.open();
                socketProperties.setProperties(serverSock.socket());
                InetSocketAddress addr = (getAddress() != null ? new InetSocketAddress(getAddress(), getPort()) : new InetSocketAddress(getPort()));
                serverSock.socket().bind(addr, getAcceptCount());
            } else {
                // Retrieve the channel provided by the OS
                Channel ic = System.inheritedChannel();
                if (ic instanceof ServerSocketChannel) {
                    serverSock = (ServerSocketChannel) ic;
                }
                if (serverSock == null) {
                    throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
                }
            }
            //mimic APR behavior
            serverSock.configureBlocking(true);
        }

我们又知道Poller维护着Selector,在NioEndpoint的内部类Poller中可以看到selector的一些操作。
我的例子中是没有用到serverSock.accept()方法来监听客户请求的,Tomcat中用Acceptor来监听请求,获取SocketChannel,并封装成NioChannel注册到Poller中。

下面分几篇文档来分析接收到请求,封装的请求的过程。


来源:https://www.jianshu.com/u/9632919f32c3

赞(0) 打赏
版权归原创作者所有,任何形式的转载请联系博主:daming_90:Java 技术驿站 » Tomcat源码分析【十】请求处理过程分析之NIO网络操作

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏