消息队列之Pulsar

Posted by 淦 Blog on March 25, 2025

Pulsar 是一个开源的分布式消息队列产品,最早是由 Yahoo 开发,现在是 Apache 基金会旗下的开源项目。

架构

无论是 RocketMQ、RabbitMQ 还是 Kafka,消息都是存储在 Broker 的磁盘或者内存中。客户端在访问某个主题分区之前,必须先找到这个分区所在 Broker,然后连接到这个 Broker 上进行生产和消费。

在集群模式下,为了避免单点故障导致丢消息,Broker 在保存消息的时候,必须也把消息复制到其他的 Broker 上。当某个 Broker 节点故障的时候,并不是集群中任意一个节点都能替代这个故障的节点,只有那些“和这个故障节点拥有相同数据的节点”才能替代这个故障的节点。原因就是,每一个 Broker 存储的消息数据是不一样的,或者说,每个节点上都存储了状态(数据)。这种节点称为“有状态的节点(Stateful Node)”。

Pulsar 与其他消息队列在架构上,最大的不同在于,它的 Broker 是无状态的(Stateless)。也就是说,在 Pulsar 的 Broker 中既不保存元数据,也不存储消息。

  • ZooKeeper 集群被用来:存储元数据。
  • BookKeeper:存储消息数据。有点儿类似 HDFS,是一个分布式的存储集群,只不过它的存储单元和 HDFS 不一样,在 HDFS 中存储单元就是文件,而 BookKeeper 的存储单元是 Ledger。Ledger 是一段 WAL(Write Ahead Log),简单地理解为某个主题队列的一段,它包含了连续的若干条消息,消息在 Ledger 中称为 Entry。为了保证 Ledger 中的 Entry 的严格顺序,Pulsar 为 Ledger 增加一次性的写入限制,Broker 创建一个 Ledger 后,只有这个 Broker 可以往 Ledger 中写入 Entry,一旦 Ledger 关闭后,无论是 Broker 主动关闭,还是因为 Broker 宕机异常关闭,这个 Ledger 就永远只能读取不能写入了。如果需要继续写入 Entry,只能新建另外一个 Ledger。“一次性写入”的主要目的是为了解决并发写入控制的问题。对于共享资源数据的并发写一般都是需要加锁的,否则很难保证数据的一致性。对于分布式存储来说,就需要加“分布式锁”。但分布式锁本身就很难实现,使用分布式锁对性能也会有比较大的损失。这种“一次性写入”的设计,只有创建 Ledger 的进程可以写入数据,Ledger 这个资源不共享,也就不需要加锁。
  • LoadBalancer:负责动态的分配,哪些 Broker 管理哪些主题分区。
  • Managed Ledger:负责管理本节点需要用到的那些 Ledger,这些 Ledger 都是保存在 BookKeeper 集群中的。
  • Cache:来缓存一部分 Ledger,提升性能。

Pulsar 的客户端要读写某个主题分区上的数据之前,依然要在元数据中找到分区当前所在的那个 Broker,这一点是和其他消息队列的实现是一样的。不一样的地方是,其他的消息队列,分区与 Broker 的对应关系是相对稳定的,只要不发生故障,这个关系是不会变的。而在 Pulsar 中,这个对应关系是动态的,它可以根据 Broker 的负载情况进行动态调整,而且由于 Broker 是无状态的,分区可以调整到集群中任意一个 Broker 上,这个负载均衡策略就可以做得非常简单并且灵活。如果某一个 Broker 发生故障,可以立即用任何一个 Broker 来替代它。

客户端在收发消息之前,需要先连接 Service Discovery 模块,获取当前主题分区与 Broker 的对应关系,然后再连接到相应 Broker 上进行消息收发。客户端收发消息的整体流程,和其他的消息队列是差不多的。比较显著的一个区别就是,消息是保存在 BookKeeper 集群中的,而不是本机上。数据的可靠性保证也是 BookKeeper 集群提供的,所以 Broker 就不需要再往其他的 Broker 上复制消息了。

存储计算分离设计

优点

  1. 对于计算节点不需要存储数据,节点就变成了无状态的(Stateless)节点。一个由无状态节点组成的集群,管理、调度都变得非常简单。集群中每个节点都是一样的,天然就支持水平扩展。任意一个请求都可以路由到集群中任意一个节点上,负载均衡策略可以做得非常灵活,可以随机分配、轮询,也可以根据节点负载动态分配等等。故障转移(Failover)也更加简单快速,如果某个节点故障了,直接把请求分配给其他节点。
  2. 对比像 ZooKeeper 这样存储计算不分离的系统,故障转移就非常麻烦,一般需要用复杂的选举算法,选出新的 leader,提供服务之前,可能还需要进行数据同步,确保新的节点上的数据和故障节点是完全一致之后,才可以继续提供服务。这个过程是非常复杂而且漫长的。
  3. 对于计算节点的开发者可以专注于计算业务逻辑开发,而不需要关注像数据一致性、数据可靠性、故障恢复和数据读写性能等等这些比较麻烦的存储问题,极大地降低了开发难度,提升了开发效率。
  4. 对于存储系统需要实现的功能就很简单,系统的开发者只需要专注于解决“如何安全高效地存储数据?”并且,存储系统的功能是非常稳定的,像 ZooKeeper、HDFS、MySQL 这些存储系统,从它们诞生到现在,功能几乎就没有变过。每次升级都是在优化存储引擎,提升性能、数据可靠性、可用性等等。

缺点

  1. BookKeeper 依然要解决数据一致性、节点故障转移、选举、数据复制等等这些问题。原来一个集群变成了两个集群,整个系统其实变得更加复杂了。
  2. 系统的性能也会有一些损失。比如,从 Pulsar 的 Broker 上消费一条消息,Broker 还需要去请求 BookKeeper 集群读取数据,然后返回给客户端,这个过程至少增加了一次网络传输和 n 次内存拷贝。相比于直接读本地磁盘,性能肯定是要差一些的。

Pulsar 存储计算分离的架构是未来消息队列的发展方向?

早期的消息队列,主要被用来在系统之间异步交换数据,大部分消息队列的存储能力都比较弱,不支持消息持久化,不提倡在消息队列中堆积大量的消息,这个时期的消息队列,本质上是一个数据的管道。

现代的消息队列,功能上看似没有太多变化,依然是收发消息,但是用途更加广泛,数据被持久化到磁盘中,大多数消息队列具备了强大的消息堆积能力,只要磁盘空间足够,可以存储无限量的消息,而且不会影响生产和消费的性能。这些消息队列,本质上已经演变成为分布式的存储系统。

为一个“分布式存储系统”做存储计算分离,计算节点就没什么业务逻辑需要计算的了。而且,消息队列又不像其他的业务系统,可以直接使用一些成熟的分布式存储系统来存储消息,因为性能达不到要求。分离后的存储节点承担了之前绝大部分功能,并且增加了系统的复杂度,还降低了性能,显然是不划算的。

现在各大消息队列的 Roadmap(发展路线图),Kafka 在做 Kafka Streams,Pulsar 在做 Pulsar Functions,就是流计算。现有的流计算平台,包括 Storm、Flink 和 Spark,它们的节点都是无状态的纯计算节点,是没有数据存储能力的。所以,现在的流计算平台,它很难做大量数据的聚合,并且在数据可靠性保证、数据一致性、故障恢复等方面,也做得不太好。

而消息队列正好相反,它很好地保证了数据的可靠性、一致性,但是 Broker 只具备存储能力,没有计算的功能,数据流进去什么样,流出来还是什么样。同样是处理实时数据流的系统,一个只能计算不能存储,一个只能存储不能计算,那未来如果出现一个新的系统,既能计算也能存储,如果还能有不错的性能,是不是就会把现在的消息队列和流计算平台都给替代了?这是很有可能的。

对于一个“带计算功能的消息队列”来说,采用存储计算分离的设计,计算节点负责流计算,存储节点负责存储消息,这个设计就非常和谐了。