kafka kraft 模式下元数据更新浅析

broker 端的元数据缓存在 kraft 模式下是如何更新的?

背景

在 kafka 下线 zk 后,每个 broker 维护集群元信息缓存的类由 ZkMetadataCache 切为了 KRaftMetadataCache ,和 ZkMetadataCache 中的 MetadataSnapshot 依赖 controller 的 UpdateMetadataRequest 来更新元数据不同的是,KRaftMetadataCache 中存储缓存的是 MetadataImage ,更新的方式就是通过 setImage 简单地覆盖原变量的值。需要了解 setImage 是由谁调用的,以及传入的参数是如何生成的。

注:源码分析基于 claude code + glm4.6 和 trae ,kafka 版本为 3.9.1

相关类定义

KRaftMetadataCache

1
2
3
4
5
6
7
8
9
10
11
12
class KRaftMetadataCache(...) extends MetadataCache with Logging with ConfigRepository {
// This is the cache state. Every MetadataImage instance is immutable, and updates
// replace this value with a completely new one. This means reads (which are not under
// any lock) need to grab the value of this variable once, and retain that read copy for
// the duration of their operation. Multiple reads of this value risk getting different
// image values.
@volatile private var _currentImage: MetadataImage = MetadataImage.EMPTY

def setImage(newImage: MetadataImage): Unit = {
_currentImage = newImage
}
}

MetadataImage

MetadataImage 定义如下,包含集群众多的信息,是不可变的

1
2
3
4
5
6
7
8
9
10
11
12
public final class MetadataImage {  
private final MetadataProvenance provenance; // 元信息溯源信息
private final FeaturesImage features; // 功能特性信息
private final ClusterImage cluster; // 集群Broker信息
private final TopicsImage topics; // Topic和分区信息
private final ConfigurationsImage configs; // 配置信息
private final ClientQuotasImage clientQuotas; // 客户端配额信息
private final ProducerIdsImage producerIds; // 生产者ID信息
private final AclsImage acls; // ACL访问控制信息
private final ScramImage scram; // SCRAM认证信息
private final DelegationTokenImage delegationTokens; // 委托令牌信息
}

MetadataDelta

表示元数据的增量变更,用于从一个 MetadataImage 生成另一个 MetadataImage

  • 基于现有的 MetadataImage 创建
  • 通过 replay() 方法应用 Raft 记录
  • 通过 apply() 方法生成新的 MetadataImage
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public final class MetadataDelta {
public static class Builder {
// ...
}

public MetadataDelta(MetadataImage image) {
this.image = image;
}

private ClusterDelta clusterDelta = null;
// ...

public void replay(ApiMessage record) {
MetadataRecordType type = MetadataRecordType.fromId(record.apiKey());
switch (type) {
case REGISTER_BROKER_RECORD:
replay((RegisterBrokerRecord) record);
break;
// ...
}
}

public MetadataImage apply(MetadataProvenance provenance) {
ClusterImage newCluster;
if (clusterDelta == null) {
newCluster = image.cluster();
} else {
newCluster = clusterDelta.apply();
}
// ...
return new MetadataImage(
newCluster,
// ...
)
}
}

是谁调用的

KRaftMetadataCachePublisher 专门负责更新 KRaftMetadataCache

1
2
3
4
// core/src/main/scala/kafka/server/metadata/KRaftMetadataCachePublisher.scala:35
override def onMetadataUpdate(
delta: MetadataDelta, newImage: MetadataImage, manifest: LoaderManifest): Unit = {
metadataCache.setImage(newImage)}

BrokerMetadataPublisher 更新元数据缓存并初始化其他管理器组件

1
2
3
4
5
6
7
// core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala:116
// Publish the new metadata image to the metadata cache.
override def onMetadataUpdate(delta: MetadataDelta,
newImage: MetadataImage, manifest: LoaderManifest): Unit = {
// ...
metadataCache.setImage(newImage)
}

上述两个调用类均实现了接口 MetadataPublisher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Publishes metadata deltas which we have loaded from the log and snapshots.
*
* Publishers receive a stream of callbacks from the metadata loader which keeps them notified
* of the latest cluster metadata. This interface abstracts away some of the complications of
* following the cluster metadata. For example, if the loader needs to read a snapshot, it will
* present the contents of the snapshot in the form of a delta from the previous state.
*/
public interface MetadataPublisher extends AutoCloseable {
// ...

/**
* Publish a new cluster metadata snapshot that we loaded.
*
* @param delta The delta between the previous state and the new one.
* @param newImage The complete new state.
* @param manifest A manifest which describes the contents of what was published.
* If we loaded a snapshot, this will be a SnapshotManifest.
* If we loaded a log delta, this will be a LogDeltaManifest.
*/
void onMetadataUpdate(
MetadataDelta delta,
MetadataImage newImage,
LoaderManifest manifest
);
}

调用链路概览: KRaftMetadataCachePublisher

事件注册

在 kafka server 启动时,初始化了 KRaftMetadataCachePublisher ,并将其加到 MetadataLoader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class SharedServer(...) extends Logging {
@volatile var loader: MetadataLoader = _
}

class ControllerServer(...) extends Logging {
@volatile var metadataCachePublisher: KRaftMetadataCachePublisher = _
val metadataPublishers: util.List[MetadataPublisher] = new util.ArrayList[MetadataPublisher]()
// ...
def startup(): Unit = {
// ...
// Set up the metadata cache publisher.
metadataCachePublisher = new KRaftMetadataCachePublisher(metadataCache)
metadataPublishers.add(metadataCachePublisher)

// ...
sharedServer.loader.installPublishers(metadataPublishers)
}
}

MetadataLoader 的关键定义如下,其中包含的类 MetadataBatchLoader 也很关键。

从中我们可以看到,在 raft commit 事件发生时,MetadataLoader 会执行函数 handleCommit 来处理 commit 数据,具体的执行逻辑是交给 MetadataBatchLoader 来执行的。MetadataLoadermaybePublishMetadata 中定义了如何发布 raft 的 image 数据,主要的逻辑是调用各个 MetadataPublisher 实现的 onMetadataUpdate ,其中就包含 KRaftMetadataCachePublisher,但是函数的执行时机交给了 MetadataBatchLoader

在 raft loadSnapshot 事件发生时,由 MetadataLoader 自己来完成了 MetadataImage 的生成及 maybePublishMetadata 的调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>, AutoCloseable {
// ...
private final MetadataBatchLoader batchLoader;

// 存储 publisher,也就是上面 installPublishers 的
private final LinkedHashMap<String, MetadataPublisher> publishers;

private MetadataLoader(...) {
this.batchLoader = new MetadataBatchLoader(
logContext,
time,
faultHandler,
this::maybePublishMetadata); // <-- 最后一个变量为 callback
}

// 监听 raft commit 事件,执行对应的逻辑
@Override
public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
eventQueue.append(() -> {
while (reader.hasNext()) {
Batch<ApiMessageAndVersion> batch = reader.next();
batchLoader.loadBatch(batch, currentLeaderAndEpoch);
}
batchLoader.maybeFlushBatches(currentLeaderAndEpoch);
reader.close();
});
}

// 处理快照加载事件
@Override
public void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
eventQueue.append(() -> {
String snapshotName = Snapshots.filenameFromSnapshotId(reader.snapshotId());
MetadataDelta delta = new MetadataDelta.Builder().
setImage(image).
build();
SnapshotManifest manifest = loadSnapshot(delta, reader);
MetadataImage image = delta.apply(manifest.provenance());
batchLoader.resetToImage(image);
maybePublishMetadata(delta, image, manifest);
});
}

// 执行各个 publisher
private void maybePublishMetadata(MetadataDelta delta, MetadataImage image, LoaderManifest manifest) {
this.image = image;
for (MetadataPublisher publisher : publishers.values()) {
publisher.onMetadataUpdate(delta, image, manifest);
}
}
}

MetadataDelta 的变更

从上述的分析中可以看出,关键的逻辑位于 MetadataBatchLoader.loadBatch 中。这里暂时不关注事务相关的逻辑,主要逻辑如下所示。我们可以看到,主要做的就是调用 MetadataDelta.replay 来进行回放,而这个方法在上面已经分析过了,是一大串 switch case 处理各种类型的 record 信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class MetadataBatchLoader {
// meta 信息的增量变更
private MetadataDelta delta;
// meta 信息的快照
private MetadataImage image;

public long loadBatch(Batch<ApiMessageAndVersion> batch, LeaderAndEpoch leaderAndEpoch) {
for (ApiMessageAndVersion record : batch.records()) {
replay(record);
}
}

private void replay(ApiMessageAndVersion record) {
delta.replay(record.message());
}
}

MetadataImage 的生成

接下来我们需要将增量信息应用至存量的镜像上,这里已经可以猜到,是 MetadataBatchLoader.maybeFlushBatches 来做的。同样暂时不关键事务相关的逻辑,主要逻辑如下所示。主要做的就是调用 MetadataDelta.apply 来,并调用 MetadataLoader.maybePublishMetadata 来发布新的镜像数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

public class MetadataBatchLoader {

public void maybeFlushBatches(LeaderAndEpoch leaderAndEpoch) {

applyDeltaAndUpdate(delta, ...);

}

private void applyDeltaAndUpdate(MetadataDelta delta, LogDeltaManifest manifest) {

// 生成 image

image = delta.apply(manifest.provenance());



// 执行 MetadataLoader.maybePublishMetadata 发布变更

callback.update(delta, image, manifest);



// 重置相关信息

resetToImage(image);

}

}

总结

至此,整个数据变更流程就串通了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
[Raft 层]
Controller 写入元信息变更

Raft Log

KafkaRaftClient
↓ (RaftClient.Listener.handleCommit)

[元信息加载层]
MetadataLoader
↓ (handleCommit)
MetadataBatchLoader
↓(loadBatch -> maybeFlushBatches)
MetadataLoader
↓ (maybePublishMetadata)

[Broker 应用层]
xxxMetadataPublisher.onMetadataUpdate()

┌─────────────────────────────────────────────────────────┐
│ 分支调用路径 │
├─────────────────────────────────────────────────────────┤
│ KRaftMetadataCache.setImage() │
│ ↓ │
│ 更新 _currentImage │
├─────────────────────────────────────────────────────────┤
│ ReplicaManager.applyDelta() │
├─────────────────────────────────────────────────────────┤
│ GroupCoordinator 更新 │
├─────────────────────────────────────────────────────────┤
│ TransactionCoordinator 更新 │
├─────────────────────────────────────────────────────────┤
│ 其他组件更新 │
└─────────────────────────────────────────────────────────┘