Kafka客户端实践及原理剖析

Posted by 淦 Blog on December 16, 2025

生产者消息分区机制原理

分区

分区的作用就是提供负载均衡的能力,实现系统的高伸缩性(Scalability)。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,每个节点的机器都能独立地执行各自分区的读写请求处理。并且,还可以通过添加新的节点机器来增加整体系统的吞吐量。

分区策略

所谓分区策略是决定生产者将消息发送到哪个分区的算法。Kafka 提供了默认的分区策略,同时它也支持自定义分区策略。

轮询策略

轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是最常用的分区策略之一。

按消息键保序策略

Kafka 允许为每条消息定义消息键,简称为 Key。这个 Key 是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务 ID 等;也可以用来表征消息元数据。特别是在 Kafka 不支持时间戳的年代,在一些场景中,工程师们都是直接将消息创建时间封装进 Key 里面的。一旦消息被定义了 Key,就可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故被称为按消息键保序策略。

生产者压缩算法

怎么压缩

Kafka 的消息层次都分为两层:消息集合(message set)以及消息(message)。一个消息集合中包含若干条日志项(record item),而日志项才是真正封装消息的地方。Kafka 底层的消息日志由一系列消息集合日志项组成。Kafka 通常不会直接操作具体的一条条消息,它总是在消息集合这个层面上进行写入操作。

  1. 把消息的公共部分抽取出来放到外层消息集合里面,不用每条消息都保存这些信息。
  2. 对整个消息集合进行压缩。

何时压缩

在 Kafka 中,压缩可能发生在两个地方:生产者端和 Broker 端。

生产者程序中配置 compression.type 参数即表示启用指定类型的压缩算法。

大部分情况下 Broker 从 Producer 端接收到消息后仅仅是原封不动地保存而不会对其进行任何修改,但有两种例外情况就可能让 Broker 重新压缩消息。

  1. Broker 端指定了和 Producer 端不同的压缩算法。
  2. Broker 端发生了消息格式转换。所谓的消息格式转换主要是为了兼容老版本的消费者程序。除了压缩之外,还丧失了引以为豪的 Zero Copy 特性。

何时解压缩

解压缩发生在消费者程序中,Producer 发送压缩消息到 Broker 后,Broker 照单全收并原样保存起来。当 Consumer 程序请求这部分消息时,Broker 依然原样发送出去,当消息到达 Consumer 端后,由 Consumer 自行解压缩还原成之前的消息。

Kafka 会将启用了哪种压缩算法封装进消息集合中,当 Consumer 读取到消息集合时,它自然就知道了这些消息使用的是哪种压缩算法。

Producer 端压缩、Broker 端保持、Consumer 端解压缩。

除了在 Consumer 端解压缩,Broker 端也会进行解压缩。注:这和前面提到消息格式转换时发生的解压缩是不同的场景。每个压缩过的消息集合在 Broker 端写入时都要发生解压缩操作,目的就是为了对消息执行各种验证。这种解压缩对 Broker 端性能是有一定影响的,特别是对 CPU 的使用率而言。

各种压缩算法对比

两个重要的指标:

  1. 压缩比
  2. 压缩 / 解压缩吞吐量

  • 在吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;
  • 在压缩比方面,zstd > LZ4 > GZIP > Snappy。
  • 物理资源,使用 Snappy 算法占用的网络带宽最多,zstd 最少,提供超高的压缩比;
  • CPU 使用率,各个算法表现得差不多,只是在压缩时 Snappy 算法使用的 CPU 较多一些,而在解压缩时 GZIP 算法则可能使用更多的 CPU。

最佳实践

  • 启用压缩的一个条件就是 Producer 程序运行机器上的 CPU 资源要很充足。
  • 如果带宽资源有限,建议开启压缩。如果客户端机器 CPU 资源有很多富余,强烈建议开启 zstd 压缩。
  • 对不可抗拒的解压缩无能为力,但至少能规避掉那些意料之外的解压缩。有条件的话尽量保证不要出现消息格式转换的情况。

无消息丢失配置

Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。

  • 当 Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。此时,这条消息在 Kafka 看来就正式变为“已提交”消息了。
  • 假如消息保存在 N 个 Kafka Broker 上,那么这个前提条件就是这 N 个 Broker 中至少有 1 个存活。只要这个条件成立,Kafka 就能保证你的这条消息永远不会丢失。

“消息丢失”案例

生产者程序丢失数据

Producer 永远要使用带有回调通知的发送 API,不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。不要小瞧这里的 callback(回调),它能准确地告诉消息是否真的提交成功了。一旦出现消息提交失败的情况,就可以有针对性地进行处理。

消费者程序丢失数据

  • 维持先消费消息(阅读),再更新位移(书签)的顺序。
  • 如果是多线程异步处理消费消息,Consumer 程序不要开启自动提交位移,而是要应用程序手动提交位移。

最佳实践

  1. 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。一定要使用带有回调通知的 send 方法。
  2. 设置 acks = all。acks 是 Producer 的一个参数,代表了对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
  3. 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
  4. 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
  5. 设置 replication.factor >= 3。这也是 Broker 端的参数。最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
  6. 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
  7. 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。
  8. 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。这对于单 Consumer 多线程处理的场景而言是至关重要的。

Kafka 拦截器

其基本思想就是允许应用程序在不修改逻辑的情况下,动态地实现一组可插拔的事件处理逻辑链。

分为生产者拦截器和消费者拦截器。生产者拦截器允许在发送消息前以及消息提交成功后植入拦截器逻辑;而消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑。这两种拦截器都支持链的方式,即可以将一组拦截器串连成一个大的拦截器,Kafka 会按照添加顺序依次执行拦截器逻辑。

当前 Kafka 拦截器的设置方法是通过参数配置完成的。生产者和消费者两端有一个相同的参数,名字叫 interceptor.classes,它指定的是一组类的列表,每个类就是特定逻辑的拦截器实现类。

编写的所有 Producer 端拦截器实现类都要继承 org.apache.kafka.clients.producer.ProducerInterceptor 接口。该接口是 Kafka 提供的,里面有两个核心的方法。

  1. onSend:该方法会在消息发送之前被调用。如果想在发送之前对消息“美美容”,这个方法是唯一的机会。
  2. onAcknowledgement:该方法会在消息成功提交或发送失败之后被调用。onAcknowledgement 的调用要早于 callback 的调用。值得注意的是,这个方法和 onSend 不是在同一个线程中被调用的,因此如果在这两个方法中调用了某个共享可变对象,一定要保证线程安全。这个方法处在 Producer 发送的主路径中,所以最好别放一些太重的逻辑,否则 Producer TPS 直线下降。

指定消费者拦截器具体的实现类要实现 org.apache.kafka.clients.consumer.ConsumerInterceptor 接口,这里面也有两个核心方法。

  1. onConsume:该方法在消息返回给 Consumer 程序之前调用。在开始正式处理消息之前,拦截器会先拦一道,搞一些事情,之后再返回。
  2. onCommit:Consumer 在提交位移之后调用该方法。通常可以在该方法中做一些记账类的动作,比如打日志等。

注:指定拦截器类时要指定它们的全限定名,即 full qualified name。要把完整包名也加上,不要只有一个类名在那里,并且还要保证 Producer 程序能够正确加载拦截器类。

典型使用场景

Kafka 拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计等多种功能在内的场景。

端到端系统性能检测

Kafka 默认提供的监控指标都是针对单个客户端或 Broker 的,很难从具体的消息维度去追踪集群间消息的流转路径。同时,如何监控一条消息从生产到最后消费的端到端延时也是迫切需要解决的问题。

从技术上来说,在客户端程序中增加统计逻辑,但是对于那些将 Kafka 作为企业级基础架构的公司,在应用代码中编写统一的监控逻辑其实是很难的,毕竟这东西非常灵活,不太可能提前确定好所有的计算逻辑。另外,将监控逻辑与主业务逻辑耦合也是软件工程中不提倡的做法。

通过实现拦截器的逻辑以及可插拔的机制,能够快速地观测、验证以及监控集群间的客户端性能指标,特别是能够从具体的消息层面上去收集这些数据。这就是 Kafka 拦截器的一个非常典型的使用场景。

消息审计(message audit)

设想公司把 Kafka 作为一个私有云消息引擎平台向全公司提供服务,这必然要涉及多租户以及消息审计的功能。

作为私有云的 PaaS 提供方,肯定要能够随时查看每条消息是哪个业务方在什么时间发布的,之后又被哪些业务方在什么时刻消费。一个可行的做法就是编写一个拦截器类,实现相应的消息审计逻辑,然后强行规定所有接入 Kafka 服务的客户端程序必须设置该拦截器。

管理TCP连接

为何采用 TCP ?

Apache Kafka 的所有通信都是基于 TCP 的,而不是基于 HTTP 或其他协议。无论是生产者、消费者,还是 Broker 之间的通信都是如此。

从社区的角度来看,在开发客户端时,能够利用 TCP本身提供的一些高级功能,比如多路复用请求以及同时轮询多个连接的能力。

社区还发现,目前已知的 HTTP 库在很多编程语言中都略显简陋。

Java生产者

Kafka 生产者程序概览

Kafka 的 Java 生产者 API 主要的对象就是 KafkaProducer。通常开发一个生产者的步骤有 4 步。

  1. 构造生产者对象所需的参数对象。
  2. 利用第 1 步的参数对象,创建 KafkaProducer 对象实例。
  3. 使用 KafkaProducer 的 send 方法发送消息。
  4. 调用 KafkaProducer 的 close 方法关闭生产者并释放各种系统资源。

何时创建 TCP 连接?

在创建 KafkaProducer 实例时,生产者应用会在后台创建并启动一个名为 Sender 的线程,该 Sender 线程开始运行时首先会创建与 Broker 的连接。

Producer 启动时会首先创建与这 n 个 Broker 的 TCP 连接。在实际使用过程中,不建议把集群中所有的 Broker 信息都配置到 bootstrap.servers 中,通常指定 3~4 台就足以了。因为 Producer 一旦连接到集群中的任一台 Broker,就能拿到整个集群的 Broker 信息,故没必要为 bootstrap.servers 指定所有的 Broker。

TCP 连接是在创建 KafkaProducer 实例时建立的。TCP 连接还可能在更新元数据后,或者是在消息发送时。当 Producer 更新了集群的元数据信息之后,如果发现与某些 Broker 当前没有连接,那么它就会创建一个 TCP 连接。同样地,当要发送消息时,Producer 发现尚不存在与目标 Broker 的连接,也会创建一个。

Producer 更新集群元数据信息的两个场景:

  1. 当 Producer 尝试给一个不存在的主题发送消息时,Broker 会告诉 Producer 说这个主题不存在。此时 Producer 会发送 METADATA 请求给 Kafka 集群,去尝试获取最新的元数据信息。
  2. Producer 通过 metadata.max.age.ms 参数定期地去更新元数据信息。该参数的默认值是 300000(5 分钟),即不管集群那边是否有变化,Producer 每 5 分钟都会强制刷新一次元数据以保证它是最及时的数据。

何时关闭 TCP 连接?

用户主动关闭

  • 用户调用 kill -9 主动“杀掉”Producer 应用。
  • 最推荐的方式还是调用 producer.close() 方法来关闭。

Kafka 自动关闭

与 Producer 端参数 connections.max.idle.ms 的值有关。默认情况下该参数值是 9 分钟,即如果在 9 分钟内没有任何请求“流过”某个TCP 连接,那么 Kafka 会主动把该 TCP 连接关闭。用户可以在 Producer 端设置 connections.max.idle.ms=-1 禁掉这种机制。一旦被设置成 -1,TCP 连接将成为永久长连接。这只是软件层面的“长连接”机制,由于 Kafka创建的这些 Socket 连接都开启了 keepalive,因此 keepalive 探活机制还是会遵守的。

注:TCP 连接是在 Broker 端被关闭的,但其实这个 TCP 连接的发起方是客户端,因此在 TCP 看来,这属于被动关闭的场景,即 passive close。被动关闭的后果就是会产生大量的 CLOSE_WAIT 连接,因此 Producer 端或 Client 端没有机会显式地观测到此连接已被中断。

小结

  1. KafkaProducer 实例创建时启动 Sender 线程,从而创建与 bootstrap.servers 中所有 Broker 的 TCP 连接。
  2. KafkaProducer 实例首次更新元数据信息之后,还会再次创建与集群中所有 Broker 的 TCP 连接。
  3. 如果 Producer 端发送消息到某台 Broker 时发现没有与该 Broker 的 TCP 连接,那么也会立即创建连接。
  4. 如果设置 Producer 端 connections.max.idle.ms 参数大于 0,则步骤 1 中创建的 TCP 连接会被自动关闭;如果设置该参数 =-1,那么步骤 1 中创建的 TCP 连接将无法被关闭,从而成为“僵尸”连接。

Java 消费者

何时创建 TCP 连接?

构建 KafkaConsumer 实例时是不会创建任何 TCP 连接的,即没有 Socket 连接被创建出来。TCP 连接是在调用 KafkaConsumer.poll 方法时被创建的。再细粒度地说,在 poll 方法内部有 3 个时机可以创建 TCP 连接。

发起 FindCoordinator 请求时

协调者(Coordinator)驻留在 Broker 端的内存中,负责消费者组的组成员管理和各个消费者的位移提交管理。当消费者程序首次启动调用 poll 方法时,它需要向 Kafka 集群发送一个名为 FindCoordinator 的请求,希望 Kafka 集群告诉它哪个 Broker 是管理它的协调者。

理论上任何一个 Broker 都能回答这个问题,即消费者可以发送 FindCoordinator 请求给集群中的任意服务器。消费者程序会向集群中当前负载最小的那台 Broker 发送请求。即看消费者连接的所有 Broker 中,谁的待发送请求最少。

连接协调者时

消费者知晓了真正的协调者后,会创建连向该 Broker 的 Socket 连接。只有成功连入协调者,协调者才能开启正常的组协调操作,比如加入组、等待组分配方案、心跳请求处理、位移获取、位移提交等。

消费数据时

消费者会为每个要消费的分区创建与该分区领导者副本所在 Broker 连接的 TCP。

创建多少个 TCP 连接?

  • 消费者程序创建的第一个 TCP 连接,这个 Socket 用于发送 FindCoordinator 请求。由于这是消费者程序创建的第一个连接,此时消费者对于要连接的 Kafka 集群一无所知,因此它连接的 Broker 节点的 ID 是 -1,表示消费者根本不知道要连接的 Kafka Broker 的任何信息。
  • 消费者复用了刚才创建的那个 Socket 连接,向 Kafka 集群发送元数据请求以获取整个集群的信息。
  • 消费者知道协调者 Broker 的连接信息,发起了第二个 Socket 连接,连接了协调者,消费者进程才能正常地开启消费者组的各种功能以及后续的消息消费。
  • 要消费的分区的领导者副本在哪台 Broker 上,消费者就要创建连向哪台 Broker 的 TCP。主要用于实际的消息获取。

连接日志中的这些 Broker 节点的 ID 在不断变化。有时候是 -1,有时候是 2147483645,只有在最后的时候才回归正常值 0、1 和 2。

  • 消费者程序(其实也不光是消费者,生产者也是这样的机制)首次启动时,对 Kafka 集群一无所知,因此用 -1 来表示尚未获取到 Broker 数据。
  • Integer.MAX_VALUE 减去协调者所在 Broker 的真实 ID 计算得来的,目的就是要让组协调请求和真正的数据获取请求使用不同的 Socket 连接。
  • 表征了真实的 Broker ID,即在 server.properties 中配置的 broker.id 值。

通常来说,消费者程序会创建 3 类 TCP 连接:

  1. 确定协调者和获取集群元数据。
  2. 连接协调者,令其执行组成员管理操作。
  3. 执行实际的消息获取。

注:当第三类 TCP 连接成功创建后,消费者程序就会废弃第一类 TCP 连接,之后在定期请求元数据时,它会改为使用第三类 TCP 连接。

何时关闭 TCP 连接?

主动关闭

显式地调用消费者 API 的方法去关闭消费者,具体方式就是手动调用 KafkaConsumer.close() 方法,或者是执行 Kill 命令,不论是 Kill -2 还是 Kill -9。

Kafka 自动关闭

由消费者端参数 connection.max.idle.ms控制的,该参数现在的默认值是 9 分钟,即如果某个 Socket 连接上连续 9 分钟都没有任何请求“过境”的话,消费者会强行“杀掉”这个 Socket 连接。

消息交付可靠性保障以及精确处理一次语义的实现

所谓的消息交付可靠性保障,是指 Kafka 对 Producer 和 Consumer 要处理的消息提供什么样的承诺。常见的承诺有以下三种:

  • 最多一次(at most once):消息可能会丢失,但绝不会重复发送。
  • 至少一次(at least once):消息不会丢失,但有可能被重复发送。
  • 精确一次(exactly once):消息不会丢失,也不会被重复发送。

目前,Kafka 默认提供的交付可靠性保障是第二种,即至少一次。只有 Broker 成功“提交”消息且 Producer 接到 Broker 的应答才会认为该消息成功发送。若消息成功“提交”,但 Broker 的应答没有成功发送回 Producer 端(比如网络出现瞬时抖动),那么 Producer 就无法确定消息是否真的提交成功了。因此,它只能选择重试,即再次发送相同的消息。这就是 Kafka 默认提供至少一次可靠性保障的原因,不过这会导致消息重复发送。

Kafka 也可以提供最多一次交付保障,只需要让 Producer禁止重试即可。这样一来,消息要么写入成功,要么写入失败,但绝不会重复发送。通常不会希望出现消息丢失的情况,但一些场景里偶发的消息丢失其实是被允许的,相反,消息重复是绝对要避免的。此时,使用最多一次交付保障就是最恰当的。

无论是至少一次还是最多一次,都不如精确一次来得有吸引力。大部分用户还是希望消息只会被交付一次,消息既不会丢失,也不会被重复处理。即使 Producer 端重复发送了相同的消息,Broker 端也能做到自动去重。在下游 Consumer 看来,消息依然只有一条。

幂等性 Producer

在 Kafka 中,Producer 默认不是幂等性的,但可以创建幂等性 Producer。它其实是 0.11.0.0 版本引入的新功能。

enable.idempotence 被设置成 true 后,Producer 自动升级成幂等性 Producer,其他所有的代码逻辑都不需要改变。Kafka 自动做消息的重复去重。底层具体的原理是在 Broker 端多保存一些字段。当 Producer 发送了具有相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了,在后台默默地把它们“丢弃”掉。

注:

  • 只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。
  • 只能实现单会话上的幂等性,不能实现跨会话的幂等性。会话为 Producer 进程的一次运行。当重启了 Producer 进程之后,这种幂等性保证就丧失了。

事务型 Producer

事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。

设置事务型 Producer 的方法满足两个要求即可:

  1. 和幂等性 Producer 一样,开启 enable.idempotence = true。
  2. 设置 Producer 端参数 transctional.id。最好为其设置一个有意义的名字。

在 Producer 代码中做一些调整

1
2
3
4
5
6
7
8
9
producer.initTransactions();
try {
    producer.beginTransaction();
    producer.send(record1);
    producer.send(record2);
    producer.commitTransaction();
} catch (KafkaException e) {
    producer.abortTransaction();
}

在 Consumer 端,读取事务型 Producer 发送的消息设置 isolation.level 参数的值即可。当前这个参数有两个取值:

  1. read_uncommitted:这是默认值,表明 Consumer 能读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。如果用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。
  2. read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。

小结

比起幂等性 Producer,事务型 Producer 的性能要更差,在实际使用过程中,需要仔细评估引入事务的开销,切不可无脑地启用事务。

消费者组

Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制。组内有多个消费者或消费者实例(Consumer Instance),它们共享一个公共的 ID,这个 ID 被称为 Group ID。组内的所有消费者协调在一起来消费订阅主题(Subscribed Topics)的所有分区(Partition)。每个分区只能由同一个消费者组内的一个 Consumer 实例来消费。

  1. Consumer Group 下可以有一个或多个 Consumer 实例。这里的实例可以是一个单独的进程,也可以是同一进程下的线程。在实际场景中,使用进程更为常见一些。
  2. Group ID 是一个字符串,在一个 Kafka 集群中,它标识唯一的一个 Consumer Group。
  3. Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内的某个 Consumer 实例消费。这个分区也可以被其他的 Group 消费。

理想情况下,Consumer 实例的数量应该等于该 Group 订阅主题的分区总数。

位移主题

将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 __consumer_offsets 中。其主要作用是保存 Kafka 消费者的位移信息。要求这个提交过程不仅要实现高持久性,还要支持高频的写操作。

消息格式

  1. Key 中保存 3 部分内容:< Group ID,主题名,分区号 >,消息体保存一个位移值,还保存了位移提交的一些其他元数据,诸如时间戳和用户自定义的数据等。
  2. 用于保存 Consumer Group 信息的消息。用来注册 Consumer Group。
  3. 用于删除 Group 过期位移甚至是删除 Group 的消息。tombstone 消息,即墓碑消息,也称 delete mark。它的主要特点是它的消息体是 null,即空消息体。一旦某个 Consumer Group 下的所有 Consumer 实例都停止了,而且它们的位移数据都已被删除时,Kafka 会向位移主题的对应分区写入 tombstone 消息,表明要彻底删除这个 Group 的信息。

创建

当 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建位移主题。该主题的分区数(offsets.topic.num.partitions)是 50,副本数(offsets.topic.replication.factor)是 3。

提交

enable.auto.commit = true/false 自动提交位移和手动提交位移,提交间隔由一个专属的参数 auto.commit.interval.ms 来控制。

删除

使用Compact 策略来删除位移主题中的过期消息,避免该主题无限期膨胀。Compact 的过程就是扫描日志的所有消息,剔除那些过期的消息,然后把剩下的消息整理在一起。

Kafka 提供了专门的后台线程定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据。这个后台线程叫 Log Cleaner。

Rebalance

Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。

触发条件

  1. 组成员数发生变更。比如有新的 Consumer 实例加入组或者离开组,抑或是有 Consumer 实例崩溃被“踢出”组。
  2. 订阅主题数发生变更。Consumer Group 可以使用正则表达式的方式订阅主题。在 Consumer Group 的运行过程中,新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。
  3. 订阅主题的分区数发生变更。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。

缺点

  • 对 Consumer Group 消费过程有极大的影响。JVM 的垃圾回收机制,在 STW 期间,所有应用线程都会停止工作,表现为整个应用程序僵在那边一动不动。Rebalance 过程也和这个类似,在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。
  • 目前 Rebalance 的设计是所有 Consumer 实例共同参与,全部重新分配所有分区。其实更高效的做法是尽量减少分配方案的变动。
  • Rebalance 实在是太慢了。

协调者Coordinator

专门为 Consumer Group 服务,负责为 Group 执行 Rebalance 以及提供位移管理和组成员管理等。Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移。当 Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。

所有 Broker 在启动时,都会创建和开启相应的 Coordinator 组件。Kafka 为某个 Consumer Group 确定 Coordinator 所在的 Broker 的算法有 2 个步骤。

  1. 确定由位移主题的哪个分区来保存该 Group 数据:partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。
  2. 找出该分区 Leader 副本所在的 Broker,该 Broker 即为对应的 Coordinator。

避免 Rebalance

在真实的业务场景中,很多 Rebalance 都是计划外的或者说是不必要的。

  1. 组成员数量发生变化
  2. 订阅主题数量发生变化
  3. 订阅主题的分区数发生变化

后面两个通常都是运维的主动操作,所以它们引发的 Rebalance 大都是不可避免的。因为组成员数量变化而引发的 Rebalance 该如何避免。

  • 未能及时发送心跳,导致 Consumer 被“踢出”Group而引发的。设置session.timeout.ms = 6s 和 heartbeat.interval.ms = 2s。
  • Consumer 消费时间过长导致,设置 max.poll.interval.ms 得大一点。
  • 排查Consumer 端的 GC 表现,比如是否出现了频繁的 Full GC 导致的长时间停顿,从而引发了 Rebalance。

消费进度监控

滞后程度指消费者当前落后于生产者的程度,即消费者 Lag 或 Consumer Lag。

在实际业务场景中必须时刻关注消费者的消费进度。一旦出现 Lag 逐步增加的趋势,一定要定位问题,及时处理,避免造成业务损失。

Kafka 自带命令

使用 Kafka 自带的命令行工具 bin/kafka-consumer-groups.sh(bat)。kafka-consumer-groups 脚本是 Kafka 最直接的监控消费者消费进度的工具。

它也能够监控独立消费者(Standalone Consumer)的 Lag。独立消费者就是没有使用消费者组机制的消费者程序。和消费者组相同的是,它们也要配置 group.id 参数值,但和消费者组调用 KafkaConsumer.subscribe() 不同的是,独立消费者调用 KafkaConsumer.assign() 方法直接消费指定分区。

  1. 按照消费者组订阅主题的分区进行展示,每个分区一行数据;
  2. 除了主题、分区等信息外,它会汇报每个分区当前最新生产的消息的位移值(即 LOG-END-OFFSET 列值)、该消费者组当前最新消费消息的位移值(即 CURRENT-OFFSET 值)、LAG 值(前两者的差值)、消费者实例 ID、消费者连接 Broker 的主机名以及消费者的 CLIENT-ID 信息。

Kafka Java Consumer API

社区提供的 Java Consumer API 分别提供了查询当前分区最新消息位移和消费者组最新消费消息位移两组方法,使用它们就能计算出对应的 Lag。

  1. 调用 AdminClient.listConsumerGroupOffsets 方法获取给定消费者组的最新消费消息的位移;
  2. 获取订阅分区的最新消息位移;
  3. 执行相应的减法操作,获取 Lag 值并封装进一个 Map 对象。

Kafka JMX 监控指标

Kafka 消费者提供了一个名为 kafka.consumer:type=consumer-fetch-manager-metrics,client-id=“{client-id}”的 JMX 指标,里面有很多属性。

records-lag-max 和 records-lead-min,它们分别表示此消费者在测试窗口时间内曾经达到的最大的 Lag 值和最小的 Lead 值。

Lead 值是指消费者最新消费消息的位移与分区当前第一条消息位移的差值,即 Lag 越大,Lead 越小,反之也是同理。

一旦监测到 Lead 越来越小,甚至是快接近于 0 了,这可能预示着消费者端要丢消息了。Kafka 默认删除 1 周前的数据。倘若消费者程序足够慢,慢到它要消费的数据快被 Kafka 删除了,这时就必须立即处理,否则一定会出现消息被删除,从而导致消费者程序重新调整位移值的情形。这可能产生两个后果:一个是消费者从头消费一遍数据,另一个是消费者从最新的消息位移处开始消费,之前没来得及消费的消息全部被跳过了,从而造成丢消息的假象。

注:在实际生产环境中,一定要同时监控 Lag 值和 Lead 值。

Kafka 消费者还在分区级别提供了额外的 JMX 指标,用于单独监控分区级别的 Lag 和 Lead 值。JMX 名称为:kafka.consumer:type=consumer-fetch-manager-metrics,partition=“{partition}”,topic=“{topic}”,client-id=“{client-id}”。

分区级别的 JMX 指标中多了 records-lag-avg 和 records-lead-avg 两个属性,可以计算平均的 Lag 值和 Lead 值。

位移提交

Consumer 的消费位移,它记录了 Consumer 要消费的下一条消息的位移。

Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即Consumer 需要为分配给它的每个分区提交各自的位移数据。

提交位移主要是为了表征 Consumer 的消费进度,当 Consumer 发生故障重启之后,就能够从 Kafka 中读取之前提交的位移值,从相应的位移处继续消费,避免整个消费过程重来一遍。

注:位移提交的语义保障是由你来负责的,Kafka 只会接受提交的位移。

从用户的角度来说,位移提交分为自动提交和手动提交;从 Consumer 端的角度来说,位移提交分为同步提交和异步提交。

自动提交

Kafka Consumer 在后台默默地提交位移,作为用户完全不必操心这些事。

Consumer 端参数 enable.auto.commit 设置为 true (默认值)。auto.commit.interval.ms(默认值 5 秒)表明 Kafka 每 5 秒会自动提交一次位移。

自动提交位移可能会出现重复消费。

手动提交

要自己提交位移,Kafka Consumer 压根不管。开启手动提交位移设置 enable.auto.commit 为 false。

commitSync()

该方法会提交 KafkaConsumer#poll() 返回的最新位移。它是一个同步操作,即该方法会一直等待,直到位移被成功提交才会返回。如果提交过程中出现异常,该方法会将异常信息抛出。

在调用 commitSync() 时,Consumer 程序会处于阻塞状态,直到远端的 Broker 返回提交结果,这个状态才会结束。在任何系统中,因为程序而非资源限制而导致的阻塞都可能是系统的瓶颈,会影响整个应用程序的 TPS。

commitAsync()

一个异步操作。调用 commitAsync() 之后,它会立即返回,不会阻塞,因此不会影响 Consumer 应用的 TPS。由于它是异步的,Kafka 提供了回调函数(callback),供实现提交之后的逻辑,比如记录日志或处理异常等。

commitAsync 的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的。

commitSync 和 commitAsync 组合使用

  1. 利用 commitSync 的自动重试来规避那些瞬时错误,比如网络的瞬时抖动,Broker 端 GC 等。因为这些问题都是短暂的,自动重试通常都会成功。
  2. 不希望程序总处于阻塞状态,影响 TPS。

Kafka Consumer API 为手动提交提供了:commitSync(Map<TopicPartition, OffsetAndMetadata>) 和 commitAsync(Map<TopicPartition, OffsetAndMetadata>)。它们的参数是一个 Map 对象,键就是 TopicPartition,即消费的分区,而值是一个 OffsetAndMetadata 对象,保存的主要是位移数据。

CommitFailedException

Consumer 客户端在提交位移时出现了错误或异常,而且还是那种不可恢复的严重异常。

本次提交位移失败了,原因是消费者组已经开启了 Rebalance 过程,并且将要提交位移的分区分配给了另一个消费者实例。出现这个情况的原因是,消费者实例连续两次调用 poll 方法的时间间隔超过了期望的 max.poll.interval.ms 参数值。这通常表明,消费者实例花费了太长的时间进行消息处理,耽误了调用 poll 方法。

社区给出了两个相应的解决办法:

  1. 增加期望的时间间隔 max.poll.interval.ms 参数值。
  2. 减少 poll 方法一次性返回的消息数量,即减少 max.poll.records 参数值。

场景一

当消息处理的总时间超过预设的 max.poll.interval.ms 参数值时,Kafka Consumer 端会抛出 CommitFailedException 异常。

如果要防止这种场景下抛出异常,需要简化消息处理逻辑。具体来说有 4 种方法。

  1. 缩短单条消息处理的时间。
  2. 增加 Consumer 端允许下游系统消费一批消息的最大时长。这取决于 Consumer 端参数 max.poll.interval.ms 的值。
  3. 减少下游系统一次性消费的消息总数。这取决于 Consumer 端参数 max.poll.records 的值,当前该参数的默认值是 500 条。
  4. 下游系统使用多线程来加速消费。

场景二

如果应用中同时出现了设置相同 group.id 值的消费者组程序和独立消费者程序,那么当独立消费者程序手动提交位移时,Kafka 就会立即抛出 CommitFailedException 异常,因为 Kafka 无法识别这个具有相同 group.id 的消费者实例,于是就向它返回一个错误,表明它不是消费者组内合法的成员。

多线程开发消费者实例

Kafka Java Consumer 设计原理

从 Kafka 0.10.1.0 版本开始,KafkaConsumer 就变为了双线程的设计,即用户主线程和心跳线程。

用户主线程是启动 Consumer 应用程序 main 方法的那个线程,而新引入的心跳线程(Heartbeat Thread)只负责定期给对应的 Broker 机器发送心跳请求,以标识消费者应用的存活性(liveness)。心跳线程能将心跳频率与主线程调用 KafkaConsumer.poll 方法的频率分开,从而解耦真实的消息处理逻辑与消费者组成员存活性管理。

虽然有心跳线程,但实际的消息获取逻辑依然是在用户主线程中完成的。在消费消息的这个层面上,可以安全地认为 KafkaConsumer 是单线程的设计。

单线程的设计能够简化 Consumer 端的设计。Consumer 获取到消息后,处理消息的逻辑是否采用多线程,即把消息处理的多线程管理策略从 Consumer 端代码中剥离的权利。

多线程方案

KafkaConsumer 类不是线程安全的 (thread-safe)。所有的网络 I/O 处理都是发生在用户主线程中,在使用过程中必须要确保线程安全。即不能在多个线程中共享同一个 KafkaConsumer 实例,否则程序会抛出 ConcurrentModificationException 异常。

可以在其他线程中安全地调用KafkaConsumer.wakeup()来唤醒 Consumer。

方案一

消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程。

方案二

消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。获取消息的线程可以是一个,也可以是多个,每个线程维护专属的 KafkaConsumer 实例,处理消息则交由特定的线程池来做,从而实现消息获取与消息处理的真正解耦。

比较