注:源码分析基于 claude code + glm4.6 和 trae ,kafka 版本为 3.9.1
相关类定义
KRaftMetadataCache
1 2 3 4 5 6 7 8 9 10 11 12
classKRaftMetadataCache(...) extendsMetadataCachewithLoggingwithConfigRepository{ // This is the cache state. Every MetadataImage instance is immutable, and updates // replace this value with a completely new one. This means reads (which are not under // any lock) need to grab the value of this variable once, and retain that read copy for // the duration of their operation. Multiple reads of this value risk getting different // image values. @volatileprivatevar _currentImage: MetadataImage = MetadataImage.EMPTY defsetImage(newImage: MetadataImage): Unit = { _currentImage = newImage } }
// core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala:116 // Publish the new metadata image to the metadata cache. overridedefonMetadataUpdate(delta: MetadataDelta, newImage: MetadataImage, manifest: LoaderManifest): Unit = { // ... metadataCache.setImage(newImage) }
/** * Publishes metadata deltas which we have loaded from the log and snapshots. * * Publishers receive a stream of callbacks from the metadata loader which keeps them notified * of the latest cluster metadata. This interface abstracts away some of the complications of * following the cluster metadata. For example, if the loader needs to read a snapshot, it will * present the contents of the snapshot in the form of a delta from the previous state. */ publicinterfaceMetadataPublisherextendsAutoCloseable{ // ...
/** * Publish a new cluster metadata snapshot that we loaded. * * @param delta The delta between the previous state and the new one. * @param newImage The complete new state. * @param manifest A manifest which describes the contents of what was published. * If we loaded a snapshot, this will be a SnapshotManifest. * If we loaded a log delta, this will be a LogDeltaManifest. */ voidonMetadataUpdate( MetadataDelta delta, MetadataImage newImage, LoaderManifest manifest ); }
调用链路概览: KRaftMetadataCachePublisher
事件注册
在 kafka server 启动时,初始化了 KRaftMetadataCachePublisher ,并将其加到 MetadataLoader 中
从上述的分析中可以看出,关键的逻辑位于 MetadataBatchLoader.loadBatch 中。这里暂时不关注事务相关的逻辑,主要逻辑如下所示。我们可以看到,主要做的就是调用 MetadataDelta.replay 来进行回放,而这个方法在上面已经分析过了,是一大串 switch case 处理各种类型的 record 信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
publicclassMetadataBatchLoader{ // meta 信息的增量变更 private MetadataDelta delta; // meta 信息的快照 private MetadataImage image; publiclongloadBatch(Batch<ApiMessageAndVersion> batch, LeaderAndEpoch leaderAndEpoch){ for (ApiMessageAndVersion record : batch.records()) { replay(record); } }