sarama 获取 kafka topic & consumergroup 基础信息代码样例

主要的信息为:

  • 全量 topic 列表

  • 单个 topic 的基础信息

  • 全量 consumergroup

  • 单个 consumergroup 的基础信息

学习来源为 github.com/danielqsj/kafka_exporter


客户端初始化

1
2
3
4
5
6
func NewClient(kafkaAddr string) (kafkaClient sarama.Client, err error) {
config := sarama.NewConfig()
config.ClientID = fmt.Sprintf("%d-nzy-client", time.Now().UnixNano())
kafkaClient, err = sarama.NewClient([]string{kafkaAddr}, config)
return
}

获取 topic 信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// GetTopics 获取 topic 列表
func GetTopics(kafkaClient sarama.Client) {
fmt.Println("get topic list ...")
if err := kafkaClient.RefreshMetadata(); err != nil {
fmt.Printf("refresh metadata error: %v\n", err)
}
topics, err := kafkaClient.Topics()
if err != nil {
fmt.Println("get topics error", err)
return
}
for _, topic := range topics {
fmt.Println(topic)
}
}

func main() {
kafkaClient, err := NewClient(kafkaAddr)
if err != nil {
panic(err)
}

// 实际上 topic 的信息可以直接从 kafka 实例中获取
GetTopics(kafkaClient)
// 输出
// alahousing_build_to_dp
// sitemapec2ec
// check2router_low
// ...
}

获取指定 topic 信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// GetTopicInfo 获取指定 topic 信息
func GetTopicInfo(kafkaClient sarama.Client, topicName string) {
var (
partitions []int32 // topic 分片列表
partitionToBroker []*sarama.Broker // 每个分片的 leader 所在的 broker id
currentOffsets []int64 // 每个分片的最新 offset
err error
)
// 其实还有 Replicas、InSyncReplicas 等信息
fmt.Printf("get topic %s info ...\n", topicName)

if err = kafkaClient.RefreshMetadata(topicName); err != nil {
fmt.Printf("refresh metadata error, topic=%s err=%v\n", topicName, err)
return
}

if partitions, err = kafkaClient.Partitions(topicName); err != nil {
fmt.Printf("get topic partitions error, topic=%s err=%v\n", topicName, err)
return
}

for _, partitionID := range partitions {
broker, err := kafkaClient.Leader(topicName, partitionID)
if err != nil {
fmt.Printf("get topic leader error, topic=%s partitionID=%d err=%v\n", topicName, partitionID, err)
return
}
partitionToBroker = append(partitionToBroker, broker)

currentOffset, err := kafkaClient.GetOffset(topicName, partitionID, sarama.OffsetNewest)
if err != nil {
fmt.Printf("get topic offset error, topic=%s partitionID=%d err=%v\n", topicName, partitionID, err)
return
}
currentOffsets = append(currentOffsets, currentOffset)
}

for idx := range partitions {
fmt.Printf("topic=%s partitionID=%d leader=%d offset=%d\n", topicName, partitions[idx], partitionToBroker[idx].ID(), currentOffsets[idx])
}
}

func main() {
kafkaClient, err := NewClient(kafkaAddr)
if err != nil {
panic(err)
}

GetTopicInfo(kafkaClient, "parser_to_dp")
// 输出
// topic=parser_to_dp partitionID=0 leader=18 offset=13154959731
// topic=parser_to_dp partitionID=1 leader=19 offset=14816760821
// topic=parser_to_dp partitionID=2 leader=20 offset=14877105177
// ...
}

获取全量 consumergroup 列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// GetConsumerGroups 获取全量 consumergroup 列表
func GetConsumerGroups(kafkaClient sarama.Client) {
fmt.Printf("get all consumergroups ...")
for _, broker := range kafkaClient.Brokers() {
func(broker *sarama.Broker) {
// 连接 broker
if err := broker.Open(kafkaClient.Config()); err != nil {
fmt.Printf("connect broker error: broker=%d err=%v\n", broker.ID(), err)
return
}
defer broker.Close()

groups, err := broker.ListGroups(&sarama.ListGroupsRequest{})
if err != nil {
fmt.Printf("get consumergroups error: broker=%d err=%v\n", broker.ID(), err)
return
}

for groupID := range groups.Groups {
fmt.Printf("consumergroup: broker=%d groupID=%s\n", broker.ID(), groupID)
}
}(broker)
}
}

func main() {
kafkaClient, err := NewClient(kafkaAddr)
if err != nil {
panic(err)
}

GetConsumerGroups(kafkaClient)
// 输出
// consumergroup: broker=27 groupID=BatchCardsGroup
// consumergroup: broker=27 groupID=data_proxy_stardust_nj_finance
// consumergroup: broker=27 groupID=saas_TravelBotRouteSearch_to_build_online
// consumergroup: broker=2 groupID=receiver_weaver_yq
// ....
}

补充一点,这里需要从连接各个 broker 来获取的原因是:Kafka 是一个分布式消息系统,其中 consumer group 的元数据信息(如 offset、成员信息等)是分布在整个集群的多个 broker 上的。没有一个单独的 broker 保存了所有 consumer group 的完整信息。因此,为了收集全面的信息,客户端可能需要查询多个 broker。(from 文心一言)

获取指定 consumergroup 的信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
// DescribeConsumerGroup 获取指定 consumergroup 的信息
func DescribeConsumerGroup(kafkaClient sarama.Client, groupID string) {
fmt.Printf("get consumergroup info: groupID=%s ...\n", groupID)

var (
err error
brokerConn *sarama.Broker
groupDescriptions *sarama.DescribeGroupsResponse
descGroupsReq = sarama.DescribeGroupsRequest{Groups: []string{groupID}}
offsetFetchReq = sarama.OffsetFetchRequest{ConsumerGroup: groupID, Version: 1} // 获取 consumergroup 各 topic 各分片的消费进度
offsetFetchResp *sarama.OffsetFetchResponse
)
// 上面在初始化 req 的时候,最好使用框架提供的 New 方法
// 其中如果 OffsetFetchRequest 不设置 Version 会无法查到 offset 信息

// 获取 cg 对应的 coordinator
if brokerConn, err = kafkaClient.Coordinator(groupID); err != nil {
fmt.Printf("get consumergroup info error: groupID=%s err=%v\n", groupID, err)
return
}
if err := brokerConn.Open(kafkaClient.Config()); err != nil && err != sarama.ErrAlreadyConnected {
fmt.Printf("connect consumergroup coordinator error: groupID=%s broker=%d err=%v\n", groupID, brokerConn.ID(), err)
return
}
defer brokerConn.Close()

// 列出当前 consumergroup 信息
if groupDescriptions, err = brokerConn.DescribeGroups(&descGroupsReq); err != nil {
fmt.Printf("get consumergroup info error: groupID=%s broker=%d err=%v\n", groupID, brokerConn.ID(), err)
return
}
groupDescription := groupDescriptions.Groups[0]

for _, memberDesc := range groupDescription.Members {
memberAssign, err := memberDesc.GetMemberAssignment()
if err != nil {
fmt.Printf("get consumergroup info error: groupID=%s broker=%d err=%v\n", groupID, brokerConn.ID(), err)
return
}
for topicName, partitions := range memberAssign.Topics {
for _, partition := range partitions {
fmt.Printf("memberID=%s clientID=%s clientHost=%s topic=%s partitionID=%d\n",
memberDesc.MemberId, memberDesc.ClientId, memberDesc.ClientHost, topicName, partition)
offsetFetchReq.AddPartition(topicName, partition)
}
}
}

// 获取 consumergroup 各 topic 各分片的消费进度
if offsetFetchResp, err = brokerConn.FetchOffset(&offsetFetchReq); err != nil {
fmt.Printf("get consumergroup info error: groupID=%s broker=%d err=%v\n", groupID, brokerConn.ID(), err)
return
}
for topic, partitions := range offsetFetchResp.Blocks {
for partition, block := range partitions {
fmt.Printf("topic=%s partitionID=%d offset=%d\n", topic, partition, block.Offset)
}
}

}

func main() {
kafkaClient, err := NewClient(kafkaAddr)
if err != nil {
panic(err)
}

DescribeConsumerGroup(kafkaClient, "receiver_weaver_yq")
// 输出
// memberID=rdkafka-ed710fa4-a663-49c1-97b1-93028eac8438 clientID=rdkafka clientHost=/10.68.236.167 topic=weaver_normal partitionID=7
// memberID=rdkafka-ed710fa4-a663-49c1-97b1-93028eac8438 clientID=rdkafka clientHost=/10.68.236.167 topic=weaver_normal partitionID=8
// memberID=rdkafka-ed710fa4-a663-49c1-97b1-93028eac8438 clientID=rdkafka clientHost=/10.68.236.167 topic=weaver_normal partitionID=9
// memberID=rdkafka-dda29bfa-bfbf-4eb5-9256-b7d22a0de6eb clientID=rdkafka clientHost=/10.35.149.144 topic=weaver_normal partitionID=4
// memberID=rdkafka-dda29bfa-bfbf-4eb5-9256-b7d22a0de6eb clientID=rdkafka clientHost=/10.35.149.144 topic=weaver_normal partitionID=5
// memberID=rdkafka-dda29bfa-bfbf-4eb5-9256-b7d22a0de6eb clientID=rdkafka clientHost=/10.35.149.144 topic=weaver_normal partitionID=6
// memberID=rdkafka-8cda9181-2f3e-40ca-8ea1-918a08f67dbd clientID=rdkafka clientHost=/10.68.219.245 topic=weaver_normal partitionID=0
// memberID=rdkafka-8cda9181-2f3e-40ca-8ea1-918a08f67dbd clientID=rdkafka clientHost=/10.68.219.245 topic=weaver_normal partitionID=1
// memberID=rdkafka-8cda9181-2f3e-40ca-8ea1-918a08f67dbd clientID=rdkafka clientHost=/10.68.219.245 topic=weaver_normal partitionID=2
// memberID=rdkafka-8cda9181-2f3e-40ca-8ea1-918a08f67dbd clientID=rdkafka clientHost=/10.68.219.245 topic=weaver_normal partitionID=3
// topic=weaver_normal partitionID=2 offset=5742653
// topic=weaver_normal partitionID=4 offset=5739048
// topic=weaver_normal partitionID=5 offset=5748532
// topic=weaver_normal partitionID=7 offset=5738679
// topic=weaver_normal partitionID=1 offset=5746542
// topic=weaver_normal partitionID=3 offset=5745192
// topic=weaver_normal partitionID=6 offset=5736528
// topic=weaver_normal partitionID=8 offset=5741136
// topic=weaver_normal partitionID=9 offset=5743792
// topic=weaver_normal partitionID=0 offset=5745314
}