「rocketmq」topic路由机制分析

课后思考题

为什么Demo里面没有创建Topic,却可以使用?

在运行producer案例测试时,即使没有在dashboard里事先创建Topic,也可以正常运行

源码分析过程

SendResult sendResult = producer.send(msg);入手,command+b转到send的具体实现

producer::DefaultMQProducer–>DefaultMQProducerImpl

最终定位到DefaultMQProducerImpl.javasendDefaultImpl方法

image-20220426195611975

定位关键过程:通过tryToFindTopicPublishInfo()方法来获取topicPublishInfo

跳转到其实现

image-20220426200212206

分析该方法先从当前对象的topicPublishInfoTable尝试获取topicPublishInfo,若找不到,则尝试从name server获取(这只是根据方法名的猜测,是否正确需要进一步对源代码进行分析

若name server中仍找不到指定topic的路由信息,则执行绿框2中的代码(对应本次要分析的未创建Topic的情况)

注意到,两次过程都是调用的同一个方法:updateTopicRouteInfoFromNameServer()

继续定位该关键方法,同时注意不同方法签名的实现区别

DefaultMQProducerImpl–>MQClientInstance

关键方法:getDefaultTopicRouteInfoFromNameServer

image-20220426201611674

注意到,如果我们没有创建topic的时候,传入的isDefault变量为true,通过判断,会调用

getDefaultTopicRouteInfoFromNameServer方法

通过方法名,我猜测如果没有事先创建topic,会尝试从name server获取默认topic的路由


该方法的方法签名,和此处被调用传入的变量如下

image-20220426202220423

定位 getCreateTopicKey()方法,跳转到DefaultMQProducer类中

1
2
3
4
5
6
// DefaultMQProducer.java
private String createTopicKey = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC;
...
public String getCreateTopicKey() {
return createTopicKey;
}

继续定位AUTO_CREATE_TOPIC_KEY_TOPIC

image-20220426202732252

传入的topic为一个常量"TBW102",结合注释和之前的分析,这个就是默认的topic名字


继续定位关键方法getDefaultTopicRouteInfoFromNameServer()

MQClientInstance–>MQClientAPIImpl

image-20220426203222605

该方法真正的实现位于MQClientAPIImpl中的getTopicRouteInfoFromNameServer()

image-20220426203606441

可以看出,这里是向name server发送了request,然后根据respones解码得到路由信息。request code为

GET_ROUTEINFO_BY_TOPIC

image-20220426203940569

根据该 request code 继续定位

commad+shift+f在namesrv目录中搜索该code

image-20220426204310363

namesrv::DefaultRequestProcessor

定位到getRouteInfoByTopic()方法

==注意:==此时,传入的topic变量已经是默认的topic:"TBW102"

image-20220426210444035

此时,name server拿到默认topic(“TBW102”)的路由信息,然后封装reponse并返回

也就是说

producer会得到默认topic TBW102的路由信息

则接下来producer会将消息发送到接收TBW102的broker上去

下面继续定位,使用跟上一小节相同的方法,在broker目录中搜索默认topic,即TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC

image-20220426211640151

broker::TopicConfigManager

最终定位到.../broker/topic/TopicConfigManager.javacreateTopicInSendMessageMethod方法

image-20220426212045848

如果传入的默认路由为TBW102,就自动进行topic的创建

创建过程如下:

image-20220426212254513

继续定位发现,该方法在整个项目中只有一个用法

位于AbstractSendMessageProcessor类中的msgCheck()方法中

TopicConfigManager–>AbstractSendMessageProcessor

image-20220426212856442

通过阅读代码,broker的处理逻辑也比较清晰了

broker收到消息后,会先检查topic是否存在,如果不存在,则调用createTopicInSendMessageMethod方法创建topic

打断点测试验证

在上一节中提到的关键方法处,打断点,观察运作流程,验证之前的分析是否正确

经过验证,之前分析无误。测试过程已经保存录像,这里不再展示

总结

如果没有事先创建topic

  1. 生产者

producer会从name server获取默认topic(“TBW102”)的路由信息,然后将消息发送到接收"TBW102"的broker上

  1. nameserver

nameserver在这个过程中是被动的角色,如果producer从nameserver查找不到对应的topic时(即未创建topic),会主动向nameserver获取默认topic的路由信息,nameserver负责处理并返回封装了topicRouteData的response

  1. broker

broker收到消息后,会进行检查

如果对应的topic不存在,且传入的defaultTopic变量等于默认topic:"TBW102"时,则会自动创建对应的topic

元数据的生命周期

元数据,这里主要针对路由信息

这里结合上一个问题的分析,从broker、name server、producer三个角度出发进行分析

broker

结合根据上一节topic的创建过程,broker的路由信息主要包括

  • topic列表
  • 每个topic的读写队列数量

start::BrokerController

元数据的注册

broker在启动时,会启动一个任务,定时向name server注册路由信息

image-20220426223925944

时间间隔范围为 [10000ms,60000ms][10000ms, 60000ms]

getRegisterNameServerPeriod()进行定位

image-20220426224252764 image-20220426224323344

默认间隔为30s,以心跳的方式向name server注册

继续定位关键方法:registerBrokerAll()

image-20220426224629729

该方法中,先创建一个路由信息的wrapper,然后进设置路由信息,最后调用doRegisterBrokerAll()进行进行具体注册

继续定位关键方法doRegisterBrokerAll()

image-20220426225537991

直接调用broker远程通信api下的registerBrokerAll()方法进行注册,注册成功之后,更新并同步master的地址

registerBrokerAll()进行定位

BrokerController–>BrokerOuterAPI

registerBrokerAll() 对于每一个nameserver调用registerBroker()方法

image-20220426230418969

继续定位

image-20220426230904139

这样看,过程就非常清晰了,broker向每一个nameserver发送路由信息注册的request command,然后根据收到的response进行相应处理,主线程同步等待response

request code为REGISTER_BROKER(103)

shutdown::BrokerController–>BrokerOuterAPI

元数据的注销

broker在关闭时,shutdow()方法中会调用unregisterBrokerAll()方法,向name server发出请求,注销broker相关信息

依次调用unregisterBrokerAll()–>this.brokerOuterAPI.unregisterBrokerAll()

image-20220427122706361 image-20220427122810315

向每一个name server发送注销请求,request code为:UNREGISTER_BROKER

而name server接收请求后,则会调用unregisterBroker(),删除关键数据结构中对应的信息,例如:brokerLiverTable,完成元数据的注销

分析过程跟之前类似,也是从request code入手,namesrv目录下搜索,进行定位

image-20220427123423028 image-20220427123448722

真正的注销执过程如下

image-20220427123723181

name server

name server保存着路由信息,并负责管理

以broker向name server注册时发送的request code:REGISTER_BROKER为突破口,分析源码

还是延续上一个问题的思路,在namesrv目录下进行搜索

image-20220426232527359

DefaultRequestProcessor

定位到DefaultRequestProcessor,以registerBroker()方法为例

image-20220426233518570

主体流程:对传入的request进行解析,调用this.namesrvController.getRouteInfoManager().registerBroker()这个关键方法进行具体注册的过程,然后组装reponse返回

DefaultRequestProcessor–>RouteInfoManager

那么继续定位registerBroker(),代码较长,下面将分析的过程写到注释中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
// 添加写锁
this.lock.writeLock().lockInterruptibly();
// 判断是否有该集群对应的brokername集合,没有则新建一个
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
// 对应的brokername集合中加入该broker
brokerNames.add(brokerName);

boolean registerFirst = false;
// 获取该broker对应的brokerData,没有则新建一个
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
// 个人理解,如果master broker挂掉了,把一个slave broker变成新的master(broker id==>0)
// 那么需要将挂掉的老master地址信息从brokerAddrTable中移除
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> item = it.next();
// 由于broker id和addr的对应关系发生了变化,因此,没变的就是需要删除的数据项
if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
log.debug("remove entry {} from brokerData", item);
it.remove();
}
}

String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
if (MixAll.MASTER_ID == brokerId) {
log.info("cluster [{}] brokerName [{}] master address change from {} to {}",
brokerData.getCluster(), brokerData.getBrokerName(), oldAddr, brokerAddr);
}
// 是否是第一次注册
registerFirst = registerFirst || (null == oldAddr);

if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
// 如果是master broker,并且为第一次注册,且topic的路由信息发生改变
// 则更新broker每个topic的队列数据queueData
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue()); // 对于具体topic更新其queueData
// 更新逻辑跟上面更新broker地址信息类似,没有,则新建;发生变化,先删去,再新建
}
}
}
}
// 创建心跳检测的对象
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}

if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
// 若当前的broker不是master,返回master的地址作为HaServer的地址
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}

return result;
}

总的来说,name server接收并处理注册请求时,会首先加写锁(因为可能存在多个broker向同一个name server注册),然后更新broker地址信息,接着更新broker中的每个topic对应的queue data,最后将最新的broker信息放入brokerLiveTable。至此,该线程(此nameserver)的具体注册过程结束,进行后续处理

NamesrvController

以上是name server接收注册请求时,对路由信息元数据处理的过程

下面考虑从name server启动时,对元数据的处理逻辑

定位到name server的启动方法initialize()

image-20220427120609831

可以看到,name server 启动后,会执行一个计划任务,每10秒调用一次scanNotActiveBroker(),进行心跳检测

NamesrvController–>RouteInfoManager

定位scanNotActiveBroker(),从方法命名,猜测:“扫描不活跃的broker“

image-20220427120425188

分析源码不难发现,该方法是对放到brkerLiveTable(结合上一小节分析中,每次路由注册都要更新该数据结构)中的broker信息进行检查,检查上一次更新的时间戳,如果超过120s没有更新,就移除该broker

producer

在第一节已经详细分析了,producer从name server获取路由元数据的过程,这里不再赘述

关键方法为updateTopicRouteInfoFromNameServer()

这里采用跟前两小节相同的思路,考虑producer启动时,和元数据相关的操作

又回到最初的起点,记忆中你模糊的脸

demo:org/apache/rocketmq/example/quickstart/Producer.java

image-20220427124653211

定位producer.start()方法,下面只关注关键过程

依次调用DefaultMQProducer::start()–>DefaultMQProducerImpl::start()–>MQClientInstance::start()

到这里发现一个敏感方法startScheduledTask()

image-20220427125725386

结合之前的经验,大胆猜测该方法是在producer client启动的时候,开启一系列计划任务,例如心跳连接?

继续定位

image-20220427125958182

关注前三个任务

第一个,是每隔两分钟更新一下name server的地址

第二个,比较熟悉了,默认每隔30秒从name server更新路由信息。

第三个,每间隔30秒,移除离线的broker,并且向所有的broker进行心跳连接

image-20220427130620726

心跳连接真正实现位于,MQClientInstance.javasendHeartbeatToAllBroker()方法

image-20220427131114146

至此,从producer出发,元数据生命周期相关分析也完毕了