「rocketmq」topic路由机制分析
课后思考题
为什么Demo里面没有创建Topic,却可以使用?
在运行
producer
案例测试时,即使没有在dashboard
里事先创建Topic,也可以正常运行
源码分析过程
从SendResult sendResult = producer.send(msg);
入手,command+b
转到send
的具体实现
producer::DefaultMQProducer–>DefaultMQProducerImpl
最终定位到DefaultMQProducerImpl.java
的sendDefaultImpl
方法
定位关键过程:通过tryToFindTopicPublishInfo()
方法来获取topicPublishInfo
跳转到其实现
分析该方法先从当前对象的topicPublishInfoTable
尝试获取topicPublishInfo
,若找不到,则尝试从name server获取(这只是根据方法名的猜测,是否正确需要进一步对源代码进行分析)
若name server中仍找不到指定topic的路由信息,则执行绿框2中的代码(对应本次要分析的未创建Topic的情况)
注意到,两次过程都是调用的同一个方法:updateTopicRouteInfoFromNameServer()
继续定位该关键方法,同时注意不同方法签名的实现区别
DefaultMQProducerImpl–>MQClientInstance
关键方法:getDefaultTopicRouteInfoFromNameServer
注意到,如果我们没有创建topic的时候,传入的isDefault
变量为true
,通过判断,会调用
getDefaultTopicRouteInfoFromNameServer
方法
通过方法名,我猜测:如果没有事先创建topic,会尝试从name server获取默认topic的路由
该方法的方法签名,和此处被调用传入的变量如下
定位 getCreateTopicKey()
方法,跳转到DefaultMQProducer
类中
1 | // DefaultMQProducer.java |
继续定位AUTO_CREATE_TOPIC_KEY_TOPIC
传入的topic为一个常量"TBW102"
,结合注释和之前的分析,这个就是默认的topic名字
继续定位关键方法getDefaultTopicRouteInfoFromNameServer()
MQClientInstance–>MQClientAPIImpl
该方法真正的实现位于MQClientAPIImpl
中的getTopicRouteInfoFromNameServer()
可以看出,这里是向name server发送了request,然后根据respones解码得到路由信息。request code为
GET_ROUTEINFO_BY_TOPIC
根据该 request code 继续定位
commad+shift+f
在namesrv目录中搜索该code
namesrv::DefaultRequestProcessor
定位到getRouteInfoByTopic()
方法
==注意:==此时,传入的topic变量已经是默认的topic:
"TBW102"
此时,name server拿到默认topic(“TBW102”)的路由信息,然后封装reponse并返回
也就是说
producer会得到默认topic TBW102的路由信息
则接下来producer会将消息发送到接收TBW102的broker上去
下面继续定位,使用跟上一小节相同的方法,在broker目录中搜索默认topic,即TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC
broker::TopicConfigManager
最终定位到.../broker/topic/TopicConfigManager.java
的createTopicInSendMessageMethod
方法
如果传入的默认路由为TBW102,就自动进行topic的创建
创建过程如下:
继续定位发现,该方法在整个项目中只有一个用法
位于AbstractSendMessageProcessor
类中的msgCheck()
方法中
TopicConfigManager–>AbstractSendMessageProcessor
通过阅读代码,broker的处理逻辑也比较清晰了
broker收到消息后,会先检查topic是否存在,如果不存在,则调用createTopicInSendMessageMethod
方法创建topic
打断点测试验证
在上一节中提到的关键方法处,打断点,观察运作流程,验证之前的分析是否正确
经过验证,之前分析无误。测试过程已经保存录像,这里不再展示
总结
如果没有事先创建topic
- 生产者
producer会从name server获取默认topic(“TBW102”)的路由信息,然后将消息发送到接收"TBW102"的broker上
- nameserver
nameserver在这个过程中是被动的角色,如果producer从nameserver查找不到对应的topic时(即未创建topic),会主动向nameserver获取默认topic的路由信息,nameserver负责处理并返回封装了topicRouteData的response
- broker
broker收到消息后,会进行检查
如果对应的topic不存在,且传入的defaultTopic
变量等于默认topic:"TBW102"时,则会自动创建对应的topic
元数据的生命周期
元数据,这里主要针对路由信息
这里结合上一个问题的分析,从broker、name server、producer三个角度出发进行分析
broker
结合根据上一节topic的创建过程,broker的路由信息主要包括
- topic列表
- 每个topic的读写队列数量
start::BrokerController
元数据的注册
broker在启动时,会启动一个任务,定时向name server注册路由信息
时间间隔范围为
对getRegisterNameServerPeriod()
进行定位
默认间隔为30s,以心跳的方式向name server注册
继续定位关键方法:registerBrokerAll()
该方法中,先创建一个路由信息的wrapper,然后进设置路由信息,最后调用doRegisterBrokerAll()
进行进行具体注册
继续定位关键方法doRegisterBrokerAll()
直接调用broker远程通信api下的registerBrokerAll()
方法进行注册,注册成功之后,更新并同步master的地址
对registerBrokerAll()
进行定位
BrokerController–>BrokerOuterAPI
registerBrokerAll()
对于每一个nameserver调用registerBroker()
方法
继续定位
这样看,过程就非常清晰了,broker向每一个nameserver发送路由信息注册的request command,然后根据收到的response进行相应处理,主线程同步等待response
request code为REGISTER_BROKER
(103)
shutdown::BrokerController–>BrokerOuterAPI
元数据的注销
broker在关闭时,shutdow()
方法中会调用unregisterBrokerAll()
方法,向name server发出请求,注销broker相关信息
依次调用unregisterBrokerAll()
–>this.brokerOuterAPI.unregisterBrokerAll()
向每一个name server发送注销请求,request code为:UNREGISTER_BROKER
而name server接收请求后,则会调用unregisterBroker()
,删除关键数据结构中对应的信息,例如:brokerLiverTable
,完成元数据的注销
分析过程跟之前类似,也是从request code入手,namesrv目录下搜索,进行定位
真正的注销执过程如下
name server
name server保存着路由信息,并负责管理
以broker向name server注册时发送的request code:REGISTER_BROKER
为突破口,分析源码
还是延续上一个问题的思路,在namesrv目录下进行搜索
DefaultRequestProcessor
定位到DefaultRequestProcessor
,以registerBroker()
方法为例
主体流程:对传入的request进行解析,调用this.namesrvController.getRouteInfoManager().registerBroker()
这个关键方法进行具体注册的过程,然后组装reponse返回
DefaultRequestProcessor–>RouteInfoManager
那么继续定位registerBroker()
,代码较长,下面将分析的过程写到注释中
1 | public RegisterBrokerResult registerBroker( |
总的来说,name server接收并处理注册请求时,会首先加写锁(因为可能存在多个broker向同一个name server注册),然后更新broker地址信息,接着更新broker中的每个topic对应的queue data,最后将最新的broker信息放入brokerLiveTable。至此,该线程(此nameserver)的具体注册过程结束,进行后续处理
NamesrvController
以上是name server接收注册请求时,对路由信息元数据处理的过程
下面考虑从name server启动时,对元数据的处理逻辑
定位到name server的启动方法initialize()
可以看到,name server 启动后,会执行一个计划任务,每10秒调用一次scanNotActiveBroker()
,进行心跳检测
NamesrvController–>RouteInfoManager
定位scanNotActiveBroker()
,从方法命名,猜测:“扫描不活跃的broker“
分析源码不难发现,该方法是对放到brkerLiveTable(结合上一小节分析中,每次路由注册都要更新该数据结构)中的broker信息进行检查,检查上一次更新的时间戳,如果超过120s没有更新,就移除该broker
producer
在第一节已经详细分析了,producer从name server获取路由元数据的过程,这里不再赘述
关键方法为updateTopicRouteInfoFromNameServer()
这里采用跟前两小节相同的思路,考虑producer启动时,和元数据相关的操作
又回到最初的起点,记忆中你模糊的脸
demo:org/apache/rocketmq/example/quickstart/Producer.java
定位producer.start()
方法,下面只关注关键过程
依次调用DefaultMQProducer::start()
–>DefaultMQProducerImpl::start()
–>MQClientInstance::start()
到这里发现一个敏感方法startScheduledTask()
结合之前的经验,大胆猜测该方法是在producer client启动的时候,开启一系列计划任务,例如心跳连接?
继续定位
关注前三个任务
第一个,是每隔两分钟更新一下name server的地址
第二个,比较熟悉了,默认每隔30秒从name server更新路由信息。
第三个,每间隔30秒,移除离线的broker,并且向所有的broker进行心跳连接
心跳连接真正实现位于,MQClientInstance.java
的sendHeartbeatToAllBroker()
方法
至此,从producer出发,元数据生命周期相关分析也完毕了