为什么不推荐并发处理单 partition 的数据
背景 最近在研究同事写的计算框架,主要功能是从 kafka 中消费数据后,根据用户配置的拓扑和算子信息,调用用户算子对数据进行处理。框架由 c++ 编写,所以很自然地,将数据用 std::shared_ptr
声明,把数据的 commit 写在了 shared_ptr 的析构 hook 函数中。在数据被处理完成后自动 commit 对应的 offset。
计算框架运行时,每个算子进行了单例初始化,算子之间通过 blocking queue 进行单向通讯,上游算子将数据写入 queue 中,由下游算子消费。每个算子内部启动 n 个 thread 并发从 queue 中消费和处理数据。如果希望做到 key 粒度的保序,则每个算子会创建 n 个 queue,每个线程消费指定的 queue,而 key 会被 hash 到指定的 queue 上。
问题 考虑如下场景,假设对于一个实例,并发处理 offset 为 n 和 n +1 的数据(对应上面的两个 thread),n + 1 的数据先于 n 的数据被处理完毕,n + 1 被提交。此时这个实例异常退出,n 还为处理完/提交。在 rebalance 后有新的实例接管了这个 partition 的数据处理,那它是会从 n 开始消费,还是 n + 2 开始消费呢?如果是后者,那么 n 数据就被丢了。
复现 最简单的方案就是写个程序模拟如上场景做一个实验,程序代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 package mainimport ( "context" "log" "github.com/IBM/sarama" ) func main () { config := sarama.NewConfig() config.Version = sarama.V1_0_0_0 config.Consumer.Return.Errors = true config.Consumer.Offsets.Initial = sarama.OffsetOldest config.Consumer.Offsets.AutoCommit.Enable = false brokers := []string {"kafka-cosmos-diaoyan.www.yq01.serv:9092" } group := "nzy_test_cg" topic := "nzy_test_consume_20241026" client, err := sarama.NewConsumerGroup(brokers, group, config) if err != nil { log.Panicf("Error creating consumer group client: %v" , err) } consumer := Consumer{} for { err := client.Consume(context.Background(), []string {topic}, &consumer) if err != nil { log.Panicf("Error from consumer: %v" , err) } } } type Consumer struct {}var _ sarama.ConsumerGroupHandler = &Consumer{}func (consumer *Consumer) Setup (sarama.ConsumerGroupSession) error { return nil }func (consumer *Consumer) Cleanup (sess sarama.ConsumerGroupSession) error { return nil }func (consumer *Consumer) ConsumeClaim (sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { if msg.Offset%5 == 0 && msg.Offset != 0 { log.Printf("Message [%v] lost" , msg.Offset) continue } log.Printf("Message [%v] : %v" , msg.Offset, string (msg.Value)) sess.MarkMessage(msg, "" ) sess.Commit() } return nil }
向这个 topic 写入数据,输出如下:
1 2 3 4 5 6 7 8 nzy@yq01-build-rd2.yq01.baidu.com /home/disk2/nizhenyang/dev/kafka-pilot-util/cmd/demo/bad_consumer $ ./bad_consumer 2024/10/26 16:24:13 Message [30] lost 2024/10/26 16:24:16 Message [31] : 32 ^C nzy@yq01-build-rd2.yq01.baidu.com /home/disk2/nizhenyang/dev/kafka-pilot-util/cmd/demo/bad_consumer $ ./bad_consumer 2024/10/26 16:24:29 Message [32] : 33
可以看出,offset 为 30 的数据被丢了。实际上从 kafka 的源码 也可以看出,broker 端并没有很复杂的逻辑,比如说如果 n 未被提交,那么 n + 1 的提交不生效。代码中仅会判断当前提交的 offset 的值是否大于存量的值,如果是,则会进行会更新。
1 2 3 4 5 6 7 8 9 10 11 12 def onOffsetCommitAppend (topicIdPartition: TopicIdPartition , offsetWithCommitRecordMetadata: CommitRecordMetadataAndOffset ): Unit = { val topicPartition = topicIdPartition.topicPartition if (pendingOffsetCommits.contains(topicPartition)) { if (!offsets.contains(topicPartition) || offsets(topicPartition).olderThan(offsetWithCommitRecordMetadata)) offsets.put(topicPartition, offsetWithCommitRecordMetadata) } }
根据上面这段代码逻辑,也不会存在如下场景,n +1 先于 n 提交 offset,n 提交 offset 时将 n + 1 的提交覆盖(新消费者重复消费 n + 1 的数据),验证代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 package mainimport ( "context" "log" "time" "github.com/IBM/sarama" ) func main () { config := sarama.NewConfig() config.Version = sarama.V1_0_0_0 config.Consumer.Return.Errors = true config.Consumer.Offsets.Initial = sarama.OffsetOldest config.Consumer.Offsets.AutoCommit.Enable = false brokers := []string {"kafka-cosmos-diaoyan.www.yq01.serv:9092" } group := "nzy_test_cg" topic := "nzy_test_consume_20241026" client, err := sarama.NewConsumerGroup(brokers, group, config) if err != nil { log.Panicf("Error creating consumer group client: %v" , err) } consumer := Consumer{} for { err := client.Consume(context.Background(), []string {topic}, &consumer) if err != nil { log.Panicf("Error from consumer: %v" , err) } } } type Consumer struct {}var _ sarama.ConsumerGroupHandler = &Consumer{}func (consumer *Consumer) Setup (sarama.ConsumerGroupSession) error { return nil }func (consumer *Consumer) Cleanup (sess sarama.ConsumerGroupSession) error { return nil }func (consumer *Consumer) ConsumeClaim (sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { go func (msg *sarama.ConsumerMessage) { if msg.Offset%5 == 0 && msg.Offset != 0 { log.Printf("Message [%v] process slow" , msg.Offset) time.Sleep(5 * time.Second) log.Printf("Message [%v] process done" , msg.Offset) } log.Printf("Message [%v] : %v" , msg.Offset, string (msg.Value)) sess.MarkMessage(msg, "" ) sess.Commit() }(msg) } return nil }
向这个 topic 写入数据,输出如下:
1 2 3 4 5 6 7 8 9 10 11 12 nzy@yq01-build-rd2.yq01.baidu.com /home/disk2/nizhenyang/dev/kafka-pilot-util/cmd/demo/latency_consumer $ ./latency_consumer 2024/10/26 16:29:49 Message [33] : 34 2024/10/26 16:29:51 Message [34] : 35 2024/10/26 16:29:52 Message [35] process slow 2024/10/26 16:29:54 Message [36] : 37 2024/10/26 16:29:57 Message [35] process done 2024/10/26 16:29:57 Message [35] : 36 ^C nzy@yq01-build-rd2.yq01.baidu.com /home/disk2/nizhenyang/dev/kafka-pilot-util/cmd/demo/latency_consumer $ ./latency_consumer 2024/10/26 16:30:08 Message [37] : 38
结论 并发处理一个 partition 中的数据,如果 offset 管理不慎,极端情况下可能会出现小概率数据丢失的情况。线上没出现问题的原因之一,可能是框架默认配置中,每个算子仅会创建一个线程,大部分实例运行时均使用的是默认配置。