架构设计与实现
由 Producer、Broker、Consumer 三个大模块组成。生产者将数据发送到 Broker,Broker 接收到数据后,将数据存储到对应的 Queue 里面,消费者从不同的 Queue 消费数据。
Exchange 称为交换器,它是一个逻辑上的概念,用来做分发,本身不存储数据。流程上生产者先将消息发送到 Exchange,然后 Exchange 会根据一定的规则将数据分发到实际的 Queue 里面存储。这个分发过程就是 Route(路由),设置路由规则的过程就是 Bind(绑定)。即 Exchange 会接收客户端发送过来的 route_key,然后根据不同的路由规则,将数据发送到不同的 Queue 里面。
在 RabbitMQ 中是没有 Topic 这个用来组织分区的逻辑概念的。RabbitMQ 中的 Topic 是指 Topic 路由模式,是一种路由模式,和消息队列中的 Topic 意义是完全不同的。
通信协议
数据流是基于四层 TCP 协议通信的,跑在 TCP 上的应用层协议是 AMQP。如果开启 Management 插件,也支持 HTTP 协议的生产和消费。TCP + AMQP 是数据流的默认访问方式,也是官方推荐的使用方式,因为它性能会比 HTTP 高很多。
在协议内容和连接管理方面,都是遵循 AMQP 规范。即 RabbitMQ 的模型架构和 AMQP 的模型架构是一样的,交换器、交换器类型、队列、绑定、路由键等都是遵循 AMQP 协议中相应的概念。
AMQP 是一个应用层的通信协议,即一系列结构化命令的集合,用来填充 TCP 层协议的 body 部分。通过协议命令进行交互,完成各种消息队列的基本操作,如 Connection.Start(建立连接)、Basic.Publish(发送消息)等等。
网络模块
Connection 是指 TCP 连接,Channel 是 Connection 中的虚拟连接。一个客户端和一个 Broker 之间只会建立一条 TCP 连接,即 Connection,一个 Connection 中可以创建多个 Channel。
客户端和服务端的实际通信都是在 Channel 维度通信的。这个机制可以减少实际的 TCP 连接数量,基于 IO 复用、异步 I/O 的思路来设计,从而降低网络模块的损耗。
网络模块主要包含 tcp_listener、tcp_acceptor、rabbit_reader 三个进程。RabbitMQ 服务端通过 tcp_listener 监听端口,tcp_acceptor 接收请求,rabbit_reader 处理和返回请求。本质上来看是也是一个多线程的网络模型。
数据存储
元数据存储
存在于 Erlang 自带的分布式数据库 Mnesia 中的。即每台 Broker 都会起一个 Mnesia 进程,用来保存一份完整的元数据信息。因为 Mnesia 本身是一个分布式的数据库,自带了多节点的 Mnesia 数据库之间的同步机制。所以在元数据的存储模块,Broker 只需要调用本地的 Mnesia 接口保存、变更数据即可。不同节点的元数据同步 Mnesia 会自动完成。
在一些异常的情况下,如果不同节点上的 Mnesia 之间的数据同步出现问题,就会导致不同的 Mnesia 数据库之间数据不一致,进而导致集群出现脑裂、无法启动等情况。此时就需要手动修复异常的 Mnesia 实例上的数据。
消息数据存储
消息数据的最小存储单元是 Queue,即消息数据是按顺序写入存储到 Queue 里面的。在底层的数据存储方面,所有的 Queue 数据是存储在同一个“文件”里面的。这个“文件”是一个虚拟的概念,表示所有的 Queue 数据是存储在一起的意思。
这个“文件”由队列索引(rabbit_queue_index)和消息存储(rabbitmq_msg_store)两部分组成。即在节点维度,所有 Queue 数据都是存储在 rabbit_msg_store 里面的,每个节点上只有一个 rabbit_msg_store,数据会依次顺序写入到 rabbit_msg_store 中。
rabbit_msg_store 是一个逻辑概念,底层的实际存储单元分为两个,msg_store_persistent 和 msg_store_transient,分别负责持久化消息和非持久化消息的存储。
msg_store_persistent 和 msg_store_transient 在操作系统上是以文件夹的形式表示的,具体的数据存储是以不同的文件段的形式存储在目录中,所有消息都会以追加的形式写入到文件中。当一个文件的大小超过了配置的单个文件的最大值,就会关闭这个文件,然后再创建一个文件来存储数据。
队列索引负责存储、维护队列中落盘消息的信息,包括消息的存储位置、是否交付、是否 ACK 等等信息。队列索引是 Queue 维度的,每个 Queue 都有一个对应的队列索引。
删除消息时,不会立即删除数据,只是从 Erlang 中的 ETS 表删除指定消息的相关信息,同时更新消息对应的存储文件的相关信息。此时文件中的消息不会立即被删除,会被标记为已删除数据,直到一个文件中都是可以删除的数据时,再将这个文件删除,这个动作就是常说的延时删除。另外内核有检测机制,会检查前后两个文件中的数据是否可以合并,当符合合并规则时,会进行段文件的合并。
生产者和消费者
当生产者和消费者连接到 Broker 进行生产消费的时候,是直接和 Broker 交互的,不需要客户端寻址。
集群部署后,为了提高容灾能力,就需要在集群前面挂一层负载均衡来进行灾备。客户端拿到负载均衡 IP 后,在生产或消费时使用这个 IP 和服务端直接建立连接。
在每个 Broker 上会设置有转发的功能和保存集群所有的元数据信息。当 Broker 收到请求后,根据本地缓存的元数据信息判断 Queue 是否在本机上,如果不在本机,就会将请求转发到 Queue 所在的目标节点。
生产者
生产端发送数据不是直接发送到 Queue,而是直接发送到 Exchange,然后在服务端根据路由绑定规则,将数据分发到不同的 Queue 中,所以在客户端是没有发送生产分区分配策略的逻辑。
Exchagne 和 Route 的功能就是生产分区分配的过程,只是将这个逻辑从客户端移动到了服务端而已。
消费者
支持 Push(推)和 Pull(拉)两种模式。如果使用了 Push 模式,Broker 会不断地推送消息给消费者。不需要客户端主动来拉,只要服务端有消息就会将数据推给客户端。推送消息的个数会受到 channel.basicQos 的限制,不能无限推送,在消费端会设置一个缓冲区来缓冲这些消息。拉模式是指客户端不断地去服务端拉取消息,拉模式只支持拉取单条消息。
为了保证消费流程的可靠性,提供了消息确认机制,包含自动 ACK 和手动 ACK 两种机制。
集群构建
节点发现
通过插件化的方式支持了多种发现方式,用来满足不同场景下的集群构建需求。
主要分为固定配置发现、类广播机制发现、第三方组件发现、手动管理等 4 种类型,以及固定配置、DNS、AWS(EC2)、Kubernetes、Consule、etcd、手动管理等 7 种发现方式。
- 固定配置发现:通过在配置文件中配置集群中所有节点的信息,从而发现集群所有节点的方式。和 ZooKeeper 的节点发现机制是一个思路。
- 类广播机制发现:通过 DNS 本身的机制解析出所有可用 IP 列表,从而发现集群中的所有节点。和 Elasticsearch 通过多播来动态发现集群节点是类似的思路。
- 第三方组件发现:是指通过多种第三方组件发现集群中的所有节点信息,比如 AWS(EC2)、Kubernetes、Consul、etcd 等。和 Kafka、Pulsar 依赖 ZooKeeper,RocketMQ 依赖 NameServer 是一个思路。
- 手动管理:通过命令 rabbitmqctlctl 工具往集群中手动添加或移除节点,即依赖人工来管理集群。
元数据存储
通过内置数据库 Mnesia,相当于天生自带了一个 Kafka KRaft 构建的元数据存储服务。
数据可靠性
集群维度数据可靠性的核心是副本和数据一致性协议。
RabbitMQ 没有 Topic,只有 Queue 的概念,所以是通过在 Queue 维度创建副本来实现数据的高可靠。比较特殊的是,RabbitMQ 在创建 Queue 时没有副本概念,即创建出来的 Queue 都是单副本的。如果要支持多副本,在 3.8.0 之前需要通过配置镜像队列来实现,在 3.8.0 后可以使用 Quorum Queue(仲裁队列)来实现。
镜像队列
通过为 Queue 创建副本、完成主从副本之间数据同步、维护主从副本之间数据的一致性等 3 个手段来保证数据的可靠性。
队列的副本数量是通过 All、Exactly、Nodes 3 种策略:
- All:集群中所有节点上都要有这个 Queue 的副本。
- Exactly:Queue 的副本分布在几个节点上,即 Queue 的副本数。
- Nodes:这个 Queue 的副本具体分布在哪几个节点上。
镜像队列在实现上有同步和性能上的缺陷,主要体现在以下三点:
- 强一致的同步策略很影响性能,导致集群的性能不高。
- 当节点挂掉后,节点上的队列数据数据都会被清空,需要重新从其他节点同步数据。
- 队列同步数据时会阻塞整个队列,导致队列不可用。如果数据量很大,同步时间长会导致队列长时间不可用。
所以,3.8.0 以后,RabbitMQ 用仲裁队列替代了镜像队列来解决这些问题。
仲裁队列
基于 Raft 算法来设计的,依赖 Raft 共识协议来确保数据的一致性和可靠性。
主从模型,至少得有三个副本。通过主副本和多个从副本之间的数据同步,来实现数据的高可靠。队列会选举出主副本来实现数据的读写,当主副本不可用,会根据算法从副本中选举出新的主副本。
当副本挂掉重新启动时,只需要从主节点同步增量数据,并且不会影响主副本的可用性,从而避免了镜像队列的缺点。
安全控制
传输加密
内置了对 TLS 的支持,通过 TLS 来实现数据的加密传输。
身份认证
支持用户名 / 密码认证和 X.509 证书两种认证形式。同时通过插件化机制,支持了 LDAP 认证、HTTP 接口认证、IP 来源认证等认证方式。
在认证配置上,RabbitMQ 支持链式认证。即同时支持多种认证方式,需要完成多重身份认证才算认证成功。
资源鉴权
管理页面
启用 Management 插件后的 Manager 页面。这个页面可以执行查看监控、创建资源、删除资源等操作,权限很大。为了保证集群的安全,需要对这个页面的访问进行权限控制。
在创建用户时通过指定不同用户的访问角色类型,从而控制不同用户在管理页面上的操作权限。Manager 页面支持 management、policymaker、monitoring、administrator 四种角色类型,分别表示不同的权限粒度。
数据流
主要包括资源操作(如创建、创建等)、写入、读取三种类型,分别对应 Configure、Write、Read 三种权限。主要支持对 Vhost 和 Exchange 两种资源进行鉴权。
可观测性
监控指标
- 集群维度:主要包含集群的监控指标,比如 Exchange 数、Queue 数、Channel 数、生产者数、消费者数等等。
- 应用程序维度:主要包含进程级的监控信息,比如 Socket 连接打开率、Channel 打开率等
- 队列维度:主要包含队列的监控指标,比如队列上的消息总数、未确认的消息总数等等。
- 节点维度:主要包含节点的信息,比如硬件层面的 CPU、内存、硬盘,操作系统层面的 Socket 连接数、文件描述符数量,Erlang 虚拟机层面的 Erlang 线程数、GC 情况等等。
在指标暴露方面,RabbitMQ 提供了 Prometheus + Grafana、HTTP API、命令行工具等多种方式。
健康检查
内核自带了健康检查机制。即支持通过命令行工具(rabbitmq-diagnostics)或 HTTP API 的方式对集群发起健康检查。检查集群的创建 Exchange、创建Queue、生产消费消息全流程是否正常。
日志
支持多种类型、多种级别的日志记录。也支持将不同类型的日志打印到不同的文件,比如 Connection、Channel、Upgrade、Queue 等等。同时也支持按时间、大小保留滚动文件。
消息轨迹
RabbitMQ 原生不支持消息轨迹的功能,但可基于 Firehose 插件来扩展支持。