KIP-853: KRaft Controller Membership Changes ,其中较为重要的一点特性为,通过外围脚本对 controller 的成员进行动态增删。
背景 最近有个新的业务需求,想使用独立的 Kafka 集群。由于是新搭建的,没有历史负担,所以想直接用无 Zookeeper 的版本。在使用 Kraft 代替 Zookeeper 的场景下,需要在文件中配置 controller.quorum.voters 列表,样例为:
1 2 3 controller.quorum.voters=0@10.62.173.11:17001,1@10.62.173.11:17011,2@10.62.173.11:17021
存在的问题是,如果 voters 中有实例发生了迁移,那么就需要更新配置文件中的列表,然后重启服务。虽然这可以用外围自动化的方案来解决,但是随着集群规模的增大,因为部分实例的迁移就需要重启其它全量的实例是有些不合理的。
如何了解到 KIP-853 的 在翻阅 Kafka 源码时,发现在 KafkaRaftClient 类(java/org/apache/kafka/raft/KafkaRaftClient.java)中存在动态更新 voter 的逻辑,大致样例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 this .addVoterHandler = new AddVoterHandler( ); this .removeVoterHandler = new RemoveVoterHandler( ); this .updateVoterHandler = new UpdateVoterHandler( );
在对应的逻辑中插入了一些日志,想确认一下在 voter 上线和下线时是否执行,发现没有执行,代码样例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private CompletableFuture<AddRaftVoterResponseData> handleAddVoterRequest ( RaftRequest.Inbound requestMetadata, long currentTimeMs ) { AddRaftVoterRequestData data = (AddRaftVoterRequestData) requestMetadata.data(); logger.info("[debug] --> add voter 1" ); logger.info("[debug] --> add voter 2, info:{} , endpoint:{}" , newVoter.get(), newVoterEndpoints); return addVoterHandler.handleAddVoterRequest( quorum.leaderStateOrThrow(), newVoter.get(), newVoterEndpoints, currentTimeMs ); };
但是既然有代码实现,那么肯定会在哪儿用到。在二次查找后,发现 Kafka 对于 voter 的动态操作暴露了对应的 api(java/org/apache/kafka/common/message/ApiMessageType.java):
1 2 3 4 5 6 7 public enum ApiMessageType { ADD_RAFT_VOTER("AddRaftVoter" , (short ) 80 , AddRaftVoterRequestData.SCHEMAS, AddRaftVoterResponseData.SCHEMAS, (short ) 0 , (short ) 0 , (short ) 0 , (short ) -1 , EnumSet.of(ListenerType.CONTROLLER, ListenerType.BROKER), false ), REMOVE_RAFT_VOTER("RemoveRaftVoter" , (short ) 81 , RemoveRaftVoterRequestData.SCHEMAS, RemoveRaftVoterResponseData.SCHEMAS, (short ) 0 , (short ) 0 , (short ) 0 , (short ) -1 , EnumSet.of(ListenerType.CONTROLLER, ListenerType.BROKER), false ), UPDATE_RAFT_VOTER("UpdateRaftVoter" , (short ) 82 , UpdateRaftVoterRequestData.SCHEMAS, UpdateRaftVoterResponseData.SCHEMAS, (short ) 0 , (short ) 0 , (short ) 0 , (short ) -1 , EnumSet.of(ListenerType.CONTROLLER), false ), }
本来想说既然有相关的 api 接口,那我就可以尝试自己去实现一个 client 去请求它,不过由于 golang 的 sdk 还不支持请求这个 api ,所以只能使用 java 来实现。由于对 java 不太熟悉,所以就想着看一下官方提供的 tool 是如何实现的,类似于 ./bin/kafka-topics.sh ... 的这种脚本。在搜索的过程中,居然发现 kafka-metadata-quorum.sh 就支持请求操作 voter 的 api :
1 exec $(dirname $0 )/kafka-run-class.sh org.apache.kafka.tools.MetadataQuorumCommand "$@ "
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class MetadataQuorumCommand { if (command.equals("describe" )) { } else if (command.equals("add-controller" )) { if (optionalCommandConfig == null ) { throw new TerseException("You must supply the configuration file of the controller you are " + "adding when using add-controller." ); } handleAddController(admin, namespace.getBoolean("dry_run" ), props); } else if (command.equals("remove-controller" )) { handleRemoveController(admin, namespace.getInt("controller_id" ), namespace.getString("controller_directory_id" ), namespace.getBoolean("dry_run" )); } else { throw new IllegalStateException(format("Unknown command: %s" , command)); }
我之前使用这个脚本查询 controller 状态时,执行过它的 --help,当时没注意到这两指令。最后在网上搜了相关类和配置的关键词,就找到了这个特性对应的提案: KIP-853: KRaft Controller Membership Changes
本地验证 我使用的 kafka 版本为 3.9.1 ,正好就是这个特性开始支持的版本。在进行本地验证时,有三点需要高优确认:
初始的 controller 集群是如何构建的
controller 对应的实例应该如何进行切换
在 controller 发生变化时,broker 是否可以感知到
下文中,cx 代表 brokerid 为 x 的实例,cluster id 为 pTAG7hqbRBGQwKg5Hujn8g(bin/kafka-storage.sh random-uuid)
controller 集群初始化 作为起始节点 c0 (–standalone),broker 端口为 15000 ,controller 端口为 15001
1 2 ./bin/kafka-storage.sh format --cluster-id pTAG7hqbRBGQwKg5Hujn8g --standalone --config config/kraft/server.properties.gen.properties ./bin/kafka-server-start.sh -daemon config/kraft/server.properties.gen.properties
c1(–no-initial-controllers),broker 端口为 15010 ,controller 端口为 15011
1 2 ./bin/kafka-storage.sh format --cluster-id pTAG7hqbRBGQwKg5Hujn8g --no-initial-controllers --config config/kraft/server.properties.gen.properties ./bin/kafka-server-start.sh -daemon config/kraft/server.properties.gen.properties
c2(–no-initial-controllers),broker 端口为 15020 ,controller 端口为 15021
1 2 ./bin/kafka-storage.sh format --cluster-id pTAG7hqbRBGQwKg5Hujn8g --no-initial-controllers --config config/kraft/server.properties.gen.properties ./bin/kafka-server-start.sh -daemon config/kraft/server.properties.gen.properties
此时,controller 的状态为
1 2 3 4 5 6 7 8 9 ./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:15001 describe --status ClusterId: pTAG7hqbRBGQwKg5Hujn8g LeaderId: 0 LeaderEpoch: 1 HighWatermark: 1036 MaxFollowerLag: 0 MaxFollowerLagTimeMs: 0 CurrentVoters: [{"id": 0, "directoryId": "3rDv-etwR2e90gwUcBllDg", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:15001"]}] CurrentObservers: [{"id": 2, "directoryId": "JCX-hbVdtxCJl7geeqjNmA"}, {"id": 1, "directoryId": "9dk70WRte2_p__mZ_O0Bhg"}]
可以看到,此时 controller 中只有一个 broker 0 ,接下来需要将 1 和 2 提升为 controller。需要注意的是,在配置中,listeners 需要显示指定 ip 信息,否则会使用 localhost ,无法被外部访问
✅ listeners=PLAINTEXT://yq01-build-rd2.yq01.baidu.com:17020,CONTROLLER://yq01-build-rd2.yq01.baidu.com:17021
❌ listeners=PLAINTEXT://:17020,CONTROLLER://:17021
1 2 3 4 5 6 7 8 9 10 11 12 static Set<RaftVoterEndpoint> getControllerAdvertisedListeners ( Properties props ) throws Exception { LinkedHashSet<RaftVoterEndpoint> results = new LinkedHashSet<>(); for (String listenerName : props.getProperty( results.add(new RaftVoterEndpoint(endpoint.listenerName().get(), endpoint.host() == null ? "localhost" : endpoint.host(), endpoint.port())); } }
1 2 3 4 c1 ./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:15001 --command -config config/kraft/server.properties.gen.properties add-controller c2 ./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:15001 --command -config config/kraft/server.properties.gen.properties add-controller
此时,可以看到,1 和 2 均成为了 controller 了:
1 2 3 4 5 6 7 8 9 ./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:15001 describe --status ClusterId: pTAG7hqbRBGQwKg5Hujn8g LeaderId: 0 LeaderEpoch: 1 HighWatermark: 2174 MaxFollowerLag: 0 MaxFollowerLagTimeMs: 0 CurrentVoters: [{"id": 0, "directoryId": "3rDv-etwR2e90gwUcBllDg", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:15001"]}, {"id": 1, "directoryId": "9dk70WRte2_p__mZ_O0Bhg", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:15011"]}, {"id": 2, "directoryId": "JCX-hbVdtxCJl7geeqjNmA", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:15021"]}] CurrentObservers: []
c3 作为 observer 加入及读写验证 一般集群中除了 controller 外,还有其它普通的 broker ,这里将 broker 3 作为普通节点加入到集群中,端口为 15030
1 2 3 ./bin/kafka-storage.sh format --cluster-id pTAG7hqbRBGQwKg5Hujn8g --no-initial-controllers --config config/kraft/server.properties.gen.properties ./bin/kafka-server-start.sh -daemon config/kraft/server.properties.gen.properties
此时可以看到,c3 为 observer 成员
1 2 3 4 5 6 7 8 9 ./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:15021 describe --status ClusterId: pTAG7hqbRBGQwKg5Hujn8g LeaderId: 0 LeaderEpoch: 1 HighWatermark: 2572 MaxFollowerLag: 1 MaxFollowerLagTimeMs: 487 CurrentVoters: [{"id": 0, "directoryId": "3rDv-etwR2e90gwUcBllDg", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:15001"]}, {"id": 1, "directoryId": "9dk70WRte2_p__mZ_O0Bhg", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:15011"]}, {"id": 2, "directoryId": "JCX-hbVdtxCJl7geeqjNmA", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:15021"]}] CurrentObservers: [{"id": 3, "directoryId": "b_ZpCQSJ0LBWf4RS5-zuqg"}]
通过 c3 进行读写验证,均正常
1 2 3 4 ./bin/kafka-topics.sh --create --topic hello_world_topic --bootstrap-server yq01-build-rd2.yq01.baidu.com:15030 --replication-factor 3 --partitions 2 kaf topics -b yq01-build-rd2.yq01.baidu.com:15030 kaf -b yq01-build-rd2.yq01.baidu.com:15030 produce hello_world_topic kaf -b yq01-build-rd2.yq01.baidu.com:15030 consume hello_world_topic -g nzy_test
controller 实例异常退出 在上节的最后,leader 为 0 ,那么这个时候,把 leader 干掉会怎样?
1 ps aux |grep kafka | grep c0 | awk '{print $2}' | xargs kill
此时 leader 切换为 1,但是 broker 0 依旧在 voter 列表中
1 2 3 4 5 6 7 8 9 ./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:15021 describe --status ClusterId: pTAG7hqbRBGQwKg5Hujn8g LeaderId: 1 LeaderEpoch: 2 HighWatermark: 3456 MaxFollowerLag: 3458 MaxFollowerLagTimeMs: -1 CurrentVoters: [{"id": 0, "directoryId": "3rDv-etwR2e90gwUcBllDg", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:15001"]}, {"id": 1, "directoryId": "9dk70WRte2_p__mZ_O0Bhg", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:15011"]}, {"id": 2, "directoryId": "JCX-hbVdtxCJl7geeqjNmA", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:15021"]}] CurrentObservers: [{"id": 3, "directoryId": "b_ZpCQSJ0LBWf4RS5-zuqg"}]
通过脚本将 broker 0 移除,可以看到,此时 broker 0 没了
1 2 ./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:15021 remove-controller --controller-id 0 --controller-directory-id 3rDv-etwR2e90gwUcBllDg
1 2 3 4 5 6 7 8 9 ./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:15021 describe --status ClusterId: pTAG7hqbRBGQwKg5Hujn8g LeaderId: 1 LeaderEpoch: 2 HighWatermark: 3614 MaxFollowerLag: 0 MaxFollowerLagTimeMs: 0 CurrentVoters: [{"id": 1, "directoryId": "9dk70WRte2_p__mZ_O0Bhg", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:15011"]}, {"id": 2, "directoryId": "JCX-hbVdtxCJl7geeqjNmA", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:15021"]}] CurrentObservers: [{"id": 3, "directoryId": "b_ZpCQSJ0LBWf4RS5-zuqg"}]
controller 实例所在 ip 变更 为了模拟实例迁移的情况,将 0/1/2 分别修改端口,验证 3 的信息是否可以实时更新。
修改 c0 的端口并启动, 15xxx -> 17xxx。对于起始节点 0 的情况,需要确认参数为 –standalone 还是 –no-initial-controllers , 以及 bootstrap-controller 的信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 # c0 ps aux |grep kafka | grep c0 | awk '{print $2}' | xargs kill rm -rf data/* ./bin/kafka-storage.sh format --cluster-id pTAG7hqbRBGQwKg5Hujn8g --no-initial-controllers --config config/kraft/server.properties.gen.properties ./bin/kafka-server-start.sh -daemon config/kraft/server.properties.gen.properties ./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:15011 --command-config config/kraft/server.properties.gen.properties add-controller ./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:15021 describe --status ClusterId: pTAG7hqbRBGQwKg5Hujn8g LeaderId: 1 LeaderEpoch: 2 HighWatermark: 4402 MaxFollowerLag: 1 MaxFollowerLagTimeMs: 435 CurrentVoters: [{"id": 0, "directoryId": "t6JHGm7q3zwlOzX-4u0wHg", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:17001"]}, {"id": 1, "directoryId": "9dk70WRte2_p__mZ_O0Bhg", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:15011"]}, {"id": 2, "directoryId": "JCX-hbVdtxCJl7geeqjNmA", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:15021"]}] CurrentObservers: [{"id": 3, "directoryId": "b_ZpCQSJ0LBWf4RS5-zuqg"}]
修改 c1 的端口并启动,15xxx -> 17xxx
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ps aux |grep kafka | grep c1 | awk '{print $2}' | xargs kill rm -rf data/* ./bin/kafka-storage.sh format --cluster-id pTAG7hqbRBGQwKg5Hujn8g --no-initial-controllers --config config/kraft/server.properties.gen.properties ./bin/kafka-server-start.sh -daemon config/kraft/server.properties.gen.properties ./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:15021 remove-controller --controller-id 1 --controller-directory-id 9dk70WRte2_p__mZ_O0Bhg ./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:15021 --command-config config/kraft/server.properties.gen.properties add-controller ./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:15021 describe --status ClusterId: pTAG7hqbRBGQwKg5Hujn8g LeaderId: 0 LeaderEpoch: 3 HighWatermark: 4823 MaxFollowerLag: 0 MaxFollowerLagTimeMs: 0 CurrentVoters: [{"id": 0, "directoryId": "t6JHGm7q3zwlOzX-4u0wHg", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:17001"]}, {"id": 1, "directoryId": "wQAnhBv-4eltR9-Pfn5Ylw", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:17011"]}, {"id": 2, "directoryId": "JCX-hbVdtxCJl7geeqjNmA", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:15021"]}] CurrentObservers: [{"id": 3, "directoryId": "b_ZpCQSJ0LBWf4RS5-zuqg"}]
修改 c2 的端口并启动,15xxx -> 17xxx
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ps aux |grep kafka | grep c2 | awk '{print $2}' | xargs kill rm -rf data/* ./bin/kafka-storage.sh format --cluster-id pTAG7hqbRBGQwKg5Hujn8g --no-initial-controllers --config config/kraft/server.properties.gen.properties ./bin/kafka-server-start.sh -daemon config/kraft/server.properties.gen.properties ./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:17001 remove-controller --controller-id 2 --controller-directory-id JCX-hbVdtxCJl7geeqjNmA ./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:17001 --command-config config/kraft/server.properties.gen.properties add-controller ./bin/kafka-metadata-quorum.sh --bootstrap-controller yq01-build-rd2.yq01.baidu.com:17021 describe --status ClusterId: pTAG7hqbRBGQwKg5Hujn8g LeaderId: 0 LeaderEpoch: 3 HighWatermark: 5860 MaxFollowerLag: 0 MaxFollowerLagTimeMs: 0 CurrentVoters: [{"id": 0, "directoryId": "t6JHGm7q3zwlOzX-4u0wHg", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:17001"]}, {"id": 1, "directoryId": "wQAnhBv-4eltR9-Pfn5Ylw", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:17011"]}, {"id": 2, "directoryId": "V5qRTKnQDpuMeRkLpkdOFA", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:17021"]}] CurrentObservers: [{"id": 3, "directoryId": "b_ZpCQSJ0LBWf4RS5-zuqg"}]
此时对于 broker 3 ,它的读写依旧是正常的
1 2 3 4 kaf topics -b yq01-build-rd2.yq01.baidu.com:15030 执行正常 NAME PARTITIONS REPLICAS __consumer_offsets 50 1 hello_world_topic 2 3
通过它来查询集群元信息也是正常的
1 2 3 4 5 6 7 8 9 ./bin/kafka-metadata-quorum.sh --bootstrap-server yq01-build-rd2.yq01.baidu.com:15030 describe --status ClusterId: pTAG7hqbRBGQwKg5Hujn8g LeaderId: 0 LeaderEpoch: 3 HighWatermark: 6305 MaxFollowerLag: 0 MaxFollowerLagTimeMs: 0 CurrentVoters: [{"id": 0, "directoryId": "t6JHGm7q3zwlOzX-4u0wHg", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:17001"]}, {"id": 1, "directoryId": "wQAnhBv-4eltR9-Pfn5Ylw", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:17011"]}, {"id": 2, "directoryId": "V5qRTKnQDpuMeRkLpkdOFA", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:17021"]}] CurrentObservers: [{"id": 3, "directoryId": "b_ZpCQSJ0LBWf4RS5-zuqg"}]
在上节中提到的,在 kafka 的源码中加的日志逻辑,也在日志文件中有体现了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 [2025-08-11 16:20:35,073] INFO [RaftManager id=0] [debug] --> add voter, info:ReplicaKey(id=1, directoryId=Optional[9dk70WRte2_p__mZ_O0Bhg]) , endpoint:Endpoints(endpoints={ListenerName(CONTROLLER)=localhost/<unresolved>:15011}) (org.apache.kafka.raft.KafkaRaftClient) [2025-08-11 16:20:35,178] INFO [RaftManager id=0] [debug] --> update voter, info:ReplicaKey(id=1, directoryId=Optional[9dk70WRte2_p__mZ_O0Bhg]) , endpointEndpoints(endpoints={ListenerName(CONTROLLER)=yq01-build-rd2.yq01.baidu.com/<unresolved>:15011}) (org.apache.kafka.raft.KafkaRaftClient) [2025-08-11 16:22:53,375] INFO [RaftManager id=0] [debug] --> remove voter, info:ReplicaKey(id=1, directoryId=Optional[9dk70WRte2_p__mZ_O0Bhg]) (org.apache.kafka.raft.KafkaRaftClient) [2025-08-11 16:25:57,616] INFO [RaftManager id=0] [debug] --> add voter, info:ReplicaKey(id=1, directoryId=Optional[9dk70WRte2_p__mZ_O0Bhg]) , endpoint:Endpoints(endpoints={ListenerName(CONTROLLER)=yq01-build-rd2.yq01.baidu.com/<unresolved>:15011}) (org.apache.kafka.raft.KafkaRaftClient) [2025-08-11 16:28:51,424] INFO [RaftManager id=0] [debug] --> add voter, info:ReplicaKey(id=2, directoryId=Optional[JCX-hbVdtxCJl7geeqjNmA]) , endpoint:Endpoints(endpoints={ListenerName(CONTROLLER)=yq01-build-rd2.yq01.baidu.com/<unresolved>:15021}) (org.apache.kafka.raft.KafkaRaftClient) [2025-08-11 16:49:20,944] INFO [RaftManager id=0] [debug] --> remove voter, info:ReplicaKey(id=1, directoryId=Optional[9dk70WRte2_p__mZ_O0Bhg]) (org.apache.kafka.raft.KafkaRaftClient) [2025-08-11 16:49:43,102] INFO [RaftManager id=0] [debug] --> add voter, info:ReplicaKey(id=1, directoryId=Optional[wQAnhBv-4eltR9-Pfn5Ylw]) , endpoint:Endpoints(endpoints={ListenerName(CONTROLLER)=yq01-build-rd2.yq01.baidu.com/<unresolved>:17011}) (org.apache.kafka.raft.KafkaRaftClient) [2025-08-11 16:52:34,300] INFO [RaftManager id=0] [debug] --> remove voter, info:ReplicaKey(id=2, directoryId=Optional[JCX-hbVdtxCJl7geeqjNmA]) (org.apache.kafka.raft.KafkaRaftClient) [2025-08-11 16:52:51,297] INFO [RaftManager id=0] [debug] --> add voter, info:ReplicaKey(id=2, directoryId=Optional[V5qRTKnQDpuMeRkLpkdOFA]) , endpoint:Endpoints(endpoints={ListenerName(CONTROLLER)=yq01-build-rd2.yq01.baidu.com/<unresolved>:17021}) (org.apache.kafka.raft.KafkaRaftClient) [2025-08-11 16:54:51,243] INFO [RaftManager id=0] [debug] --> remove voter, info:ReplicaKey(id=2, directoryId=Optional[JCX-hbVdtxCJl7geeqjNmA]) (org.apache.kafka.raft.KafkaRaftClient) [2025-08-11 16:55:43,202] INFO [RaftManager id=0] [debug] --> remove voter, info:ReplicaKey(id=2, directoryId=Optional[JCX-hbVdtxCJl7geeqjNmA]) (org.apache.kafka.raft.KafkaRaftClient) [2025-08-11 16:56:05,005] INFO [RaftManager id=0] [debug] --> remove voter, info:ReplicaKey(id=2, directoryId=Optional[V5qRTKnQDpuMeRkLpkdOFA]) (org.apache.kafka.raft.KafkaRaftClient) [2025-08-11 16:56:17,098] INFO [RaftManager id=0] [debug] --> add voter, info:ReplicaKey(id=2, directoryId=Optional[V5qRTKnQDpuMeRkLpkdOFA]) , endpoint:Endpoints(endpoints={ListenerName(CONTROLLER)=yq01-build-rd2.yq01.baidu.com/<unresolved>:17021}) (org.apache.kafka.raft.KafkaRaftClient) [2025-08-11 16:56:39,354] INFO [RaftManager id=0] [debug] --> add voter, info:ReplicaKey(id=2, directoryId=Optional[V5qRTKnQDpuMeRkLpkdOFA]) , endpoint:Endpoints(endpoints={ListenerName(CONTROLLER)=yq01-build-rd2.yq01.baidu.com/<unresolved>:17021}) (org.apache.kafka.raft.KafkaRaftClient)
controller leader 被 remove 了 如果 leader 还在运行但是被 remove 了会怎样?
1 2 3 4 5 6 7 8 9 10 11 ./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:17001 remove-controller --controller-id 0 --controller-directory-id t6JHGm7q3zwlOzX-4u0wHg ./bin/kafka-metadata-quorum.sh --bootstrap-server yq01-build-rd2.yq01.baidu.com:15030 describe --status ClusterId: pTAG7hqbRBGQwKg5Hujn8g LeaderId: 1 LeaderEpoch: 4 HighWatermark: 8710 MaxFollowerLag: 0 MaxFollowerLagTimeMs: 0 CurrentVoters: [{"id": 1, "directoryId": "wQAnhBv-4eltR9-Pfn5Ylw", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:17011"]}, {"id": 2, "directoryId": "V5qRTKnQDpuMeRkLpkdOFA", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:17021"]}] CurrentObservers: [{"id": 3, "directoryId": "b_ZpCQSJ0LBWf4RS5-zuqg"}, {"id": 0, "directoryId": "t6JHGm7q3zwlOzX-4u0wHg"}]
可以看到,它被降级为了 observers。那么再将其提升为 voter。
1 2 3 4 5 6 7 8 9 10 11 ./bin/kafka-metadata-quorum.sh --bootstrap-controller yq01-build-rd2.yq01.baidu.com:17011 --command-config config/kraft/server.properties.gen.properties add-controller ./bin/kafka-metadata-quorum.sh --bootstrap-server yq01-build-rd2.yq01.baidu.com:15030 describe --status ClusterId: pTAG7hqbRBGQwKg5Hujn8g LeaderId: 1 LeaderEpoch: 4 HighWatermark: 8936 MaxFollowerLag: 0 MaxFollowerLagTimeMs: 117 CurrentVoters: [{"id": 0, "directoryId": "t6JHGm7q3zwlOzX-4u0wHg", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:17001"]}, {"id": 1, "directoryId": "wQAnhBv-4eltR9-Pfn5Ylw", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:17011"]}, {"id": 2, "directoryId": "V5qRTKnQDpuMeRkLpkdOFA", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:17021"]}] CurrentObservers: [{"id": 3, "directoryId": "b_ZpCQSJ0LBWf4RS5-zuqg"}]
结论 就本地验证的结果而言,动态增删 controller 成员的功能是可用的。配以合适的外挂逻辑(设计中 …),无需重启集群的全部实例,可以实现 controller 成员信息的动态更新。