Kafka副本机制详解
副本机制(Replication),即备份机制,通常是指分布式系统在多台网络互联的机器上保存有相同的数据拷贝。
- 提供数据冗余。即使系统部分组件失效,系统依然能够继续运转,因而增加了整体可用性以及数据持久性。
- 提供高伸缩性。支持横向扩展,能够通过增加机器的方式来提升读性能,进而提高读操作吞吐量。
- 改善数据局部性。允许将数据放入与用户地理位置相近的地方,从而降低系统延时。
副本定义
Kafka 是有主题概念的,而每个主题又进一步划分成若干个分区。副本的概念实际上是在分区层级下定义的,每个分区配置有若干个副本。
所谓副本(Replica),本质就是一个只能追加写消息的提交日志。根据 Kafka 副本机制的定义,同一个分区下的所有副本保存有相同的消息序列,这些副本分散保存在不同的 Broker 上,从而能够对抗部分 Broker 宕机带来的数据不可用。
副本角色
基于领导者(Leader-based)的副本机制。

- 在 Kafka 中,副本分成两类:领导者副本(Leader Replica)和追随者副本(Follower Replica)。每个分区在创建时都要选举一个副本,称为领导者副本,其余的副本自动称为追随者副本。
- Kafka 的副本机制比其他分布式系统要更严格一些。在 Kafka 中,追随者副本是不对外提供服务的。即任何一个追随者副本都不能响应消费者和生产者的读写请求。所有的请求都必须由领导者副本来处理,或者说,所有的读写请求都必须发往领导者副本所在的 Broker,由该 Broker 负责处理。追随者副本不处理客户端请求,它唯一的任务就是从领导者副本异步拉取消息,并写入到自己的提交日志中,从而实现与领导者副本的同步。
- 当领导者副本挂掉了,或者说领导者副本所在的 Broker 宕机时,Kafka 依托于 ZooKeeper 提供的监控功能能够实时感知到,并立即开启新一轮的领导者选举,从追随者副本中选一个作为新的领导者。老 Leader 副本重启回来后,只能作为追随者副本加入到集群中。
注:追随者副本是不对外提供服务的。
副本机制好处:
- 方便实现“Read-your-writes”。当使用生产者 API 向 Kafka 成功写入消息后,马上使用消费者 API 去读取刚才生产的消息。
- 方便实现单调读(Monotonic Reads)。对于一个消费者用户而言,在多次消费消息时,它不会看到某条消息会儿存在一会儿不存在。
In-sync Replicas(ISR)
ISR 副本集合。ISR 中的副本都是与 Leader 同步的副本,相反,不在 ISR 中的追随者副本就被认为是与 Leader 不同步的。
Leader 副本天然就在 ISR 中。即 ISR 不只是追随者副本集合,它必然包括 Leader 副本。甚至在某些情况下,ISR 只有 Leader 这一个副本。
Broker 端参数 replica.lag.time.max.ms,表示Follower 副本能够落后 Leader 副本的最长时间间隔,当前默认值是 10 秒。只要一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。
Follower 副本唯一的工作就是不断地从 Leader 副本拉取消息,然后写入到自己的提交日志中。如果这个同步过程的速度持续慢于 Leader 副本的消息写入速度,那么在 replica.lag.time.max.ms 时间后,此 Follower 副本就会被认为是与 Leader 副本不同步的,因此不能再放入 ISR 中。此时,Kafka 会自动收缩 ISR 集合,将该副本“踢出”ISR。
倘若该副本后面慢慢地追上了 Leader 的进度,那么它是能够重新被加回 ISR 的。ISR 是一个动态调整的集合,而非静态不变的。
Unclean 领导者选举(Unclean Leader Election)
Kafka 把所有不在 ISR 中的存活副本都称为非同步副本。在 Kafka 中,选举这种副本的过程称为 Unclean 领导者选举。Broker 端参数 unclean.leader.election.enable 控制是否允许 Unclean 领导者选举。
开启 Unclean 领导者选举可能会造成数据丢失,但好处是,它使得分区 Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。反之,禁止 Unclean 领导者选举的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。
CAP 理论,一个分布式系统通常只能同时满足一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)中的两个。在这个问题上,Kafka 赋予你选择 C 或 A 的权利。
根据实际业务场景决定是否开启 Unclean 领导者选举。不过,强烈建议不要开启它,毕竟还可以通过其他的方式来提升高可用性。如果为了这点儿高可用性的改善,牺牲了数据一致性,那就非常不值当了。
Kafka 请求处理
无论是 Kafka 客户端还是 Broker 端,它们之间的交互都是通过“请求 / 响应”的方式完成的。比如,客户端会通过网络发送消息生产请求给 Broker,而 Broker 处理完成后,会发送对应的响应给到客户端。
Apache Kafka 自己定义了一组请求协议,用于实现各种各样的交互操作。比如常见的 PRODUCE 请求是用于生产消息的,FETCH 请求是用于消费消息的,METADATA 请求是用于请求 Kafka 集群元数据信息的。
总之,Kafka 定义了很多类似的请求格式。截止到目前最新的 2.3 版本,Kafka 共定义了多达 45 种请求格式。所有的请求都是通过 TCP 网络以 Socket 的方式进行通讯的。
Kafka Broker 端处理请求的全流程
顺序处理请求
吞吐量太差。由于只能顺序处理每个请求,每个请求都必须等待前一个请求处理完毕才能得到处理。这种方式只适用于请求发送非常不频繁的系统。
每个请求使用单独线程处理
为每个入站请求都创建一个新的线程来异步处理。它是完全异步的,每个请求的处理都不会阻塞下一个请求。但为每个请求都创建线程的做法开销极大,在某些场景下甚至会压垮整个服务,只适用于请求发送频率很低的业务场景。
Reactor 模式
Reactor 模式是事件驱动架构的一种实现方式,特别适合应用于处理多个客户端并发向服务器端发送请求的场景。

多个客户端会发送请求给到 Reactor。Reactor 有个请求分发线程 Dispatcher,即 Acceptor,它会将不同的请求下发到多个工作线程中处理。
在这个架构中,Acceptor 线程只是用于请求分发,不涉及具体的逻辑处理,非常得轻量级,因此有很高的吞吐量表现。而这些工作线程可以根据实际业务处理需要任意增减,从而动态调节系统负载能力。

Kafka 的 Broker 端有个 SocketServer 组件,类似于 Reactor 模式中的 Dispatcher,它也有对应的 Acceptor 线程和一个工作线程池,在 Kafka 中,这个工作线程池有个专属的名字,叫网络线程池。Kafka 提供了 Broker 端参数 num.network.threads,用于调整该网络线程池的线程数。其默认值是 3,表示每台Broker 启动时会创建 3 个网络线程,专门处理客户端发送的请求。
Acceptor 线程采用轮询的方式将入站请求公平地发到所有网络线程中,在实际使用过程中,这些线程通常都有相同的几率被分配到待处理请求。这种轮询策略编写简单,同时也避免了请求处理的倾斜,有利于实现较为公平的请求处理调度。

当网络线程拿到请求后,它不是自己处理,而是将请求放入到一个共享请求队列中。Broker 端还有个 IO 线程池,负责从该队列中取出请求,执行真正的处理。如果是 PRODUCE 生产请求,则将消息写入到底层的磁盘日志中;如果是 FETCH 请求,则从磁盘或页缓存中读取消息。
IO 线程池处中的线程才是执行请求逻辑的线程。Broker 端参数num.io.threads控制了这个线程池中的线程数。目前该参数默认值是 8,表示每台 Broker 启动后自动创建 8 个 IO 线程处理请求。可根据实际硬件条件设置此线程池的个数。
请求队列是所有网络线程共享的,而响应队列则是每个网络线程专属的。这么设计的原因就在于,Dispatcher 只是用于请求分发而不负责响应回传,因此只能让每个网络线程自己发送 Response 给客户端,所以这些 Response 也就没必要放在一个公共的地方。
Purgatory 组件,Kafka 中著名的“炼狱”组件。它是用来缓存延时请求(Delayed Request)的。所谓延时请求,就是那些一时未满足条件不能立刻处理的请求。比如设置了 acks=all 的 PRODUCE 请求,一旦设置了 acks=all,那么该请求就必须等待 ISR 中所有副本都接收了消息后才能返回,此时处理该请求的 IO 线程就必须等待其他 Broker 的写入结果。当请求不能立刻处理时,它就会暂存在 Purgatory 中。稍后一旦满足了完成条件,IO 线程会继续处理该请求,并将 Response 放入对应网络线程的响应队列中。
数据类与控制类请求
到目前为止,提及的请求处理流程对于所有请求都是适用的,即 Kafka Broker 对所有请求是一视同仁的。但是,在 Kafka 内部,除了客户端发送的 PRODUCE 请求和 FETCH 请求之外,还有很多执行其他操作的请求类型,比如负责更新 Leader 副本、Follower 副本以及 ISR 集合的 LeaderAndIsr 请求,负责勒令副本下线的 StopReplica 请求等。与 PRODUCE 和 FETCH 请求相比,这些请求有个明显的不同:它们不是数据类的请求,而是控制类的请求。即它们并不是操作消息数据的,而是用来执行特定的Kafka 内部动作的。
Kafka 社区把 PRODUCE 和 FETCH 这类请求称为数据类请求,把 LeaderAndIsr、StopReplica 这类请求称为控制类请求。细究起来,当前这种一视同仁的处理方式对控制类请求是不合理的。因为控制类请求有这样一种能力:它可以直接令数据类请求失效!
社区于 2.3 版本正式实现了数据类请求和控制类请求的分离。Kafka Broker 启动后,会在后台分别创建网络线程池和 IO 线程池,它们分别处理数据类请求和控制类请求。至于所用的 Socket 端口,自然是使用不同的端口了,需要提供不同的listeners 配置,显式地指定哪套端口用于处理哪类请求。
消费者组重平衡全流程解析
消费者组的重平衡流程,它的作用是让组内所有的消费者实例就消费哪些主题分区达成一致。重平衡需要借助 Kafka Broker 端的 Coordinator 组件,在 Coordinator 的帮助下完成整个消费者组的分区重分配。
触发与通知
触发条件
- 组成员数量发生变化。
- 订阅主题数量发生变化。
- 订阅主题的分区数发生变化。
通知机制
重平衡过程是靠消费者端的心跳线程(Heartbeat Thread)通知到其他消费者实例的。重平衡的通知机制正是通过心跳线程来完成的。当协调者决定开启新一轮重平衡后,它会将“REBALANCE_IN_PROGRESS”封装进心跳请求的响应中,发还给消费者实例。当消费者实例发现心跳响应中包含了“REBALANCE_IN_PROGRESS”,就能立马知道重平衡又开始了,这就是重平衡的通知机制。
消费者端参数 heartbeat.interval.ms 设置了心跳的间隔时间,但真正作用是控制重平衡通知的频率。如果想要消费者实例更迅速地得到通知,那么就可以给这个参数设置一个非常小的值,消费者就能更快地感知到重平衡已经开启了。
消费者组状态机
重平衡一旦开启,Broker 端的协调者组件就要开始忙了,主要涉及到控制消费者组的状态流转。Kafka 设计了一套消费者组状态机(State Machine),来帮助协调者完成整个重平衡流程。严格来说,这套状态机属于非常底层的设计,它能够帮助搞懂消费者组的设计原理,比如消费者组的过期位移(Expired Offsets)删除等。
Kafka 为消费者组定义了 5 种状态,它们分别是:Empty、Dead、PreparingRebalance、CompletingRebalance 和 Stable。


一个消费者组最开始是 Empty 状态,当重平衡过程开启后,它会被置于 PreparingRebalance 状态等待成员加入,之后变更到 CompletingRebalance 状态等待分配方案,最后流转到 Stable 状态完成重平衡。
当有新成员加入或已有成员退出时,消费者组的状态从 Stable 直接跳到 PreparingRebalance 状态,此时,所有现存成员就必须重新申请加入组。当所有成员都退出组后,消费者组状态变更为 Empty。Kafka 定期自动删除过期位移的条件就是,组要处于 Empty 状态。因此,如果消费者组停掉了很长时间(超过 7 天),那么 Kafka 很可能就把该组的位移数据删除了。
消费者端重平衡流程
重平衡的完整流程需要消费者端和协调者组件共同参与才能完成。
在消费者端,重平衡分为两个步骤:分别是加入组和等待领导者消费者(Leader Consumer)分配方案。这两个步骤分别对应两类特定的请求:JoinGroup 请求和 SyncGroup 请求。
当组内成员加入组时,它会向协调者发送 JoinGroup 请求。在该请求中,每个成员都要将自己订阅的主题上报,这样协调者就能收集到所有成员的订阅信息。一旦收集了全部成员的 JoinGroup 请求后,协调者会从这些成员中选择一个担任这个消费者组的领导者。
通常情况下,第一个发送 JoinGroup 请求的成员自动成为领导者。注意区分这里的领导者和领导者副本,它们不是一个概念。这里的领导者是具体的消费者实例,它既不是副本,也不是协调者。领导者消费者的任务是收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配方案。
选出领导者之后,协调者会把消费者组订阅信息封装进 JoinGroup 请求的响应体中,然后发给领导者,由领导者统一做出分配方案后,进入到下一步:发送 SyncGroup 请求。
在这一步中,领导者向协调者发送 SyncGroup 请求,将刚刚做出的分配方案发给协调者。其他成员也会向协调者发送 SyncGroup 请求,只不过请求体中并没有实际的内容。这一步的主要目的是让协调者接收分配方案,然后统一以 SyncGroup 响应的方式分发给所有成员,这样组内所有成员就都知道自己该消费哪些分区了。

JoinGroup 请求的主要作用是将组成员订阅信息发送给领导者消费者,待领导者制定好分配方案后,重平衡流程进入到 SyncGroup 请求阶段。

SyncGroup 请求的主要目的,是让协调者把领导者制定的分配方案下发给各个组内成员。当所有成员都成功接收到分配方案后,消费者组进入到 Stable 状态,即开始正常的消费工作。
Broker 端重平衡场景剖析
新成员入组
组处于 Stable 状态后,有新成员加入。 当协调者收到新的 JoinGroup 请求后,它会通过心跳请求响应的方式通知组内现有的所有成员,强制它们开启新一轮的重平衡。

组成员主动离组
消费者实例所在线程或进程调用 close() 方法主动通知协调者它要退出。这个场景就涉及到了第三类请求:LeaveGroup 请求。协调者收到 LeaveGroup 请求后,依然会以心跳响应的方式通知其他成员。

组成员崩溃离组
崩溃离组是指消费者实例出现严重故障,突然宕机导致的离组。它和主动离组是有区别的,因为后者是主动发起的离组,协调者能马上感知并处理。但崩溃离组是被动的,协调者通常需要等待一段时间(由消费者端参数 session.timeout.ms 控制)才能感知到。
group-members-submit-displacements.pnggroup-members-collapse-and-leave-the-group.png)
重平衡时协调者对组内成员提交位移的处理
正常情况下,每个组内成员都会定期汇报位移给协调者。当重平衡开启时,协调者会给予成员一段缓冲时间,要求每个成员必须在这段时间内快速地上报自己的位移信息,然后再开启正常的 JoinGroup/SyncGroup 请求发送。

Kafka控制器
控制器组件(Controller),是 Apache Kafka 的核心组件。它的主要作用是在 Apache ZooKeeper 的帮助下管理和协调整个 Kafka 集群。集群中任意一台 Broker 都能充当控制器的角色,在运行过程中,只能有一个 Broker 成为控制器,行使其管理和协调的职责。即每个正常运转的 Kafka 集群,在任意时刻都有且只有一个控制器。官网上有个名为 activeController 的 JMX 指标,实时监控控制器的存活状态。
控制器是重度依赖 ZooKeeper 的,Apache ZooKeeper 是一个提供高可靠性的分布式协调服务框架。它使用的数据模型类似于文件系统的树形结构,根目录也是以“/”开始。该结构上的每个节点被称为 znode,用来保存一些元数据协调信息。
如果以 znode 持久性来划分,znode 可分为持久性 znode 和临时 znode。持久性 znode 不会因为 ZooKeeper 集群重启而消失,而临时 znode 则与创建该 znode 的 ZooKeeper 会话绑定,一旦会话结束,该节点会被自动删除。
ZooKeeper 赋予客户端监控 znode 变更的能力,即所谓的 Watch 通知功能。一旦 znode 节点被创建、删除,子节点数量发生变化,抑或是 znode 所存的数据本身变更,ZooKeeper 会通过节点变更监听器 (ChangeHandler) 的方式显式通知客户端。
ZooKeeper 常被用来实现集群成员管理、分布式锁、领导者选举等功能。Kafka 控制器大量使用 Watch 功能实现对集群的协调管理。

控制器选举
Broker 在启动时,会尝试去 ZooKeeper 中创建 /controller 节点。Kafka 当前选举控制器的规则是:第一个成功创建 /controller 节点的 Broker 会被指定为控制器。
控制器作用
主题管理(创建、删除、增加分区)
控制器帮助完成对 Kafka 主题的创建、删除以及分区增加的操作。即执行kafka-topics 脚本时,大部分的后台工作都是控制器来完成的。
分区重分配
分区重分配主要是指,kafka-reassign-partitions 脚本提供的对已有主题分区进行细粒度的分配功能。这部分功能也是控制器实现的。
Preferred 领导者选举
主要是 Kafka 为了避免部分 Broker 负载过重而提供的一种换 Leader 的方案。
集群成员管理(新增 Broker、Broker 主动关闭、Broker 宕机)
这种自动检测是依赖于前面提到的 Watch 功能和 ZooKeeper 临时节点组合实现的。
控制器组件会利用 Watch 机制检查 ZooKeeper 的 /brokers/ids 节点下的子节点数量变更。当有新 Broker 启动后,它会在 /brokers 下创建专属的 znode 节点。一旦创建完毕,ZooKeeper 会通过 Watch 机制将消息通知推送给控制器,控制器就能自动地感知到这个变化,进而开启后续的新增 Broker 作业。
侦测 Broker 存活性则是依赖于刚刚提到的另一个机制:临时节点。每个 Broker 启动后,会在 /brokers/ids 下创建一个临时 znode。当 Broker 宕机或主动关闭后,该 Broker 与 ZooKeeper 的会话结束,这个 znode 会被自动删除。同理,ZooKeeper 的 Watch 机制将这一变更推送给控制器,这样控制器就能知道有 Broker 关闭或宕机了,从而进行“善后”。
数据服务
向其他 Broker 提供数据服务。控制器上保存了最全的集群元数据信息,其他所有 Broker 会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据。
控制器保存数据

- 所有主题信息。包括具体的分区信息,比如领导者副本是谁,ISR 集合中有哪些副本等。
- 所有 Broker 信息。包括当前都有哪些运行中的 Broker,哪些正在关闭中的 Broker 等。
- 所有涉及运维任务的分区。包括当前正在进行 Preferred 领导者选举以及分区重分配的分区列表。
注:这些数据其实在 ZooKeeper 中也保存了一份。每当控制器初始化时,它都会从 ZooKeeper 上读取对应的元数据并填充到自己的缓存中。有了这些数据,控制器就能对外提供数据服务了。这里的对外主要是指对其他 Broker 而言,控制器通过向这些 Broker 发送请求的方式将这些数据同步到其他 Broker 上。
控制器故障转移(Failover)
在 Kafka 集群运行过程中,只能有一台 Broker 充当控制器的角色,这就存在单点失效(Single Point of Failure)的风险,为控制器提供故障转移功能,即 Failover。
故障转移指的是,当运行中的控制器突然宕机或意外终止时,Kafka 能够快速地感知到,并立即启用备用控制器来代替之前失败的控制器。这个过程就被称为 Failover,该过程是自动完成的,无需手动干预。

- Broker 0 是控制器。当 Broker 0 宕机后,ZooKeeper 通过 Watch 机制感知到并删除了 /controller 临时节点。
- 所有存活的 Broker 开始竞选新的控制器身份。Broker 3 最终赢得了选举,成功地在 ZooKeeper 上重建了 /controller 节点。
- Broker 3 会从 ZooKeeper 中读取集群元数据信息,并初始化到自己的缓存中。
- 控制器的 Failover 完成,可以行使正常的工作职责了。
控制器内部设计原理
为了保护数据安全性,控制器不得不在代码中大量使用ReentrantLock 同步机制,这就进一步拖慢了整个控制器的处理速度。
鉴于这些原因,社区于 0.11 版本重构了控制器的底层设计,最大的改进就是,把多线程的方案改成了单线程加事件队列的方案。

社区引入了一个事件处理线程,统一处理各种控制器事件,然后控制器将原来执行的操作全部建模成一个个独立的事件,发送到专属的事件队列中,供此线程消费。这就是所谓的单线程 + 队列的实现方式。
注:这里的单线程不代表之前提到的所有线程都被“干掉”了,控制器只是把缓存状态变更方面的工作委托给了这个线程而已。
控制器缓存中保存的状态只被一个线程处理,因此不再需要重量级的线程同步机制来维护线程安全,Kafka 不用再担心多线程并发访问的问题,非常利于社区定位和诊断控制器的各种问题。
将之前同步操作 ZooKeeper 全部改为异步操作。ZooKeeper 本身的 API 提供了同步写和异步写两种方式。之前控制器操作 ZooKeeper 使用的是同步的 API,性能很差,集中表现为,当有大量主题分区发生变更时,ZooKeeper 容易成为系统的瓶颈。新版本 Kafka 修改了这部分设计,完全摒弃了之前的同步 API 调用,转而采用异步 API 写入 ZooKeeper,性能有了很大的提升。根据社区的测试,改成异步之后,ZooKeeper 写入提升了 10 倍!
正式支持这种不同优先级请求的处理。即 Kafka 将控制器发送的请求与普通数据类请求分开,实现了控制器请求单独处理的逻辑。例如赋予 StopReplica 请求更高的优先级,使它能够得到抢占式的处理。