1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
| func DescribeConsumerGroup(kafkaClient sarama.Client, groupID string) { fmt.Printf("get consumergroup info: groupID=%s ...\n", groupID)
var ( err error brokerConn *sarama.Broker groupDescriptions *sarama.DescribeGroupsResponse descGroupsReq = sarama.DescribeGroupsRequest{Groups: []string{groupID}} offsetFetchReq = sarama.OffsetFetchRequest{ConsumerGroup: groupID, Version: 1} offsetFetchResp *sarama.OffsetFetchResponse )
if brokerConn, err = kafkaClient.Coordinator(groupID); err != nil { fmt.Printf("get consumergroup info error: groupID=%s err=%v\n", groupID, err) return } if err := brokerConn.Open(kafkaClient.Config()); err != nil && err != sarama.ErrAlreadyConnected { fmt.Printf("connect consumergroup coordinator error: groupID=%s broker=%d err=%v\n", groupID, brokerConn.ID(), err) return } defer brokerConn.Close()
if groupDescriptions, err = brokerConn.DescribeGroups(&descGroupsReq); err != nil { fmt.Printf("get consumergroup info error: groupID=%s broker=%d err=%v\n", groupID, brokerConn.ID(), err) return } groupDescription := groupDescriptions.Groups[0]
for _, memberDesc := range groupDescription.Members { memberAssign, err := memberDesc.GetMemberAssignment() if err != nil { fmt.Printf("get consumergroup info error: groupID=%s broker=%d err=%v\n", groupID, brokerConn.ID(), err) return } for topicName, partitions := range memberAssign.Topics { for _, partition := range partitions { fmt.Printf("memberID=%s clientID=%s clientHost=%s topic=%s partitionID=%d\n", memberDesc.MemberId, memberDesc.ClientId, memberDesc.ClientHost, topicName, partition) offsetFetchReq.AddPartition(topicName, partition) } } }
if offsetFetchResp, err = brokerConn.FetchOffset(&offsetFetchReq); err != nil { fmt.Printf("get consumergroup info error: groupID=%s broker=%d err=%v\n", groupID, brokerConn.ID(), err) return } for topic, partitions := range offsetFetchResp.Blocks { for partition, block := range partitions { fmt.Printf("topic=%s partitionID=%d offset=%d\n", topic, partition, block.Offset) } }
}
func main() { kafkaClient, err := NewClient(kafkaAddr) if err != nil { panic(err) }
DescribeConsumerGroup(kafkaClient, "receiver_weaver_yq") }
|