版本为 3.9.1。由于是魔改的 kafka 代码,虽然线下验证没问题,但是还拿不准是否需要更新线上服务使用的 jar 包 …
背景
在 KIP-853 功能验证 调研时,已经确认了普通的 broker 是可以同步 voter 列表的变更的,但是在排查其他问题时,发现普通的 broker 还是会尝试连接旧的 voter
1 | [2025-12-18 20:50:06,552] INFO [RaftManager id=157] Node 0 disconnected. (org.apache.kafka.clients.NetworkClient) |
当时吓出一身冷汗,之前调研的时候,做 broker 感知 voter 变化的测试,可能因为某些操作有问题,导致没有发现这个问题。让模型分析了一下相关代码,给出的结论时,当前 kafka broker 仅使用配置文件中配的 voter 列表来发现与连接 kraft leader 。虽然它同步的 raft log 中包含了最新的 voter 列表信息,但是它并不会使用。
模型相关问答
问题1:broker 如何感知到 voter 的变化
结论:无法感知到配置文件中 controller.quorum.voters 的值的变化。但是可以通过 raft log 中 voter record 来感知。


问题2:再次确认 broker 可以通过 log 来感知 voter 的变更
结论:是可以的


问题3:确认为什么 broker 会去连接旧 voter 节点
结论:在网络连接层,broker 连接的 voter 列表是在初始化的时候就定死的,后续不会变更


问题4:确认这个初始化 voter 列表来自于哪里
结论:来自于配置 controller.quorum.voters


问题5:为什么 voter 在选举的时候可以获取到最新的 voter 信息?
结论:由于 voter 和 observer(普通 broker) 是不同的角色,它们获取 voter 列表的方式走的是不同的机制。对于 voter 来说,它本身就维护者 voter 列表的变更,也就是 KIP-853 提供的特性,所以可以获取到最新的 voter 信息。


结论
虽然 broker 可以通过 raft log 获取最新的 voter 列表,但是在它尝试发现 raft leader 的时候,依旧会使用配置文件中写死的 voter 列表。如果这个 voter 列表中的信息都失效了,那么在 leader 发生切换时,broker 将无法发现新的 leader,从而导致 broker 挂掉。之前验证 KIP-853 的结论存在问题。
模型修复方案
相关 prompt
1 | 结合源码进行分析,是否有简洁的方式可以支持 broker 动态更新 voter 集合的变化? |
1 | 再次确认其中是否可能会存在并发访问的问题 |
1 | 根据你的设计方案,对相关代码进行更新 |
1 | 确认一下,在当前的实现中,在更新 bootstrap server 的信息时,是使用 raft log 中最新的 bootstrap server 列表的信息来更新的吗 |
1 | 额外确认这次变更中,raft/src/main/java/org/apache/kafka/raft/RequestManager.java 中的变量 bootstrapServers 是否会存在并发访问的问题 |
1 | 对代码进行额外修改,新增配置项用于判断是否开启这个特性,默认为开启的 |
1 | 为了保险起见,为 RequestManager 的 bootstrapServers 添加上并发访问的场景的下的同步机制 |
背景
问题描述
在 KRaft 模式下,当通过 AddVoter/RemoveVoter API 动态变更 voter 集合时,普通 broker 虽然能通过 KRaft log 中的 VotersRecord 感知到 voter 集合的变化,但 RequestManager 中的 bootstrapServers 列表不会更新。这导致 broker 在需要重新发现 leader 时,仍然会尝试连接旧的 voter 地址。
现象
broker 日志中出现类似以下警告:
1 | [RaftManager id=157] Connection to node 0 (old-host:9093) could not be established. Node may not be available. |
根本原因
RequestManager.bootstrapServers 在 broker 启动时从配置文件 controller.quorum.voters 初始化后,不会再更新:
1 | // 原实现 |
解决方案
设计思路
新增配置项
controller.quorum.dynamic.bootstrap.servers.enable控制是否启用此特性(默认启用)将
RequestManager.bootstrapServers改为可变添加
updateBootstrapServers()方法支持动态更新在
KafkaRaftClient中,当partitionState.updateState()处理完 KRaft log 后,检查并更新 bootstrap servers
线程安全分析
经过分析,KafkaRaftClient 采用单线程模型:
所有操作都在
KafkaRaftClientDriver的 IO 线程中通过poll()方法执行updateBootstrapServers()和findReadyBootstrapServer()不会并发执行
防御性线程安全措施: 为了保险起见,仍然添加了线程安全机制:
bootstrapServers字段使用volatile关键字,确保可见性读取方法使用本地变量引用模式,确保一致性读取
1 | // 字段声明 |
这种模式确保即使在未来可能的多线程场景下也能正确工作。
配置项
controller.quorum.dynamic.bootstrap.servers.enable
文件路径: raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
|属性|值|
|-|-|
|配置名|controller.quorum.dynamic.bootstrap.servers.enable|
|类型|Boolean|
|默认值|true|
|重要性|LOW|
说明: 是否在 voter 集合变更时动态更新 bootstrap servers。启用后,broker 会根据 KRaft log 中最新的 VotersRecord 自动更新 bootstrap servers 列表,确保能够连接到新的 voter 地址。
1 | public static final String QUORUM_DYNAMIC_BOOTSTRAP_SERVERS_CONFIG = QUORUM_PREFIX + "dynamic.bootstrap.servers.enable"; |
代码变更
1. QuorumConfig.java
文件路径: raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
变更 1:添加 BOOLEAN import
1 | import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; |
变更 2:添加配置项定义
1 | public static final String QUORUM_DYNAMIC_BOOTSTRAP_SERVERS_CONFIG = QUORUM_PREFIX + "dynamic.bootstrap.servers.enable"; |
变更 3:添加到 CONFIG_DEF
1 | public static final ConfigDef CONFIG_DEF = new ConfigDef() |
变更 4:添加字段和 getter
1 | private final boolean dynamicBootstrapServersEnabled; |
2. RequestManager.java
文件路径: raft/src/main/java/org/apache/kafka/raft/RequestManager.java
变更 1:添加 List import
1 | import java.util.ArrayList; |
变更 2:将 bootstrapServers 改为可变并添加 volatile
1 | public class RequestManager { |
变更 3:添加 updateBootstrapServers 方法
1 | /** |
3. KafkaRaftClient.java
文件路径: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
变更 1:添加 maybeUpdateBootstrapServers 方法(带配置检查)
1 | /** |
变更 2-4:在三个位置调用 maybeUpdateBootstrapServers()
appendAsFollower()方法中appendAsLeader()方法中handleFetchSnapshotResponse()方法中
数据来源
maybeUpdateBootstrapServers() 使用 KRaft log 中最新的 VoterSet 来更新 bootstrap servers:
1 | partitionState.lastVoterSet() |
交互流程
1 | sequenceDiagram |
触发时机
Bootstrap servers 会在以下时机更新(当配置启用时):
|场景|方法|说明|
|-|-|-|
|Follower 追加日志|appendAsFollower()|复制 leader 的日志时|
|Leader 追加日志|appendAsLeader()|leader 写入新记录时|
|Snapshot 下载完成|handleFetchSnapshotResponse()|从 leader 下载 snapshot 后|
使用方式
启用(默认)
无需配置,默认启用。
禁用
在 server.properties 中添加:
1 | controller.quorum.dynamic.bootstrap.servers.enable=false |
测试验证
1 | # 编译 |
所有测试通过。
总结
改动范围
QuorumConfig.java:新增配置项定义、字段和 getterRequestManager.java:5 处改动1 个 import(
List)1 个字段修改(
volatile List<Node> bootstrapServers)1 个新方法(
updateBootstrapServers())2 处本地变量引用(
findReadyBootstrapServer()和backoffBeforeAvailableBootstrapServer())
KafkaRaftClient.java:4 处改动(1 个新方法含配置检查,3 个调用点)
效果
当 voter 通过 AddVoter/RemoveVoter API 变更后,broker 会自动更新 bootstrap servers 列表,确保在需要重新发现 leader 时能够连接到新的 voter 地址。
兼容性
向后兼容:不改变现有 API,默认启用
可配置:通过
controller.quorum.dynamic.bootstrap.servers.enable控制无性能影响:只在日志追加时检查更新
线程安全:单线程模型,无并发问题
kafka 自带测试
1 | ./gradlew test |

其中有一个测试存在问题,是和 streams 相关的,相关功能和这次的变更无关,且重跑就正常了
1 | ./gradlew :streams:test |

本地测试
测试环境信息
1 | # work @ bjyz-201605-m32-wise044 in /home/disk0/nizhenhyang/kafka-kraft-voter-fix-test [10:59:43] |
1 | # work @ bjyz-201605-m32-wise044 in /home/disk0/nizhenhyang/kafka-kraft-voter-fix-test/exec_env/b3 [11:02:13] |
问题复现
turn1
我们需要修改 0 、 1 的端口,以模拟实现迁移的场景,最后将 2 stop ,观察 3 的日志
0: 8100 -> 8200
1: 8110 -> 8210
第一轮居然没有复现,3 依旧正常,我已经确认我执行的是没有问题的。排查发现,流程大概是这样的
初始化的时候,leader 为 1 ,3 中 0、1、2 的信息都是正确的,3 连接 1
在 0 切换 port 后,leader 依旧为 1 ,3 中只有 1、2 的信息是正确的,3 连接 1
在 1 切换 port 后,leader 切换为 0,3 中只有 2 的信息是正确的,此时 3 断开了和 1 的连接,但是由于依旧保留 2 正确的信息,所以可以发现新的 0
在 2 stop 后,leader 依旧为 0 ,3 和 0 保持连接,所以 3 依旧是正常的。即使 2 切换 port ,3 依旧连这 0,所以现象是正常的
综上,上一次调研的时候应该遇到的是这种场景,实际上只用再把 0 restart ,3 就挂了。
turn2
接下来我们只用进行如下操作,就可以让 3 挂掉了
2: 8120 –> 8220
stop 0
可以看到,此时 3 挂了

1 | [2025-12-19 11:51:15,218] WARN [RaftManager id=3] Connection to node 0 (/10.144.79.15:8101) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient) |
复现了预期的故障场景
新机制测试
当前的 leader 为 0,则将 3 中的 controller.quorum.voters配置里,1 和 2 都是错误的,仅 c0 是正确的。将 debug 日志开启,此时日志中有 voter 列表更新的日志
1 | [2025-12-19 16:48:02,383] DEBUG [RaftManager id=3] Updated bootstrap servers to: [10.144.79.15:8201 (id: 0 rack: null), 10.144.79.15:8221 (id: 2 rack: null), 10.144.79.15:8211 (id: 1 rack: null)] (org.apache.kafka.raft.KafkaRaftClient) |
此时将 0 stop ,3 依旧正常,有请求新 leader 的 fetch 日志,说明其可以感知到其他的 voter
1 | [2025-12-19 16:50:51,531] DEBUG [RaftManager id=3] Received FETCH response from node 1 for request with header RequestHeader(apiKey=FETCH, apiVersion=17, clientId=raft-client-3, correlationId=359, headerVersion=2): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=0, responses=[FetchableTopicResponse(topic='', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[PartitionData(partitionIndex=0, errorCode=0, highWatermark=1425551, lastStableOffset=-1, logStartOffset=1292218, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=1, leaderEpoch=1515), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=[], preferredReadReplica=-1, records=MemoryRecords(size=72, buffer=java.nio.HeapByteBuffer[pos=0 lim=72 cap=112]))])], nodeEndpoints=[NodeEndpoint(nodeId=1, host='10.144.79.15', port=8211, rack=null)]) (org.apache.kafka.clients.NetworkClient) |
1 | # work @ bjyz-201605-m32-wise044 in /home/disk0/nizhenhyang/kafka-kraft-voter-fix-test/exec_env/b3 [16:49:58] |
总结
验证新特性时,不能仅靠测试,因为测试可能会有遗漏,必须结合源码进行分析。也就源码分析优先,实验测试辅助。