RocketMQ源码分析之NameServer

撸了今年阿里、腾讯和美团的面试,我有一个重要发现…….

作者:dingwpmz

出处:https://blog.csdn.net/prestigeding/article/details/78888290



1、RocketMQ组件概述

201908231001_2.png

1)NameServer

NameServer相当于配置中心,维护Broker集群、Broker信息、Broker存活信息、主题与队列信息等。

NameServer彼此之间不通信,每个Broker与集群内所有的Broker保持长连接。

2、源码分析NameServer

本文不对NameServer与Broker、Producer集群、Consumer进群的网络通信做详细解读(该系列后续专门进行讲解)

本文重点关注NameServer作为MQ集群的配置中心存储什么信息并复习Netty线程模型

2.1 源码分析org.apache.rocketmq.namesrv.NamesrvController

NameserController,NameServer的核心控制类。

2.1.1 NamesrvConfig

NamesrvConfig,主要指定nameserver的相关配置目录属性

1)kvConfigPath(kvConfig.json)

2)mqhome/namesrv/namesrv.properties

3)orderMessageEnable,是否开启顺序消息功能,默认为false

2.1.2 ScheduledExecutorService scheduledExecutorService

private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryIm pl(“NSScheduledThread”));

NameServer 定时任务执行线程池,一个线程,默认定时执行两个任务:

任务1、每隔10s扫描broker,维护当前存活的Broker信息

任务2、每隔10s打印KVConfig信息。

2.1.3 KVConfigManager

读取或变更NameServer的配置属性,加载NamesrvConfig中配置的配置文件到内存,此类一个亮点就是使用轻量级的非线程安全容器,再结合读写锁对资源读写进行保护。尽最大程度提高线程的并发度。

2.1.4 RouteInfoManager

NameServer数据的载体,记录Broker,Topic等信息。

        private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;                                         //@1
        private final ReadWriteLock lock = new ReentrantReadWriteLock();                                                      //@2
        private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;                                   //@3
        private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;                                  //@4
        private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;    //@5
        private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;                                //@6
        private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;        //@7

代码@1,NameServer 与 Broker 空闲时长,默认2分钟,在2分钟内Nameserver没有收到Broker的心跳包,则关闭该连接。

代码@2,读写锁,用来保护非线程安全容器HashMap

代码@3,topicQueueTable,主题与队列关系,记录一个主题的队列分布在哪些Broker上,每个Broker上存在该主题的队列个数。

QueueData队列描述信息,对应如下属性:

private String brokerName; // broker的名称

private int readQueueNums; // 读队列个数

private int writeQueueNums; // 写队列个数

private int perm; // 权限操作

private int topicSynFlag; // 同步复制还是异步复制

代码@4,brokerAddrTable,所有Broker信息,使用brokerName当key,BrokerData信息描述每一个broker信息。

private String cluster; broker所属集群

private String brokerName; broker name

private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs; // broker 对应的IP:Port,brokerId=0表示Master,大于0表示Slave。

代码@5,clusterAddrTable,broker集群信息,每个集群包含哪些Broker。

代码@6,brokerLiveTable,当前存活的Broker,该信息不是实时的,NameServer每10S扫描一次所有的broker,根据心跳包的时间得知broker的状态,该机制也是导致当一个master Down掉后,消息生产者无法感知,可能继续向Down掉的Master发送消息,导致失败(非高可用),消息发送者是否提供了消息重试机制,待后续文章分析broker时再研究。

2.1.5 BrokerHousekeepingService brokerHousekeepingService

BrokerHouseKeepingService实现 ChannelEventListener接口,可以说是通道在发送异常时的回调方法(Nameserver与Broker的连接通道在关闭、通道发送异常、通道空闲时),在上述数据结构中移除

已Down掉的Broker。

    public interface ChannelEventListener {
        void onChannelConnect(final String remoteAddr, final Channel channel);

        void onChannelClose(final String remoteAddr, final Channel channel);

        void onChannelException(final String remoteAddr, final Channel channel);

        void onChannelIdle(final String remoteAddr, final Channel channel);
    }

2.1.6 NettyServerConfig、RemotingServer remotingServer、ExecutorService remotingExecutor

这三个属性与网络通信有关,NameServer与Broker、Product、Consume之间的网络通信,基于Netty。本文借这个机会再次探究Netty线程模型与Netty实战技巧。

源码分析网络通讯之前,我们关注如下问题:

1)NettyServerConfig 的配置含义

2)Netty线程模型中EventLoopGroup、EventExecutorGroup之间的区别于作用

3)在Channel的整个生命周期中,如何保证Channel的读写事件至始至终使用同一个线程处理

首先我们先过一下NettyServerConfig中的配置属性:

        private int listenPort = 8888;
        private int serverWorkerThreads = 8;
        private int serverCallbackExecutorThreads = 0;
        private int serverSelectorThreads = 3;
        private int serverOnewaySemaphoreValue = 256;
        private int serverAsyncSemaphoreValue = 64;
        private int serverChannelMaxIdleTimeSeconds = 120;

        private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
        private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
        private boolean serverPooledByteBufAllocatorEnable = true;

我们带着上面的疑问开始源码分析org.apache.rocketmq.remoting.netty.NettyRemotingServer

2.6.1.1 serverWorkerThreads

含义:业务线程池的线程个数,RocketMQ按任务类型,每个任务类型会拥有一个专门的线程池,比如发送消息,消费消息,另外再加一个其他(默认的业务线程池),

默认业务线程池,采用fixed类型,线程个数就是由serverWorkerThreads。

线程名称:RemotingExecutorThread_

作用范围:该参数目前主要用于NameServer的默认业务线程池,处理诸如broker,product,consume与NameServer的所有交互命令。

源码来源:org.apache.rocketmq.namesrv.NamesrvController

    public boolean initialize() {

            this.kvConfigManager.load();

            this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

            this.remotingExecutor =
                Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));   // @1

            this.registerProcessor();                 // @2

            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                @Override
                public void run() {
                    NamesrvController.this.routeInfoManager.scanNotActiveBroker();
                }
            }, 5, 10, TimeUnit.SECONDS);

            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                @Override
                public void run() {
                    NamesrvController.this.kvConfigManager.printAllPeriodically();
                }
            }, 1, 10, TimeUnit.MINUTES);

            return true;
        }

        private void registerProcessor() {
            if (namesrvConfig.isClusterTest()) {

                this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
                    this.remotingExecutor);
            } else {

                this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
            }
        }

代码@1,创建一个线程容量为serverWorkerThreads的固定长度的线程池,该线程池供DefaultRequestProcessor类实现,该类实现具体的默认的请求命令处理。

代码@2,就是将DefaultRequestProcessor与代码@1创建的线程池绑定在一起

具体的命令调用类:org.apache.rocketmq.remoting.netty.NettyRemotingAbstract

    /**
         * Process incoming request command issued by remote peer.
         * @param ctx channel handler context.
         * @param cmd request command.
         */
        public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
            final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
            final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
            final int opaque = cmd.getOpaque();

            if (pair != null) {
                Runnable run = new Runnable() {
                    @Override
                    public void run() {
                        try {
                            RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
                            if (rpcHook != null) {
                                rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                            }

                            final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
                            if (rpcHook != null) {
                                rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
                            }

                            if (!cmd.isOnewayRPC()) {
                                if (response != null) {
                                    response.setOpaque(opaque);
                                    response.markResponseType();
                                    try {
                                        ctx.writeAndFlush(response);
                                    } catch (Throwable e) {
                                        PLOG.error("process request over, but response failed", e);
                                        PLOG.error(cmd.toString());
                                        PLOG.error(response.toString());
                                    }
                                } else {

                                }
                            }
                        } catch (Throwable e) {
                            PLOG.error("process request exception", e);
                            PLOG.error(cmd.toString());

                            if (!cmd.isOnewayRPC()) {
                                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, //
                                    RemotingHelper.exceptionSimpleDesc(e));
                                response.setOpaque(opaque);
                                ctx.writeAndFlush(response);
                            }
                        }
                    }
                };

                if (pair.getObject1().rejectRequest()) {
                    final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                        "[REJECTREQUEST]system busy, start flow control for a while");
                    response.setOpaque(opaque);
                    ctx.writeAndFlush(response);
                    return;
                }

                try {
                    final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
                    pair.getObject2().submit(requestTask);
                } catch (RejectedExecutionException e) {
                    if ((System.currentTimeMillis() % 10000) == 0) {
                        PLOG.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) //
                            + ", too many requests and system thread pool busy, RejectedExecutionException " //
                            + pair.getObject2().toString() //
                            + " request code: " + cmd.getCode());
                    }

                    if (!cmd.isOnewayRPC()) {
                        final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                            "[OVERLOAD]system busy, start flow control for a while");
                        response.setOpaque(opaque);
                        ctx.writeAndFlush(response);
                    }
                }
            } else {
                String error = " request type " + cmd.getCode() + " not supported";
                final RemotingCommand response =
                    RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
                PLOG.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
            }
        }

该方法比较简单,该方法其实就是一个具体命令的处理模板(模板方法),具体的命令实现由各个子类实现,该类的主要责任就是将命令封装成一个线程对象,然后丢到线程池去执行。

2.6.1.2 serverCallbackExecutorThreads

含义:业务线程池的线程个数,RocketMQ按任务类型,每个任务类型会拥有一个专门的线程池,比如发送消息,消费消息,另外再加一个其他(默认的业务线程池),

默认业务线程池,采用fixed类型,线程个数就是由serverCallbackExecutorThreads 。

线程名称:NettyServerPublicExecutor_

作用范围:broker,product,consume处理默认命令的业务线程池大小。

源码来源:org.apache.rocketmq.remoting.netty.NettyRemotingServer

201908231001_3.png

201908231001_4.png

2.6.1.3 serverSelectorThreads

含义:Netty IO线程数量,Selector所在的线程个数,也就主从Reactor模型中的从Reactor线程数量 。

线程名称:NettyServerNIOSelector_

作用范围:broker,product,consume 服务端的IO线程数量。

源码来源:org.apache.rocketmq.remoting.netty.NettyRemotingServer

201908231001_5.png

2.6.1.4 serverOnewaySemaphoreValue、 serverAsyncSemaphoreValue

含义:服务端 oneWay(单向执行)、异步调用的信号量(并发度)

线程名称:无

作用范围:通常用在客户端与Broker的交互

源码来源:org.apache.rocketmq.remoting.netty.NettyRemotingServer

201908231001_6.png

org.apache.rocketmq.remoting.netty.NettyRemotingAbstract

201908231001_7.png

201908231001_8.png

备注:单向(Oneway)发送特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。

应用场景适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

2.6.1.5 其他配置参数

private int serverChannelMaxIdleTimeSeconds = 120; // 通道空闲时间,默认120S, 通过Netty的IdleStateHandler实现

private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize; // socket发送缓存区大小

private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize; // socket接收缓存区大小

private boolean serverPooledByteBufAllocatorEnable = true; // 是否使用PooledByteBuf(可重用,缓存ByteBuf)

2.6.2 Netty线程模型中EventLoopGroup、EventExecutorGroup之间的区别与作用

EventExecutor:执行者,真正的线程对象(线程池)

EventLoop:线程模型对外内,继承自EventExecutor,增加通道注册等方法。

NioEventLoopGroup–>MultithreadEventLoopGroup ( EventLoop[] )

EventLoop — > SingleThreadEventLoop –> SingleThreadEventExecutor

NioEventLoopGroup持有一个EventLoop数组,每一个EventLoop其实是有一个单个线程的线程池(EventExecutor)组成,EventLoop的线程为(SingleThreadEventExecutor的属性thread线程对象 )。

Netty有一个设计原则,就是在Channel的整个生命周期中,ChannelHandler的执行总是相同的一个线程(EventLoop、SingleThreadEventExecutor),我们知道Channel在调用注册到Selector上时会绑定一个EventLoop,默认所有的ChannelHandler的执行都在该线程上,是否可以改变ChannelHandler的执行线程呢?答案是可以的,通过在ChannelPipeline.addLast( EventLoopGroup group, ChannelHandler )。

相关源码:

201908231001_9.png

201908231001_10.png

201908231001_11.png

从上面的代码可以看到,如果ChannelPipeline.addLast指定了EventLoopGroup,会将该ChannelPipeline记录当前Pipeline对于EventLoopGroup使用EvenetLoopGroup的一个线程,并与此同时ChannelHandlerContext的executor为该group的一个线程。ChannelHandler方法的执行逻辑:

201908231001_12.png

如果,AbstractChannelHandlerContext的执行线程与该通道的eventLoop相同,则直接执行(这时是在IO线程中执行(Netty主从多Reactor线程模型,也就是selector对象所在的线程(处理读写(workEventLoopGroup))),如果不是,则在ChannelHandlerContext的execute中执行:AbstractChannelHandlerContext

201908231001_13.png

201908231001_14.png

总结:本文详细介绍了NameServer作为配置中心,如果存储集群配置信息,重点分析了RocketMQ的网络服务器类(NettyRemotingServer)、NettyServerConfig个配置属性的含义,重温了Netty线程模型,Netty线程模型详解:http://blog.csdn.net/prestigeding/article/details/64443479

读者朋友们,下面的几个问题都明白了吗?

1)NameServer集群保存哪些配置信息,如果保存的。

2)NettyServerConfig 的配置含义

3)Netty线程模型中EventLoopGroup、EventExecutorGroup之间的区别于作用

4)在Channel的整个生命周期中,如何保证Channel的读写事件至始至终使用同一个线程处理,Channel绑定的ChannelHandler一定在IO线程中执行吗?为什么。

赞(1) 打赏

如未加特殊说明,此网站文章均为原创,转载必须注明出处。Java 技术驿站 » RocketMQ源码分析之NameServer
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

关注【Java 技术驿站】公众号,每天早上 8:10 为你推送一篇技术文章

扫描二维码关注我!


关注【Java 技术驿站】公众号 回复 “VIP”,获取 VIP 地址永久关闭弹出窗口

免费获取资源

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏