KIP-853 功能验证 - kraft 模式下动态变更 kafka controller voter 列表

KIP-853: KRaft Controller Membership Changes ,其中较为重要的一点特性为,通过外围脚本对 controller 的成员进行动态增删。

背景

最近有个新的业务需求,想使用独立的 Kafka 集群。由于是新搭建的,没有历史负担,所以想直接用无 Zookeeper 的版本。在使用 Kraft 代替 Zookeeper 的场景下,需要在文件中配置 controller.quorum.voters 列表,样例为:

1
2
3
# The connect string for the controller quorum
# 格式:brokerid @ ip : port
controller.quorum.voters=0@10.62.173.11:17001,1@10.62.173.11:17011,2@10.62.173.11:17021

存在的问题是,如果 voters 中有实例发生了迁移,那么就需要更新配置文件中的列表,然后重启服务。虽然这可以用外围自动化的方案来解决,但是随着集群规模的增大,因为部分实例的迁移就需要重启其它全量的实例是有些不合理的。

如何了解到 KIP-853 的

在翻阅 Kafka 源码时,发现在 KafkaRaftClient 类(java/org/apache/kafka/raft/KafkaRaftClient.java)中存在动态更新 voter 的逻辑,大致样例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Specialized add voter handler
this.addVoterHandler = new AddVoterHandler(
// ...
);

// Specialized remove voter handler
this.removeVoterHandler = new RemoveVoterHandler(
// ...
);

// Specialized update voter handler
this.updateVoterHandler = new UpdateVoterHandler(
// ...
);

在对应的逻辑中插入了一些日志,想确认一下在 voter 上线和下线时是否执行,发现没有执行,代码样例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 private CompletableFuture<AddRaftVoterResponseData> handleAddVoterRequest(
RaftRequest.Inbound requestMetadata,
long currentTimeMs
) {
AddRaftVoterRequestData data = (AddRaftVoterRequestData) requestMetadata.data();
// TODO: just for debugging
logger.info("[debug] --> add voter 1");

// ...

// TODO: just for debugging
logger.info("[debug] --> add voter 2, info:{} , endpoint:{}", newVoter.get(), newVoterEndpoints);

return addVoterHandler.handleAddVoterRequest(
quorum.leaderStateOrThrow(),
newVoter.get(),
newVoterEndpoints,
currentTimeMs
);
};

但是既然有代码实现,那么肯定会在哪儿用到。在二次查找后,发现 Kafka 对于 voter 的动态操作暴露了对应的 api(java/org/apache/kafka/common/message/ApiMessageType.java):

1
2
3
4
5
6
7
public enum ApiMessageType {
// ....
ADD_RAFT_VOTER("AddRaftVoter", (short) 80, AddRaftVoterRequestData.SCHEMAS, AddRaftVoterResponseData.SCHEMAS, (short) 0, (short) 0, (short) 0, (short) -1, EnumSet.of(ListenerType.CONTROLLER, ListenerType.BROKER), false),
REMOVE_RAFT_VOTER("RemoveRaftVoter", (short) 81, RemoveRaftVoterRequestData.SCHEMAS, RemoveRaftVoterResponseData.SCHEMAS, (short) 0, (short) 0, (short) 0, (short) -1, EnumSet.of(ListenerType.CONTROLLER, ListenerType.BROKER), false),
UPDATE_RAFT_VOTER("UpdateRaftVoter", (short) 82, UpdateRaftVoterRequestData.SCHEMAS, UpdateRaftVoterResponseData.SCHEMAS, (short) 0, (short) 0, (short) 0, (short) -1, EnumSet.of(ListenerType.CONTROLLER), false),
// ...
}

本来想说既然有相关的 api 接口,那我就可以尝试自己去实现一个 client 去请求它,不过由于 golang 的 sdk 还不支持请求这个 api ,所以只能使用 java 来实现。由于对 java 不太熟悉,所以就想着看一下官方提供的 tool 是如何实现的,类似于 ./bin/kafka-topics.sh ... 的这种脚本。在搜索的过程中,居然发现 kafka-metadata-quorum.sh 就支持请求操作 voter 的 api :

1
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.MetadataQuorumCommand "$@"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class MetadataQuorumCommand {
// .....
if (command.equals("describe")) {
// ....
} else if (command.equals("add-controller")) {
if (optionalCommandConfig == null) {
throw new TerseException("You must supply the configuration file of the controller you are " +
"adding when using add-controller.");
}
handleAddController(admin,
namespace.getBoolean("dry_run"),
props);
} else if (command.equals("remove-controller")) {
handleRemoveController(admin,
namespace.getInt("controller_id"),
namespace.getString("controller_directory_id"),
namespace.getBoolean("dry_run"));
} else {
throw new IllegalStateException(format("Unknown command: %s", command));
}

我之前使用这个脚本查询 controller 状态时,执行过它的 --help,当时没注意到这两指令。最后在网上搜了相关类和配置的关键词,就找到了这个特性对应的提案: KIP-853: KRaft Controller Membership Changes

本地验证

我使用的 kafka 版本为 3.9.1 ,正好就是这个特性开始支持的版本。在进行本地验证时,有三点需要高优确认:

  1. 初始的 controller 集群是如何构建的

  2. controller 对应的实例应该如何进行切换

  3. 在 controller 发生变化时,broker 是否可以感知到

下文中,cx 代表 brokerid 为 x 的实例,cluster id 为 pTAG7hqbRBGQwKg5Hujn8gbin/kafka-storage.sh random-uuid

controller 集群初始化

作为起始节点 c0 (–standalone),broker 端口为 15000 ,controller 端口为 15001

1
2
./bin/kafka-storage.sh format --cluster-id pTAG7hqbRBGQwKg5Hujn8g --standalone --config config/kraft/server.properties.gen.properties
./bin/kafka-server-start.sh -daemon config/kraft/server.properties.gen.properties

c1(–no-initial-controllers),broker 端口为 15010 ,controller 端口为 15011

1
2
./bin/kafka-storage.sh format --cluster-id pTAG7hqbRBGQwKg5Hujn8g --no-initial-controllers --config config/kraft/server.properties.gen.properties
./bin/kafka-server-start.sh -daemon config/kraft/server.properties.gen.properties

c2(–no-initial-controllers),broker 端口为 15020 ,controller 端口为 15021

1
2
./bin/kafka-storage.sh format --cluster-id pTAG7hqbRBGQwKg5Hujn8g --no-initial-controllers --config config/kraft/server.properties.gen.properties
./bin/kafka-server-start.sh -daemon config/kraft/server.properties.gen.properties

此时,controller 的状态为

1
2
3
4
5
6
7
8
9
./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:15001 describe --status
ClusterId: pTAG7hqbRBGQwKg5Hujn8g
LeaderId: 0
LeaderEpoch: 1
HighWatermark: 1036
MaxFollowerLag: 0
MaxFollowerLagTimeMs: 0
CurrentVoters: [{"id": 0, "directoryId": "3rDv-etwR2e90gwUcBllDg", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:15001"]}]
CurrentObservers: [{"id": 2, "directoryId": "JCX-hbVdtxCJl7geeqjNmA"}, {"id": 1, "directoryId": "9dk70WRte2_p__mZ_O0Bhg"}]

可以看到,此时 controller 中只有一个 broker 0 ,接下来需要将 1 和 2 提升为 controller。需要注意的是,在配置中,listeners 需要显示指定 ip 信息,否则会使用 localhost ,无法被外部访问

  • listeners=PLAINTEXT://yq01-build-rd2.yq01.baidu.com:17020,CONTROLLER://yq01-build-rd2.yq01.baidu.com:17021

  • listeners=PLAINTEXT://:17020,CONTROLLER://:17021

1
2
3
4
5
6
7
8
9
10
11
12
static Set<RaftVoterEndpoint> getControllerAdvertisedListeners(
Properties props
) throws Exception {
// ...
LinkedHashSet<RaftVoterEndpoint> results = new LinkedHashSet<>();
for (String listenerName : props.getProperty(
// ....
results.add(new RaftVoterEndpoint(endpoint.listenerName().get(),
endpoint.host() == null ? "localhost" : endpoint.host(),
endpoint.port()));
}
}
1
2
3
4
c1
./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:15001 --command-config config/kraft/server.properties.gen.properties add-controller
c2
./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:15001 --command-config config/kraft/server.properties.gen.properties add-controller

此时,可以看到,1 和 2 均成为了 controller 了:

1
2
3
4
5
6
7
8
9
./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:15001 describe --status
ClusterId: pTAG7hqbRBGQwKg5Hujn8g
LeaderId: 0
LeaderEpoch: 1
HighWatermark: 2174
MaxFollowerLag: 0
MaxFollowerLagTimeMs: 0
CurrentVoters: [{"id": 0, "directoryId": "3rDv-etwR2e90gwUcBllDg", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:15001"]}, {"id": 1, "directoryId": "9dk70WRte2_p__mZ_O0Bhg", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:15011"]}, {"id": 2, "directoryId": "JCX-hbVdtxCJl7geeqjNmA", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:15021"]}]
CurrentObservers: []

c3 作为 observer 加入及读写验证

一般集群中除了 controller 外,还有其它普通的 broker ,这里将 broker 3 作为普通节点加入到集群中,端口为 15030

1
2
3
# c3
./bin/kafka-storage.sh format --cluster-id pTAG7hqbRBGQwKg5Hujn8g --no-initial-controllers --config config/kraft/server.properties.gen.properties
./bin/kafka-server-start.sh -daemon config/kraft/server.properties.gen.properties

此时可以看到,c3 为 observer 成员

1
2
3
4
5
6
7
8
9
./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:15021 describe --status
ClusterId: pTAG7hqbRBGQwKg5Hujn8g
LeaderId: 0
LeaderEpoch: 1
HighWatermark: 2572
MaxFollowerLag: 1
MaxFollowerLagTimeMs: 487
CurrentVoters: [{"id": 0, "directoryId": "3rDv-etwR2e90gwUcBllDg", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:15001"]}, {"id": 1, "directoryId": "9dk70WRte2_p__mZ_O0Bhg", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:15011"]}, {"id": 2, "directoryId": "JCX-hbVdtxCJl7geeqjNmA", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:15021"]}]
CurrentObservers: [{"id": 3, "directoryId": "b_ZpCQSJ0LBWf4RS5-zuqg"}]

通过 c3 进行读写验证,均正常

1
2
3
4
./bin/kafka-topics.sh --create --topic hello_world_topic --bootstrap-server yq01-build-rd2.yq01.baidu.com:15030 --replication-factor 3 --partitions 2
kaf topics -b yq01-build-rd2.yq01.baidu.com:15030
kaf -b yq01-build-rd2.yq01.baidu.com:15030 produce hello_world_topic
kaf -b yq01-build-rd2.yq01.baidu.com:15030 consume hello_world_topic -g nzy_test

controller 实例异常退出

在上节的最后,leader 为 0 ,那么这个时候,把 leader 干掉会怎样?

1
ps aux |grep kafka | grep c0 | awk '{print $2}' | xargs kill

此时 leader 切换为 1,但是 broker 0 依旧在 voter 列表中

1
2
3
4
5
6
7
8
9
./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:15021 describe --status
ClusterId: pTAG7hqbRBGQwKg5Hujn8g
LeaderId: 1
LeaderEpoch: 2
HighWatermark: 3456
MaxFollowerLag: 3458
MaxFollowerLagTimeMs: -1
CurrentVoters: [{"id": 0, "directoryId": "3rDv-etwR2e90gwUcBllDg", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:15001"]}, {"id": 1, "directoryId": "9dk70WRte2_p__mZ_O0Bhg", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:15011"]}, {"id": 2, "directoryId": "JCX-hbVdtxCJl7geeqjNmA", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:15021"]}]
CurrentObservers: [{"id": 3, "directoryId": "b_ZpCQSJ0LBWf4RS5-zuqg"}]

通过脚本将 broker 0 移除,可以看到,此时 broker 0 没了

1
2
# 将 leader 删除
./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:15021 remove-controller --controller-id 0 --controller-directory-id 3rDv-etwR2e90gwUcBllDg
1
2
3
4
5
6
7
8
9
./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:15021 describe --status
ClusterId: pTAG7hqbRBGQwKg5Hujn8g
LeaderId: 1
LeaderEpoch: 2
HighWatermark: 3614
MaxFollowerLag: 0
MaxFollowerLagTimeMs: 0
CurrentVoters: [{"id": 1, "directoryId": "9dk70WRte2_p__mZ_O0Bhg", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:15011"]}, {"id": 2, "directoryId": "JCX-hbVdtxCJl7geeqjNmA", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:15021"]}]
CurrentObservers: [{"id": 3, "directoryId": "b_ZpCQSJ0LBWf4RS5-zuqg"}]

controller 实例所在 ip 变更

为了模拟实例迁移的情况,将 0/1/2 分别修改端口,验证 3 的信息是否可以实时更新。

修改 c0 的端口并启动, 15xxx -> 17xxx。对于起始节点 0 的情况,需要确认参数为 –standalone 还是 –no-initial-controllers , 以及 bootstrap-controller 的信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# c0
ps aux |grep kafka | grep c0 | awk '{print $2}' | xargs kill
rm -rf data/*

./bin/kafka-storage.sh format --cluster-id pTAG7hqbRBGQwKg5Hujn8g --no-initial-controllers --config config/kraft/server.properties.gen.properties
./bin/kafka-server-start.sh -daemon config/kraft/server.properties.gen.properties
./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:15011 --command-config config/kraft/server.properties.gen.properties add-controller

./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:15021 describe --status
ClusterId: pTAG7hqbRBGQwKg5Hujn8g
LeaderId: 1
LeaderEpoch: 2
HighWatermark: 4402
MaxFollowerLag: 1
MaxFollowerLagTimeMs: 435
CurrentVoters: [{"id": 0, "directoryId": "t6JHGm7q3zwlOzX-4u0wHg", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:17001"]}, {"id": 1, "directoryId": "9dk70WRte2_p__mZ_O0Bhg", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:15011"]}, {"id": 2, "directoryId": "JCX-hbVdtxCJl7geeqjNmA", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:15021"]}]
CurrentObservers: [{"id": 3, "directoryId": "b_ZpCQSJ0LBWf4RS5-zuqg"}]

修改 c1 的端口并启动,15xxx -> 17xxx

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ps aux |grep kafka | grep c1 | awk '{print $2}' | xargs kill
rm -rf data/*
./bin/kafka-storage.sh format --cluster-id pTAG7hqbRBGQwKg5Hujn8g --no-initial-controllers --config config/kraft/server.properties.gen.properties
./bin/kafka-server-start.sh -daemon config/kraft/server.properties.gen.properties
./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:15021 remove-controller --controller-id 1 --controller-directory-id 9dk70WRte2_p__mZ_O0Bhg
./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:15021 --command-config config/kraft/server.properties.gen.properties add-controller

./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:15021 describe --status
ClusterId: pTAG7hqbRBGQwKg5Hujn8g
LeaderId: 0
LeaderEpoch: 3
HighWatermark: 4823
MaxFollowerLag: 0
MaxFollowerLagTimeMs: 0
CurrentVoters: [{"id": 0, "directoryId": "t6JHGm7q3zwlOzX-4u0wHg", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:17001"]}, {"id": 1, "directoryId": "wQAnhBv-4eltR9-Pfn5Ylw", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:17011"]}, {"id": 2, "directoryId": "JCX-hbVdtxCJl7geeqjNmA", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:15021"]}]
CurrentObservers: [{"id": 3, "directoryId": "b_ZpCQSJ0LBWf4RS5-zuqg"}]

修改 c2 的端口并启动,15xxx -> 17xxx

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ps aux |grep kafka | grep c2 | awk '{print $2}' | xargs kill
rm -rf data/*
./bin/kafka-storage.sh format --cluster-id pTAG7hqbRBGQwKg5Hujn8g --no-initial-controllers --config config/kraft/server.properties.gen.properties
./bin/kafka-server-start.sh -daemon config/kraft/server.properties.gen.properties
./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:17001 remove-controller --controller-id 2 --controller-directory-id JCX-hbVdtxCJl7geeqjNmA
./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:17001 --command-config config/kraft/server.properties.gen.properties add-controller

./bin/kafka-metadata-quorum.sh --bootstrap-controller yq01-build-rd2.yq01.baidu.com:17021 describe --status
ClusterId: pTAG7hqbRBGQwKg5Hujn8g
LeaderId: 0
LeaderEpoch: 3
HighWatermark: 5860
MaxFollowerLag: 0
MaxFollowerLagTimeMs: 0
CurrentVoters: [{"id": 0, "directoryId": "t6JHGm7q3zwlOzX-4u0wHg", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:17001"]}, {"id": 1, "directoryId": "wQAnhBv-4eltR9-Pfn5Ylw", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:17011"]}, {"id": 2, "directoryId": "V5qRTKnQDpuMeRkLpkdOFA", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:17021"]}]
CurrentObservers: [{"id": 3, "directoryId": "b_ZpCQSJ0LBWf4RS5-zuqg"}]

此时对于 broker 3 ,它的读写依旧是正常的

1
2
3
4
kaf topics -b yq01-build-rd2.yq01.baidu.com:15030 执行正常
NAME PARTITIONS REPLICAS
__consumer_offsets 50 1
hello_world_topic 2 3

通过它来查询集群元信息也是正常的

1
2
3
4
5
6
7
8
9
./bin/kafka-metadata-quorum.sh --bootstrap-server yq01-build-rd2.yq01.baidu.com:15030 describe --status
ClusterId: pTAG7hqbRBGQwKg5Hujn8g
LeaderId: 0
LeaderEpoch: 3
HighWatermark: 6305
MaxFollowerLag: 0
MaxFollowerLagTimeMs: 0
CurrentVoters: [{"id": 0, "directoryId": "t6JHGm7q3zwlOzX-4u0wHg", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:17001"]}, {"id": 1, "directoryId": "wQAnhBv-4eltR9-Pfn5Ylw", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:17011"]}, {"id": 2, "directoryId": "V5qRTKnQDpuMeRkLpkdOFA", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:17021"]}]
CurrentObservers: [{"id": 3, "directoryId": "b_ZpCQSJ0LBWf4RS5-zuqg"}]

在上节中提到的,在 kafka 的源码中加的日志逻辑,也在日志文件中有体现了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[2025-08-11 16:20:35,073] INFO [RaftManager id=0] [debug] --> add voter, info:ReplicaKey(id=1, directoryId=Optional[9dk70WRte2_p__mZ_O0Bhg]) , endpoint:Endpoints(endpoints={ListenerName(CONTROLLER)=localhost/<unresolved>:15011}) (org.apache.kafka.raft.KafkaRaftClient)
[2025-08-11 16:20:35,178] INFO [RaftManager id=0] [debug] --> update voter, info:ReplicaKey(id=1, directoryId=Optional[9dk70WRte2_p__mZ_O0Bhg]) , endpointEndpoints(endpoints={ListenerName(CONTROLLER)=yq01-build-rd2.yq01.baidu.com/<unresolved>:15011}) (org.apache.kafka.raft.KafkaRaftClient)
[2025-08-11 16:22:53,375] INFO [RaftManager id=0] [debug] --> remove voter, info:ReplicaKey(id=1, directoryId=Optional[9dk70WRte2_p__mZ_O0Bhg]) (org.apache.kafka.raft.KafkaRaftClient)
[2025-08-11 16:25:57,616] INFO [RaftManager id=0] [debug] --> add voter, info:ReplicaKey(id=1, directoryId=Optional[9dk70WRte2_p__mZ_O0Bhg]) , endpoint:Endpoints(endpoints={ListenerName(CONTROLLER)=yq01-build-rd2.yq01.baidu.com/<unresolved>:15011}) (org.apache.kafka.raft.KafkaRaftClient)
[2025-08-11 16:28:51,424] INFO [RaftManager id=0] [debug] --> add voter, info:ReplicaKey(id=2, directoryId=Optional[JCX-hbVdtxCJl7geeqjNmA]) , endpoint:Endpoints(endpoints={ListenerName(CONTROLLER)=yq01-build-rd2.yq01.baidu.com/<unresolved>:15021}) (org.apache.kafka.raft.KafkaRaftClient)
[2025-08-11 16:49:20,944] INFO [RaftManager id=0] [debug] --> remove voter, info:ReplicaKey(id=1, directoryId=Optional[9dk70WRte2_p__mZ_O0Bhg]) (org.apache.kafka.raft.KafkaRaftClient)
[2025-08-11 16:49:43,102] INFO [RaftManager id=0] [debug] --> add voter, info:ReplicaKey(id=1, directoryId=Optional[wQAnhBv-4eltR9-Pfn5Ylw]) , endpoint:Endpoints(endpoints={ListenerName(CONTROLLER)=yq01-build-rd2.yq01.baidu.com/<unresolved>:17011}) (org.apache.kafka.raft.KafkaRaftClient)
[2025-08-11 16:52:34,300] INFO [RaftManager id=0] [debug] --> remove voter, info:ReplicaKey(id=2, directoryId=Optional[JCX-hbVdtxCJl7geeqjNmA]) (org.apache.kafka.raft.KafkaRaftClient)
[2025-08-11 16:52:51,297] INFO [RaftManager id=0] [debug] --> add voter, info:ReplicaKey(id=2, directoryId=Optional[V5qRTKnQDpuMeRkLpkdOFA]) , endpoint:Endpoints(endpoints={ListenerName(CONTROLLER)=yq01-build-rd2.yq01.baidu.com/<unresolved>:17021}) (org.apache.kafka.raft.KafkaRaftClient)
[2025-08-11 16:54:51,243] INFO [RaftManager id=0] [debug] --> remove voter, info:ReplicaKey(id=2, directoryId=Optional[JCX-hbVdtxCJl7geeqjNmA]) (org.apache.kafka.raft.KafkaRaftClient)
[2025-08-11 16:55:43,202] INFO [RaftManager id=0] [debug] --> remove voter, info:ReplicaKey(id=2, directoryId=Optional[JCX-hbVdtxCJl7geeqjNmA]) (org.apache.kafka.raft.KafkaRaftClient)
[2025-08-11 16:56:05,005] INFO [RaftManager id=0] [debug] --> remove voter, info:ReplicaKey(id=2, directoryId=Optional[V5qRTKnQDpuMeRkLpkdOFA]) (org.apache.kafka.raft.KafkaRaftClient)
[2025-08-11 16:56:17,098] INFO [RaftManager id=0] [debug] --> add voter, info:ReplicaKey(id=2, directoryId=Optional[V5qRTKnQDpuMeRkLpkdOFA]) , endpoint:Endpoints(endpoints={ListenerName(CONTROLLER)=yq01-build-rd2.yq01.baidu.com/<unresolved>:17021}) (org.apache.kafka.raft.KafkaRaftClient)
[2025-08-11 16:56:39,354] INFO [RaftManager id=0] [debug] --> add voter, info:ReplicaKey(id=2, directoryId=Optional[V5qRTKnQDpuMeRkLpkdOFA]) , endpoint:Endpoints(endpoints={ListenerName(CONTROLLER)=yq01-build-rd2.yq01.baidu.com/<unresolved>:17021}) (org.apache.kafka.raft.KafkaRaftClient)

controller leader 被 remove 了

如果 leader 还在运行但是被 remove 了会怎样?

1
2
3
4
5
6
7
8
9
10
11
./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:17001 remove-controller --controller-id 0 --controller-directory-id t6JHGm7q3zwlOzX-4u0wHg

./bin/kafka-metadata-quorum.sh --bootstrap-server yq01-build-rd2.yq01.baidu.com:15030 describe --status
ClusterId: pTAG7hqbRBGQwKg5Hujn8g
LeaderId: 1
LeaderEpoch: 4
HighWatermark: 8710
MaxFollowerLag: 0
MaxFollowerLagTimeMs: 0
CurrentVoters: [{"id": 1, "directoryId": "wQAnhBv-4eltR9-Pfn5Ylw", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:17011"]}, {"id": 2, "directoryId": "V5qRTKnQDpuMeRkLpkdOFA", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:17021"]}]
CurrentObservers: [{"id": 3, "directoryId": "b_ZpCQSJ0LBWf4RS5-zuqg"}, {"id": 0, "directoryId": "t6JHGm7q3zwlOzX-4u0wHg"}]

可以看到,它被降级为了 observers。那么再将其提升为 voter。

1
2
3
4
5
6
7
8
9
10
11
./bin/kafka-metadata-quorum.sh --bootstrap-controller yq01-build-rd2.yq01.baidu.com:17011 --command-config config/kraft/server.properties.gen.properties add-controller

./bin/kafka-metadata-quorum.sh --bootstrap-server yq01-build-rd2.yq01.baidu.com:15030 describe --status
ClusterId: pTAG7hqbRBGQwKg5Hujn8g
LeaderId: 1
LeaderEpoch: 4
HighWatermark: 8936
MaxFollowerLag: 0
MaxFollowerLagTimeMs: 117
CurrentVoters: [{"id": 0, "directoryId": "t6JHGm7q3zwlOzX-4u0wHg", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:17001"]}, {"id": 1, "directoryId": "wQAnhBv-4eltR9-Pfn5Ylw", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:17011"]}, {"id": 2, "directoryId": "V5qRTKnQDpuMeRkLpkdOFA", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:17021"]}]
CurrentObservers: [{"id": 3, "directoryId": "b_ZpCQSJ0LBWf4RS5-zuqg"}]

结论

就本地验证的结果而言,动态增删 controller 成员的功能是可用的。配以合适的外挂逻辑(设计中 …),无需重启集群的全部实例,可以实现 controller 成员信息的动态更新。