kafka-metadata-quorum.sh describe 中的 CurrentObservers 更新不及时

./bin/kafka-metadata-quorum.sh describe --statusCurrentObservers 为什么有时候会包含已经下线的 broker 信息?

背景

今天有同事咨询了一个问题,他发现在本地测试 kraft 集群时,当一个 broker 下线后,执行 ./bin/kafka-metadata-quorum.sh describe --status 的输出结果中,在 CurrentObservers 里依旧有这个 broker 的信息。我其实也挺好奇的,所以就探索了一下 CurrentObservers 的更新机制。kafka 的版本为 3.9.1

注:源码分析基于 claude code + glm4.6 和 trae ,kafka 版本为 3.9.1

是什么

执行命令 kafka-metadata-quorum.sh describe --status 的输出样例如下:

1
2
3
4
5
6
7
8
9
./bin/kafka-metadata-quorum.sh --bootstrap-controller 0.0.0.0:15001 describe --status
ClusterId: pTAG7hqbRBGQwKg5Hujn8g
LeaderId: 0
LeaderEpoch: 1
HighWatermark: 1036
MaxFollowerLag: 0
MaxFollowerLagTimeMs: 0
CurrentVoters: [{"id": 0, "directoryId": "3rDv-etwR2e90gwUcBllDg", "endpoints": ["CONTROLLER://yq01-build-rd2.yq01.baidu.com:15001"]}]
CurrentObservers: [{"id": 2, "directoryId": "JCX-hbVdtxCJl7geeqjNmA"}, {"id": 1, "directoryId": "9dk70WRte2_p__mZ_O0Bhg"}]

CurrentObservers 位于最后一行,它记录了所有非 voter 的成员信息。在 kraft 的模式下,observer 不参与 raft 投票决策及元数据同步,也即是普通的 broker 节点。

如何定义

它的核心数据结构定义在 LeaderState.java:81 中,是一个纯内存的数据结构,如下:

1
private final Map<ReplicaKey, ReplicaState> observerStates = new HashMap<>();

其中:

  • Key: ReplicaKey - 包含节点ID和目录ID的唯一标识
  • Value: ReplicaState - 包含节点的状态信息,如日志偏移量、最后拉取时间等

如何新增

新 Observer 节点首次连接

触发场景: 节点准备加入集群但尚未成为正式Voter时

工作流程:

  1. 新节点启动后向Leader发送FetchRequest
  2. 由于节点不在Voter集合中,被自动识别为Observer
  3. 节点开始同步数据,为后续成为Voter做准备
1
2
3
4
5
6
7
8
9
// LeaderState
private ReplicaState getOrCreateReplicaState(ReplicaKey replicaKey) {
ReplicaState state = voterStates.get(replicaKey.id());
if (state == null || !state.matchesKey(replicaKey)) {
observerStates.putIfAbsent(replicaKey, new ReplicaState(replicaKey, false, Endpoints.empty()));
return observerStates.get(replicaKey);
}
return state;
}

Voter 节点降级为 Observer

触发条件:

  1. 执行集群重配置操作
  2. 某个Voter从VoterSet中被移除
  3. 该Voter的状态被保留并转移到Observer列表

工作流程:

  1. 接收新的VoterSet配置
  2. 识别出被移除的Voter节点
  3. 清除该节点的监听器信息
  4. 将其状态转移到observerStates
1
2
3
4
5
6
7
8
private void updateVoterAndObserverStates(VoterSet lastVoterSet) {  
// ...
// Move any of the remaining old voters to observerStates
for (ReplicaState replicaStateEntry : oldVoterStates.values()) {
replicaStateEntry.clearListeners();
observerStates.putIfAbsent(replicaStateEntry.replicaKey, replicaStateEntry);
}
}

Observer 发送 Fetch 请求

触发条件:

  1. 节点向Leader发起FetchRequest请求 (pollFollowerAsObserver
  2. 该节点不在当前Voter集合中(voterStates.get(replicaKey.id()) == null
  3. 或者节点的ReplicaKey与现有记录不匹配(!state.matchesKey(replicaKey)

工作流程:

  1. Leader接收到非Voter节点的 FetchRequest
  2. 调用updateReplicaState方法
  3. getOrCreateReplicaState方法判断该节点不是Voter
  4. 自动将该节点添加到observerStates
1
2
3
4
5
6
7
8
private ReplicaState getOrCreateReplicaState(ReplicaKey replicaKey) {
ReplicaState state = voterStates.get(replicaKey.id());
if (state == null || !state.matchesKey(replicaKey)) {
observerStates.putIfAbsent(replicaKey, new ReplicaState(replicaKey, false, Endpoints.empty()));
return observerStates.get(replicaKey);
}
return state;
}

如何删除

Observer 升级为 Voter

触发条件:

  1. 执行AddVoter操作
  2. Observer节点被正式添加到Voter集合中
  3. 新的VoterSet配置生效

工作流程:

  1. 接收AddVoter请求
  2. 验证Observer节点状态(是否追上Leader等)
  3. 更新VoterSet配置
  4. observerStates中移除该节点
  5. voterStates中添加该节点
1
2
3
4
5
6
7
private void updateVoterAndObserverStates(VoterSet lastVoterSet) {
// ...
for (VoterSet.VoterNode voterNode : lastVoterSet.voterNodes()) {
// ...
observerStates.remove(voterNode.voterKey());
}
}

集群重新配置

触发时机 :当 VoterSet 发生变化时,会重新计算 observer 状态

1
2
3
4
5
6
7
  public boolean updateLocalState(
LogOffsetMetadata endOffsetMetadata,
VoterSet lastVoterSet
) {
// ...
updateVoterAndObserverStates(lastVoterSet);
}

Observer 会话超时

触发条件:

  • Observer超过5分钟(300秒)未向Leader发送FetchRequest
  • 当前时间与最后拉取时间的差值超过OBSERVER_SESSION_TIMEOUT_MS
  • 该Observer不是Leader节点本身

触发时机:

  • 仅在处理DESCRIBE_QUORUM请求时触发:当外部工具或客户端调用DescribeQuorum API时
  • observerStates(currentTimeMs)方法被调用的唯一位置是在handleDescribeQuorumRequest
1
2
3
4
5
6
private void clearInactiveObservers(final long currentTimeMs) {
observerStates.entrySet().removeIf(integerReplicaStateEntry ->
currentTimeMs - integerReplicaStateEntry.getValue().lastFetchTimestamp >= OBSERVER_SESSION_TIMEOUT_MS &&
!integerReplicaStateEntry.getKey().equals(localReplicaKey)
);
}

如何检测会话超时

也即 clearInactiveObservers 调用的时机是什么?经过分析,目前发现是由 DESCRIBE_QUORUM 请求触发执行,但是 kafka 集群本身不会执行这个请求,有如下几处调用:

  • kafka-metadata-quorum.sh ,底层会调用 AdminClient
  • AdminClientAdminClient.create(properties).describeMetadataQuorum().quorumInfo().get()

所以它有什么用?

目前看来,它主要是用于为提升到 voter 做准备。当一个实例准备提升 voter 时,它是在线的,所以在 observer 中的信息必然是最新的。而对于 observer 列表中的过期数据,并不会对 kraft 机制造成什么实际的影响:不参与选举、不影响集群元信息 ,且只是主动从 leader 除拉数据。不过假如长时间不清理的话,会导致其中存了大量过期的 observer 数据。

同事的问题的原因是什么?

在回到最初的问题,为什么 broker 下线了,但是在查询 Observers 的时候 broker 的信息未删除?查询的时候必定会执行 DESCRIBE_QUORUM 请求,所以 clearInactiveObservers 必定会触发。那么原因只可能是同事在下线 broker 后立刻就进行了查询,未到 5 min (OBSERVER_SESSION_TIMEOUT_MS)过期阈值。