RocketMQ源码分析之RocketMQ拉模式

撸了今年阿里、腾讯和美团的面试,我有一个重要发现…….

作者:dingwpmz

出处:https://blog.csdn.net/prestigeding/article/details/78888290


消费者 与 消息存储方Broker一般有两种通信机制:推(PUSH)、拉(PULL)
推模式:消息发送者将消息发送到Broker,然后Broker主动推送给订阅了该消息的消费者。
拉模式:消息发送者将消息发送到Broker上,然后由消息消费者自发的向Broker拉取消息。
RocketMQ推拉机制实现:
严格意义上来讲,RocketMQ并没有实现PUSH模式,而是对拉模式进行一层包装,在消费端开启一个线程PullMessageService循环向Broker拉取消息,一次拉取任务结束后马上又发起另一次拉取操作,实现准实时自动拉取,,PUSH模式的实现请参考如下博文:
推模式消息拉取机制: http://blog.csdn.net/prestigeding/article/details/78885420
推模式消息队列负载机制:http://blog.csdn.net/prestigeding/article/details/78927447
本文重点在讨论RocketMQ拉模式DefaultMQPullConsumer实现。
RocketMQ拉模式,RocketMQ消费者不自动向消息服务器拉取消息,而是将控制权移交给应用程序,RocketMQ消费者只是提供拉取消息API。
为了对RocketMQ 拉模式有一个直观的了解,我们先大概浏览一下MQPullConsumer接口:
2019082310014_1.png
从上面我们可以看到除了启动、关闭,注册消息监听器,其他的就是针对MessageQueue拉取消息,特别值得留意的是每一个拉取pull方法,都是直接针对消息消费队列。PUSH模式可以说基于订阅与发布模式,而PULL模式可以说是基于消息队列模式。
特别说明:PULL模式根据主题注册消息监听器,这里的消息监听器,不是用来消息消费的,而是在该主题的队列负载发生变化时,做一下通知。
下文,我们应该带着我们对PUSH模式的相关知识来认识一下PULL模式,对比学习:
PUSH模式主要知识点:
1)消息拉取机制:PullMessageServer线程 根据PullRequest拉取任务循环拉取。
2)消息队列负载机制,按照消费组,对主题下的消息队列,结合当前消费组内消费者数量动态负载。
按照上面API的描述,PULL模式应该无需考虑上面两个情形,我们带着上述疑问,开始我们今天的学习。

1、DefaultMQPullConsumer 核心属性

    /**
         * Do the same thing for the same Group, the application must be set,and
         * guarantee Globally unique
         */
        private String consumerGroup;
        /**
         * Long polling mode, the Consumer connection max suspend time, it is not
         * recommended to modify
         */
        private long brokerSuspendMaxTimeMillis = 1000 * 20;
        /**
         * Long polling mode, the Consumer connection timeout(must greater than
         * brokerSuspendMaxTimeMillis), it is not recommended to modify
         */
        private long consumerTimeoutMillisWhenSuspend = 1000 * 30;
        /**
         * The socket timeout in milliseconds
         */
        private long consumerPullTimeoutMillis = 1000 * 10;
        /**
         * Consumption pattern,default is clustering
         */
        private MessageModel messageModel = MessageModel.CLUSTERING;
        /**
         * Message queue listener
         */
        private MessageQueueListener messageQueueListener;
        /**
         * Offset Storage
         */
        private OffsetStore offsetStore;
        /**
         * Topic set you want to register
         */
        private Set<String> registerTopics = new HashSet<String>();
        /**
         * Queue allocation algorithm
         */
        private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
        /**
         * Whether the unit of subscription group
         */
        private boolean unitMode = false;

        private int maxReconsumeTimes = 16;

consumerGroup : 消费组名称
brokerSuspendMaxTimeMillis :长轮询模式下挂起的最大超时时间,在Broker端根据偏移量从存储
文件中查找消息时如果返回PULL_NOT_FOUND时,不理解返回给拉取客户端,而是交给
PullRequestHoldService线程,每隔5秒再去拉取一次消息,如果找到则返回给消息拉取客
户端,否则超时。
consumerTimeoutMillisWhenSuspend : 整个消息拉取过程中,拉取客户端等待服务器响应结果的超时时间,默认30S
consumerPullTimeoutMillis :默认10s,拉消息时建立网络连接的超时时间
messageModel :消费模式,广播、集群
messageQueueListener : 业务消息监听器
OffsetStore :消息消费进度管理器
registerTopics :注册主题数
allocateMessageQueueStrategy :队列分配器
maxReconsumeTimes :最大消息重试次数,默认16次

2、消息消费者启动流程分析,DefaultMQPullConsumerImpl#start

     public synchronized void start() throws MQClientException {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;

                    this.checkConfig();  

                    this.copySubscription(); // @1

                    if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                        this.defaultMQPullConsumer.changeInstanceNameToPID();
                    }

                    this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook);   // @2

                    this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());
                    this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());
                    this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());
                    this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);   // @3

                    this.pullAPIWrapper = new PullAPIWrapper(
                        mQClientFactory,
                        this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());
                    this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);    // @:4

                    if (this.defaultMQPullConsumer.getOffsetStore() != null) {
                        this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();
                    } else {
                        switch (this.defaultMQPullConsumer.getMessageModel()) {
                            case BROADCASTING:
                                this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
                                break;
                            case CLUSTERING:
                                this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
                                break;
                            default:
                                break;
                        }
                        this.defaultMQPullConsumer.setOffsetStore(this.offsetStore);   // @5
                    }

                    this.offsetStore.load();  

                    boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this);   // @6
                    if (!registerOK) {
                        this.serviceState = ServiceState.CREATE_JUST;

                        throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup()
                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                            null);
                    }

                    mQClientFactory.start();  // @7
                    log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup());
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                case START_FAILED:
                case SHUTDOWN_ALREADY:
                    throw new MQClientException("The PullConsumer service state not OK, maybe started once, "
                        + this.serviceState
                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                        null);
                default:
                    break;
            }

        }

代码@1:根据注册的主题,构建订阅信息,放入到RebalanceImpl的订阅表中。PS:DefaultMQPullConsumerImpl 可以注册多个主题,但多个主题使用同一个消息处理监听器。
2019082310014_2.png
代码@2:创建MQClientInstance,每一个clientConfig一个MqClientInstance对象。
代码@3:填充rebalanceImpl 对象的消费组、消息队列分配器、消费模式。这里的作用是什么?既然无需负载消息队列,为什么需要这一步???
代码@4:构建PullAPIWrapper对象,该对象封装了具体拉取消息的逻辑,PULL,PUSH模式最终都会调用PullAPIWrapper类的方法从Broker拉取消息。
代码@5:根据集群消费模式(广播、集群)初始化消息进度管理器offsetStore。
代码@6:将该消费者加入到MQClientInstance消费者列表中。
代码@7:启动MQClientInstance。该方法我们在讲解DefaultMQPushConsumer时相信讲解过,我们再简单浏览一下该方法:
2019082310014_3.png
既然Pull模式无需自动拉取消息,但PullMessageService线程(消息拉取)+ RebalanceService线程(消息队列负载)这个两个线程就没必要启动,这里启动了,会不会带来问题?
答案是不会,因为虽然PullMessageService线程启动,但是一开始会在获取拉取任务(PullRequest)
2019082310014_4.png
PullRequest是有RebalanceService产生,它根据主题消息队列个数和当前消费组内消费者个数进行负载,然后产生对应的PullRequest对象,再将这些对象放入到PullMessageService的pullRequestQueue队列。具体放入逻辑调用:RebalanceImpl#dispatchPullRequest(final List pullRequestList);
我们来看一下RebalanceImpl的子类RebalancePullImpl的dispatchPullRequest方法:
2019082310014_5.png
再对比一下RebalancePushImpl的dispatchPullRequest,
2019082310014_6.png
再结合PullMessageService被唤醒后,执行的pullMessage方法:
2019082310014_7.png
我们可以得出结论,PullMessageService 只为PUSH模式服务,ReblanceService进行路由重新分布时,如果是RebalancePullImpl,并不会产PullRequest,从而唤醒PullMessageService,PullMessageService被 唤醒后,也是执行DefaultMQPushConsumerImpl的pullMessage方法。
ReblanceService线程默认每20S进行一次消息队列重新负载,判断消息队列是否需要进行重新分布(如果消费者个数和主题的队列数没有发生改变),则继续保持原样。对于PULL模型,如果消费者需要监听某些主题队列发生事件,注册消息队列变更事件方法,则RebalanceService会将消息队列负载变化事件通知消费者。
至于PULL模式那些根据消息队列拉取消息的方法,与PUSH模式走的逻辑是一样的,唯一的区别是PULL模式是需要应用程序收到触发消息拉取动作。

通过上述分析,我们总结一下RocketMQ,PUSH,PULL模式区别:
PUSH: 消费者订阅主题,然后自动进行集群内消息队列的动态负载,自动拉取消息。准实时。
PULL:消费者无需订阅主题,由业务方(应用程序)直接根据MessageQueue拉取消息。
项目中一般采用PUSH模式。

赞(0) 打赏

如未加特殊说明,此网站文章均为原创,转载必须注明出处。Java 技术驿站 » RocketMQ源码分析之RocketMQ拉模式
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

关注【Java 技术驿站】公众号,每天早上 8:10 为你推送一篇技术文章

扫描二维码关注我!


关注【Java 技术驿站】公众号 回复 “VIP”,获取 VIP 地址永久关闭弹出窗口

免费获取资源

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏