kafka controller 启动失败案例一则

一般在 kafka 集群频繁出现 under replicated partitions 的场景下出现 …

现象

业务方在 kafka-manager 上对 topic AIPluginRetrieveonlineBs 的 partition 进行扩容失败,新扩出来的 partition 的 leader 和 isr 均为空

zk 路径中,/kafka/brokers/topics/AIPluginRetrieveonlineBs 的值包含新扩出来的 32、33、34、35

但是 /kafka/brokers/topics/AIPluginRetrieveonlineBs/partition 中却没有 32、33、34、35 的信息,这和 kafka manager 页面上展示的信息基本可以对得上。


问题排查

首先,大致可以猜到是 controller 的问题,那么就先切 controller,也就是把 zk 中的 /kafka/controller 节点 delete 掉,但是发现没用。

登录 controller 对应的机器,发现 logs/server.log 中有大量的如下日志:

1
2
3
4
5
6
[2024-02-02 19:58:34,153] WARN Session 0x360851b6e6b50000 for server bddwd-ps-beehive-agent187791.bddwd.baidu.com/10.41.105.210:2181, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
java.io.IOException: Packet len4332163 is out of range!
at org.apache.zookeeper.ClientCnxnSocket.readLength(ClientCnxnSocket.java:113)
at org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:79)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1145)

查阅资料后发现,zk client 给单次通信预留的最大 buffer 为 4 MB,从日志上看基本上可以确定是返回的数据超过了 4 MB,导致 controller 无法正常启动。

网上给的解决方案基本都是调大 zk client 的 buffer 配置,但是不幸的是,出问题的 kafka 集群有 100 个实例,升级成本非常高。

那么就需要找出异常的数据。首先怀疑是某个 zk 节点存储的数据过大,于是写了一个程序筛了一下,为了将可疑 zk 路径全部找出,程序中 zk client 对于单次数据请求预留的 buffer 设置为了 10MB 。结果发现最大的数据也才不到 8 KB。

除了 get 外,可能还存在 ls 操作。 发现 /kafka/isr_change_notification 下有大量的子文件

用程序 ls 一下,并将 buffer 设置为 4MB,果然挂了

1
2
3
-> % ./iterate_zk /kafka/isr_change_notification test.txt
2024/02/02 22:06:22 recv loop terminated: received packet from server with length 4601363, which exceeds max buffer size 4194304
2024/02/02 22:06:22 get node child list /kafka/isr_change_notification err : zk: connection closed

那么基本就定位问题了。


isr_change_notification 是干啥的?

https://juejin.cn/post/6844904021250015239

也就是说,如果 controller 之前就是卡死的话,就会导致这个路径下的文件无法处理,如果堆积的数据超过一定的阈值,在切 controller 之后,新的 controller 会因为 4MB buffer 限制导致无法加载 /kafka/isr_change_notification 下的数据。


故障恢复

故障 kafka 集群有 100 个实例,600 多 topic,并且实例都部署在混布池中,预期 isr 的变化会比较多,所以决定根据 mtime 参数,将时间早于 2024.02.02 的数据全部删掉

在删掉大概 11w 02.02 之前的数据后,剩余的数据 kafka controller 自己就开始处理了


清理程序样例

代码是基于另外一个 kafka 运维脚本临时改的,针对此特殊场景部分代码逻辑有待优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package main

import (
"flag"
"fmt"
"log"
"path"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/go-zookeeper/zk"
)

var (
zkAddr string = "zk-stardust-online.www.hbe.serv:2181"
parallel int = 50
clearISRChangeNotification bool = true
isrChangeNotificationExpireTime int64 = 10
deleteController bool = true
maxZkBufferSize int = 10 // MB
)

func init() {
flag.StringVar(&zkAddr, "zk", "zk-stardust-online.www.hbe.serv:2181", "zk addr")
flag.IntVar(&parallel, "p", 50, "parallel")
flag.BoolVar(&clearISRChangeNotification, "c", true, "clear isr change notification")
flag.Int64Var(&isrChangeNotificationExpireTime, "e", 10, "expire time of isr change notification, default 10 minutes")
flag.BoolVar(&deleteController, "d", true, "delete controller node, in order to trigger controller to re-elect")
flag.IntVar(&maxZkBufferSize, "b", 10, "zk resp buffer size, default is 10mb")
flag.Parse()
}

func main() {
conn, _, err := zk.Connect([]string{zkAddr}, time.Second, zk.WithLogInfo(false), zk.WithMaxBufferSize(1024*1024*maxZkBufferSize))
if err != nil {
panic(err)
}

var (
limitChan = make(chan struct{}, parallel)
rootPath = "/kafka/isr_change_notification"
t = time.Now().Add(-time.Minute * time.Duration(isrChangeNotificationExpireTime)).UnixMilli()
handleCurrentNodeFunc func(currPath string)
wg sync.WaitGroup
cnt int64
)
fmt.Println(t)

handleCurrentNodeFunc = func(currPath string) {
defer wg.Done()

limitChan <- struct{}{}
_, zkStatus, err := conn.Get(currPath)
if err != nil {
log.Printf("get node data %s err : %v\n", currPath, err)
<-limitChan
return
}
if currPath != "/kafka/isr_change_notification" {
mtime := zkStatus.Mtime
if mtime < t {
res := atomic.AddInt64(&cnt, 1)
if err := conn.Delete(currPath, zkStatus.Version); err != nil {
log.Printf("delete node %s err : %v\n", currPath, err)
} else {
if res%1000 == 0 {
log.Printf("[debug] %d done\n", res)
}
}
}
<-limitChan
} else {
childList, _, err := conn.Children(currPath)
if err != nil {
log.Printf("get node child list %s err : %v\n", currPath, err)
<-limitChan
return
}
<-limitChan
for _, child := range childList {
wg.Add(1)
go handleCurrentNodeFunc(path.Join(currPath, child))
}
}

}

if clearISRChangeNotification {
wg.Add(1)
go handleCurrentNodeFunc(rootPath)
wg.Wait()
}
if deleteController {
if err := conn.Delete("/kafka/controller", -1); err != nil {
log.Printf("[err] delete controller failed: %v\n", err)
}
}

fmt.Printf("------ delete done %s ------\n", zkAddr)
}