消息队列之Kafka

Posted by 淦 Blog on March 19, 2025

要点

  • Kafka 的每个 Consumer(消费者)实例属于一个 ConsumerGroup(消费组);
  • 在消费时,ConsumerGroup 中的每个 Consumer 独占一个或多个 Partition(分区);
  • 对于每个 ConsumerGroup,在任意时刻,每个 Partition 至多有 1 个 Consumer 在消费;
  • 每个 ConsumerGroup 都有一个 Coordinator(协调者)负责分配 Consumer 和 Partition 的对应关系,当 Partition 或是 Consumer 发生变更时,会触发 reblance(重新分配)过程,重新分配 Consumer 与 Partition 的对应关系;
  • Consumer 维护与 Coordinator 之间的心跳,Coordinator 就能感知到Consumer 的状态,在 Consumer 故障的时候及时触发 rebalance。

消息复制

复制的基本单位是分区。每个分区的几个副本之间,构成一个小的复制集群,Broker 只是这些分区副本的容器,所以 Broker 是不分主从的。

分区的多个副本中采用一主多从的方式。Kafka 在写入消息的时候,采用的也是异步复制的方式。消息在写入到主节点之后,并不会马上返回写入成功,而是等待足够多(用户自己决定)的节点都复制成功后再返回。

使用 ZooKeeper 来监控每个分区的多个节点,如果发现某个分区的主节点宕机了,会利用 ZooKeeper 来选出一个新的主节点。

协调服务ZooKeeper

作用

  • 作为一个分布式的协调服务框架,主要用来解决分布式集群中,应用系统需要面对的各种通用的一致性问题。本身可以部署为一个集群,集群的各个节点之间可以通过选举来产生一个 Leader,选举遵循半数以上的原则,所以一般集群需要部署奇数个节点。
  • 提供了一个分布式的存储系统,数据的组织方式类似于 UNIX 文件系统的树形结构。由于这是一个可以保证一致性的存储系统,所以在应用集群中读写 ZooKeeper 的数据,而不用担心数据一致性的问题。分布式系统中一些需要整个集群所有节点都访问的元数据,比如集群节点信息、公共配置信息等,特别适合保存在 ZooKeeper 中。
  • 在这个树形的存储结构中,每个节点被称为一个“ZNode”。ZooKeeper 提供了一种特殊的 ZNode 类型:临时节点。如果创建临时节点的客户端与 ZooKeeper 集群失去连接,这个临时节点就会自动消失。在 ZooKeeper 内部,它维护了 ZooKeeper 集群与所有客户端的心跳,通过判断心跳的状态,来确定是否需要删除客户端创建的临时节点。
  • 提供了一种订阅 ZNode 状态变化的通知机制:Watcher,一旦 ZNode 或者它的子节点状态发生了变化,订阅的客户端会立即收到通知。 利用 ZooKeeper 临时节点和 Watcher 机制,来获取业务集群中每个节点的存活状态,并且可以监控业务集群的节点变化情况,当有节点上下线时,都可以收到来自ZooKeeper 的通知。
  • 实现业务集群的快速选举、节点间的简单通信、分布式锁等很多功能。

事务

用于实现它的 Exactly Once 机制,应用于实时计算的场景中。

确保在一个事务中发送的多条消息,要么都成功,要么都失败。注意,这里面的多条消息不一定要在同一个主题和分区中,可以是发往多个主题和分区的消息。

单独来使用的场景不多,更多的情况下被用来配合 Kafka 的幂等机制来实现 Kafka 的 Exactly Once 语义。

角色

事务协调者:负责在服务端协调整个事务。这个协调者并不是一个独立的进程,而是 Broker 进程的一部分,协调者和分区一样通过选举来保证自身的可用性。

Kafka 集群中有一个特殊的用于记录事务日志的主题,这个事务日志主题的实现和普通的主题是一样的,里面记录的数据就是类似于“开启事务”“提交事务”这样的事务日志。日志主题同样也包含了很多的分区。在 Kafka 集群中,可以存在多个协调者,每个协调者负责管理和使用事务日志中的几个分区,并行执行多个事务,提升性能。

实现流程

  1. 开启事务,生产者会给协调者发一个请求来开启事务,协调者在事务日志中记录下事务 ID。
  2. 生产者在发送消息之前,还要给协调者发送请求,告知发送的消息属于哪个主题和分区,这个信息也会被协调者记录在事务日志中。
  3. 生产者像发送普通消息一样来发送事务消息,直接发给 Broker,保存在这些消息对应的分区中,Kafka 会在客户端的消费者中,暂时过滤未提交的事务消息。
  4. 消息发送完成后,生产者给协调者发送提交或回滚事务的请求,由协调者来开始两阶段提交,完成事务。第一阶段,协调者把事务的状态设置为“预提交”,并写入事务日志。实际上事务已经成功了,无论接下来发生什么情况,事务最终都会被提交。
  5. 第二阶段,协调者在事务相关的所有分区中,都会写一条“事务结束”的特殊消息,当 Kafka 的消费者读到这个事务结束的特殊消息之后,它就可以把之前暂时过滤的那些未提交的事务消息,放行给业务代码进行消费了。
  6. 协调者记录最后一条事务日志,标识这个事务已经结束了。

保证 Exactly Once 语义

端到端 Exactly Once 指的是,数据从 Kafka 的 A 主题消费,发送给 Flink 的计算集群进行计算,计算结果再发给 Kafka 的 B 主题。在这整个过程中,无论是 Kafka 集群的节点还是 Flink 集群的节点发生故障,都不会影响计算结果,每条消息只会被计算一次,不能多也不能少。

Flink 通过 CheckPoint 机制来定期保存计算任务的快照,这个快照中主要包含两个重要的数据:

  1. 整个计算任务的状态。这个状态主要是计算任务中,每个子任务在计算过程中需要保存的临时状态数据。
  2. 数据源的位置信息。这个信息记录了在数据源的这个流中已经计算了哪些数据。如果数据源是 Kafka 的主题,这个位置信息就是 Kafka 主题中的消费位置。

当计算任务失败重启的时候,每个子任务先从 CheckPoint 中读取并恢复自己的状态,然后整个计算任务从 CheckPoint 中记录的数据源位置开始消费数据,只要这个恢复位置和 CheckPoint 中每个子任务的状态是完全对应的,或者说,每个子任务的状态恰好是:“刚刚处理完恢复位置之前的那条数据,还没有开始处理恢复位置对应的这条数据”,这个时刻保存的状态,就可以做到严丝合缝地恢复计算任务,每一条数据既不会丢失也不会重复。

Flink 通过在数据流中插入一个 Barrier(屏障)来确保 CheckPoint 中的位置和状态完全对应。

Barrier 为一条特殊的数据,由 Flink 生成,并在数据进入计算集群时被插入到数据流中。无限的数据流就被很多的 Barrier 分隔成很多段。Barrier 在流经每个计算节点的时候,就会触发这个节点在 CheckPoint 中保存本节点的状态,如果这个节点是数据源节点,还会保存数据源的位置。

当一个 Barrier 流过所有计算节点,流出计算集群后,一个 CheckPoint 也就保存完成了。由于每个节点都是在 Barrier 流过的时候保存的状态,这时的状态恰好就是 Barrier 所在位置(也就是 CheckPoint 数据源位置)对应的状态,这样就完美解决了状态与恢复位置对应的问题。

Flink 通过 CheckPoint 机制实现了集群内计算任务的 Exactly Once 语义,但是仍然实现不了在输入和输出两端数据不丢不重。比如,Flink 在把一条计算结果发给 Kafka 并收到来自 Kafka 的“发送成功”响应之后,才会继续处理下一条数据。如果这个时候重启计算任务,Flink 集群内的数据都可以完美地恢复到上一个 CheckPoint,但是已经发给 Kafka 的消息却没办法撤回,还是会出现数据重复的问题。

配合实现端到端 Exactly Once

Kafka 的 Exactly Once 语义是通过它的事务和生产幂等两个特性来共同实现的。它可以保证一个事务内的所有消息,要么都成功投递,要么都不投递。

生产幂等:通过连续递增的序号进行检测。Kafka 的生产者给每个消息增加都附加一个连续递增的序号,Broker 端会检测这个序号的连续性,如果序号重复了,Broker 会拒绝这个重复消息。

每个 Flink 的 CheckPoint 对应一个 Kafka 事务。Flink 在创建一个 CheckPoint 的时候,同时开启一个 Kafka 的事务,完成 CheckPoint 同时提交 Kafka 的事务。当计算任务重启的时候,在 Flink 中计算任务会恢复到上一个 CheckPoint,这个 CheckPoint 正好对应 Kafka 上一个成功提交的事务。未完成的 CheckPoint 和未提交的事务中的消息都会被丢弃,这样就实现了端到端的 Exactly Once。

Flink 基于两阶段提交这个常用的分布式事务算法,实现了一分布式事务的控制器保证“完成 CheckPoint 同时提交 Kafka 的事务”来解决这个问题。

选举

Kafka 分区的 Leader并不是选举出来的,而是 Controller 指定的。Kafka 使用 ZooKeeper 来监控每个分区的多个副本,如果发现某个分区的主节点宕机了,Controller 会收到 ZooKeeper 的通知,从 ISR 节点中选择一个节点,指定为新的 Leader。

在 Kafka 中 Controller 本身也是通过 ZooKeeper 选举产生的。严格来说是一种抢占模式。每个Broker 在启动后,都会尝试在 ZooKeeper 中创建同一个临时节点:/controller,并把自身的信息写入到这个节点中。由于 ZooKeeper 它是一个可以保证数据一致性的分布式存储,所以,集群中只会有一个 Broker 抢到这个临时节点,那它就是 Leader 节点。其他没抢到 Leader 的节点,会 Watch 这个临时节点,如果当前的 Leader 节点宕机,所有其他节点都会收到通知,它们会开始新一轮的抢 Leader。

消息存储

以 Partition 为单位,每个 Partition 包含一组消息文件(Segment file)和一组索引文件(Index),并且消息文件和索引文件一一对应,具有相同的文件名(但文件扩展名不一样),文件名就是这个文件中第一条消息的索引序号。

每个索引中保存索引序号(也就是这条消息是这个分区中的第几条消息)和对应的消息在消息文件中的绝对位置。在索引的设计上,Kafka 采用的是稀疏索引,为了节省存储空间,它不会为每一条消息都创建索引,而是每隔几条消息创建一条索引。

写入消息的就是在消息文件尾部连续追加写入,一个文件写满了再写下一个文件。查找消息时,首先根据文件名找到所在的索引文件,然后用二分法遍历索引文件内的索引,在里面找到离目标消息最近的索引,再去消息文件中,找到这条最近的索引指向的消息位置,从这个位置开始顺序遍历消息文件,找到目标消息。

寻址过程是需要一定时间的。一旦找到消息位置后,就可以批量顺序读取,不必每条消息都要进行一次寻址。

优点

  • 以分区为单位,粒度更细,优点是更加灵活,很容易进行数据迁移和扩容。
  • 采用了稀疏索引,节省一些存储空间。
  • 几个、十几个字节的小消息,但数量很多,稀疏索引能节省非常多的存储空间。

缺点

  • 稀疏索引牺牲了查找性能。

源码分析

Consumer

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java

// 设置必要的配置信息
// 包括:起始连接的 Broker 地址,Consumer Group 的 ID,自动提交消费位置的配置和序列化配置;
Properties props = new Properties();
props.setProperty(bootstrap.servers, localhost:9092);
props.setProperty(group.id, test);
props.setProperty(enable.auto.commit, true);
props.setProperty(auto.commit.interval.ms, 1000);
props.setProperty(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);
props.setProperty(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);

// 创建 Consumer 实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// 订阅了 2 个 Topic:foo 和 bar;
consumer.subscribe(Arrays.asList(foo, bar));

// 循环拉消息,并打印在控制台上
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records)
        System.out.printf(offset = %d, key = %s, value = %s%n, record.offset(), record.key(), record.value());
}

订阅过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
   /**
     * Internal helper method for {@link #subscribe(Collection)} and
     * {@link #subscribe(Collection, ConsumerRebalanceListener)}
     * <p>
     * Subscribe to the given list of topics to get dynamically assigned partitions.
     * <b>Topic subscriptions are not incremental. This list will replace the current
     * assignment (if there is one).</b> It is not possible to combine topic subscription with group management
     * with manual partition assignment through {@link #assign(Collection)}.
     *
     * If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}.
     *
     * <p>
     * @param topics The list of topics to subscribe to
     * @param listener {@link Optional} listener instance to get notifications on partition assignment/revocation
     *                 for the subscribed topics
     * @throws IllegalArgumentException If topics is null or contains null or empty elements
     * @throws IllegalStateException If {@code subscribe()} is called previously with pattern, or assign is called
     *                               previously (without a subsequent call to {@link #unsubscribe()}), or if not
     *                               configured at-least one partition assignment strategy
     */
    private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebalanceListener> listener) {
        acquireAndEnsureOpen();
        try {
            maybeThrowInvalidGroupIdException();
            if (topics == null)
                throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
            if (topics.isEmpty()) {
                // treat subscribing to empty topic list as the same as unsubscribing
                this.unsubscribe();
            } else {
                for (String topic : topics) {
                    if (isBlank(topic))
                        throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
                }

                throwIfNoAssignorsConfigured();

                // Clear the buffered data which are not a part of newly assigned topics
                final Set<TopicPartition> currentTopicPartitions = new HashSet<>();

                for (TopicPartition tp : subscriptions.assignedPartitions()) {
                    if (topics.contains(tp.topic()))
                        currentTopicPartitions.add(tp);
                }

                fetcher.clearBufferedDataForUnassignedPartitions(currentTopicPartitions);

                log.info("Subscribed to topic(s): {}", String.join(", ", topics));
                // 重置订阅状态
                // 主要维护了订阅的 topic 和 patition 的消费位置等状态信息
                if (this.subscriptions.subscribe(new HashSet<>(topics), listener))
                    // 更新元数据
                    // 属性 metadata 中维护了 Kafka 集群元数据的一个子集,包括集群的 Broker 节点、Topic 和 Partition 在节点上分布
                    // 以及Coordinator 给 Consumer 分配的 Partition 信息。
                    metadata.requestUpdateForNewTopics();
            }
        } finally {
            release();
        }
    }

   /**
     * Request an immediate update of the current cluster metadata info, because the caller is interested in
     * metadata that is being newly requested.
     * @return The current updateVersion before the update
     */
    // 必须确保在第一次拉消息之前元数据是可用的,也就是说在第一次拉消息之前必须更新一次元数据,否则 Consumer 就不知道它应该去哪个 Broker 上去拉哪个 Partition 的消息。
    public synchronized int requestUpdateForNewTopics() {
        // Override the timestamp of last refresh to let immediate update.
        this.lastRefreshMs = 0;
        this.needPartialUpdate = true;
        this.equivalentResponseCount = 0;
        this.requestVersion++;
        return this.updateVersion;
    }

拉取消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
// clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
   /**
     * @throws KafkaException if the rebalance callback throws exception
     */
    private ConsumerRecords<K, V> poll(final Timer timer) {
        acquireAndEnsureOpen();
        try {
            this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());

            if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
                throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
            }

            do {
                client.maybeTriggerWakeup();

                // try to update assignment metadata BUT do not need to block on the timer for join group
                // 更新元数据,最终现了与 Cluster 通信,在 Coordinator 上注册 Consumer 并拉取和更新元数据。
                // 类 ConsumerNetworkClient 封装了 Consumer 和 Cluster 之间所有的网络通信的实现,这个类是一个非常彻底的异步实现。
                // 它没有维护任何的线程,所有待发送的 Request 都存放在属性 unsent 中,返回的 Response 存放在属性 pendingCompletion 中。
                // 每次调用poll() 方法的时候,在当前线程中发送所有待发送的 Request,处理所有收到的Response。
                updateAssignmentMetadataIfNeeded(timer, false);

                // 拉取消息
                final Fetch<K, V> fetch = pollForFetches(timer);
                if (!fetch.isEmpty()) {
                    // before returning the fetched records, we can send off the next round of fetches
                    // and avoid block waiting for their responses to enable pipelining while the user
                    // is handling the fetched records.
                    //
                    // NOTE: since the consumed position has already been updated, we must not allow
                    // wakeups or any other errors to be triggered prior to returning the fetched records.
                    if (sendFetches() > 0 || client.hasPendingRequests()) {
                        client.transmitSends();
                    }

                    if (fetch.records().isEmpty()) {
                        log.trace("Returning empty records from `poll()` "
                                + "since the consumer's position has advanced for at least one topic partition");
                    }

                    return this.interceptors.onConsume(new ConsumerRecords<>(fetch.records(), fetch.nextOffsets()));
                }
            } while (timer.notExpired());

            return ConsumerRecords.empty();
        } finally {
            release();
            this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
        }
    }
    
    /**
     * @throws KafkaException if the rebalance callback throws exception
     */
    private Fetch<K, V> pollForFetches(Timer timer) {
        long pollTimeout = coordinator == null ? timer.remainingMs() :
                Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());

        // if data is available already, return it immediately
        // 如果缓存里面有未读取的消息,直接返回这些消息
        final Fetch<K, V> fetch = fetcher.collectFetch();
        if (!fetch.isEmpty()) {
            return fetch;
        }

        // send any new fetches (won't resend pending fetches)
        // 构造拉取消息请求,并发送
        // Kafka 根据元数据的信息,构造到所有需要的Broker 的拉消息的 Request,然后调用 client.Send() 方法将这些请求异步发送出去。
        // 并且,注册了一个回调类来处理返回的 Response,所有返回的 Response 被暂时存放在Fetcher.completedFetches 中。
        // 需要注意的是,这时的 Request 并没有被真正发给各个Broker,而是被暂存在了 client.unsend 中等待被发送。
        sendFetches();

        // We do not want to be stuck blocking in poll if we are missing some positions
        // since the offset lookup may be backing off after a failure

        // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we MUST call
        // updateAssignmentMetadataIfNeeded before this method.
        if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > retryBackoffMs) {
            pollTimeout = retryBackoffMs;
        }

        log.trace("Polling for fetches with timeout {}", pollTimeout);

        Timer pollTimer = time.timer(pollTimeout);
        // 发送网络请求拉取消息,等待直到有消息返回或者超时
        // 真正将之前构造的所有 Request 发送出去,并处理收到的 Response。
        client.poll(pollTimer, () -> {
            // since a fetch might be completed by the background thread, we need this poll condition
            // to ensure that we do not block unnecessarily in poll()
            return !fetcher.hasAvailableFetches();
        });
        timer.update(pollTimer.currentTimeMs());

        // 返回拉到的消息
        // 返回的 Response 反序列化后转换为消息列表,返回给调用者。
        return fetcher.collectFetch();
    }

协调服务ZooKeeper

客户端寻找Broker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
       /**
         * Add a metadata request to the list of sends if we can make one
         */
        private long maybeUpdate(long now, Node node) {
            String nodeConnectionId = node.idString();

            if (canSendRequest(nodeConnectionId, now)) {
                // 构建一个更新元数据的请求的构造器
                // 在这个方法里面创建的并不是一个真正的更新元数据的 MetadataRequest,而是一个用于构造 MetadataRequest 的构造器 MetadataRequest.Builder,
                // 等到真正要发送请求之前,Kafka 才会调用 Builder.buid() 方法把这个 MetadataRequest 构建出来然后发送出去。
                // 而且,不仅是元数据的请求,所有的请求都是这样来处理的。
                Metadata.MetadataRequestAndVersion requestAndVersion = metadata.newMetadataRequestAndVersion(now);
                MetadataRequest.Builder metadataRequest = requestAndVersion.requestBuilder;
                log.debug("Sending metadata request {} to node {}", metadataRequest, node);
                // 发送更新元数据的请求
                // 这个请求也并没有被真正发出去,依然是保存在待发送的队列中,然后择机来异步批量发送。
                sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
                inProgress = new InProgressData(requestAndVersion.requestVersion, requestAndVersion.isPartialUpdate);
                return defaultRequestTimeoutMs;
            }

            // If there's any connection establishment underway, wait until it completes. This prevents
            // the client from unnecessarily connecting to additional nodes while a previous connection
            // attempt has not been completed.
            if (isAnyNodeConnecting()) {
                // Strictly the timeout we should return here is "connect timeout", but as we don't
                // have such application level configuration, using reconnect backoff instead.
                return reconnectBackoffMs;
            }

            if (connectionStates.canConnect(nodeConnectionId, now)) {
                // We don't have a connection to this node right now, make one
                log.debug("Initialize connection to node {} for sending metadata request", node);
                initiateConnect(node, now);
                return reconnectBackoffMs;
            }

            // connected, but can't send more OR connecting
            // In either case, we just need to wait for a network event to let us know the selected
            // connection might be usable again.
            return Long.MAX_VALUE;
        }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
// core/src/main/scala/kafka/server/KafkaApis.scala
  def handleTopicMetadataRequest(request: RequestChannel.Request): Unit = {
    val metadataRequest = request.body[MetadataRequest]
    val requestVersion = request.header.apiVersion

    // Topic IDs are not supported for versions 10 and 11. Topic names can not be null in these versions.
    if (!metadataRequest.isAllTopics) {
      metadataRequest.data.topics.forEach{ topic =>
        if (topic.name == null && metadataRequest.version < 12) {
          throw new InvalidRequestException(s"Topic name can not be null for version ${metadataRequest.version}")
        } else if (topic.topicId != Uuid.ZERO_UUID && metadataRequest.version < 12) {
          throw new InvalidRequestException(s"Topic IDs are not supported in requests for version ${metadataRequest.version}")
        }
      }
    }

    // Check if topicId is presented firstly.
    val topicIds = metadataRequest.topicIds.asScala.toSet.filterNot(_ == Uuid.ZERO_UUID)
    val useTopicId = topicIds.nonEmpty

    // Only get topicIds and topicNames when supporting topicId
    val unknownTopicIds = topicIds.filter(metadataCache.getTopicName(_).isEmpty)
    val knownTopicNames = topicIds.flatMap(metadataCache.getTopicName)

    val unknownTopicIdsTopicMetadata = unknownTopicIds.map(topicId =>
        metadataResponseTopic(Errors.UNKNOWN_TOPIC_ID, null, topicId, isInternal = false, util.Collections.emptyList())).toSeq

    // 计算需要获取哪些主题的元数据
    val topics = if (metadataRequest.isAllTopics)
      metadataCache.getAllTopics()
    else if (useTopicId)
      knownTopicNames
    else
      metadataRequest.topics.asScala.toSet

    val authorizedForDescribeTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC,
      topics, logIfDenied = !metadataRequest.isAllTopics)(identity)
    var (authorizedTopics, unauthorizedForDescribeTopics) = topics.partition(authorizedForDescribeTopics.contains)
    var unauthorizedForCreateTopics = Set[String]()

    if (authorizedTopics.nonEmpty) {
      val nonExistingTopics = authorizedTopics.filterNot(metadataCache.contains)
      if (metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) {
        if (!authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false)) {
          val authorizedForCreateTopics = authHelper.filterByAuthorized(request.context, CREATE, TOPIC,
            nonExistingTopics)(identity)
          unauthorizedForCreateTopics = nonExistingTopics.diff(authorizedForCreateTopics)
          authorizedTopics = authorizedTopics.diff(unauthorizedForCreateTopics)
        }
      }
    }

    val unauthorizedForCreateTopicMetadata = unauthorizedForCreateTopics.map(topic =>
      // Set topicId to zero since we will never create topic which topicId
      metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, Uuid.ZERO_UUID, isInternal(topic), util.Collections.emptyList()))

    // do not disclose the existence of topics unauthorized for Describe, so we've not even checked if they exist or not
    val unauthorizedForDescribeTopicMetadata =
      // In case of all topics, don't include topics unauthorized for Describe
      if ((requestVersion == 0 && (metadataRequest.topics == null || metadataRequest.topics.isEmpty)) || metadataRequest.isAllTopics)
        Set.empty[MetadataResponseTopic]
      else if (useTopicId) {
        // Topic IDs are not considered sensitive information, so returning TOPIC_AUTHORIZATION_FAILED is OK
        unauthorizedForDescribeTopics.map(topic =>
          metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, null, metadataCache.getTopicId(topic), isInternal = false, util.Collections.emptyList()))
      } else {
        // We should not return topicId when on unauthorized error, so we return zero uuid.
        unauthorizedForDescribeTopics.map(topic =>
          metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, Uuid.ZERO_UUID, isInternal = false, util.Collections.emptyList()))
      }

    // In version 0, we returned an error when brokers with replicas were unavailable,
    // while in higher versions we simply don't include the broker in the returned broker list
    val errorUnavailableEndpoints = requestVersion == 0
    // In versions 5 and below, we returned LEADER_NOT_AVAILABLE if a matching listener was not found on the leader.
    // From version 6 onwards, we return LISTENER_NOT_FOUND to enable diagnosis of configuration errors.
    val errorUnavailableListeners = requestVersion >= 6

    val allowAutoCreation = config.autoCreateTopicsEnable && metadataRequest.allowAutoTopicCreation && !metadataRequest.isAllTopics
    // 从元数据缓存过滤出相关主题的元数据
    val topicMetadata = getTopicMetadata(request, metadataRequest.isAllTopics, allowAutoCreation, authorizedTopics,
      request.context.listenerName, errorUnavailableEndpoints, errorUnavailableListeners)

    var clusterAuthorizedOperations = Int.MinValue // Default value in the schema
    if (requestVersion >= 8) {
      // get cluster authorized operations
      if (requestVersion <= 10) {
        if (metadataRequest.data.includeClusterAuthorizedOperations) {
          if (authHelper.authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME))
            clusterAuthorizedOperations = authHelper.authorizedOperations(request, Resource.CLUSTER)
          else
            clusterAuthorizedOperations = 0
        }
      }

      // get topic authorized operations
      if (metadataRequest.data.includeTopicAuthorizedOperations) {
        def setTopicAuthorizedOperations(topicMetadata: Seq[MetadataResponseTopic]): Unit = {
          topicMetadata.foreach { topicData =>
            topicData.setTopicAuthorizedOperations(authHelper.authorizedOperations(request, new Resource(ResourceType.TOPIC, topicData.name)))
          }
        }
        setTopicAuthorizedOperations(topicMetadata)
      }
    }

    val completeTopicMetadata =  unknownTopicIdsTopicMetadata ++
      topicMetadata ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata

    // 获取所有 Broker 列表
    val brokers = metadataCache.getAliveBrokerNodes(request.context.listenerName)

    trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(completeTopicMetadata.mkString(","),
      brokers.mkString(","), request.header.correlationId, request.header.clientId))
    val controllerId = {
      metadataCache.getControllerId.flatMap {
        case ZkCachedControllerId(id) => Some(id)
        case KRaftCachedControllerId(_) => metadataCache.getRandomAliveBrokerId
      }
    }

    // 构建 Response 并发送
    requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
       MetadataResponse.prepareResponse(
         requestVersion,
         requestThrottleMs,
         brokers.toList.asJava,
         clusterId,
         controllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
         completeTopicMetadata.asJava,
         clusterAuthorizedOperations
      ))
  }