kafka server 端数据压缩特性

也即:kafka topic 配置了数据压缩但是 producer 未压缩时,broker 会对数据进行压缩吗?

背景

目前发现线上部分 topic 虽然设置了 compression.typegzip,但是 producer 在写入时,并没有压缩的相关配置,也即写入 topic 的数据是未压缩的。那么,在这种情况下,broker 会对数据进行压缩吗?

分别问了 deepseek 和 claude,给的回答分别是不会和会,这就让我有些懵逼了。随后在网上搜了一下,confluent 有个文章给的结论和 claude 一致,即会由 broker 进行压缩。

源码分析

对 2.1.1 和 3.9.1 的源码进行分析。前者为当前线上使用的版本,后者为后续新集群部署时使用的版本。

3.9.1

在 3.9.1 版本中,管理 Kafka 数据的关键类为 UnifiedLog.scala(core/src/main/scala/kafka/log/UnifiedLog.scala),其中数据落盘使用的是 append 方法,删去了大量无用的逻辑,关键代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private def append(records: MemoryRecords,
// .....
ignoreRecordSize: Boolean): LogAppendInfo = {
// ....
// they are valid, insert them in the log
lock synchronized {
maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
localLog.checkIfMemoryMappedBufferClosed()
if (validateAndAssignOffsets) {
// assign offsets to the message set
val offset = PrimitiveRef.ofLong(localLog.logEndOffset)
appendInfo.setFirstOffset(offset.value)
val validateAndOffsetAssignResult = try {
val targetCompression = BrokerCompressionType.targetCompression(config.compression, appendInfo.sourceCompression())
val validator = new LogValidator(validRecords,
// ....
appendInfo.sourceCompression,
targetCompression,
// .....
)
validator.validateMessagesAndAssignOffsets(offset,
validatorMetricsRecorder,
requestLocal.getOrElse(throw new IllegalArgumentException(
"requestLocal should be defined if assignOffsets is true")
).bufferSupplier
)
} catch {
case e: IOException =>
throw new KafkaException(s"Error validating messages while appending to log $name", e)
}
// ....
}
}
}

可以看到,首先会通过 BrokerCompressionType.targetCompression(config.compression, appendInfo.sourceCompression()) 获取 topic 的数据压缩类型对应的处理对象,然后通过 LogValidatorvalidateMessagesAndAssignOffsets 来进行数据校验和写入。这里的 targetCompressionorg.apache.kafka.common.compress.Compression 接口类型,Kafka 支持的压缩算法都实现了这个接口:

深入 validateMessagesAndAssignOffsets 方法,大致的调用顺序如下:

1
2
3
LogValidator::validateMessagesAndAssignOffsets
LogValidator::validateMessagesAndAssignOffsetsCompressed
LogValidator::buildRecordsAndAssignOffsets

在方法 buildRecordsAndAssignOffsets 的关键代码如下,使用了 MemoryRecordsBuilder 对数据进行实际的写入,并且可以看到,在这之前,它根据压缩的类型,对预期写入的数据大小进行了估算,并且以此为基准申请了 buffer,所以实际上已经很明确了,对于 producer 未对数据进行压缩的场景,broker 会代为进行数据压缩。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private ValidationResult buildRecordsAndAssignOffsets(LongRef offsetCounter,
long logAppendTime,
RecordBatch firstBatch,
List<Record> validatedRecords,
int uncompressedSizeInBytes) {
long startNanos = time.nanoseconds();
int estimatedSize = AbstractRecords.estimateSizeInBytes(toMagic, offsetCounter.value, targetCompression.type(),
validatedRecords);
// The current implementation of BufferSupplier is naive and works best when the buffer size
// cardinality is low, so don't use it here
ByteBuffer buffer = ByteBuffer.allocate(estimatedSize);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, toMagic, targetCompression,
timestampType, offsetCounter.value, logAppendTime, firstBatch.producerId(),
firstBatch.producerEpoch(), firstBatch.baseSequence(), firstBatch.isTransactional(),
partitionLeaderEpoch);

for (Record record : validatedRecords)
builder.appendWithOffset(offsetCounter.value++, record);

MemoryRecords records = builder.build();
// ...
}

MemoryRecordsBuilder 中,最后实现数据写入的函数为 appendDefaultRecord,其中调用了 DefaultRecord.writeTo 将数据写入至 appendStream 中,这个 appendStream 就是在上初始化这个类时,基于传入的 targetCompression 生成的流式数据写入对象,targetCompression 实际对应的压缩算子实现,会对收到的数据进行流式压缩。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
byte magic,
Compression compression,
// .....
) {
// ...
this.appendStream = new DataOutputStream(compression.wrapForOutput(this.bufferStream, magic));
// ...
}

private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value,
Header[] headers) throws IOException {
ensureOpenForRecordAppend();
int offsetDelta = (int) (offset - baseOffset);
long timestampDelta = timestamp - baseTimestamp;
int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
recordWritten(offset, timestamp, sizeInBytes);
}

所以,对于 3.9.1 版本,结论为如果 topic 配置了压缩,但是收到的数据未压缩,那么 broker 会对数据进行压缩。

2.1.1

在 2.1.1 版本中,管理 Kafka 数据的关键类为 Log.scala (core/src/main/scala/kafka/log/Log.scala),数据落盘的方法为 append ,实现和 3.9.1 类似:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {
// ....
// they are valid, insert them in the log
lock synchronized {
checkIfMemoryMappedBufferClosed()
if (assignOffsets) {
// assign offsets to the message set
// ...
val validateAndOffsetAssignResult = try {
LogValidator.validateMessagesAndAssignOffsets(validRecords,
offset,
time,
now,
appendInfo.sourceCodec,
appendInfo.targetCodec,
// ....
)
} catch {
case e: IOException =>
throw new KafkaException(s"Error validating messages while appending to log $name", e)
}
// ....

同样也是通过 LogValidatorvalidateMessagesAndAssignOffsets 进行数据的写入,老版本中 LogValidator 使用 scala ,而非 java ,实现,实际的逻辑不变,最终都是使用了 MemoryRecordsBuilder 对数据进行实际的写入,这段逻辑在 3.9.1 中分析过,这里不再赘述。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
 private def buildRecordsAndAssignOffsets(magic: Byte,
offsetCounter: LongRef,
time: Time,
timestampType: TimestampType,
compressionType: CompressionType,
// ...
uncompresssedSizeInBytes: Int): ValidationAndOffsetAssignResult = {
val startNanos = time.nanoseconds
val estimatedSize = AbstractRecords.estimateSizeInBytes(magic, offsetCounter.value, compressionType,
validatedRecords.asJava)
val buffer = ByteBuffer.allocate(estimatedSize)
val builder = MemoryRecords.builder(buffer, magic, compressionType, timestampType, offsetCounter.value,
logAppendTime, producerId, producerEpoch, baseSequence, isTransactional, partitionLeaderEpoch)

validatedRecords.foreach { record =>
builder.appendWithOffset(offsetCounter.getAndIncrement(), record)
}

val records = builder.build()
// ...
}

源码分析结论

2.1.1 和 3.9.1 版本中,如果 topic 配置了压缩,但是收到的数据未压缩,那么 broker 会对数据进行压缩。

实际读取验证

正好当前需要验证新业务场景下 gzip 对于二进制数据的压缩效率,该业务场景需要使用 3.9.1 版本的集群。正好借着这个机会验证一下 producer 端未配置压缩的场景下,broker 端对于数据的处理:

可以看到,在写入相同数据的情况下,broker 端会对数据进行压缩操作。为了验证确实走到了 buildRecordsAndAssignOffsets 中,在函数中添加了相关的日志:

在实际执行时,确实有日志输出:

附录:deepseek 嘴硬记录