RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)
  Xa9NVipNa2Vn 2023年11月02日 65 0


一、原理

以集群消费为例,集群消息在同一个消费组中只能有一个消费者可以消费到这条消息;假如最开始我们有一个叫TopicA的主体,TopicA中有8个MessageQueue;有个消费组ConsumerGroupA;在最开始的时候我们只启动一个consumer1消费者实例,即consumer1这一个消费实例将消费这8个queue,如下图:

RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)_负载均衡


然后我们在启动一个consumer2消费者实例,其会向Broker注册;这时broker发现ConsumerGroupA中新增了一个消费者实例,其会通知consumer1:嗨,哥们,你们这个消费组的实例发生了变化,你重新负载均衡一下吧。

这时consumer2在注册的过程中,也会进行负载均衡;它首先会从broker获取ConsumerGroupA下的所有的消费者实例,发现有两个;接着获取订阅的TopicA下的MessageQueue信息集合,一看有8个,然后其会根据负载均衡算法(默认是平均分配)算出可以分几个MessageQueue;8个queue、2个consumer实例,一人4个queue刚刚好。

RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)_负载均衡_02


这时,我们先看一下consumer1它是怎么做的,原本有8个queue,包括它queue4、queue5、queue6、queue7 ,现在不能有了,它会将本地维护的MessageQueue对应的ProcessQueue清理掉,ProcessQueue我们在前面有介绍过:​​RocketMQ源码解析:ProcessQueue的作用​​​。再看consumer2实例,其新加了4个queue,他会遍历这4个queue,获取每个queue的元数据信息(包括:开始消费的offset、队列ID queueId、broker民称 brokerName),然后为每个MessageQueue生成一个对应的ProcessQueue;最后将MessageQueue和ProcessQueue封装成PullRequest拉取消息请求,并放入到PullMessageService的pullRequestQueue阻塞队列中;然后由PullMessageService服务执行后续的拉取消息动作。关于PullMessageService拉取消息的介绍,我们前面也有聊过:​​RocketMQ源码分析pullMessage:Consumer是如何从broker拉取消息的?​​。

rebalance负载均衡就这!!感觉就是同一个消费组中消费者实例如何分配MessageQueue,下面我们来看看RocketMQ源码中是怎么实现的。

二、源码分析

以下所有分析相关的源码注释请见GitHub中的release-4.8.0分支:​​https://github.com/Saint9768/rocketmq/tree/rocketmq-all-4.8.0​

RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)_RocketMQ_03

1、问:负载均衡服务什么使用启动的?

1、consumer.start()启动消费者时,其中会执行mQClientFactory.start()操作启动MQClientInstance;

此处的DefaultMQPushConsumerImpl为消费者Push方式的具体实现类。

RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)_客户端_04


RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)_java_05


2、MQClientInstance.start()方法中会启动负载均衡服务​​rebalanceService​​;

RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)_客户端_06

2、RebalanceService开始套娃

RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)_RocketMQ_07


因为RebalanceServie是一个Runnable线程,我们看一下它的run()方法,发现它只是在判断RebalanceService线程状态之后,调用MQClientInstance#doRebalance()方法。​​注意,RebalanceService线程会先睡眠waitInterval毫秒,即20s,如果我们不提前唤醒它,整个Consumer消费实例是没有办法工作的,所以我们在DefaultMQPushConsumerImplement#start()的最后会发现一个理解唤醒RebalanceService负载均衡服务的逻辑​​。

RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)_客户端_08


RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)_客户端_09


来,我们继续看MQClientInstance#doRebalance()方法。

RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)_负载均衡_10


在MQClientInstance#doRebalance()方法中,它未当前JVM下所有consumer做负载均衡操作,通过调用MQConsumerInner的doRebalance()方法。

RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)_RocketMQ_11


而MQConsumerInner是一个接口,我们需要去它的实现中看具体的实现逻辑。此处我们以Push模式为例,即DefaultMQPushConsumerImpl。

RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)_负载均衡_12


套娃还没结束,这里会传入是否为顺序消费信息,然后调用RebalanceImpl#doRebalance()方法。

RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)_阻塞队列_13


这里开始便是负载均衡的核心逻辑。在RebalanceImpl类中,​​doRebalance()​​方法一方面要以topic为维度进行consumer负载均衡,一方面要在负载均衡后,移除topic中不需要再消费的MessageQueue信息(ProcessQueue)。

RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)_阻塞队列_14


然而​​truncateMessageQueueNotMyTopic()​​​方法中实际没有操作​​PutMessageService​​​中​​pullRequestQueue​​​这个阻塞队列中的​​PullRequest​​​,只是​​将MessageQueue​​​关联的ProcessQueue从processQueueTable中移除,并将ProcessQueue设置为​​drop​​状态。这样Consumer就不会再拉取相应MessageQueue对应的Broker存的消息。

这里也就是我们上面说的consumer1消费者实例移除queue4/5/6/7所做的操作。

3、RebalanceImpl#rebalanceByTopic() — 真正负载均衡的地方

private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
case CLUSTERING: {
// 获取topic下的所有queue元数据信息
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// 获取topic下的所有consumer
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}

if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}

if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);

// 将consumer和MessageQueue排序
Collections.sort(mqAll);
Collections.sort(cidAll);

// 获取负载均衡策略
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}

Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}

// 真正执行消费的方法
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}

我们进入rebalanceByTopic()方法,发现其会根据消息的传播方式为BROADCASTING 或 CLUSTERING做不同的逻辑;对于广播模式,约等于不需要做负载均衡,当前消费者会直接消费所有的队列。

RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)_客户端_15

1、我们重点看一下集群消息的负载均衡是如何做的?

RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)_客户端_16


我们把集群消息的负载均衡分为五步来看,下面我们一步一步的介绍:

1)第一步

从Topic订阅表中​​获取​​​当前topic下的​​所有MessageQueue​​​(MessageQueue的信息是定时从NameServer中拉取到的),然后根据Topic从broker中获取当前​​ConsumerGroup下的所有消费者实例​​。

2)第二步

对​​MessageQueue集合和ConsumerID集合进行排序​​,以保证每个Consumer实例对应的MessageQueue集合顺序是一致的,并且这一步对于下一步的负载均衡算法及其重要。

3)第三步

使用负载均衡算法进行重新负载,默认使用AllocateMessageQueueAveragely 平均分配算法;我们接着来看一下AllocateMessageQueueStrategy#allocate()方法是如何对Queue进行负载的。

RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)_RocketMQ_17


RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)_阻塞队列_18

就像我们写的代码一样,前面常规一堆校验。我们直接看下平均分配这个负载均衡算法的实现:

(1)获取当前客户端ID(currentCID)在消费者集合(cidAll)中的索引位置;
(2)计算MessageQueue能不能 没“零头”的平均分配到每个Consumer上,如果不能就把“零头”记下来;
(3)接着计算当前消费者能分到几个MessageQueue去消费,计算规则如下:

  1. 首先队列个数 是否小于等于 消费者个数;如果小于,则一个consumer最多只能分配到一个queue;
  2. 否者判断queue是否有“零头”;如果有零头 并且 当前client在消费者集合中的索引位置 小于 “零头”, 当前consumer可以多分到一个queue;否者就不管“零头”,平均分配就得了。这里的索引位置就体现出了第二步时对消费者集合排序的重要性。
    (4)计算当前consumer可以从MessageQueue集合中取MessageQueue的startIndex。
    (5)结合上面计算的averageSize算出真正要取的queue个数。

我们以实际数据来推演一下:

比如我们有8个queue、一个ConsumerGroup下有3个consumer实例(消费者0、消费者1、消费者2),并且当前consumer客户端在consumer集合的0索引处,即是消费者0。

  • 首先按照上面的算法,我们可以算出每个consumer客户端可以分配的队列数averageSize。消费者0分到3个queue、消费者1分到3个queue、消费者2分到2个queue。
  • RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)_阻塞队列_19

  • 然后计算从MessageQueue集合中开始取MessageQueue的索引位置。消费者0从0开始取、消费者1从3开始取、消费者2从6开始取。
  • 最后计算取的范围,也就是可以分几个queue。消费者0从0开始取3个、消费者1从3开始取3个、消费者2从6开始取2个。
  • RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)_RocketMQ_20

来,我们细品,如果不对MessageQueue集合和consumer集合进行排序的话,每个Consumer取到的MessageQueue个数不会变,但是大概念会重复,因为大家都是随便取的,所以排序很重要。

4)第四步

更新当前Topic下的所有MessageQueue和对应的ProcessQueue信息,比如MessageQueue的新增、删除等操作。这里我们分三块(移除不再需要关注的MessageQueue/ProcessQueue、新增MessageQueue/ProcessQueue、分配PullRequest拉取消息请求)来看。

RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)_java_21


(1)遍历ProcessQueueTable这个Map,如果负载均衡后分配的queue中不在包含之前的queue,则就将之前的queue移除;如果之前的queue过了120s没有拉取d到消息了,也会将它销毁。

我们来看看是怎么销毁queue的:

  • 先是设置ProcessQueue的状态为dropped;
  • 然后移除没必要的queue,即将这个queue的消费offset上报给broker;另外如果是顺序消费的话,向broker发送释放锁请求。

RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)_客户端_22


(2)处理需要新增的MessageQueue,级联创建对应的ProcessQueue。

  • 遍历负载均衡后传递过来的queue集合,当processQueueTable中不包含MessageQueue时执行下列逻辑:
  1. 顺序消费时,如果MessageQueue没有被锁定成功,跳过当前MessageQueue,不进行消费操作。
  2. 清空MessageQueue的消费进度信息,计算下个要消费消息的offset:
  • 如果是​​CONSUME_FROM_LAST_OFFSET​​,则从Broker中获取该queue的最大offset信息,即从最后开始消费。
  • 如果是​​CONSUME_FROM_FIRST_OFFSET​​,则先去Broker中获取最后消费到了哪个offset,如果还没消费则从0开始,否则从获取到的offset开始消费。
  • 如果是​​CONSUME_FROM_TIMESTAMP​​,则先从broker中获取最后消费到的offset,如果还没消费,则去broker上找我们当前时间对应的一个offset。
  1. 组装​​ProcessQueue​​​,然后将组装好的ProcessQueue放入到​​processQueueTable​​​中,并将其封装到​​PullRequest​​中放入到pullRequestList。

RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)_java_23


(3)分配PullRequest拉取消息请求,将pullRequestList加入到PullMessageService的pullRequestQueue中,以供执行拉取消息操作。最后调用RebalanceImpl#dispatchPullRequest()方法 将PullRequeat对象放入到​​PullMessageService​​​拉取消息服务的​​pullRequestQueue​​阻塞队列中。

RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)_负载均衡_24


RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)_阻塞队列_25


RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)_阻塞队列_26

5)第五步

如果第四步中MessageQueue或ProcessQueue发生了变化,会执行消费队列分配发生变更后的逻辑。

RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)_RocketMQ_27

这里会首先给订阅信息设置一个版本号,然后计算一些拉取消息的阈值,包括:Topic级别的拉取阈值、拉取消息大小阈值;

  • 如果配置了topic级别的拉取阈值,计算出来每个queue的拉取阈值,即topic的限制阈值 / queue的数量,然后设置到DefaultMQPushConsumer的pullThresholdForQueue 参数中。
  • 同理计算Topic级别拉取消息大小的阈值。

最后向broker 发送心跳包,通知一下Broker。

三、总结

rabalance其实就是解决一些MessageQueue应该如何分给一些消费者的问题。负载均衡操作还牵扯Consumer拉取消息的请求源。


【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
  ehrZuhofWJiC   2024年04月26日   40   0   0 日志Java
Xa9NVipNa2Vn