RocketMQ源码分析之消息消费机制—-消费端消息负载均衡机制与重新分布

撸了今年阿里、腾讯和美团的面试,我有一个重要发现…….

作者:dingwpmz

出处:https://blog.csdn.net/prestigeding/article/details/78888290


1、消息消费需要解决的问题

首先再次重复啰嗦一下RocketMQ消息消费的一些基本元素的关系

主题 —》 消息队列(MessageQueue) 1 对多

主题 —-》 消息生产者,,,一般主题会由多个生产者组成,生产者组

主题 —- 》 消息消费者,,一般一个主题也会被多个消费者消费

那消息消费至少需要解决如下问题:

1、一个消费组中多个消费者是如何对消息队列(1个主题多个消息队列)进

行负载消费的。

2、一个消费者中多个线程又是如何协作(并发)的消费分配给该消费者的

消息队列中的消息呢?

3、消息消费进度如何保存,包括MQ是如何知道消息是否正常被消费了。

4、RocketMQ 推拉模式实现机制

再提一个业界关于消费者与消息队列的消费规则

1个消费者可以消费多个消息队列,但一个消息队列同一时间只能被一个消费者消费,这又是如何实现的呢?

本文紧接着上文: http://blog.csdn.net/prestigeding/article/details/78885420

继续探讨消息分发与消费端负载均衡。

我们从上文知道,PullMessageService线程主要是负责pullRequestQueue中的PullResult,那问题来了,pullRequestQueue中的数据从哪来,在什么时候由谁来填充。

201908231006_1.png

那我们就先沿着这条线索分析下去,看一下PullMessageService的pullReqestQueue添加元素的方法的调用链条如下:

201908231006_2.png

也就是调用链:

RebalanceService. run()

MQClientInstance.doRebalance()

DefaultMQPulConsumerImpl.doRebalance()

RebalanceImpl.doRebalance()

RebalanceImpl.rebalanceByTopic

RebalanceImpl.updateProcessQueueTableInRebalance

RebalanceImpl.dispatchPullRequest

DefaultMQPushConsumerImpl.executePullRequestImmediately

PullMessageService.executePullRequestImmediately

从上面可以直观的看出,向PullMesssageService 的 LinkedBlockingQueue pullRequestQueue添加PullRequest的是RebalanceService.run方法,就是向PullMessageService中放入PullRequest,才会驱动PullMessageSerivce run方法的运行,如果pullRequestQueue中没有元素,PullMessageService线程将被阻塞。

那么RebalanceService是何许人也,让我们一起来揭开其神秘面纱。

2、消息消费负载机制分析

2.1 RebalanceService 线程

201908231006_3.png

201908231006_4.png

从上面可以看出,MQClientInstance 持有一个 RebalanceService 线程并启动它。RebalanceService线程的run方法比较简单,就是直接调用mqClientFactory.doRebalance。

下面重点分步骤来详细探究MQClientInstance.doRebalance方法的执行流程。

1、MQClientInstance.doRebalance方法,循环遍历每个消费组获取 MQConsumeInner对象(其实就是DefaultMQPushConsumerImpl或DefaultMQPullConsumerImpl对象),并执行其doRebalance方法

201908231006_5.png

2、DefaultMQPushConsumerImpl.doRebalance

201908231006_6.png

RebalanceImpl doRebalance

201908231006_7.png

到这里,经过层层对象委托,终于进入到实现消息负载分发的核心地带了,RebalanceImpl类,我们应该停下脚步,先重点认识一下RebalanceImpl类

3、RebalanceImpl 类初探

重点属性:

201908231006_8.png

下面还是从doRebalance方法入手:

201908231006_9.png

1、根据topic来进行负载

2、移除MessageQueu,如果MesageQueue的topic不在订阅的主题中,接下来重点关注rebalanceByTopic方法

RebalanceImpl rebalanceByTopic详解

part1:根据消息消费模式(集群还是广播)我们先重点看集群模式

part2: 获取主题的消息消费队列、主题与该消费组的消费者id列表,任意一个为空,则退出方法的执行

201908231006_10.png

part3: 主要是对主题的消息队列排序、消费者ID进行排序,然后利用分配算法,计算当前消费者ID(mqClient.clientId) 分配出需要拉取的消息队列

具体的消息消费队列分配算法参考:AllocateMessageQueueStrategy的实现类,具体算法实现就不细化研究了

在这里举一个最简单的队列分配机制,,比如一个topic 有8个消息队列(q1,q2,q3,q4,q5,q6,q7,q8) ,比如有三个消费者c1,c2,c3

一种队列负载算法: q1,q4,q7 分给c1,,q2,q5,q8 c2,,q3,q5 给 c3

消费者需要从哪些消息队列中

拉取消息即可,下文会专题研究一下负载算法。

201908231006_11.png

part4: 更新主题的消息消费处理队列,并返回消息队列负载是否改变

201908231006_12.png

201908231006_13.png

201908231006_14.png

遍历消息队列-处理队列缓存,只处理mq的主题与该主题相关的ProcessQueue,如果mq不在当期主题的处理范围内(由于消息队列数量变化等原因,消费者的消费队列发生了变化,该消息队列已经分配给别的消费者去消费了),首先设置该消息队列为丢弃(dropped为voliate修饰),可以及时的阻止继续向ProceeQueue中拉取数据,

然后执行removeUnecessaryMessageQueue(mq,pq) 来判断是否需要移除。

201908231006_15.png

既然我们都是从Push进入的,先重点沿着Push这条线走到黑(同时我们也可以先思考思考push,pull差别),移步到RebalancePushImpl

201908231006_16.png

目前只看非顺序消息,逻辑就比较简单了,丢弃之前,先将MessageQueue持久化,然后丢弃,重新被其他消费者加载。顺序消息应该会在消息消费进度存储之后详细分析。

接下来处理MessageQueue的ProcessQueue,也就是在ProcessQueueTable中没有mq的处理队列(因为重新负载后,可能会分配一些新的队列)

201908231006_17.png

主要就是在内存中移除 MessageQueue的offerset,然后计算下一个拉取偏移量,然后每一个MessageQueue创建一个拉取任务(PullRequest)。

201908231006_18.png

RebalancePushImpl

201908231006_19.png

PullMessageService

201908231006_20.png

往PullServiceMessage中的 pullRequestQueue中放入PullRequest,则PullMessageService线程 的run方法就不会阻塞

201908231006_21.png

part5:如果消息负载发生变化,需处理

201908231006_22.png

主要是调整主题小各个队列的拉取阔值

201908231006_23.png

这里,主要看出来当消费者挂断后,或主题消息队列动态变化后,消息负载会发生变化的重新分布情况。

本文主要阐述了消息消费端负载机制,这里消息非顺序消息机制就梳理到这里了,大概再总结一下:

1、首先RebalanceService线程启动,为消费者分配消息队列,其实每一个MessageQueue会构建一个PullRequest 对象,然后通过RebalanceImpl将PullRequest放入到

PullMessageService线程的LinkedBlockingQueue,进而唤醒 queue.take()方法,然后执行DefaultMQPushConsumerImpl 的 pullMessage,通过网络从broker端拉取消息,一次最多拉取的消息条数可配置,默认为1条,然后然后将拉取的消息,执行过滤等,然后封装成任务(ConsumeRequest),提交到消费者的线程池去执行,每次消费消息后,又将该PullRequest放入到PullMessageService中(DefaultMQPushConsumerImpl 的机制就是pullInterval – 0;

下文预告:

CommitLog写入与ConsumeQueue队列的持久化机制

消息消费进度存储机制,再谈RocketMQ消息存储

RocketMQ顺序消息

RocketMQ主从机制

赞(0) 打赏

如未加特殊说明,此网站文章均为原创,转载必须注明出处。Java 技术驿站 » RocketMQ源码分析之消息消费机制—-消费端消息负载均衡机制与重新分布
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

关注【Java 技术驿站】公众号,每天早上 8:10 为你推送一篇技术文章

扫描二维码关注我!


关注【Java 技术驿站】公众号 回复 “VIP”,获取 VIP 地址永久关闭弹出窗口

免费获取资源

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏