「rocketMQ」关于消费者的负载均衡问题
本文是基于初次接触RocketMQ的新手视角,来展开的
我觉得非常有必要记录一下,这次的分析思路和解决问题的方式
抛出问题
“如果把消费者打成jar包,在本地同时运行两个消费者进程,观察终端输出,解释一下原因”
下面开始设计实验,分析原因
实验参数设置
topic: lhy-test-topic
topic维持的队列数目
read-queue:16
write-queue:16
consumerGroup: lhy-test-consumerGroup
测试
第一轮
先发送六条消息
启动pushConsumer 接收到6条消息
第二轮
再启动一个消费者进程后,发送六条消息
消费者消费情况:
第一次注册的消费者并未接收到任何消息
后注册的消费者收到了后续六条消息。
第三轮
再次尝试,一共发送12条消息
接收情况
pushConsumer1
pushConsumer2
接收消息情况并无明显规律,类似随机消费?
猜想
仔细分析接收情况,从队列id上看,每个消费者维持一组queue-id,且存在一定规律:
- 数量上:*两个消费者平分所有队列?
- id上:id是分块连续的 例如:pushconsumer1:[0、1、2]、[1、2、3、4、5]
根据以上规律,我决定将topic的队列数量改为1试试
结果发现所有的消息都被consumer1消费
再次发送消息测试
依旧全部被consumer1消费
单从测试的结果来看:
消费的消息所属的队列id有部分连续性的规律*
从数量上看,有均分消息的趋势
当topic的队列数量(:1)小于消费者数量(:2)时,所有的消息都被其中一个消费者consumer1消费了
猜想,消费者消费消息的状况,跟topic设置的队列有关?
查阅资料
之前简单学习过Kafka,里面有分区partition的概念
为了详细了解RocketMQ的消息队列queue,查询了相关资料
RocketMq中topic被多个queue分片,那么consumer是如何拿到完整的topic的呢? - 中间件兴趣圈的回答 - 知乎
查阅资料得知:
- 同一个ConsumerGroup的consumer必须消费同一个Topic
- 一个Topic维持多个队列,类似Kafka的分区
- 一个队列只能分配给一个消费者
- 如果队列数量小于消费者数量,部分消费者将无法接收消息
再次实验测试
这次我将topic的队列数量设置为两个
发现两个消息队列被两个消费者均分
再次测试
两个PushConsumer,分别分配到了队列0和队列1,并且一直是如此的分配关系(之后通过源码阅读得知,是因为每次分配前都进行了排序,所以分配关系不变)
每个consumer接收到了一半的消息,并且队列内的消息是有序的
分析源码
有了具体的思路和基础知识,下面对源码进行分析
在Consumer.java
中,cmd+b
转到DefaultMQPushConsumer
的声明处
翻译过来是:平均队列分配算法(均衡队列分配算法?)
继续定位
org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java
详细分析写在了注释中
1 | /** |
具体计算逻辑
首先 mod = topic维持的总队列数 % 消费者总数
如果topic维持的队列数 小于或等于 当前消费者总数,分配的尺寸为 1,即每个消费者分配1个queue
如果topic维持的队列数 大于 当前消费者总数
如果当前消费者的index < mod,分配的队列个数就为 topic维持的总队列数 / 消费者总数 + 1
如果当前消费者的index >= mod, 分配的队列个数就为 topic维持的总队列数 / 消费者总数
然后根据此计算 分配的队列起始索引,进行分配
举个例子
如果一开始设置Topic
维持的队列数为16,当前总共有3个消费者
mqAll.size() = 16;
cidAll.size() = 3;
这时无法平均分配,即每个消费者分到5个队列后,还会剩一个
此时,均衡分配算法会将余出的队列按序号从小到大再分配一次
具体过程:
计算得到的mod=16 % 3 = 1
,如果当前为第一个消费者,其索引index=0
,0小于1,
则为其分配 16/3+1=6
个队列,队列索引为[0,1,2,3,4,5]
其余的两个消费者,索引index=1,2
>=1,分配16/3=5
个队列
则分配情况如下
消费者1: [0, 1, 2, 3, 4, 5]
消费者2: [6, 7, 8, 9, 10]
消费者3: [11,12,13,14,15]
实验验证
Topic维持的队列数量: 16
消费者数目: 3
生产消息数目:32条
发送消息
消费消息
消费者1接收情况
分配到的queue ID分别为0,1,2,3,4,5,一共6个队列,符合之前的分析
消费者2接收情况
分配到的queue ID分别为6,7,8,9,10,一共5个队列,符合之前的分析
消费者3接收情况
分配到的queue ID分别为11,12,13,14,15,一共5个队列,符合之前的分析
打断点,调试一下
调试时的参数设置:
当前Topic维持的队列数量: 2
消费者数目: 3
根据之前的分析,由于队列数目小于消费者数目,第三个消费者会无法获取消息。
开始debug后,消费者总数目是3,总共只有两个队列可供分配
当前消费者index=2,经过计算,起始的分配索引为4,range=-2
实际上根本不会为这个消费者分配队列,也就是当前消费者无法获取消息。
继续F8,步过,会进入org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
进行再均衡,重新分配队列。实际上其他消费者还在线的话,当前消费者仍不会分配到队列:)
总结
- 每个Topic维持多个队列queue,队列内消息是有序的
- 同一个ConsumerGroup的消费者只能订阅一个topic
- 一个队列只能分配给一个消费者
- pushConsumer默认的队列分配算法是:均衡队列分配算法
AllocateMessageQueueAveragely
- 如果一个topic内的队列数目小于消费这个topi的消费者总数,会有消费者无法接收到消息
消息队列的分配涉及到consumer端的负载均衡
之后继续阅读Rebalance
相关代码后得知,每当集群中的消费者实例发生变化,都会进行一次负载均衡
将消息队列queue按照队列分配算法分配给每个消费者实例
后话
rocketmq有五种消息队列分配算法
都是实现了AllocateMessageQueueStrategy
接口