kafka raft 模式下普通 broker 无法自动更新 voter 列表问题修复

版本为 3.9.1。由于是魔改的 kafka 代码,虽然线下验证没问题,但是还拿不准是否需要更新线上服务使用的 jar 包 …

背景

KIP-853 功能验证 调研时,已经确认了普通的 broker 是可以同步 voter 列表的变更的,但是在排查其他问题时,发现普通的 broker 还是会尝试连接旧的 voter

1
2
[2025-12-18 20:50:06,552] INFO [RaftManager id=157] Node 0 disconnected. (org.apache.kafka.clients.NetworkClient)
[2025-12-18 20:50:06,552] WARN [RaftManager id=157] Connection to node 0 (bddwd-ps-beehive-agent187658.bddwd.baidu.com/10.41.70.215:9093) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient)

当时吓出一身冷汗,之前调研的时候,做 broker 感知 voter 变化的测试,可能因为某些操作有问题,导致没有发现这个问题。让模型分析了一下相关代码,给出的结论时,当前 kafka broker 仅使用配置文件中配的 voter 列表来发现与连接 kraft leader 。虽然它同步的 raft log 中包含了最新的 voter 列表信息,但是它并不会使用。

模型相关问答

问题1:broker 如何感知到 voter 的变化

结论:无法感知到配置文件中 controller.quorum.voters 的值的变化。但是可以通过 raft log 中 voter record 来感知。

问题2:再次确认 broker 可以通过 log 来感知 voter 的变更

结论:是可以的

问题3:确认为什么 broker 会去连接旧 voter 节点

结论:在网络连接层,broker 连接的 voter 列表是在初始化的时候就定死的,后续不会变更

问题4:确认这个初始化 voter 列表来自于哪里

结论:来自于配置 controller.quorum.voters

问题5:为什么 voter 在选举的时候可以获取到最新的 voter 信息?

结论:由于 voter 和 observer(普通 broker) 是不同的角色,它们获取 voter 列表的方式走的是不同的机制。对于 voter 来说,它本身就维护者 voter 列表的变更,也就是 KIP-853 提供的特性,所以可以获取到最新的 voter 信息。

结论

虽然 broker 可以通过 raft log 获取最新的 voter 列表,但是在它尝试发现 raft leader 的时候,依旧会使用配置文件中写死的 voter 列表。如果这个 voter 列表中的信息都失效了,那么在 leader 发生切换时,broker 将无法发现新的 leader,从而导致 broker 挂掉。之前验证 KIP-853 的结论存在问题。

模型修复方案

相关 prompt

1
结合源码进行分析,是否有简洁的方式可以支持 broker 动态更新 voter 集合的变化?
1
再次确认其中是否可能会存在并发访问的问题
1
根据你的设计方案,对相关代码进行更新
1
确认一下,在当前的实现中,在更新 bootstrap server 的信息时,是使用 raft log 中最新的 bootstrap server 列表的信息来更新的吗
1
额外确认这次变更中,raft/src/main/java/org/apache/kafka/raft/RequestManager.java 中的变量 bootstrapServers 是否会存在并发访问的问题
1
对代码进行额外修改,新增配置项用于判断是否开启这个特性,默认为开启的
1
为了保险起见,为 RequestManager 的 bootstrapServers 添加上并发访问的场景的下的同步机制

背景

问题描述

在 KRaft 模式下,当通过 AddVoter/RemoveVoter API 动态变更 voter 集合时,普通 broker 虽然能通过 KRaft log 中的 VotersRecord 感知到 voter 集合的变化,但 RequestManager 中的 bootstrapServers 列表不会更新。这导致 broker 在需要重新发现 leader 时,仍然会尝试连接旧的 voter 地址。

现象

broker 日志中出现类似以下警告:

1
[RaftManager id=157] Connection to node 0 (old-host:9093) could not be established. Node may not be available.

根本原因

RequestManager.bootstrapServers 在 broker 启动时从配置文件 controller.quorum.voters 初始化后,不会再更新:

1
2
// 原实现
private final ArrayList<Node> bootstrapServers; // final,不可变

解决方案

设计思路

  1. 新增配置项 controller.quorum.dynamic.bootstrap.servers.enable 控制是否启用此特性(默认启用)

  2. RequestManager.bootstrapServers 改为可变

  3. 添加 updateBootstrapServers() 方法支持动态更新

  4. KafkaRaftClient 中,当 partitionState.updateState() 处理完 KRaft log 后,检查并更新 bootstrap servers

线程安全分析

经过分析,KafkaRaftClient 采用单线程模型

  • 所有操作都在 KafkaRaftClientDriver 的 IO 线程中通过 poll() 方法执行

  • updateBootstrapServers()findReadyBootstrapServer() 不会并发执行

防御性线程安全措施: 为了保险起见,仍然添加了线程安全机制:

  1. bootstrapServers 字段使用 volatile 关键字,确保可见性

  2. 读取方法使用本地变量引用模式,确保一致性读取

1
2
3
4
5
6
// 字段声明
private volatile List<Node> bootstrapServers;

// 读取时使用本地引用
List<Node> servers = this.bootstrapServers; // 本地引用
for (Node node : servers) { ... } // 使用本地引用

这种模式确保即使在未来可能的多线程场景下也能正确工作。


配置项

controller.quorum.dynamic.bootstrap.servers.enable

文件路径: raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java

|属性|值|

|-|-|

|配置名|controller.quorum.dynamic.bootstrap.servers.enable|

|类型|Boolean|

|默认值|true|

|重要性|LOW|

说明: 是否在 voter 集合变更时动态更新 bootstrap servers。启用后,broker 会根据 KRaft log 中最新的 VotersRecord 自动更新 bootstrap servers 列表,确保能够连接到新的 voter 地址。

1
2
3
4
5
6
public static final String QUORUM_DYNAMIC_BOOTSTRAP_SERVERS_CONFIG = QUORUM_PREFIX + "dynamic.bootstrap.servers.enable";
public static final String QUORUM_DYNAMIC_BOOTSTRAP_SERVERS_DOC = "Whether to dynamically update bootstrap servers " +
"when the voter set changes via AddVoter/RemoveVoter. When enabled, the broker will automatically update " +
"its bootstrap servers list based on the latest VotersRecord in the KRaft log, ensuring it can connect to " +
"new voters when discovering the leader.";
public static final boolean DEFAULT_QUORUM_DYNAMIC_BOOTSTRAP_SERVERS = true;

代码变更

1. QuorumConfig.java

文件路径: raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java

变更 1:添加 BOOLEAN import

1
2
3
4
5
6
 import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.LIST;

变更 2:添加配置项定义

1
2
3
4
5
6
public static final String QUORUM_DYNAMIC_BOOTSTRAP_SERVERS_CONFIG = QUORUM_PREFIX + "dynamic.bootstrap.servers.enable";
public static final String QUORUM_DYNAMIC_BOOTSTRAP_SERVERS_DOC = "Whether to dynamically update bootstrap servers " +
"when the voter set changes via AddVoter/RemoveVoter. When enabled, the broker will automatically update " +
"its bootstrap servers list based on the latest VotersRecord in the KRaft log, ensuring it can connect to " +
"new voters when discovering the leader.";
public static final boolean DEFAULT_QUORUM_DYNAMIC_BOOTSTRAP_SERVERS = true;

变更 3:添加到 CONFIG_DEF

1
2
3
4
5
 public static final ConfigDef CONFIG_DEF =  new ConfigDef()
// ... 其他配置 ...
- .define(QUORUM_RETRY_BACKOFF_MS_CONFIG, INT, DEFAULT_QUORUM_RETRY_BACKOFF_MS, null, LOW, QUORUM_RETRY_BACKOFF_MS_DOC);
+ .define(QUORUM_RETRY_BACKOFF_MS_CONFIG, INT, DEFAULT_QUORUM_RETRY_BACKOFF_MS, null, LOW, QUORUM_RETRY_BACKOFF_MS_DOC)
+ .define(QUORUM_DYNAMIC_BOOTSTRAP_SERVERS_CONFIG, BOOLEAN, DEFAULT_QUORUM_DYNAMIC_BOOTSTRAP_SERVERS, null, LOW, QUORUM_DYNAMIC_BOOTSTRAP_SERVERS_DOC);

变更 4:添加字段和 getter

1
2
3
4
5
private final boolean dynamicBootstrapServersEnabled;

public boolean dynamicBootstrapServersEnabled() {
return dynamicBootstrapServersEnabled;
}

2. RequestManager.java

文件路径: raft/src/main/java/org/apache/kafka/raft/RequestManager.java

变更 1:添加 List import

1
2
3
4
5
6
7
8
9
 import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Random;

变更 2:将 bootstrapServers 改为可变并添加 volatile

1
2
3
4
 public class RequestManager {
private final Map<String, ConnectionState> connections = new HashMap<>();
- private final ArrayList<Node> bootstrapServers;
+ private volatile List<Node> bootstrapServers;

变更 3:添加 updateBootstrapServers 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Updates the bootstrap servers list dynamically.
* This method should be called when the voter set changes to ensure
* the client can connect to the new voters.
*
* @param newBootstrapServers the new list of bootstrap servers
*/
public void updateBootstrapServers(Collection<Node> newBootstrapServers) {
if (newBootstrapServers == null || newBootstrapServers.isEmpty()) {
return;
}
this.bootstrapServers = new ArrayList<>(newBootstrapServers);
}

3. KafkaRaftClient.java

文件路径: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java

变更 1:添加 maybeUpdateBootstrapServers 方法(带配置检查)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* Updates the bootstrap servers in RequestManager based on the current voter set.
* This ensures that when voters change dynamically (via AddVoter/RemoveVoter),
* the broker can connect to the new voters when discovering the leader.
*
* This feature is controlled by the configuration:
* {@link QuorumConfig#QUORUM_DYNAMIC_BOOTSTRAP_SERVERS_CONFIG}
*/
private void maybeUpdateBootstrapServers() {
if (!quorumConfig.dynamicBootstrapServersEnabled()) {
return;
}

if (requestManager == null) {
return;
}

VoterSet currentVoterSet = partitionState.lastVoterSet();
if (currentVoterSet == null || currentVoterSet.voterIds().isEmpty()) {
return;
}

List<Node> newBootstrapNodes = currentVoterSet.voterNodes()
.stream()
.map(voterNode -> {
InetSocketAddress address = voterNode.listeners()
.address(channel.listenerName())
.orElse(null);
if (address == null) {
return null;
}
return new Node(
voterNode.voterKey().id(),
address.getHostString(),
address.getPort()
);
})
.filter(Objects::nonNull)
.collect(Collectors.toList());

if (!newBootstrapNodes.isEmpty()) {
requestManager.updateBootstrapServers(newBootstrapNodes);
logger.info("Updated bootstrap servers to: {}", newBootstrapNodes);
}
}

变更 2-4:在三个位置调用 maybeUpdateBootstrapServers()

  • appendAsFollower() 方法中

  • appendAsLeader() 方法中

  • handleFetchSnapshotResponse() 方法中


数据来源

maybeUpdateBootstrapServers() 使用 KRaft log 中最新的 VoterSet 来更新 bootstrap servers:

1
2
3
4
5
6
7
partitionState.lastVoterSet()

VoterSetHistory.lastValue()

votersHistory.lastEntry().value() // 优先返回 log 中的最新 VotersRecord

如果 log 中没有,则返回 staticVoterSet(配置文件中的值)

交互流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
sequenceDiagram
participant Leader as Controller Leader
participant Log as KRaft Log
participant Broker as Broker (Observer)
participant StateMachine as KRaftControlRecordStateMachine
participant RaftClient as KafkaRaftClient
participant ReqMgr as RequestManager

Leader->>Log: appendVotersRecord(newVoterSet)
Log->>Broker: Fetch 复制
Broker->>RaftClient: appendAsFollower(records)
RaftClient->>StateMachine: updateState()
StateMachine->>StateMachine: handleBatch() - 解析 KRAFT_VOTERS
StateMachine->>StateMachine: voterSetHistory.addAt(offset, voters)
RaftClient->>RaftClient: maybeUpdateBootstrapServers()
alt dynamicBootstrapServersEnabled = true
RaftClient->>StateMachine: lastVoterSet()
StateMachine-->>RaftClient: 返回最新 VoterSet
RaftClient->>ReqMgr: updateBootstrapServers(newNodes)
ReqMgr->>ReqMgr: bootstrapServers = newList
Note over ReqMgr: 下次 findReadyBootstrapServer()<br/>将使用新的 voter 地址
else dynamicBootstrapServersEnabled = false
Note over RaftClient: 跳过更新
end

触发时机

Bootstrap servers 会在以下时机更新(当配置启用时):

|场景|方法|说明|

|-|-|-|

|Follower 追加日志|appendAsFollower()|复制 leader 的日志时|

|Leader 追加日志|appendAsLeader()|leader 写入新记录时|

|Snapshot 下载完成|handleFetchSnapshotResponse()|从 leader 下载 snapshot 后|


使用方式

启用(默认)

无需配置,默认启用。

禁用

server.properties 中添加:

1
controller.quorum.dynamic.bootstrap.servers.enable=false

测试验证

1
2
3
4
5
6
7
8
# 编译
./gradlew :raft:compileJava --no-daemon -q

# 运行相关测试
./gradlew :raft:test --tests "org.apache.kafka.raft.QuorumConfigTest" \
--tests "org.apache.kafka.raft.RequestManagerTest" \
--tests "org.apache.kafka.raft.KafkaRaftClientTest" \
-x checkstyleMain -x checkstyleTest

所有测试通过。


总结

改动范围

  • QuorumConfig.java:新增配置项定义、字段和 getter

  • RequestManager.java:5 处改动

    • 1 个 import(List

    • 1 个字段修改(volatile List<Node> bootstrapServers

    • 1 个新方法(updateBootstrapServers()

    • 2 处本地变量引用(findReadyBootstrapServer()backoffBeforeAvailableBootstrapServer()

  • KafkaRaftClient.java:4 处改动(1 个新方法含配置检查,3 个调用点)

效果

当 voter 通过 AddVoter/RemoveVoter API 变更后,broker 会自动更新 bootstrap servers 列表,确保在需要重新发现 leader 时能够连接到新的 voter 地址。

兼容性

  • 向后兼容:不改变现有 API,默认启用

  • 可配置:通过 controller.quorum.dynamic.bootstrap.servers.enable 控制

  • 无性能影响:只在日志追加时检查更新

  • 线程安全:单线程模型,无并发问题

kafka 自带测试

1
./gradlew test

其中有一个测试存在问题,是和 streams 相关的,相关功能和这次的变更无关,且重跑就正常了

1
./gradlew :streams:test

本地测试

测试环境信息

1
2
3
4
5
6
7
# work @ bjyz-201605-m32-wise044 in /home/disk0/nizhenhyang/kafka-kraft-voter-fix-test [10:59:43] 
$ kaf nodes -b $(hostname -i):8140
ID ADDRESS CONTROLLER
0 10.144.79.15:8100 false
1 10.144.79.15:8110 false
2 10.144.79.15:8120 true
3 10.144.79.15:8140 false
1
2
3
4
5
6
7
8
9
10
# work @ bjyz-201605-m32-wise044 in /home/disk0/nizhenhyang/kafka-kraft-voter-fix-test/exec_env/b3 [11:02:13] 
$ ./bin/kafka-metadata-quorum.sh --bootstrap-server $(hostname -i):8140 describe --status
ClusterId: SBYtyuhkTNGeZco8wQg5hw
LeaderId: 0
LeaderEpoch: 1461
HighWatermark: 1382966
MaxFollowerLag: 0
MaxFollowerLagTimeMs: 0
CurrentVoters: [{"id": 0, "directoryId": "nBF16kyeLHulGTLHHObZIQ", "endpoints": ["CONTROLLER://10.144.79.15:8101"]}, {"id": 1, "directoryId": "V1zZfv_t96eXfS30k-09FQ", "endpoints": ["CONTROLLER://10.144.79.15:8111"]}, {"id": 2, "directoryId": "8YTCnijW9E49vJRze4z5HA", "endpoints": ["CONTROLLER://10.144.79.15:8121"]}]
CurrentObservers: [{"id": 3, "directoryId": "zgKDwvKNIdbbNR-fxF44tw"}]

问题复现

turn1

我们需要修改 0 、 1 的端口,以模拟实现迁移的场景,最后将 2 stop ,观察 3 的日志

  • 0: 8100 -> 8200

  • 1: 8110 -> 8210

第一轮居然没有复现,3 依旧正常,我已经确认我执行的是没有问题的。排查发现,流程大概是这样的

  1. 初始化的时候,leader 为 1 ,3 中 0、1、2 的信息都是正确的,3 连接 1

  2. 在 0 切换 port 后,leader 依旧为 1 ,3 中只有 1、2 的信息是正确的,3 连接 1

  3. 在 1 切换 port 后,leader 切换为 0,3 中只有 2 的信息是正确的,此时 3 断开了和 1 的连接,但是由于依旧保留 2 正确的信息,所以可以发现新的 0

  4. 在 2 stop 后,leader 依旧为 0 ,3 和 0 保持连接,所以 3 依旧是正常的。即使 2 切换 port ,3 依旧连这 0,所以现象是正常的

综上,上一次调研的时候应该遇到的是这种场景,实际上只用再把 0 restart ,3 就挂了。

turn2

接下来我们只用进行如下操作,就可以让 3 挂掉了

  • 2: 8120 –> 8220

  • stop 0

可以看到,此时 3 挂了

1
2
3
[2025-12-19 11:51:15,218] WARN [RaftManager id=3] Connection to node 0 (/10.144.79.15:8101) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient)
[2025-12-19 11:51:15,218] WARN [RaftManager id=3] Connection to node 1 (/10.144.79.15:8111) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient)
[2025-12-19 11:51:15,238] WARN [RaftManager id=3] Connection to node 2 (/10.144.79.15:8121) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient)

复现了预期的故障场景

新机制测试

当前的 leader 为 0,则将 3 中的 controller.quorum.voters配置里,1 和 2 都是错误的,仅 c0 是正确的。将 debug 日志开启,此时日志中有 voter 列表更新的日志

1
[2025-12-19 16:48:02,383] DEBUG [RaftManager id=3] Updated bootstrap servers to: [10.144.79.15:8201 (id: 0 rack: null), 10.144.79.15:8221 (id: 2 rack: null), 10.144.79.15:8211 (id: 1 rack: null)] (org.apache.kafka.raft.KafkaRaftClient)

此时将 0 stop ,3 依旧正常,有请求新 leader 的 fetch 日志,说明其可以感知到其他的 voter

1
[2025-12-19 16:50:51,531] DEBUG [RaftManager id=3] Received FETCH response from node 1 for request with header RequestHeader(apiKey=FETCH, apiVersion=17, clientId=raft-client-3, correlationId=359, headerVersion=2): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=0, responses=[FetchableTopicResponse(topic='', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[PartitionData(partitionIndex=0, errorCode=0, highWatermark=1425551, lastStableOffset=-1, logStartOffset=1292218, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=1, leaderEpoch=1515), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=[], preferredReadReplica=-1, records=MemoryRecords(size=72, buffer=java.nio.HeapByteBuffer[pos=0 lim=72 cap=112]))])], nodeEndpoints=[NodeEndpoint(nodeId=1, host='10.144.79.15', port=8211, rack=null)]) (org.apache.kafka.clients.NetworkClient)
1
2
3
4
5
6
# work @ bjyz-201605-m32-wise044 in /home/disk0/nizhenhyang/kafka-kraft-voter-fix-test/exec_env/b3 [16:49:58] 
$ kaf nodes -b $(hostname -i):8140
ID ADDRESS CONTROLLER
1 10.144.79.15:8210 true
2 10.144.79.15:8220 false
3 10.144.79.15:8140 false

总结

验证新特性时,不能仅靠测试,因为测试可能会有遗漏,必须结合源码进行分析。也就源码分析优先,实验测试辅助。