源码分析RocketMQ之消息消费

撸了今年阿里、腾讯和美团的面试,我有一个重要发现…….

作者:dingwpmz

出处:https://blog.csdn.net/prestigeding/article/details/78888290


1、消息消费关注点

1)消息消费方式:拉取、推送

2)消费者组与消费模式

consumerGroup;

MessageModel messageModel;

多个消费者组成一个消费组,两种模式:集群(消息被其中任何一个消息者消费)、广播模式(全部消费者消费)

3)ConsumeFromWhere consumeFromWhere

CONSUME_FROM_LAST_OFFSET 上一次消费偏移量

CONSUME_FROM_FIRST_OFFSET 从头开始

CONSUME_FROM_TIMESTAMP 从某个时间点开始

4)消息订阅主题

Map subscription

5)消费进度

OffsetStore offsetStore

消费者需要记录消息消费的进度

广播模式:广播模式由于每个消费者都需要消费消息,故消息的进度(最后消费的偏移量可以保存在本地)

集群模式:由于集群中的消费者只要一个消费消息即可,故消息的消费进度,需要保存在集中点,比如分布式缓存、

RocketMQ存储在Broker所在的服务器。

6)消费者线程(待后续讲解)

private int consumeThreadMin = 20;

private int consumeThreadMax = 64;

private long adjustThreadPoolNumsThreshold = 100000;

private int consumeConcurrentlyMaxSpan = 2000;

2、消息消费实现

消费端demo:

201908231003_1.png

使用推送模式,设置消费者所属组,订阅主题、定义消息消费回调接口,推送消息后消费方具体业务处理,并返回CONSUME_SUCCESS表示消费成功。

消息消费者具体实现类:org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl

2.1 DefaultMQPushConsumerImpl

2.1.1 消费端初始化(构造方法)

201908231003_2.png

201908231003_3.png

201908231003_4.png

然后开始重点从start方法深入研究 DefaultMQPushConsumerImpl的内部机制

    public synchronized void start() throws MQClientException {
            switch (this.serviceState) {  
                case CREATE_JUST:
                    log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                        this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                    this.serviceState = ServiceState.START_FAILED;     // @1

                    this.checkConfig();                                                 //@2

                    this.copySubscription();                                         //@3

                    if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {    // @4
                        this.defaultMQPushConsumer.changeInstanceNameToPID();
                    }

                    this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

                    this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());    // @5
                    this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                    this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                    this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

                    this.pullAPIWrapper = new PullAPIWrapper(
                        mQClientFactory,
                        this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                    this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);   //@6

                    if (this.defaultMQPushConsumer.getOffsetStore() != null) {    // @7
                        this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                    } else {
                        switch (this.defaultMQPushConsumer.getMessageModel()) {
                            case BROADCASTING:
                                this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                                break;
                            case CLUSTERING:
                                this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                                break;
                            default:
                                break;
                        }
                    }
                    this.offsetStore.load();         //@8

                    if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                        this.consumeOrderly = true;
                        this.consumeMessageService =
                            new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                    } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                        this.consumeOrderly = false;
                        this.consumeMessageService =
                            new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                    }

                    this.consumeMessageService.start();   //@9

                    boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);   //@10
                    if (!registerOK) {
                        this.serviceState = ServiceState.CREATE_JUST;
                        this.consumeMessageService.shutdown();
                        throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                            null);
                    }

                    mQClientFactory.start();
                    log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                case START_FAILED:
                case SHUTDOWN_ALREADY:
                    throw new MQClientException("The PushConsumer service state not OK, maybe started once, "//
                        + this.serviceState//
                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                        null);
                default:
                    break;
            }

            this.updateTopicSubscribeInfoWhenSubscriptionChanged();    //@11
            this.mQClientFactory.checkClientInBroker();                               //@12
            this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();   //@13
            this.mQClientFactory.rebalanceImmediately();                          //@14
        }

代码@2, checkConfig,检查配置信息,主要检查 消费者组(consumeGroup)、消息消费方式(messageModel)、消息消费开始偏移量(consumeFromWhere)、消息队列分配算法(AllocateMessageQueueStrategy)、订阅消息主题(Map<topic,sub expression ),消息回调监听器(MessageListener)、顺序消息模式时是否只有一个消息队列等等。

代码@3,copySubscription加工订阅信息,将Map,同时,如果消息消费模式为集群模式,还需要为该消费组对应一个重试主题。

代码@4,如果消息消费模式为集群模式,并且当前的实例名为DEFAULT,替换为当前客户端进程的PID。

代码@5,负载均衡相关实现,后文重点关注

代码@6, pullAPIWrapper ,后文重点关注

代码@7,消费进度存储,如果是集群模式,使用远程存储 RemoteBrokerOffsetStore,如果是广播模式,则使用本地存储LocalFileOffsetStore,后文重点关注

代码@8,加载消息进度。

代码@9,消息消费服务并启动,后文重点关注

代码@10,向远程Broker服务器注册消费者

代码@11,更新订阅新

代码@12,检测broker状态

代码@13,发送心跳

代码@14,重新负载。

上面包含的信息量巨大,我们一一进行分析

第一步:我们先来确定消息消费的一些核心类的作用

2.1.1.1 MQClientInstance 概述

    this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
    public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
            String clientId = clientConfig.buildMQClientId();
            MQClientInstance instance = this.factoryTable.get(clientId);
            if (null == instance) {
                instance =
                    new MQClientInstance(clientConfig.cloneClientConfig(),
                        this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
                MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
                if (prev != null) {
                    instance = prev;
                    log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
                } else {
                    log.info("Created new MQClientInstance for clientId:[{}]", clientId);
                }
            }

            return instance;
        }
    public String buildMQClientId() {
            StringBuilder sb = new StringBuilder();
            sb.append(this.getClientIP());
            sb.append("@");
            sb.append(this.getInstanceName());
            if (!UtilAll.isBlank(this.unitName)) {
                sb.append("@");
                sb.append(this.unitName);
            }
            return sb.toString();
        }
    private String clientIP = RemotingUtil.getLocalAddress();

从这段代码可以看成,一个客户端IP@InstanceName只会持有一个 MQClientInstance对象,MQClientInstance无论是消费者还是生产者,都在应用程序这一端;

有了这一层认识,我们就重点关注一下该类的属性:

1) ClientConfig clientConfig;

2) int instanceIndex;

MQClientInstance在同一台机器上的创建序号

3) String clientId; 客户端id

4) ConcurrentMap<String/* group */, MQProducerInner> producerTable

生产组–》消息生产者,也就是在应用程式一端,每个生产者组,在同一台应用服务器只需要初始化一个生产者(MQProducerInner)

5)ConcurrentMap<String/* group */, MQConsumerInner> consumerTable

消费组–》消费者,也就是在应用程序一 端 ,每个消费组,在同一台应用服务器只需要初始化一个消费者即可。

6)ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable

同理

7)NettyClientConfig nettyClientConfig;

8)NettyClientConfig nettyClientConfig;

9)MQClientAPIImpl mQClientAPIImpl;

10)MQAdminImpl mQAdminImpl;

11)ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable

topic路由信息

12)Lock lockNamesrv = new ReentrantLock();

13)Lock lockHeartbeat = new ReentrantLock();

14)ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable

broker信息,这些信息存在于NameServer,但缓存在本地客户端,供生产者、消费者共同使用

15)private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {

@Override

public Thread newThread(Runnable r) {

return new Thread(r, “MQClientFactoryScheduledThread”);

}

});

16)ClientRemotingProcessor clientRemotingProcessor;

17)PullMessageService pullMessageService;

拉取服务

18)RebalanceService rebalanceService;

拉取负载均衡,下文重点关注

19)DefaultMQProducer defaultMQProducer;

默认的消息生产者

20)ConsumerStatsManager consumerStatsManager;

消费端统计

21)AtomicLong sendHeartbeatTimesTotal = new AtomicLong(0);

心跳包发送次数

22)ServiceState serviceState = ServiceState.CREATE_JUST;

状态

23)DatagramSocket datagramSocket;

24)Random random = new Random();

我们在消息发送时,也与这个类打过交道,就是从NameServer获取topic的路由信息并缓存之,现在我们先不重点关注每个方法,方法用到后再细说了,目前我做一个简单的总结:

MQClientInstance mq客户端实例,每台应用服务器将持有一个MQClientInstance对象,供该应用服务器的消费者,生产者使用。该类是消费者,生产者网络处

理的出口类。

2.1.1.2 RebalanceImpl

同样,在这里,我们重点先理解该类的作用

RebalanceImpl是consume的重新负载,什么意思呢?就是消费者与消费队列的对应关系,我们来思考一个问题,比如现在有4个消息队列(q1,q2,q3,q4),3个消费者(m1,m2,m3),那么消费者与消息队列的对应关系是什么呢?我们按照一个轮询算法来表示, m1(q1,q4) m2(q2) m3(q3),如果此时q2消息队列失效(所在的broker挂了),那么消息队列的消费就需要重新分配,RebalanceImpl就是干这事的,该类的调用轨迹如下:(MQClientInstance start –> (this.rebalanceService.start()) —> RebalanceService.run(this.mqClientFactory.doRebalance()) —> MQConsumerInner.doRebalance(DefaultMQPushConsumerImpl) —> RebalanceImpl.doRebalance

在这里着重说明一点:消息队列数量与消费者关系:1个消费者可以消费多个队列,但1个消息队列只会被一个消费者消费;如果消费者数量大于消息队列数量,则有的消费者会消费不到消息(集群模式)

2.1.2 消息消费过程

消息的消费过程,就是从服务器拉取,然后消费者进行消费,再根据业务反馈是否成功消费来推动消费进度,也就是消息的消费进度并不是保存在服务端(比如commitlog文件中),而是保存在消费端(可以是本地(广播模式)、broke端(集群模式))具体的代码实现:

    public void pullMessage(final PullRequest pullRequest) {
            final ProcessQueue processQueue = pullRequest.getProcessQueue();
            if (processQueue.isDropped()) {
                log.info("the pull request[{}] is dropped.", pullRequest.toString());
                return;
            }

            pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());

            try {
                this.makeSureStateOK();
            } catch (MQClientException e) {
                log.warn("pullMessage exception, consumer state not ok", e);
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
                return;
            }

            if (this.isPause()) {
                log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
                return;
            }

            long size = processQueue.getMsgCount().get();
            if (size > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                if ((flowControlTimes1++ % 1000) == 0) {
                    log.warn(
                        "the consumer message buffer is full, so do flow control, minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}",
                        processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), size, pullRequest, flowControlTimes1);
                }
                return;
            }

            if (!this.consumeOrderly) {
                if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
                    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                    if ((flowControlTimes2++ % 1000) == 0) {
                        log.warn(
                            "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
                            processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
                            pullRequest, flowControlTimes2);
                    }
                    return;
                }
            } else {
                if (processQueue.isLocked()) {
                    if (!pullRequest.isLockedFirst()) {
                        final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
                        boolean brokerBusy = offset < pullRequest.getNextOffset();
                        log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
                            pullRequest, offset, brokerBusy);
                        if (brokerBusy) {
                            log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
                                pullRequest, offset);
                        }

                        pullRequest.setLockedFirst(true);
                        pullRequest.setNextOffset(offset);
                    }
                } else {
                    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
                    log.info("pull message later because not locked in broker, {}", pullRequest);
                    return;
                }
            }

            final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
            if (null == subscriptionData) {
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
                log.warn("find the consumer's subscription failed, {}", pullRequest);
                return;
            }

            final long beginTimestamp = System.currentTimeMillis();

            PullCallback pullCallback = new PullCallback() {
                @Override
                public void onSuccess(PullResult pullResult) {
                    if (pullResult != null) {
                        pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                            subscriptionData);

                        switch (pullResult.getPullStatus()) {
                            case FOUND:
                                long prevRequestOffset = pullRequest.getNextOffset();
                                pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                                long pullRT = System.currentTimeMillis() - beginTimestamp;
                                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                    pullRequest.getMessageQueue().getTopic(), pullRT);

                                long firstMsgOffset = Long.MAX_VALUE;
                                if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                } else {
                                    firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

                                    DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                        pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());

                                    boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                    DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
                                        pullResult.getMsgFoundList(), //
                                        processQueue, //
                                        pullRequest.getMessageQueue(), //
                                        dispathToConsume);

                                    if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                            DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                    } else {
                                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                    }
                                }

                                if (pullResult.getNextBeginOffset() < prevRequestOffset//
                                    || firstMsgOffset < prevRequestOffset) {
                                    log.warn(
                                        "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", //
                                        pullResult.getNextBeginOffset(), //
                                        firstMsgOffset, //
                                        prevRequestOffset);
                                }

                                break;
                            case NO_NEW_MSG:
                                pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                                DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                break;
                            case NO_MATCHED_MSG:
                                pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                                DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                break;
                            case OFFSET_ILLEGAL:
                                log.warn("the pull request offset illegal, {} {}", //
                                    pullRequest.toString(), pullResult.toString());
                                pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                                pullRequest.getProcessQueue().setDropped(true);
                                DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {

                                    @Override
                                    public void run() {
                                        try {
                                            DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                                pullRequest.getNextOffset(), false);

                                            DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());

                                            DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());

                                            log.warn("fix the pull request offset, {}", pullRequest);
                                        } catch (Throwable e) {
                                            log.error("executeTaskLater Exception", e);
                                        }
                                    }
                                }, 10000);
                                break;
                            default:
                                break;
                        }
                    }
                }

                @Override
                public void onException(Throwable e) {
                    if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("execute the pull request exception", e);
                    }

                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
                }
            };

            boolean commitOffsetEnable = false;
            long commitOffsetValue = 0L;
            if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
                commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
                if (commitOffsetValue > 0) {
                    commitOffsetEnable = true;
                }
            }

            String subExpression = null;
            boolean classFilter = false;
            SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
            if (sd != null) {
                if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
                    subExpression = sd.getSubString();
                }

                classFilter = sd.isClassFilterMode();
            }

            int sysFlag = PullSysFlag.buildSysFlag(//
                commitOffsetEnable, // commitOffset
                true, // suspend
                subExpression != null, // subscription
                classFilter // class filter
            );
            try {
                this.pullAPIWrapper.pullKernelImpl(//
                    pullRequest.getMessageQueue(), // 1
                    subExpression, // 2
                    subscriptionData.getExpressionType(), // 3
                    subscriptionData.getSubVersion(), // 4
                    pullRequest.getNextOffset(), // 5
                    this.defaultMQPushConsumer.getPullBatchSize(), // 6
                    sysFlag, // 7
                    commitOffsetValue, // 8
                    BROKER_SUSPEND_MAX_TIME_MILLIS, // 9
                    CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 10
                    CommunicationMode.ASYNC, // 11
                    pullCallback // 12
                );
            } catch (Exception e) {
                log.error("pullKernelImpl exception", e);
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            }
        }

消费过程暂不详细讲解了,后续文章会进一步对消息消费的各个方面做出详细的剖析。

本文旨在梳理出消息的消费过程,消息队列与消费者的负载、消费模式(广播、集群),消息进度等等。

后续文章重点关注:

1、消费进度反馈源码分析

2、消费者消息负载实现原理

3、消息过滤与tag机制

4、常用问题分析

1)一个消费者多个线程(线程之间如何协调)

赞(0) 打赏

如未加特殊说明,此网站文章均为原创,转载必须注明出处。Java 技术驿站 » 源码分析RocketMQ之消息消费
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

关注【Java 技术驿站】公众号,每天早上 8:10 为你推送一篇技术文章

扫描二维码关注我!


关注【Java 技术驿站】公众号 回复 “VIP”,获取 VIP 地址永久关闭弹出窗口

免费获取资源

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏