Skip to content

Commit b48b116

Browse files
committed
add list consumer groups under a consumer group command
1 parent 26b03e1 commit b48b116

File tree

7 files changed

+72
-37
lines changed

7 files changed

+72
-37
lines changed

cmd/consumergroup.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package cmd
2+
3+
import (
4+
"github.com/gojek/kat/cmd/list"
5+
"github.com/gojek/kat/logger"
6+
"github.com/spf13/cobra"
7+
)
8+
9+
var consumerGroupCmd = &cobra.Command{
10+
Use: "consumergroup",
11+
Short: "Admin commands on consumergroups",
12+
}
13+
14+
func init() {
15+
consumerGroupCmd.PersistentFlags().StringP("broker-list", "b", "", "Comma separated list of broker ips")
16+
consumerGroupCmd.PersistentFlags().StringP("topic", "t", "", "Specify topic")
17+
if err := consumerGroupCmd.MarkPersistentFlagRequired("broker-list"); err != nil {
18+
logger.Fatal(err)
19+
}
20+
21+
consumerGroupCmd.AddCommand(list.ListConsumerGroupsCmd)
22+
}

cmd/list/listconsumergroup.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package list
22

33
import (
4+
"sort"
45
"strings"
56

67
"github.com/gojek/kat/cmd/base"
7-
"github.com/gojek/kat/logger"
88
"github.com/gojek/kat/pkg/client"
99
"github.com/spf13/cobra"
1010
)
@@ -28,20 +28,21 @@ var ListConsumerGroupsCmd = &cobra.Command{
2828
},
2929
}
3030

31-
func init() {
32-
ListConsumerGroupsCmd.PersistentFlags().StringP("broker-list", "b", "", "Comma separated list of broker ips")
33-
ListConsumerGroupsCmd.PersistentFlags().StringP("topic", "t", "", "Specify topic")
34-
if err := ListConsumerGroupsCmd.MarkPersistentFlagRequired("broker-list"); err != nil {
35-
logger.Fatal(err)
36-
}
37-
}
38-
3931
func (c *consumerGroupAdmin) ListGroups(topic string) error {
4032
consumerGroupsMap, err := c.saramaClient.ListConsumerGroups()
4133
if err != nil {
4234
return err
4335
}
4436

37+
consumerGroupList := make([]string, len(consumerGroupsMap))
38+
for group := range consumerGroupsMap {
39+
consumerGroupList = append(consumerGroupList, group)
40+
}
41+
42+
sort.Slice(consumerGroupList, func(i int, j int) bool {
43+
return consumerGroupList[i] < consumerGroupList[j]
44+
})
45+
4546
var consumerGroups []string
4647

4748
for consumerGroupID := range consumerGroupsMap {

cmd/root.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"fmt"
55
"os"
66

7-
"github.com/gojek/kat/cmd/list"
87
"github.com/gojek/kat/cmd/mirror"
98

109
"github.com/gojek/kat/logger"
@@ -21,7 +20,7 @@ func init() {
2120
cobra.OnInitialize()
2221
cliCmd.AddCommand(topicCmd)
2322
cliCmd.AddCommand(mirror.MirrorCmd)
24-
cliCmd.AddCommand(list.ListConsumerGroupsCmd)
23+
cliCmd.AddCommand(consumerGroupCmd)
2524
}
2625

2726
func Execute() {

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,13 @@ require (
1111
github.com/mattn/go-runewidth v0.0.5 // indirect
1212
github.com/mitchellh/go-homedir v1.1.0
1313
github.com/olekukonko/tablewriter v0.0.1
14-
github.com/pkg/errors v0.9.1 // indirect
14+
github.com/pkg/errors v0.9.1
1515
github.com/r3labs/diff v0.0.0-20191018104334-e3ae93f4edbb
1616
github.com/sirupsen/logrus v1.4.2
1717
github.com/spf13/cobra v0.0.3
1818
github.com/spf13/pflag v1.0.3 // indirect
1919
github.com/stretchr/testify v1.3.0
2020
golang.org/x/crypto v0.0.0-20191029031824-8986dd9e96cf
2121
golang.org/x/net v0.0.0-20191028085509-fe3aa8a45271 // indirect
22+
gotest.tools v2.2.0+incompatible
2223
)

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ github.com/olekukonko/tablewriter v0.0.1 h1:b3iUnf1v+ppJiOfNX4yxxqfWKMQPZR5yoh8u
4848
github.com/olekukonko/tablewriter v0.0.1/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
4949
github.com/pierrec/lz4 v2.2.6+incompatible h1:6aCX4/YZ9v8q69hTyiR7dNLnTA3fgtKHVVW5BCd5Znw=
5050
github.com/pierrec/lz4 v2.2.6+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
51+
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
52+
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
5153
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
5254
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
5355
github.com/r3labs/diff v0.0.0-20191018104334-e3ae93f4edbb h1:kaV32NbiIn7ESdHB4PEW2VTKhB0odk9wo4/yW2acmoo=
@@ -95,3 +97,5 @@ gopkg.in/jcmturner/gokrb5.v7 v7.2.3 h1:hHMV/yKPwMnJhPuPx7pH2Uw/3Qyf+thJYlisUc440
9597
gopkg.in/jcmturner/gokrb5.v7 v7.2.3/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM=
9698
gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU=
9799
gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8=
100+
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
101+
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=

pkg/client/saramaclient.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,17 @@ func (s *SaramaClient) GetConsumerGroupsForTopic(groups []string, topic string)
8080
groupDescription, _ := s.admin.DescribeConsumerGroups([]string{groups[i]})
8181
for _, memberDesc := range groupDescription[0].Members {
8282
ma, _ := memberDesc.GetMemberAssignment()
83+
fmt.Println("----version----")
84+
fmt.Println(ma.Version)
85+
fmt.Println("----version----")
86+
87+
fmt.Println("----userdata----")
88+
fmt.Println(ma.UserData)
89+
fmt.Println("----userdata----")
90+
91+
fmt.Println("----topics----")
92+
fmt.Println(ma.Topics)
93+
fmt.Println("----topics----")
8394
for topicName, _ := range ma.Topics {
8495
if topicName == topic {
8596
consumerGroupsChannel <- groupDescription[0].GroupId

pkg/client/saramaclient_test.go

Lines changed: 22 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -270,36 +270,33 @@ func TestSaramaClient_CreatePartitionsFailure(t *testing.T) {
270270
assert.Error(t, err)
271271
admin.AssertExpectations(t)
272272
}
273-
274-
type mockGroupDescription interface {
275-
GetMemberAssignment() (*sarama.ConsumerGroupMemberAssignment, error)
276-
}
277-
278-
type mockMemberAssignment struct{}
279-
280-
func (m *mockMemberAssignment) GetMemberAssignment() (*sarama.ConsumerGroupMemberAssignment, error) {
281-
return &sarama.ConsumerGroupMemberAssignment{
282-
Version: 1,
283-
Topics: map[string][]int32{"t1": {0, 1, 2}},
284-
UserData: []byte("ss"),
285-
}, nil
286-
}
287-
288-
type mockGroupDescriptionStruct struct {
289-
GroupId string
290-
Members map[string]mockMemberAssignment
291-
}
292-
293273
func TestSaramaClient_GetConsumerGroupsForTopic(t *testing.T) {
294274
admin := &MockClusterAdmin{}
295275
client := SaramaClient{admin: admin}
296276

297-
admin.On("DescribeConsumerGroups", []string{"c1"}).Return([]*mockGroupDescriptionStruct{{
298-
GroupId: "c1",
299-
Members: map[string]mockMemberAssignment{"k1": {}},
300-
}}, nil)
277+
groupDesciption := []*sarama.GroupDescription{{
278+
GroupId: "test-group-id",
279+
Members: map[string]*sarama.GroupMemberDescription{
280+
"instance-id-0": {
281+
ClientId: "instance-id-0",
282+
MemberAssignment: []byte{0x04, 0x05, 0x06},
283+
},
284+
285+
"instance-id-1": {
286+
ClientId: "instance-id-1",
287+
MemberAssignment: []byte{0x04, 0x05, 0x06},
288+
},
289+
290+
"instance-id-2": {
291+
ClientId: "instance-id-2",
292+
MemberAssignment: []byte{0x04, 0x05, 0x06},
293+
},
294+
},
295+
}}
296+
297+
admin.On("DescribeConsumerGroups", []string{"test-group-id"}).Return(groupDesciption, nil)
301298

302-
consumerGroupChannel, err := client.GetConsumerGroupsForTopic([]string{"c1"}, "t1")
299+
consumerGroupChannel, err := client.GetConsumerGroupsForTopic([]string{"test-group-id"}, "test-topic")
303300

304301
require.NoError(t, err)
305302
assert.Equal(t, 1, len(consumerGroupChannel))

0 commit comments

Comments
 (0)