./bin/kafka-metadata-quorum.sh describe --status 中 CurrentObservers 为什么有时候会包含已经下线的 broker 信息?
背景
今天有同事咨询了一个问题,他发现在本地测试 kraft 集群时,当一个 broker 下线后,执行 ./bin/kafka-metadata-quorum.sh describe --status 的输出结果中,在 CurrentObservers 里依旧有这个 broker 的信息。我其实也挺好奇的,所以就探索了一下 CurrentObservers 的更新机制。kafka 的版本为 3.9.1
注:源码分析基于 claude code + glm4.6 和 trae ,kafka 版本为 3.9.1
是什么
执行命令 kafka-metadata-quorum.sh describe --status 的输出样例如下:
1 | ./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:15001 describe --status |
CurrentObservers 位于最后一行,它记录了所有非 voter 的成员信息。在 kraft 的模式下,observer 不参与 raft 投票决策及元数据同步,也即是普通的 broker 节点。
如何定义
它的核心数据结构定义在 LeaderState.java:81 中,是一个纯内存的数据结构,如下:
1 | private final Map<ReplicaKey, ReplicaState> observerStates = new HashMap<>(); |
其中:
- Key:
ReplicaKey- 包含节点ID和目录ID的唯一标识 - Value:
ReplicaState- 包含节点的状态信息,如日志偏移量、最后拉取时间等
如何新增
新 Observer 节点首次连接
触发场景: 节点准备加入集群但尚未成为正式Voter时
工作流程:
- 新节点启动后向Leader发送FetchRequest
- 由于节点不在Voter集合中,被自动识别为Observer
- 节点开始同步数据,为后续成为Voter做准备
1 | // LeaderState |
Voter 节点降级为 Observer
触发条件:
- 执行集群重配置操作
- 某个Voter从VoterSet中被移除
- 该Voter的状态被保留并转移到Observer列表
工作流程:
- 接收新的VoterSet配置
- 识别出被移除的Voter节点
- 清除该节点的监听器信息
- 将其状态转移到
observerStates中
1 | private void updateVoterAndObserverStates(VoterSet lastVoterSet) { |
Observer 发送 Fetch 请求
触发条件:
- 节点向Leader发起FetchRequest请求 (
pollFollowerAsObserver) - 该节点不在当前Voter集合中(
voterStates.get(replicaKey.id()) == null) - 或者节点的ReplicaKey与现有记录不匹配(
!state.matchesKey(replicaKey))
工作流程:
- Leader接收到非Voter节点的 FetchRequest
- 调用
updateReplicaState方法 getOrCreateReplicaState方法判断该节点不是Voter- 自动将该节点添加到
observerStates中
1 | private ReplicaState getOrCreateReplicaState(ReplicaKey replicaKey) { |
如何删除
Observer 升级为 Voter
触发条件:
- 执行AddVoter操作
- Observer节点被正式添加到Voter集合中
- 新的VoterSet配置生效
工作流程:
- 接收AddVoter请求
- 验证Observer节点状态(是否追上Leader等)
- 更新VoterSet配置
- 从
observerStates中移除该节点 - 在
voterStates中添加该节点
1 | private void updateVoterAndObserverStates(VoterSet lastVoterSet) { |
集群重新配置
触发时机 :当 VoterSet 发生变化时,会重新计算 observer 状态
1 | public boolean updateLocalState( |
Observer 会话超时
触发条件:
- Observer超过5分钟(300秒)未向Leader发送FetchRequest
- 当前时间与最后拉取时间的差值超过
OBSERVER_SESSION_TIMEOUT_MS - 该Observer不是Leader节点本身
触发时机:
- 仅在处理DESCRIBE_QUORUM请求时触发:当外部工具或客户端调用DescribeQuorum API时
observerStates(currentTimeMs)方法被调用的唯一位置是在handleDescribeQuorumRequest中
1 | private void clearInactiveObservers(final long currentTimeMs) { |
如何检测会话超时
也即 clearInactiveObservers 调用的时机是什么?经过分析,目前发现是由 DESCRIBE_QUORUM 请求触发执行,但是 kafka 集群本身不会执行这个请求,有如下几处调用:
kafka-metadata-quorum.sh,底层会调用AdminClientAdminClient:AdminClient.create(properties).describeMetadataQuorum().quorumInfo().get()
所以它有什么用?
目前看来,它主要是用于为提升到 voter 做准备。当一个实例准备提升 voter 时,它是在线的,所以在 observer 中的信息必然是最新的。而对于 observer 列表中的过期数据,并不会对 kraft 机制造成什么实际的影响:不参与选举、不影响集群元信息 ,且只是主动从 leader 除拉数据。不过假如长时间不清理的话,会导致其中存了大量过期的 observer 数据。
同事的问题的原因是什么?
在回到最初的问题,为什么 broker 下线了,但是在查询 Observers 的时候 broker 的信息未删除?查询的时候必定会执行 DESCRIBE_QUORUM 请求,所以 clearInactiveObservers 必定会触发。那么原因只可能是同事在下线 broker 后立刻就进行了查询,未到 5 min (OBSERVER_SESSION_TIMEOUT_MS)过期阈值。