一、原理
以集群消费为例,集群消息在同一个消费组中只能有一个消费者可以消费到这条消息;假如最开始我们有一个叫TopicA的主体,TopicA中有8个MessageQueue;有个消费组ConsumerGroupA;在最开始的时候我们只启动一个consumer1消费者实例,即consumer1这一个消费实例将消费这8个queue,如下图:
然后我们在启动一个consumer2消费者实例,其会向Broker注册;这时broker发现ConsumerGroupA中新增了一个消费者实例,其会通知consumer1:嗨,哥们,你们这个消费组的实例发生了变化,你重新负载均衡一下吧。
这时consumer2在注册的过程中,也会进行负载均衡;它首先会从broker获取ConsumerGroupA下的所有的消费者实例,发现有两个;接着获取订阅的TopicA下的MessageQueue信息集合,一看有8个,然后其会根据负载均衡算法(默认是平均分配)算出可以分几个MessageQueue;8个queue、2个consumer实例,一人4个queue刚刚好。
这时,我们先看一下consumer1它是怎么做的,原本有8个queue,包括它queue4、queue5、queue6、queue7 ,现在不能有了,它会将本地维护的MessageQueue对应的ProcessQueue清理掉,ProcessQueue我们在前面有介绍过:RocketMQ源码解析:ProcessQueue的作用。再看consumer2实例,其新加了4个queue,他会遍历这4个queue,获取每个queue的元数据信息(包括:开始消费的offset、队列ID queueId、broker民称 brokerName),然后为每个MessageQueue生成一个对应的ProcessQueue;最后将MessageQueue和ProcessQueue封装成PullRequest拉取消息请求,并放入到PullMessageService的pullRequestQueue阻塞队列中;然后由PullMessageService服务执行后续的拉取消息动作。关于PullMessageService拉取消息的介绍,我们前面也有聊过:RocketMQ源码分析pullMessage:Consumer是如何从broker拉取消息的?。
rebalance负载均衡就这!!感觉就是同一个消费组中消费者实例如何分配MessageQueue,下面我们来看看RocketMQ源码中是怎么实现的。
二、源码分析
以下所有分析相关的源码注释请见GitHub中的release-4.8.0分支:https://github.com/Saint9768/rocketmq/tree/rocketmq-all-4.8.0
1、问:负载均衡服务什么使用启动的?
1、consumer.start()启动消费者时,其中会执行mQClientFactory.start()操作启动MQClientInstance;
此处的DefaultMQPushConsumerImpl为消费者Push方式的具体实现类。
2、MQClientInstance.start()方法中会启动负载均衡服务rebalanceService
;
2、RebalanceService开始套娃
因为RebalanceServie是一个Runnable线程,我们看一下它的run()方法,发现它只是在判断RebalanceService线程状态之后,调用MQClientInstance#doRebalance()方法。注意,RebalanceService线程会先睡眠waitInterval毫秒,即20s,如果我们不提前唤醒它,整个Consumer消费实例是没有办法工作的,所以我们在DefaultMQPushConsumerImplement#start()的最后会发现一个理解唤醒RebalanceService负载均衡服务的逻辑
。
来,我们继续看MQClientInstance#doRebalance()方法。
在MQClientInstance#doRebalance()方法中,它未当前JVM下所有consumer做负载均衡操作,通过调用MQConsumerInner的doRebalance()方法。
而MQConsumerInner是一个接口,我们需要去它的实现中看具体的实现逻辑。此处我们以Push模式为例,即DefaultMQPushConsumerImpl。
套娃还没结束,这里会传入是否为顺序消费信息,然后调用RebalanceImpl#doRebalance()方法。
这里开始便是负载均衡的核心逻辑。在RebalanceImpl类中,doRebalance()
方法一方面要以topic为维度进行consumer负载均衡,一方面要在负载均衡后,移除topic中不需要再消费的MessageQueue信息(ProcessQueue)。
然而truncateMessageQueueNotMyTopic()
方法中实际没有操作PutMessageService
中pullRequestQueue
这个阻塞队列中的PullRequest
,只是将MessageQueue
关联的ProcessQueue从processQueueTable中移除,并将ProcessQueue设置为drop
状态。这样Consumer就不会再拉取相应MessageQueue对应的Broker存的消息。
这里也就是我们上面说的consumer1消费者实例移除queue4/5/6/7所做的操作。
3、RebalanceImpl#rebalanceByTopic() — 真正负载均衡的地方
我们进入rebalanceByTopic()方法,发现其会根据消息的传播方式为BROADCASTING 或 CLUSTERING做不同的逻辑;对于广播模式,约等于不需要做负载均衡,当前消费者会直接消费所有的队列。
1、我们重点看一下集群消息的负载均衡是如何做的?
我们把集群消息的负载均衡分为五步来看,下面我们一步一步的介绍:
1)第一步
从Topic订阅表中获取
当前topic下的所有MessageQueue
(MessageQueue的信息是定时从NameServer中拉取到的),然后根据Topic从broker中获取当前ConsumerGroup下的所有消费者实例
。
2)第二步
对MessageQueue集合和ConsumerID集合进行排序
,以保证每个Consumer实例对应的MessageQueue集合顺序是一致的,并且这一步对于下一步的负载均衡算法及其重要。
3)第三步
使用负载均衡算法进行重新负载,默认使用AllocateMessageQueueAveragely 平均分配算法;我们接着来看一下AllocateMessageQueueStrategy#allocate()方法是如何对Queue进行负载的。
就像我们写的代码一样,前面常规一堆校验。我们直接看下平均分配这个负载均衡算法的实现:
(1)获取当前客户端ID(currentCID)在消费者集合(cidAll)中的索引位置;
(2)计算MessageQueue能不能 没“零头”的平均分配到每个Consumer上,如果不能就把“零头”记下来;
(3)接着计算当前消费者能分到几个MessageQueue去消费,计算规则如下:
- 首先队列个数 是否小于等于 消费者个数;如果小于,则一个consumer最多只能分配到一个queue;
- 否者判断queue是否有“零头”;如果有零头 并且 当前client在消费者集合中的索引位置 小于 “零头”, 当前consumer可以多分到一个queue;否者就不管“零头”,平均分配就得了。这里的索引位置就体现出了第二步时对消费者集合排序的重要性。
(4)计算当前consumer可以从MessageQueue集合中取MessageQueue的startIndex。
(5)结合上面计算的averageSize算出真正要取的queue个数。
我们以实际数据来推演一下:
比如我们有8个queue、一个ConsumerGroup下有3个consumer实例(消费者0、消费者1、消费者2),并且当前consumer客户端在consumer集合的0索引处,即是消费者0。
- 首先按照上面的算法,我们可以算出每个consumer客户端可以分配的队列数averageSize。消费者0分到3个queue、消费者1分到3个queue、消费者2分到2个queue。
- 然后计算从MessageQueue集合中开始取MessageQueue的索引位置。消费者0从0开始取、消费者1从3开始取、消费者2从6开始取。
- 最后计算取的范围,也就是可以分几个queue。消费者0从0开始取3个、消费者1从3开始取3个、消费者2从6开始取2个。
来,我们细品,如果不对MessageQueue集合和consumer集合进行排序的话,每个Consumer取到的MessageQueue个数不会变,但是大概念会重复,因为大家都是随便取的,所以排序很重要。
4)第四步
更新当前Topic下的所有MessageQueue和对应的ProcessQueue信息,比如MessageQueue的新增、删除等操作。这里我们分三块(移除不再需要关注的MessageQueue/ProcessQueue、新增MessageQueue/ProcessQueue、分配PullRequest拉取消息请求)来看。
(1)遍历ProcessQueueTable这个Map,如果负载均衡后分配的queue中不在包含之前的queue,则就将之前的queue移除;如果之前的queue过了120s没有拉取d到消息了,也会将它销毁。
我们来看看是怎么销毁queue的:
- 先是设置ProcessQueue的状态为dropped;
- 然后移除没必要的queue,即将这个queue的消费offset上报给broker;另外如果是顺序消费的话,向broker发送释放锁请求。
(2)处理需要新增的MessageQueue,级联创建对应的ProcessQueue。
- 遍历负载均衡后传递过来的queue集合,当processQueueTable中不包含MessageQueue时执行下列逻辑:
- 顺序消费时,如果MessageQueue没有被锁定成功,跳过当前MessageQueue,不进行消费操作。
- 清空MessageQueue的消费进度信息,计算下个要消费消息的offset:
- 如果是
CONSUME_FROM_LAST_OFFSET
,则从Broker中获取该queue的最大offset信息,即从最后开始消费。- 如果是
CONSUME_FROM_FIRST_OFFSET
,则先去Broker中获取最后消费到了哪个offset,如果还没消费则从0开始,否则从获取到的offset开始消费。- 如果是
CONSUME_FROM_TIMESTAMP
,则先从broker中获取最后消费到的offset,如果还没消费,则去broker上找我们当前时间对应的一个offset。
- 组装
ProcessQueue
,然后将组装好的ProcessQueue放入到processQueueTable
中,并将其封装到PullRequest
中放入到pullRequestList。
(3)分配PullRequest拉取消息请求,将pullRequestList加入到PullMessageService的pullRequestQueue中,以供执行拉取消息操作。最后调用RebalanceImpl#dispatchPullRequest()方法 将PullRequeat对象放入到PullMessageService
拉取消息服务的pullRequestQueue
阻塞队列中。
5)第五步
如果第四步中MessageQueue或ProcessQueue发生了变化,会执行消费队列分配发生变更后的逻辑。
这里会首先给订阅信息设置一个版本号,然后计算一些拉取消息的阈值,包括:Topic级别的拉取阈值、拉取消息大小阈值;
- 如果配置了topic级别的拉取阈值,计算出来每个queue的拉取阈值,即topic的限制阈值 / queue的数量,然后设置到DefaultMQPushConsumer的pullThresholdForQueue 参数中。
- 同理计算Topic级别拉取消息大小的阈值。
最后向broker 发送心跳包,通知一下Broker。
三、总结
rabalance其实就是解决一些MessageQueue应该如何分给一些消费者的问题。负载均衡操作还牵扯Consumer拉取消息的请求源。