kafka client 并发处理单 partition 异常场景分析

为什么不推荐并发处理单 partition 的数据

背景

最近在研究同事写的计算框架,主要功能是从 kafka 中消费数据后,根据用户配置的拓扑和算子信息,调用用户算子对数据进行处理。框架由 c++ 编写,所以很自然地,将数据用 std::shared_ptr 声明,把数据的 commit 写在了 shared_ptr 的析构 hook 函数中。在数据被处理完成后自动 commit 对应的 offset。

计算框架运行时,每个算子进行了单例初始化,算子之间通过 blocking queue 进行单向通讯,上游算子将数据写入 queue 中,由下游算子消费。每个算子内部启动 n 个 thread 并发从 queue 中消费和处理数据。如果希望做到 key 粒度的保序,则每个算子会创建 n 个 queue,每个线程消费指定的 queue,而 key 会被 hash 到指定的 queue 上。

问题

考虑如下场景,假设对于一个实例,并发处理 offset 为 n 和 n +1 的数据(对应上面的两个 thread),n + 1 的数据先于 n 的数据被处理完毕,n + 1 被提交。此时这个实例异常退出,n 还为处理完/提交。在 rebalance 后有新的实例接管了这个 partition 的数据处理,那它是会从 n 开始消费,还是 n + 2 开始消费呢?如果是后者,那么 n 数据就被丢了。

复现

最简单的方案就是写个程序模拟如上场景做一个实验,程序代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package main

import (
"context"
"log"

"github.com/IBM/sarama"
)

func main() {
config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0 // 使用你的Kafka集群版本
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Offsets.AutoCommit.Enable = false // 关闭自动提交offset

brokers := []string{"kafka-cosmos-diaoyan.www.yq01.serv:9092"} // 你的Kafka集群地址
group := "nzy_test_cg" // 你的Consumer Group
topic := "nzy_test_consume_20241026" // 你的Kafka Topic, 单partition

client, err := sarama.NewConsumerGroup(brokers, group, config)
if err != nil {
log.Panicf("Error creating consumer group client: %v", err)
}

consumer := Consumer{}
// 暂时不考虑优雅退出
for {
err := client.Consume(context.Background(), []string{topic}, &consumer)
if err != nil {
log.Panicf("Error from consumer: %v", err)
}
}
}

type Consumer struct{}

var _ sarama.ConsumerGroupHandler = &Consumer{}

func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error { return nil }

func (consumer *Consumer) Cleanup(sess sarama.ConsumerGroupSession) error { return nil }

func (consumer *Consumer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
if msg.Offset%5 == 0 && msg.Offset != 0 {
// 模拟数据丢失
log.Printf("Message [%v] lost", msg.Offset)
continue
}
log.Printf("Message [%v] : %v", msg.Offset, string(msg.Value))
sess.MarkMessage(msg, "") // 标记消息,准备提交offset, 注意这里是异步提交
sess.Commit() // 显示声明提交offset
}
return nil
}

向这个 topic 写入数据,输出如下:

1
2
3
4
5
6
7
8
nzy@yq01-build-rd2.yq01.baidu.com /home/disk2/nizhenyang/dev/kafka-pilot-util/cmd/demo/bad_consumer
$ ./bad_consumer
2024/10/26 16:24:13 Message [30] lost
2024/10/26 16:24:16 Message [31] : 32
^C
nzy@yq01-build-rd2.yq01.baidu.com /home/disk2/nizhenyang/dev/kafka-pilot-util/cmd/demo/bad_consumer
$ ./bad_consumer
2024/10/26 16:24:29 Message [32] : 33

可以看出,offset 为 30 的数据被丢了。实际上从 kafka 的源码 也可以看出,broker 端并没有很复杂的逻辑,比如说如果 n 未被提交,那么 n + 1 的提交不生效。代码中仅会判断当前提交的 offset 的值是否大于存量的值,如果是,则会进行会更新。

1
2
3
4
5
6
7
8
9
10
11
12
 def onOffsetCommitAppend(topicIdPartition: TopicIdPartition, offsetWithCommitRecordMetadata: CommitRecordMetadataAndOffset): Unit = {
val topicPartition = topicIdPartition.topicPartition
if (pendingOffsetCommits.contains(topicPartition)) {
// ...
// offsets 字段中没有该分区位移提交数据,或者
// offsets 字段中该分区对应的提交位移消息在位移主题中的位移值小于待写入的位移值
if (!offsets.contains(topicPartition) || offsets(topicPartition).olderThan(offsetWithCommitRecordMetadata))
// 将该分区对应的提交位移消息添加到 offsets 中
offsets.put(topicPartition, offsetWithCommitRecordMetadata)
}
// ....
}

根据上面这段代码逻辑,也不会存在如下场景,n +1 先于 n 提交 offset,n 提交 offset 时将 n + 1 的提交覆盖(新消费者重复消费 n + 1 的数据),验证代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package main

import (
"context"
"log"
"time"

"github.com/IBM/sarama"
)

func main() {
config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0 // 使用你的Kafka集群版本
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Offsets.AutoCommit.Enable = false // 关闭自动提交offset

brokers := []string{"kafka-cosmos-diaoyan.www.yq01.serv:9092"} // 你的Kafka集群地址
group := "nzy_test_cg" // 你的Consumer Group
topic := "nzy_test_consume_20241026" // 你的Kafka Topic

client, err := sarama.NewConsumerGroup(brokers, group, config)
if err != nil {
log.Panicf("Error creating consumer group client: %v", err)
}

consumer := Consumer{}
// 暂时不考虑优雅退出
for {
err := client.Consume(context.Background(), []string{topic}, &consumer)
if err != nil {
log.Panicf("Error from consumer: %v", err)
}
}
}

type Consumer struct{}

var _ sarama.ConsumerGroupHandler = &Consumer{}

func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error { return nil }

func (consumer *Consumer) Cleanup(sess sarama.ConsumerGroupSession) error { return nil }

func (consumer *Consumer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
go func(msg *sarama.ConsumerMessage) {
if msg.Offset%5 == 0 && msg.Offset != 0 {
// 模拟数据处理延迟
log.Printf("Message [%v] process slow", msg.Offset)
time.Sleep(5 * time.Second)
log.Printf("Message [%v] process done", msg.Offset)
}
log.Printf("Message [%v] : %v", msg.Offset, string(msg.Value))
sess.MarkMessage(msg, "") // 标记消息,准备提交offset, 注意这里是异步提交
sess.Commit() // 显示声明提交offset
}(msg)
}
return nil
}

向这个 topic 写入数据,输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
nzy@yq01-build-rd2.yq01.baidu.com /home/disk2/nizhenyang/dev/kafka-pilot-util/cmd/demo/latency_consumer
$ ./latency_consumer
2024/10/26 16:29:49 Message [33] : 34
2024/10/26 16:29:51 Message [34] : 35
2024/10/26 16:29:52 Message [35] process slow
2024/10/26 16:29:54 Message [36] : 37
2024/10/26 16:29:57 Message [35] process done
2024/10/26 16:29:57 Message [35] : 36
^C
nzy@yq01-build-rd2.yq01.baidu.com /home/disk2/nizhenyang/dev/kafka-pilot-util/cmd/demo/latency_consumer
$ ./latency_consumer
2024/10/26 16:30:08 Message [37] : 38

结论

并发处理一个 partition 中的数据,如果 offset 管理不慎,极端情况下可能会出现小概率数据丢失的情况。线上没出现问题的原因之一,可能是框架默认配置中,每个算子仅会创建一个线程,大部分实例运行时均使用的是默认配置。