系统架构
基本概念和架构
由 Producer、NameServer、Broker、Consumer 四大模块组成。其中,NameServer 是 RocketMQ 的元数据存储组件。在 5.0 后,还增加了 Proxy 模块,用来支持 gRPC 协议,并为后续的计算存储分离架构做准备。
一个 Topic 可以包含一个或多个MessageQueue,一个 Group 可以订阅一个或多个 Topic。MessageQueue 是具体消息数据的存储单元,订阅的时候通过 Group 来管理消费订阅关系。
一个 Topic 可以包含一个或多个MessageQueue,一个 Group 可以订阅一个或多个 Topic。MessageQueue 是具体消息数据的存储单元,订阅的时候通过 Group 来管理消费订阅关系。
从流程上看,Broker 在启动的时候会先连接 NameServer,将各自的元数据信息上报给 NameServer,NameServer 会在内存中存储元数据信息。客户端在连接集群的时候,会配置对应的 NameServer 地址,通过连接 NameServer 来实现客户端寻址,从而连接上对应的 Broker。
客户端在发送数据的时候,会指定 Topic 或 MessageQueue。Broker 收到数据后,将数据存储到对应的 Topic 中,消息存储在 Topic 的不同 Queue 中。在底层的文件存储中,所有 Queue 的数据是存储在同一个 CommitLog 文件中的。在订阅的时候会先创建对应的 Group,消费消息后,再确认数据。
从客户端来看,在5.0 以后,可通过直连 Proxy,将数据通过 gRPC 协议发送给 Proxy。Proxy 在当前阶段本质上只是一个代理(gRPC 协议的代理),不负责真正的数据存储,当收到数据后,还是将数据转发到 Broker 进行保存。
协议和网络模块
协议
5.0 之前支持自定义的 Remoting 协议,在 5.0 之后,增加了 gRPC 协议的支持。
这是在 Proxy 组件上完成了对 gRPC 协议的支持,用的是代理(Proxy)模式。即 Broker 依旧只支持 Remoting 协议,如果需要支持 gRPC 协议,那么就需要单独部署 Proxy 组件。
在传输层协议方面,Remoting 和 gRPC 都是基于 TCP 协议传输的。Remoting 直接基于四层的 TCP 协议通信,gRPC 是基于七层的 HTTP2 协议通信,不过 HTTP2 底层也是基于TCP,它们本质上都是应用层的协议。
Remoting 是私有协议,客户端的重复开发成本,以及与第三方服务集成的不便捷。gRPC 作为公有协议,有很多天然的生态集成能力,比如 Service Mesh、Kubernetes 生态等等。
网络模型
采用 Netty 组件作为底层通信库,遵循 Reactor 多线程模型,同时又在 Reactor 模型上做了一些扩展和优化。
数据存储
元数据存储
存储在 Broker 上的,Broker 启动时将数据上报到 NameServer 模块中汇总缓存。NameServer 是一个简单的 TCP Server,专门用来接收、存储、分发 Broker 上报的元数据信息。这些元数据信息是存储在 NameServer 内存中的,NameServer 不会持久化去存储这些数据。
Broker 启动或删除时,会调用 NameServer 的注册和退出接口,每个 Broker 都会存储自己节点所属的元数据信息(比如有哪些 Topic、哪些 Queue 在本节点上),在 Broker 启动时,会把全量的数据上报到 NameServer 中。
从部署形态上看,NameServer 是多节点部署的,是一个集群。但是不同节点之间是没有相互通信的,所以本质上多个 NameServer 节点间数据没有一致性的概念,是各自维护自己的数据,由每台 Broker 上报元数据来维护每台 NameServer 节点上数据的准确性。
由于 NameServer 不负责具体消息数据的存储和分发,所以在请求频率、负载方面都不会很高。所以在大多数场景下,NameServer 都是可以多集群共享的。从功能上看,它对RocketMQ 的作用相当于 RabbitMQ 的 Mnesia。
消息数据
最小存储单元是 MessageQueue,即常说的 Queue 或 Partition。Topic 可以包含一个或多个 MessageQueue,数据写入到 Topic 后,最终消息会分发到对应的 MessageQueue 中存储。
在底层的文件存储方面,并不是一个 MessageQueue 对应一个文件存储的,而是一个节点对应一个总的存储文件,单个 Broker 节点下所有的队列共用一个日志数据文件(CommitLog)来存储,和 RabbitMQ 采用的是同一种存储结构。
- CommitLog:消息主体以及元数据存储主体,每个节点只有一个,客户端写入到所有 MessageQueue 的数据,最终都会存储到这一个文件中。
- ConsumeQueue:逻辑消费队列,是消息消费的索引,不存储具体的消息数据。目的主要是提高消息消费的性能。由于基于主题 Topic 的订阅模式,消息消费是针对主题进行的,如果要遍历 Commitlog 文件,基于 Topic 检索消息是非常低效的。Consumer 可根据 ConsumeQueue 来查找待消费的消息,ConsumeQueue 文件是基于 Topic 的 CommitLog 索引文件。
- IndexFile:索引文件,在文件系统中是以 HashMap 结构存储的。通过 Key 或时间区间来查询消息的功能就是由它实现的。
因为消息数据会很多,CommitLog 会存储所有的消息内容。所以为了保证数据的读写性能,会对 CommitLog 进行分段存储。CommitLog 底层默认单个文件大小为1G,消息是顺序写入到文件中,当文件满了,就会写入下一个文件。对于 ConsumeQueue 和 IndexFile,则不需要分段存储,因为它们存储的是索引数据,数据量一般很小。
在消息清理方面,支持按照时间清理数据。这个时间是按照消息的生产时间计算的,和消息是否被消费无关,只要时间到了,那么数据就会被删除。
按照节点的维度来清理,所有 Queue 的日志都存储在一个文件中,如果要支持主题和队列单独管理,需要进行数据的合并、索引的重建,实现难度相对复杂。
生产者和消费者
客户端连接服务端是需要经过客户端寻址的。首先和 NameServer 完成寻址,拿到 Topic/MessageQueue 和 Broker 的对应关系后,才会和 Broker 进行交互。
生产端
基础模块(如连接管理、心跳检测、协议构建、序列化等工作),会以协议和网络层的设计为准,使用不同编程语言 SDK 完成对应的开发。
从生产端来看,生产者是将数据发送到 Topic 或者 Queue 里面的。如果是发送到 Topic,则数据要经历生产数据分区分配的过程。即决定消息要发送到哪个目标分区。
默认情况下,RocketMQ 支持轮询算法和最小投递延迟算法两种策略。默认是轮询算法,该算法保证了每个 Queue 中可以均匀地获取到消息。最小投递延迟算法会统计每次消息投递的时间延迟,然后根据统计出的结果将消息投递到时间延迟最小的 Queue。如果是直接发送到 Queue,则无需经过分区选择,直接发送即可。
由于在协议层不支持批量发送消息的协议,所以在 SDK 底层是没有等待、聚合发送逻辑的。所以如果需要批量发送数据,就需要在生产的时候进行聚合,然后发送。
支持单向发送、同步发送、异步发送三种发送形式。单向发送(Oneway)指发送消息后立即返回,不处理响应,不关心是否发送成功。同步发送(Sync)指发送消息后等待响应。异步发送(Async)指发送消息后立即返回,在提供的回调方法中处理响应。
消费端
消费模型
- 默认的消费模型是 Pull,Pull 的底层是以客户端会不断地去服务端拉取数据的形式实现的。
- Push 模型底层是以伪 Push 的方式实现的,即在客户端底层用一个 Pull 线程不断地去服务端拉取数据,拉到数据后,触发客户端设置的回调函数。让客户端从感受上看,是服务端直接将数据 Push 过来的。
- Pop 模式将消费分区、分区分配关系、重平衡都移到了服务端,减少了重平衡机制给客户端带来的复杂性。当消费者和分区都很多的时候,因为消费重平衡会消耗很长时间,且重平衡期间的消费会暂停。而在客户端也需要感知到复杂的重平衡行为,各个语言的客户端需要较高的重复开发成本。
负载均衡策略
默认是通过消费分组机制来消费的。即在客户端消费数据的时候,会通过消费分组来管理消费关系和存储消费进度。从实现上看,同一条消息支持被多个消费分组订阅,每个消费者分组可以有多个消费者。
由于 Topic 和 Queue 模型的存在,在启动消费的时候,就需要先分配消费者和分区消费关系。这个过程就是消费端负载均衡。在实现中,消息按照哪种逻辑分配给哪个消费者,就是由消费者负载均衡策略所决定的。
消息粒度负载均衡(共享消费模式)
同一消费者分组内的多个消费者,将按照消息粒度平均分摊主题中的所有消息。即同一个队列中的消息,会被平均分配给多个消费者共同消费。
队列粒度负载均衡(独占消费模式)
同一消费者分组内的多个消费者,将按照队列粒度消费消息,即每个队列仅被一个消费者消费。
消费进度
提交消费位点信息来保存消费进度。在服务端,RocketMQ 会为每个消费分组维护一份消费位点信息,信息中会保存消费的最大位点、最小位点、当前消费位点等内容。
从实现来看,客户端消费完数据后,就会调用 Broker 的消费位点更新接口,提交当前消费的位点信息。
在服务端,消息被某个消费者消费完成后,不会立即在队列中被删除,以便当消费者客户端停止又再次重新上线时,会严格按照服务端保存的消费进度继续处理消息。如果服务端保存的历史位点信息已过期被删除,此时消费位点向前移动至服务端存储的最小位点。
HTTP 协议支持和管控操作
原生不支持 HTTP 协议的生产消费,需通过使用 Proxy模式来实现。
管控也是不支持 HTTP 协议的操作,都是通过 Remoting 协议支持的,在 gRPC 协议中也不支持管控操作。即在 Broker 中,通过 Remoting 协议暴露不同的接口或者在 NameServer 中暴露 TCP 的接口,来实现一些对应的管控操作。
集群构建
元数据实际是存储在 Broker 上,不是直接存储在 NameServer 中。NameServer 本身只是一个缓存服务,没有持久化存储的能力。
元数据信息实际存储在每台 Broker 上,每台 Broker 会在本节点维护持久化文件来存储元数据信息。这些元数据信息主要包括节点信息、节点上的 Topic、分区信息等等。在 Broker 启动时,会先连接 NameServer 注册节点信息,并将保存的元数据上报到所有 NameServer 节点中。此时所有 NameServer 节点就有全量的元数据信息了,从而完成了节点之间的发现。
Broker 和 NameServer 之间会有保活机制,Broker 会定期和 NameServer 保持心跳探测,来确认节点运行正常。当 Broker 异常时,就会被踢出集群。
部署模式
Master/Slave 模式
集群部署时会先配置 Broker 是 Master 节点还是 Slave 节点。Master 负责写入,Slave 负责备份和读取。早期架构是不支持主从切换的,即当 Master 挂了以后,Slave 无法成为 Master。此时会导致一些只能在 Master 上完成的工作无法完成,比如数据写入、Offset 操作、结束事务等等。
所以在创建 Topic 时,会建议在多个 Master 节点上同时创建这个 Topic 及其所有分区。
所有的 Master 节点都可以同时为这个 Topic 提供服务。当某个 Master 节点挂了后,其他 Master 节点依旧可以提供同样的服务,不影响新数据的写入。这种方式的好处是当负载过高时,可以通过快速横向添加节点来扩容。缺点是这个模型无法保证生产的消息的有序。节点挂了以后,这个节点上的未消费的数据不能被消费,并且 Topic 和分区数会有放大效应。因为每个节点上都需要创建全量的 Topic 和分区,此时瓶颈就存在单个节点上。
Dledger 模式
核心是通过 Raft 算法实现的 Raft Commitlog,具备了选举和切换的能力。
根据 Raft 算法的多数原则,集群最少必须由三个节点来组成。不同节点的 Raft Commitlog 之间会根据 Raft 算法来完成数据同步和选主操作。当 Master 发生故障后,会先通过内部协商,然后从 Slave 节点中选出新的 Master,从而完成主从切换。
Controller 模式
Deledger 模式最少需要三个节点,并且无法兼容 RocketMQ 原生的存储和复制能力(比如 Master/Slave 模式),而且这个模式维护较困难,所以重新实现了选主组件 DLedger Controller。
DLedger Controller 是一个新的部署形态,它的核心是基于 Raft 算法实现了一个选主组件 Controller。Controller 主要用来在副本之间进行 Leader 选举和切换。它是集群部署的,多个 Controller 之间是通过 Raft 算法来完成主 Controller 选举。
Controller 模式跟 Dledger 模式最大的差别在于,Controller 是一个可选的、松耦合的组件,可以选择内嵌在 NameServer 中,也可以独立部署。而且它和底层存储的 Commitlog 模块是独立的,即存储模块不一定非得是 Raft Committlog,也可以是 Commitlog。所以 Controller 可以用在 Master/Slave 模式中,当部署 DLedger Controller 组件后,原本的 Master-Slave 部署模式下的 Broker 组就拥有了容灾切换能力。
Controller 组成:
- Active Controller:通过 Raft 算法在多个 Controller 之间选举会选出的主 Controller。
- Alter SyncStateSet:分区副本中允许选为 Master 的副本集合。
- Elect Master:分区副本间选主操作。
- Replication:分区副本间的数据复制的动作。
从运行机制上看,首先会通过 Raft 算法选举出主 Controller。主 Controller 会维护每个分区可用的 SyncStateSet 集合。当节点变动时,Elect Master 会在从 SyncStateSet 集合中选举出新的主节点。主从副本间的数据通过 Replication 模块来完成。
数据可靠性
Master/Slave 模式
提供了异步复制和同步双写两种模式,主要的区别是性能和数据可靠性。
- 性能层面:异步复制 > 同步双写。
- 可靠性层面: 异步复制 < 同步双写。
- 一致性层面:同步双写是强一致,异步复制是最终一致。
Dledger 模式
在数据一致性上,遵循的是 Raft 的多数原则。即数据最少得三副本,同时得多数副本写入成功才算成功。
副本间数据同步是采用同步写入的方式,即 Master 收到数据后,同步将数据写入到副本,多数副本写入成功后,就算数据写入成功,属于最终一致性。
Controller 模式
数据的一致性是可以配置的,可通过参数 inSyncReplicas 来配置数据写入成功的副本数。比如 3 个副本且 inSyncReplicas 配置为 2,表示写入 2 个副本时算数据写入成功。同时也提供了 allAckInSyncStateSet 参数,来设置要全部写入成功才算成功。
副本间的数据同步属于同步写入的方式。从一致性上来看,inSyncReplicas 属于最终一致性,allAckInSyncStateSet 属于强一致。
安全控制
传输安全
支持 TLS 加密传输。Broker 使用标准 Java Server 集成 TLS。
认证
只支持一种明文(PLAIN)的用户名 / 密码认证方式。即先从服务端申请 AccessKey(用户名)和 SecretKey(密码),支持动态申请,然后客户端通过配置传递 AccessKey 和 SecretKey 来完成身份认证。
同时分为管理员账户和普通账户,管理员账户拥有集群的所有权限,普通账户需要经过授权才能进行某些操作。
鉴权
- 支持 Topic 和 Group 两种资源的鉴权。权限分为 DENY、ANY、PUB、SUB 四个类型,分别表示拒绝、全部权限、发送、订阅。
- 支持 IP 白名单的功能,即支持对来源 IP 进行限制。
- 支持通过命令行工具 mqadmin 动态增删用户及相关的权限信息,比如通过 mqadmin 查询 ACL 信息。
可观测性
监控指标
5.0 之前的版本
指标的定义和记录依赖一个 Broker 内部自定义实现的指标管理器 BrokerStatsManager。通过在内存中维护一个 Map 来记录不同的指标,主要支持 Broker、Producer、Consumer Groups、Consumer 四个维度的指标。指标暴露方式是通过 RocketMQ Export + RocketMQ Remoting。
Export 使用 Admin SDK 通过 Remoting 协议调用 Broker 获取指标数据。Export 会不断地从 Broker Pull 数据,然后在内部进行整合,再通过自身的 HTTP Service 的 /Metrics 接口暴露给 Prometheus 集成展示。
这种方式主要有 3 个缺点:
- Broker 指标定义不符合开源规范,难以和其他开源可观测组件搭配使用;
- 大量 RPC 调用会给 Broker 带来额外的压力;
- 拓展性较差,当需要增加或修改指标时,必须先修改 Broker 的 Admin 接口。
5.0 之后的版本
基于 OpenTelemtry 规范完全重新设计实现了指标模块。在指标数量方面,新的指标模块在之前版本的基础上,支持了更多维度、更丰富的指标,比如 Broker、Proxy 等。
在指标定义记录方面,选用兼容 Promethues 的 Counter、Guage、Histogram 等类型(这三种类型请参考第 21 讲)来完成指标的记录,并且遵循 Promethues 推荐的指标命名规范。
在指标暴露方面,新版的指标模块提供了 Pull、Push、Export 兼容三种方式。
- Pull 模式主要与 Prometheus 兼容,适合于运维 K8s 和 Promethues 集群的用户。通过在 Broker 内核启动一个 HTTP Server,暴露 /metrics 接口来给 Prometheus 拉取指标数据。
- Push 模式是 OpenTelemetry 推荐使用的模式。需要先部署 Collector 来接收传输指标数据。Broker 会主动将指标推送给对应的 Collector,然后通过 Collector 来暴露指标。Collector 是 OpenTelemetry 规范推荐的使用方式。
- Export 兼容是指兼容了当前 RocketMQ Export 的使用方式。即现在使用 RocketMQ Export 的用户无需变更部署架构即可接入新 Metrics。从实现来看,Export 获取指标数据的方式从早期的通过 Remoting 协议 Pull 数据,换成了 Broker 根据 OpenTelemetry 规范将指标数据 Push 给 Export。
日志
使用的是 Java 中标准的 Logback 和 SLF4J 日志框架进行日志记录,天然具备日志分级(ERROR、WARN、INFO 等)、日志滚动、按大小时间保留等特性。
在日志格式定义方面,通过独立的日志库来进行封装,属于常见的标准用法。
消息轨迹
消息队列里面支持得最好的了。因为完整的消息轨迹需要包含生产者、Broker、消费者三部分的信息,如果需要支持生产端和消费端的轨迹信息,就需要在客户端 SDK 中集成轨迹信息上报的功能。
RocketMQ 的生产端和消费端的 SDK 集成了轨迹信息上报模块,而其他大部分消息队列在 SDK 是没有这个功能。当数据发送或消费成功时,如果开启轨迹上报,客户端会将轨迹数据上报到集群中的内置 Topic 或者自定义 Topic 中。因此 Broker 端就保存有全链路的轨迹信息了。
每条消息赋予一个唯一 ID。当消息发送成功后,可以根据消息 ID 查看轨迹信息。如果需要,还可以把轨迹信息存储到一些第三方系统(比如 Elasticsearch),也可以通过命令行工具 mqadmin。
NameServer
一个独立的进程,为 Broker、生产者和消费者提供服务。最主要的功能是为客户端提供寻址服务,协助客户端找到主题对应的 Broker 地址。此外,还负责监控每个 Broker 的存活状态。
支持只部署一个节点,也支持部署多个节点组成一个集群,这样可以避免单点故障。在集群模式下,NameServer 各节点之间是不需要任何通信的,也不会通过任何方式互相感知,每个节点都可以独立提供全部服务。
每个 Broker 都需要和所有的 NameServer 节点进行通信。当 Broker 保存的 Topic 信息发生变化的时候,它会主动通知所有的 NameServer 更新路由信息,为了保证数据一致性,Broker 还会定时给所有的 NameServer 节点上报路由信息。这个上报路由信息的RPC 请求,同时起到 Broker 与 NameServer 之间的心跳作用,NameServer 依靠这个心跳来确定 Broker 的健康状态。
因为每个 NameServer 节点都可以独立提供完整的服务,对于生产者和消费者,只需要选择任意一个 NameServer 节点来查询路由信息就可以了。在生产或消费某个主题的消息之前,会先从 NameServer 上查询这个主题的路由信息,然后根据路由信息获取到当前主题和队列对应的 Broker 物理地址,再连接到 Broker 节点上进行生产或消费。
如果 NameServer 检测到与 Broker 的连接中断了,会认为这个 Broker 不再能提供服务,从路由信息中移除掉,避免客户端连接到一个不可用的 Broker 上去。而客户端在与 Broker 通信失败之后,会重新去NameServer 上拉取路由信息,然后连接到其他 Broker 上继续生产或消费消息,实现了自动切换失效 Broker 的功能。
事务
适用于解决本地事务和发消息的数据一致性问题。
确保执行本地事务和发消息这两个操作,要么都成功,要么都失败。并且,增加了一个事务反查的机制,来尽量提高事务执行的成功率和数据一致性。
消息存储
以 Broker 为单位。分为消息文件和索引文件,每个 Broker 只有一组消息文件,这个 Broker 上的所有主题的消息都存在这一组消息文件中。索引文件和 Kafka 一样,是按照主题和队列分别建立的,每个队列对应一组索引文件,这组索引文件称为 ConsumerQueue。索引是定长稠密索引,它为每一条消息都建立索引,每个索引的长度(注意不是消息长度)是固定的 20 个字节。
写入消息时,Broker 上所有主题、所有队列的消息按照自然顺序追加写入到同一个消息文件中,一个文件写满了再写下一个文件。查找消息的时候,可以直接根据队列的消息序号,计算出索引的全局位置(索引序号 * 索引固定长度 20),然后直接读取这条索引,再根据索引中记录的消息的全局位置,找到消息。这里两次寻址都是绝对位置寻址,比 Kafka 的查找要快。
优点
- 在写入时,同时写入的文件更少,有更好的批量(不同主题和分区的数据可以组成一批一起写入),更多的顺序写入,尤其是在 Broker 上有很多主题和分区的情况下,有更好的写入性能。
- 采用了稠密索引,查找性能更好。
- 在一个 Broker 上有上千个活动主题的情况下,写入性能就会体现出优势。
缺点
- 以 Broker 为单位,较粗的粒度牺牲了灵活性。
- 稠密索引需要更多的存储空间。
源码分析
https://github.com/apache/rocketmq/archive/refs/tags/rocketmq-all-5.3.2.tar.gz
Producer
启动过程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@Before
public void init() throws Exception {
String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
// 创建了一个 DefaultMQProducer 的实例
producer = new DefaultMQProducer(producerGroupTemp);
// 为它初始化一些参数
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setCompressMsgBodyOverHowmuch(16);
message = new Message(topic, new byte[] {'a'});
zeroMsg = new Message(topic, new byte[] {});
bigMessage = new Message(topic, "This is a very huge message!".getBytes());
// 调用 start 方法启动它
producer.start();
Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(producer.getDefaultMQProducerImpl(), mQClientFactory);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);
producer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
.thenReturn(createSendResult(SendStatus.SEND_OK));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
public void start(final boolean startFactory) throws MQClientException {
// 记录和管理自身的服务状态,这实际上是状态模式 (State Pattern) 这种设计模式的变种实现。
// 状态模式允许一个对象在其内部状态改变时改变它的行为,对象看起来就像是改变了它的类。
// 与标准的状态模式不同的是,它没有使用状态子类,而是使用分支流程(switch-case)来实现不同状态下的不同行为
switch (this.serviceState) {
case CREATE_JUST: // 中间状态
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
// 通过一个单例模式(Singleton Pattern)的 MQClientManager 获取MQClientInstance 的实例 mQClientFactory,没有则自动创建新的实例
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
defaultMQProducer.initProduceAccumulator();
// 在 mQClientFactory 中注册自己
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 启动 mQClientFactory
if (startFactory) {
mQClientFactory.start();
}
this.initTopicRoute();
this.mqFaultStrategy.startDetector();
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
// 将这些状态之间的转换路径考虑清楚,并在进行状态转换的时候,检查上一个状态是否能转换到下一个状态。
// 比如,只有处于 CREATE_JUST 状态才能转换为 RUNNING 状态,这样就可以确保这个服务是一次性的,只能启动一次。从而避免了多次启动服务而导致的各种问题。
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING: // 正常状态
case START_FAILED: // 异常状态
case SHUTDOWN_ALREADY: // 正常状态
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
// 给所有 Broker 发送心跳
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
RequestFutureHolder.getInstance().startScheduledTask(this);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
// 启动请求响应通道
// 启动实例 mQClientAPIImpl,其中 mQClientAPIImpl 是类 MQClientAPIImpl 的实例,封装了客户端与 Broker 通信的方法
this.mQClientAPIImpl.start();
// Start various schedule tasks
// 启动各种定时任务,包括与 Broker 之间的定时心跳,定时与 NameServer 同步数据等任务
this.startScheduledTask();
// Start pull service
// 启动拉消息服务
this.pullMessageService.start();
// Start rebalance service
// 启动 Rebalance 服务
this.rebalanceService.start();
// Start push service
// 启动 Producer 服务
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
- DefaultMQProducerImpl:Producer 的内部实现类,包含大部分 Producer 的业务逻辑,即发消息的逻辑。
- MQClientInstance:这个类中封装了客户端一些通用的业务逻辑,无论是 Producer 还是 Consumer,最终需要与服务端交互时,都需要调用这个类中的方法;
- MQClientAPIImpl:这个类中封装了客户端服务端的 RPC,对调用者隐藏了真正网络通信部分的具体实现;
- NettyRemotingClient:RocketMQ 各进程之间网络通信的底层实现类。
消息发送过程
在 Producer 的接口 MQProducer 中,定义了 19 个不同参数的发消息的方法,按照发送方式不同可以分成三类:
- 单向发送(Oneway):发送消息后立即返回,不处理响应,不关心是否发送成功;
- 同步发送(Sync):发送消息后等待响应;
- 异步发送(Async):发送消息后立即返回,在提供的回调方法中处理响应。 这三类发送实现基本上是相同的,异步发送稍微有一点儿区别。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
/**
* It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be
* provided in next version
*
* @param msg
* @param selector
* @param arg
* @param sendCallback
* @param timeout the <code>sendCallback</code> will be invoked at most time
* @throws MQClientException
* @throws RemotingException
* @throws InterruptedException
*/
@Deprecated
public void send(final Message msg, final MessageQueueSelector selector, final Object arg,
final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
BackpressureSendCallBack newCallBack = new BackpressureSendCallBack(sendCallback);
final long beginStartTime = System.currentTimeMillis();
Runnable runnable = new Runnable() {
@Override
public void run() {
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout > costTime) {
try {
try {
sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, newCallBack,
timeout - costTime);
} catch (MQBrokerException e) {
throw new MQClientException("unknown exception", e);
}
} catch (Exception e) {
newCallBack.onException(e);
}
} else {
newCallBack.onException(new RemotingTooMuchRequestException("call timeout"));
}
}
};
executeAsyncMessageSend(runnable, msg, newCallBack, timeout, beginStartTime);
}
public void executeAsyncMessageSend(Runnable runnable, final Message msg, final BackpressureSendCallBack sendCallback,
final long timeout, final long beginStartTime)
throws MQClientException, InterruptedException {
// 使用了一个 ExecutorService 来实现异步发送:使用asyncSenderExecutor 的线程池,异步调用方法 sendSelectImpl(),继续发送消息的后续工作
ExecutorService executor = this.getAsyncSenderExecutor();
boolean isEnableBackpressureForAsyncMode = this.getDefaultMQProducer().isEnableBackpressureForAsyncMode();
boolean isSemaphoreAsyncNumAcquired = false;
boolean isSemaphoreAsyncSizeAcquired = false;
int msgLen = msg.getBody() == null ? 1 : msg.getBody().length;
sendCallback.msgLen = msgLen;
try {
if (isEnableBackpressureForAsyncMode) {
defaultMQProducer.acquireBackPressureForAsyncSendNumLock();
long costTime = System.currentTimeMillis() - beginStartTime;
isSemaphoreAsyncNumAcquired = timeout - costTime > 0
&& semaphoreAsyncSendNum.tryAcquire(timeout - costTime, TimeUnit.MILLISECONDS);
sendCallback.isSemaphoreAsyncNumAcquired = isSemaphoreAsyncNumAcquired;
defaultMQProducer.releaseBackPressureForAsyncSendNumLock();
if (!isSemaphoreAsyncNumAcquired) {
sendCallback.onException(
new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncNum timeout"));
return;
}
defaultMQProducer.acquireBackPressureForAsyncSendSizeLock();
costTime = System.currentTimeMillis() - beginStartTime;
isSemaphoreAsyncSizeAcquired = timeout - costTime > 0
&& semaphoreAsyncSendSize.tryAcquire(msgLen, timeout - costTime, TimeUnit.MILLISECONDS);
sendCallback.isSemaphoreAsyncSizeAcquired = isSemaphoreAsyncSizeAcquired;
defaultMQProducer.releaseBackPressureForAsyncSendSizeLock();
if (!isSemaphoreAsyncSizeAcquired) {
sendCallback.onException(
new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncSize timeout"));
return;
}
}
// 当前线程把发送任务提交给 asyncSenderExecutor
executor.submit(runnable);
} catch (RejectedExecutionException e) {
if (isEnableBackpressureForAsyncMode) {
runnable.run();
} else {
throw new MQClientException("executor rejected ", e);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
private SendResult sendSelectImpl(
Message msg,
MessageQueueSelector selector,
Object arg,
final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
// 选择将消息发送到哪个队列(Queue)中
try {
List<MessageQueue> messageQueueList =
mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
Message userMessage = MessageAccessor.cloneMessage(msg);
String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
userMessage.setTopic(userTopic);
// 选择哪个队列发送由 MessageQueueSelector#select 方法决定。
// 使用了策略模式(Strategy Pattern),来解决不同场景下需要使用不同的队列选择算法问题。
// 策略模式:定义一系列算法,将每一个算法封装起来,并让它们可以相互替换。策略模式让算法独立于使用它的客户而变化。
// 提供了很多 MessageQueueSelector 的实现,例如随机选择策略,哈希选择策略和同机房选择策略等
mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
} catch (Throwable e) {
throw new MQClientException("select message queue threw exception.", e);
}
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout < costTime) {
throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
}
// 发送消息
if (mq != null) {
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
} else {
throw new MQClientException("select message queue return null.", null);
}
}
validateNameServerSetting();
throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
// client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
String brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
}
// 上下文
SendMessageContext context = null;
if (brokerAddr != null) {
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
byte[] prevBody = msg.getBody();
try {
//for MessageBatch,ID has been set in the generating process
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
boolean topicWithNamespace = false;
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
topicWithNamespace = true;
}
int sysFlag = 0;
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
sysFlag |= this.defaultMQProducer.getCompressType().getCompressionFlag();
msgBodyCompressed = true;
}
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
if (hasCheckForbiddenHook()) {
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
checkForbiddenContext.setCommunicationMode(communicationMode);
checkForbiddenContext.setBrokerAddr(brokerAddr);
checkForbiddenContext.setMessage(msg);
checkForbiddenContext.setMq(mq);
checkForbiddenContext.setUnitMode(this.isUnitMode());
this.executeCheckForbiddenHook(checkForbiddenContext);
}
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
context.setNamespace(this.defaultMQProducer.getNamespace());
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
this.executeSendMessageHookBefore(context);
}
// 构建发送消息的头 RequestHeader
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
requestHeader.setBrokerName(brokerName);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
Message tmpMessage = msg;
boolean messageCloned = false;
if (msgBodyCompressed) {
//If msg body was compressed, msgbody should be reset using prevBody.
//Clone new message using compressed message body and recover origin massage.
//Fix bug:https://github.com/apache/rocketmq-externals/issues/66
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
msg.setBody(prevBody);
}
if (topicWithNamespace) {
if (!messageCloned) {
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
}
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
brokerName,
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
case ONEWAY:
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
// 将消息发送给队列所在的 Broker
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
brokerName,
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
return sendResult;
} catch (RemotingException | InterruptedException | MQBrokerException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} finally {
msg.setBody(prevBody);
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
}
throw new MQClientException("The broker[" + brokerName + "] not exist", null);
}
Producer 整个发消息的流程,无论是同步发送还是异步发送,都统一到了同一个流程中。包括异步发送消息的实现,实际上也是通过一个线程池,在异步线程执行的调用和同步发送相同的底层方法。
NameServer
总体结构
- NamesrvStartup:程序入口。
- NamesrvController:NameServer 的总控制器,负责所有服务的生命周期管理。
- RouteInfoManager:NameServer 最核心的实现类,负责保存和管理集群路由信息,这些路由信息都是保存在内存中,并且没有持久化。
- BrokerHousekeepingService:监控 Broker 连接状态的代理类。
- DefaultRequestProcessor:负责处理客户端和 Broker 发送过来的 RPC 请求的处理器。
- ClusterTestRequestProcessor:用于测试的请求处理器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
public class RouteInfoManager {
// ...
// 保存的是主题和队列信息,其中每个队列信息对应的类 QueueData 中,还保存了 brokerName。
// 这个 brokerName 并不真正是某个 Broker 的物理地址,它对应的一组 Broker 节点,包括一个主节点和若干个从节点。
private final Map<String/* topic */, Map<String, QueueData>> topicQueueTable;
// 保存了集群中每个 brokerName 对应 Broker 信息,每个 Broker 信息用一个 BrokerData 对象表示。
private final Map<String/* brokerName */, BrokerData> brokerAddrTable;
// 保存的是集群名称与 BrokerName 的对应关系。
private final Map<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// 保存了每个 Broker 当前的动态信息,包括心跳更新时间,路由数据版本等等。
private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// 保存了每个 Broker 对应的消息过滤服务的地址,用于服务端消息过滤。
private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
// 用于管理 Topic 与队列映射关系,主要服务于跨 Broker 队列的动态分配与一致性路由。
private final Map<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable;
// ...
}
// remoting/src/main/java/org/apache/rocketmq/remoting/protocol/route/BrokerData.java
/**
* The class describes that a typical broker cluster's (in replication) details: the cluster (in sharding) name
* that it belongs to, and all the single instance information for this cluster.
*/
public class BrokerData implements Comparable<BrokerData> {
// 集群名称
private String cluster;
private String brokerName;
/**
* The container that store the all single instances for the current broker replication cluster.
* The key is the brokerId, and the value is the address of the single broker instance.
*/
// 保存 Broker 物理地址的Map:brokerAddrs,它的 Key 是 BrokerID,Value 就是这个 BrokerID 对应的 Broker的物理地址。
private HashMap<Long, String> brokerAddrs;
}
Broker 注册的路由信息处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
// namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@Override
// 处理 Request 的路由分发器
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
if (ctx != null) {
log.debug("receive request, {} {} {}",
request.getCode(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
request);
}
// 根据 request.getCode() 来分发请求到对应的处理器中
switch (request.getCode()) {
// ...
case RequestCode.REGISTER_BROKER:
// Broker 发给 NameServer 注册请求的 Code 为 REGISTER_BROKER,
// 在代码中根据 Broker 的版本号不同,分别有两个不同的处理实现方法:
// “extractRegisterBrokerBodyFromRequest”和"extractRegisterTopicConfigFromRequest"。
return this.registerBroker(ctx, request);
// ...
default:
String error = " request type " + request.getCode() + " not supported";
return RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
}
}
// namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
// 根据 Broker 请求过来的路由信息,依次对比并更新 clusterAddrTable、brokerAddrTable、topicQueueTable、brokerLiveTable和 filterServerTable 这 5 个保存集群信息和路由信息的 Map 对象中的数据。
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final String zoneName,
final Long timeoutMillis,
final Boolean enableActingMaster,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
// 加写锁,防止并发修改数据
this.lock.writeLock().lockInterruptibly();
//init or update the cluster info
// 更新 clusterAddrTable
Set<String> brokerNames = ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap<String, Set<String>>) this.clusterAddrTable, clusterName, k -> new HashSet<>());
brokerNames.add(brokerName);
// // 更新 brokerAddrTable
boolean registerFirst = false;
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true; // 标识需要先注
brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());
this.brokerAddrTable.put(brokerName, brokerData);
}
boolean isOldVersionBroker = enableActingMaster == null;
brokerData.setEnableActingMaster(!isOldVersionBroker && enableActingMaster);
brokerData.setZoneName(zoneName);
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
boolean isMinBrokerIdChanged = false;
long prevMinBrokerId = 0;
if (!brokerAddrsMap.isEmpty()) {
prevMinBrokerId = Collections.min(brokerAddrsMap.keySet());
}
if (brokerId < prevMinBrokerId) {
isMinBrokerIdChanged = true;
}
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
// 更新 brokerAddrTable 中的 brokerData
brokerAddrsMap.entrySet().removeIf(item -> null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey());
//If Local brokerId stateVersion bigger than the registering one,
String oldBrokerAddr = brokerAddrsMap.get(brokerId);
if (null != oldBrokerAddr && !oldBrokerAddr.equals(brokerAddr)) {
BrokerLiveInfo oldBrokerInfo = brokerLiveTable.get(new BrokerAddrInfo(clusterName, oldBrokerAddr));
if (null != oldBrokerInfo) {
long oldStateVersion = oldBrokerInfo.getDataVersion().getStateVersion();
long newStateVersion = topicConfigWrapper.getDataVersion().getStateVersion();
if (oldStateVersion > newStateVersion) {
log.warn("Registering Broker conflicts with the existed one, just ignore.: Cluster:{}, BrokerName:{}, BrokerId:{}, " +
"Old BrokerAddr:{}, Old Version:{}, New BrokerAddr:{}, New Version:{}.",
clusterName, brokerName, brokerId, oldBrokerAddr, oldStateVersion, brokerAddr, newStateVersion);
//Remove the rejected brokerAddr from brokerLiveTable.
brokerLiveTable.remove(new BrokerAddrInfo(clusterName, brokerAddr));
return result;
}
}
}
if (!brokerAddrsMap.containsKey(brokerId) && topicConfigWrapper.getTopicConfigTable().size() == 1) {
log.warn("Can't register topicConfigWrapper={} because broker[{}]={} has not registered.",
topicConfigWrapper.getTopicConfigTable(), brokerId, brokerAddr);
return null;
}
// 如果是新注册的 Master Broker,或者 Broker 中的路由信息变了,需要更新topicQueueMappingInfoTable
String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr);
registerFirst = registerFirst || (StringUtils.isEmpty(oldAddr));
boolean isMaster = MixAll.MASTER_ID == brokerId;
boolean isPrimeSlave = !isOldVersionBroker && !isMaster
&& brokerId == Collections.min(brokerAddrsMap.keySet());
if (null != topicConfigWrapper && (isMaster || isPrimeSlave)) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);
Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap();
// Delete the topics that don't exist in tcTable from the current broker
// Static topic is not supported currently
if (namesrvConfig.isDeleteTopicWithBrokerRegistration() && topicQueueMappingInfoMap.isEmpty()) {
final Set<String> oldTopicSet = topicSetOfBrokerName(brokerName);
final Set<String> newTopicSet = tcTable.keySet();
final Sets.SetView<String> toDeleteTopics = Sets.difference(oldTopicSet, newTopicSet);
for (final String toDeleteTopic : toDeleteTopics) {
Map<String, QueueData> queueDataMap = topicQueueTable.get(toDeleteTopic);
final QueueData removedQD = queueDataMap.remove(brokerName);
if (removedQD != null) {
log.info("deleteTopic, remove one broker's topic {} {} {}", brokerName, toDeleteTopic, removedQD);
}
if (queueDataMap.isEmpty()) {
log.info("deleteTopic, remove the topic all queue {}", toDeleteTopic);
topicQueueTable.remove(toDeleteTopic);
}
}
}
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
if (registerFirst || this.isTopicConfigChanged(clusterName, brokerAddr,
topicConfigWrapper.getDataVersion(), brokerName,
entry.getValue().getTopicName())) {
final TopicConfig topicConfig = entry.getValue();
// In Slave Acting Master mode, Namesrv will regard the surviving Slave with the smallest brokerId as the "agent" Master, and modify the brokerPermission to read-only.
if (isPrimeSlave && brokerData.isEnableActingMaster()) {
// Wipe write perm for prime slave
topicConfig.setPerm(topicConfig.getPerm() & (~PermName.PERM_WRITE));
}
this.createAndUpdateQueueData(brokerName, topicConfig);
}
}
if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {
//the topicQueueMappingInfoMap should never be null, but can be empty
for (Map.Entry<String, TopicQueueMappingInfo> entry : topicQueueMappingInfoMap.entrySet()) {
if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<>());
}
//Note asset brokerName equal entry.getValue().getBname()
//here use the mappingDetail.bname
topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());
}
}
}
}
BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
// 更新 brokerLiveTable
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddrInfo,
new BrokerLiveInfo(
System.currentTimeMillis(),
timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
topicConfigWrapper == null ? new DataVersion() : topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAService: {}", brokerAddrInfo, haServerAddr);
}
// 更新 filterServerTable
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddrInfo);
} else {
this.filterServerTable.put(brokerAddrInfo, filterServerList);
}
}
// 如果是 Slave Broker,需要在返回的信息中带上 master 的相关信息
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerAddrInfo masterAddrInfo = new BrokerAddrInfo(clusterName, masterAddr);
BrokerLiveInfo masterLiveInfo = this.brokerLiveTable.get(masterAddrInfo);
if (masterLiveInfo != null) {
result.setHaServerAddr(masterLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
if (isMinBrokerIdChanged && namesrvConfig.isNotifyMinBrokerIdChanged()) {
notifyMinBrokerIdChanged(brokerAddrsMap, null,
this.brokerLiveTable.get(brokerAddrInfo).getHaServerAddr());
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
} finally {
// 释放写锁
this.lock.writeLock().unlock();
}
return result;
}
客户端寻找Broker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// remoting/src/main/java/org/apache/rocketmq/remoting/protocol/route/TopicRouteData.java
// 客户端在启动后,会启动一个定时器,定期从 NameServer 上拉取相关主题的路由信息,然后缓存在本地内存中,在需要的时候使用。
// 每个主题的路由信息用一个 TopicRouteData 对象来表示
public class TopicRouteData extends RemotingSerializable {
private String orderTopicConf;
/*
客户端选定了队列后,可以在对应的 QueueData 中找到对应的BrokerName,
然后用这个 BrokerName 找到对应的 BrokerData 对象,最终找到对应的Master Broker 的物理地址
*/
// 保存了主题中的所有队列信息
private List<QueueData> queueDatas;
// 保存了主题相关的所有 Broker 信息
private List<BrokerData> brokerDatas;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
// namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
// 通过路由分发器将请求分发的对应的处理方法中
public TopicRouteData pickupTopicRouteData(final String topic) {
// 初始化返回数据 topicRouteData
TopicRouteData topicRouteData = new TopicRouteData();
boolean foundQueueData = false;
boolean foundBrokerData = false;
List<BrokerData> brokerDataList = new LinkedList<>();
topicRouteData.setBrokerDatas(brokerDataList);
HashMap<String, List<String>> filterServerMap = new HashMap<>();
topicRouteData.setFilterServerTable(filterServerMap);
try {
// 加读锁
this.lock.readLock().lockInterruptibly();
// 先获取主题对应的队列信息
Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topic);
if (queueDataMap != null) {
// 把队列信息返回值中
topicRouteData.setQueueDatas(new ArrayList<>(queueDataMap.values()));
foundQueueData = true;
Set<String> brokerNameSet = new HashSet<>(queueDataMap.keySet());
// 遍历这些 BrokerName,找到对应的 BrokerData,并写入返回结果中
for (String brokerName : brokerNameSet) {
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
continue;
}
BrokerData brokerDataClone = new BrokerData(brokerData);
brokerDataList.add(brokerDataClone);
foundBrokerData = true;
if (filterServerTable.isEmpty()) {
continue;
}
for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(brokerDataClone.getCluster(), brokerAddr);
List<String> filterServerList = this.filterServerTable.get(brokerAddrInfo);
filterServerMap.put(brokerAddr, filterServerList);
}
}
}
} catch (Exception e) {
log.error("pickupTopicRouteData Exception", e);
} finally {
// 释放读锁
this.lock.readLock().unlock();
}
log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
if (foundBrokerData && foundQueueData) {
// 将 Topic 的队列映射信息(topicQueueMappingInfoTable)注入路由数据,供客户端识别队列分布规则。
topicRouteData.setTopicQueueMappingByBroker(this.topicQueueMappingInfoTable.get(topic));
// 若 NameServer 配置未启用 Acting Master 功能,直接返回原始路由数据。
if (!namesrvConfig.isSupportActingMaster()) {
return topicRouteData;
}
// 跳过以 SYNC_BROKER_MEMBER_GROUP_PREFIX 开头的内部 Topic(如 Broker 成员组同步 Topic),避免干扰系统内部通信。
if (topic.startsWith(TopicValidator.SYNC_BROKER_MEMBER_GROUP_PREFIX)) {
return topicRouteData;
}
if (topicRouteData.getBrokerDatas().size() == 0 || topicRouteData.getQueueDatas().size() == 0) {
return topicRouteData;
}
boolean needActingMaster = false;
// 遍历所有 Broker,若某 Broker 组有存活的节点但无 Master,则标记需处理。
for (final BrokerData brokerData : topicRouteData.getBrokerDatas()) {
if (brokerData.getBrokerAddrs().size() != 0
&& !brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
needActingMaster = true;
break;
}
}
if (!needActingMaster) {
return topicRouteData;
}
// 动态设置 Acting Master
for (final BrokerData brokerData : topicRouteData.getBrokerDatas()) {
final HashMap<Long, String> brokerAddrs = brokerData.getBrokerAddrs();
if (brokerAddrs.size() == 0 || brokerAddrs.containsKey(MixAll.MASTER_ID) || !brokerData.isEnableActingMaster()) {
continue;
}
// No master
for (final QueueData queueData : topicRouteData.getQueueDatas()) {
if (queueData.getBrokerName().equals(brokerData.getBrokerName())) {
if (!PermName.isWriteable(queueData.getPerm())) {
final Long minBrokerId = Collections.min(brokerAddrs.keySet());
final String actingMasterAddr = brokerAddrs.remove(minBrokerId);
brokerAddrs.put(MixAll.MASTER_ID, actingMasterAddr);
}
break;
}
}
}
return topicRouteData;
}
return null;
}
事务
- executeLocalTransaction:执行本地事务。
- checkLocalTransaction:反查本地事务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
public TransactionSendResult sendMessageInTransaction(final Message msg,
final TransactionListener localTransactionListener, final Object arg)
throws MQClientException {
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionListener && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
// ignore DelayTimeLevel parameter
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
// 这里给消息添加了属性,标明这是一个事务消息,也就是半消息
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
// 调用发送普通消息的方法,发送这条半消息
try {
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
// 执行本地事务
if (null != localTransactionListener) {
localTransactionState = localTransactionListener.executeLocalTransaction(msg, arg);
} else {
log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return: {} messageTopic: {} transactionId: {} tag: {} key: {}",
localTransactionState, msg.getTopic(), msg.getTransactionId(), msg.getTags(), msg.getKeys());
}
} catch (Throwable e) {
log.error("executeLocalTransactionBranch exception, messageTopic: {} transactionId: {} tag: {} key: {}",
msg.getTopic(), msg.getTransactionId(), msg.getTags(), msg.getKeys(), e);
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
// 根据事务消息和本地事务的执行结果 localTransactionState,决定提交或回滚事务消息
// 这里给 Broker 发送提交或回滚事务的 RPC 请求。
try {
this.endTransaction(msg, sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
String uniqId = msgInner.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (uniqId != null && !uniqId.isEmpty()) {
MessageAccessor.putProperty(msgInner, TransactionalMessageUtil.TRANSACTION_ID, uniqId);
}
// 记录消息的主题和队列,到新的属性中
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
// 替换消息的主题和队列为:RMQ_SYS_TRANS_HALF_TOPIC,0
// 并没有把半消息保存到消息中客户端指定的那个队列中,而是记录了原始的主题队列后,把这个半消息保存在了一个特殊的内部主题RMQ_SYS_TRANS_HALF_TOPIC 中,使用的队列号固定为 0。
// 这个主题和队列对消费者是不可见的,所以里面的消息永远不会被消费。F
// 保证了在事务提交成功之前,这个半消息对消费者来说是消费不到的。
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}