Skip to content

Commit 3a7ac77

Browse files
committed
Merge branch 'topic_polling' into NOISSUE-kafka-support
Signed-off-by: rodneyosodo <[email protected]>
2 parents 6f78484 + 77535c0 commit 3a7ac77

File tree

262 files changed

+65123
-25233
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

262 files changed

+65123
-25233
lines changed

go.mod

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ go 1.19
55
require (
66
github.com/caarlos0/env/v7 v7.0.0
77
github.com/cenkalti/backoff/v4 v4.1.3
8-
github.com/confluentinc/confluent-kafka-go/v2 v2.0.2
98
github.com/docker/docker v20.10.21+incompatible
109
github.com/eclipse/paho.mqtt.golang v1.4.2
1110
github.com/fatih/color v1.13.0
@@ -39,6 +38,7 @@ require (
3938
github.com/prometheus/client_golang v1.13.0
4039
github.com/rabbitmq/amqp091-go v1.5.0
4140
github.com/rubenv/sql-migrate v1.2.0
41+
github.com/segmentio/kafka-go v0.4.38
4242
github.com/spf13/cobra v1.6.1
4343
github.com/spf13/viper v1.13.0
4444
github.com/stretchr/testify v1.8.1
@@ -102,6 +102,7 @@ require (
102102
github.com/jackc/pgio v1.0.0 // indirect
103103
github.com/jackc/pgpassfile v1.0.0 // indirect
104104
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
105+
github.com/jhump/protoreflect v1.12.0 // indirect
105106
github.com/klauspost/compress v1.15.11 // indirect
106107
github.com/magiconair/properties v1.8.6 // indirect
107108
github.com/mattn/go-colorable v0.1.13 // indirect
@@ -121,6 +122,7 @@ require (
121122
github.com/opencontainers/runc v1.1.4 // indirect
122123
github.com/pelletier/go-toml/v2 v2.0.5 // indirect
123124
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
125+
github.com/pierrec/lz4/v4 v4.1.15 // indirect
124126
github.com/pion/dtls/v2 v2.1.5 // indirect
125127
github.com/pion/logging v0.2.2 // indirect
126128
github.com/pion/transport v0.13.1 // indirect

go.sum

Lines changed: 10 additions & 49 deletions
Large diffs are not rendered by default.

pkg/messaging/kafka/publisher.go

Lines changed: 71 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,27 +4,41 @@
44
package kafka
55

66
import (
7+
"context"
78
"fmt"
9+
"sync"
10+
"time"
811

9-
kf "github.com/confluentinc/confluent-kafka-go/v2/kafka"
1012
"github.com/mainflux/mainflux/pkg/messaging"
13+
"github.com/segmentio/kafka-go"
1114
"google.golang.org/protobuf/proto"
1215
)
1316

1417
var _ messaging.Publisher = (*publisher)(nil)
1518

19+
var (
20+
numPartitions = 1
21+
replicationFactor = 1
22+
batchTimeout = time.Microsecond
23+
)
24+
1625
type publisher struct {
17-
prod *kf.Producer
26+
url string
27+
conn *kafka.Conn
28+
mu sync.Mutex
29+
topics map[string]*kafka.Writer
1830
}
1931

2032
// NewPublisher returns Kafka message Publisher.
2133
func NewPublisher(url string) (messaging.Publisher, error) {
22-
prod, err := kf.NewProducer(&kf.ConfigMap{"bootstrap.servers": url})
34+
conn, err := kafka.Dial("tcp", url)
2335
if err != nil {
2436
return &publisher{}, err
2537
}
2638
ret := &publisher{
27-
prod: prod,
39+
url: url,
40+
conn: conn,
41+
topics: make(map[string]*kafka.Writer),
2842
}
2943
return ret, nil
3044

@@ -43,20 +57,66 @@ func (pub *publisher) Publish(topic string, msg *messaging.Message) error {
4357
subject = fmt.Sprintf("%s.%s", subject, msg.Subtopic)
4458
}
4559

46-
kafkaMsg := kf.Message{
47-
TopicPartition: kf.TopicPartition{Topic: &subject},
48-
Value: data,
60+
kafkaMsg := kafka.Message{
61+
Value: data,
4962
}
5063

51-
if err := pub.prod.Produce(&kafkaMsg, nil); err != nil {
52-
return err
64+
writer, ok := pub.topics[subject]
65+
if ok {
66+
if err := writer.WriteMessages(context.Background(), kafkaMsg); err != nil {
67+
return err
68+
}
69+
return nil
5370
}
5471

55-
pub.prod.Flush(1 * 1000)
72+
topicConfigs := []kafka.TopicConfig{
73+
{
74+
Topic: subject,
75+
NumPartitions: numPartitions,
76+
ReplicationFactor: replicationFactor,
77+
},
78+
}
79+
if err := pub.conn.CreateTopics(topicConfigs...); err != nil {
80+
return err
81+
}
82+
writer = &kafka.Writer{
83+
Addr: kafka.TCP(pub.url),
84+
Topic: subject,
85+
RequiredAcks: kafka.RequireAll,
86+
Balancer: &kafka.LeastBytes{},
87+
BatchTimeout: batchTimeout,
88+
AllowAutoTopicCreation: true,
89+
}
90+
if err := writer.WriteMessages(context.Background(), kafkaMsg); err != nil {
91+
return err
92+
}
93+
pub.mu.Lock()
94+
defer pub.mu.Unlock()
95+
pub.topics[subject] = writer
5696
return nil
5797
}
5898

5999
func (pub *publisher) Close() error {
60-
pub.prod.Close()
100+
defer pub.conn.Close()
101+
102+
pub.mu.Lock()
103+
defer pub.mu.Unlock()
104+
105+
topics := make([]string, 0, len(pub.topics))
106+
for topic := range pub.topics {
107+
topics = append(topics, topic)
108+
pub.topics[topic].Close()
109+
}
110+
111+
req := &kafka.DeleteTopicsRequest{
112+
Addr: kafka.TCP(pub.url),
113+
Topics: topics,
114+
}
115+
client := kafka.Client{
116+
Addr: kafka.TCP(pub.url),
117+
}
118+
if _, err := client.DeleteTopics(context.Background(), req); err != nil {
119+
return err
120+
}
61121
return nil
62122
}

pkg/messaging/kafka/pubsub.go

Lines changed: 89 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,26 @@
44
package kafka
55

66
import (
7+
"context"
78
"errors"
89
"fmt"
9-
"strings"
10+
"regexp"
1011
"sync"
12+
"time"
1113

12-
kf "github.com/confluentinc/confluent-kafka-go/v2/kafka"
1314
log "github.com/mainflux/mainflux/logger"
15+
1416
"github.com/mainflux/mainflux/pkg/messaging"
17+
kafka "github.com/segmentio/kafka-go"
18+
ktopics "github.com/segmentio/kafka-go/topics"
1519
"google.golang.org/protobuf/proto"
1620
)
1721

1822
const (
19-
chansPrefix = "channels"
23+
chansPrefix = "channels"
24+
SubjectAllChannels = "channels.*"
25+
offset = kafka.LastOffset
26+
defaultScanningIntervalMS = 500
2027
)
2128

2229
var (
@@ -29,31 +36,33 @@ var (
2936
var _ messaging.PubSub = (*pubsub)(nil)
3037

3138
type subscription struct {
32-
*kf.Consumer
39+
*kafka.Reader
3340
cancel func() error
3441
}
3542
type pubsub struct {
3643
publisher
37-
url string
38-
queue string
44+
client *kafka.Client
3945
logger log.Logger
4046
mu sync.Mutex
4147
subscriptions map[string]map[string]subscription
4248
}
4349

4450
// NewPubSub returns Kafka message publisher/subscriber.
45-
func NewPubSub(url, queue string, logger log.Logger) (messaging.PubSub, error) {
46-
prod, err := kf.NewProducer(&kf.ConfigMap{"bootstrap.servers": url})
51+
func NewPubSub(url, _ string, logger log.Logger) (messaging.PubSub, error) {
52+
conn, err := kafka.Dial("tcp", url)
4753
if err != nil {
4854
return &pubsub{}, err
4955
}
50-
56+
client := &kafka.Client{
57+
Addr: conn.LocalAddr(),
58+
}
5159
ret := &pubsub{
5260
publisher: publisher{
53-
prod: prod,
61+
url: url,
62+
conn: conn,
63+
topics: make(map[string]*kafka.Writer),
5464
},
55-
url: url,
56-
queue: queue,
65+
client: client,
5766
subscriptions: make(map[string]map[string]subscription),
5867
logger: logger,
5968
}
@@ -70,49 +79,26 @@ func (ps *pubsub) Subscribe(id, topic string, handler messaging.MessageHandler)
7079
ps.mu.Lock()
7180
defer ps.mu.Unlock()
7281

73-
// Check topic
74-
s, ok := ps.subscriptions[topic]
75-
switch ok {
76-
case true:
77-
// Check topic ID
78-
if _, ok := s[id]; ok {
79-
return ErrAlreadySubscribed
80-
}
81-
default:
82-
s = make(map[string]subscription)
83-
ps.subscriptions[topic] = s
84-
}
85-
86-
consumer, err := kf.NewConsumer(&kf.ConfigMap{
87-
"bootstrap.servers": ps.url,
88-
"broker.address.family": "v4",
89-
"group.id": "mainflux",
90-
"auto.offset.reset": "latest",
91-
"metadata.max.age.ms": 1,
92-
"allow.auto.create.topics": "true",
93-
})
82+
s, err := ps.checkTopic(topic, id, ErrAlreadySubscribed)
9483
if err != nil {
9584
return err
9685
}
97-
if err = consumer.SubscribeTopics([]string{formatTopic(topic)}, nil); err != nil {
98-
return err
99-
}
100-
101-
go func() {
102-
for {
103-
message, err := consumer.ReadMessage(-1)
104-
ps.handle(message, handler)
105-
if err == nil {
106-
ps.handle(message, handler)
107-
} else if !err.(kf.Error).IsTimeout() {
108-
ps.logger.Error(err.Error())
109-
return
86+
ps.configReader(id, topic, s, handler)
87+
88+
// Subscribe to all topic by prediocially scanning for all topics and consuming them
89+
if topic == SubjectAllChannels {
90+
go func() {
91+
for {
92+
topics, _ := ps.listTopic()
93+
for _, t := range topics {
94+
s, err := ps.checkTopic(t, id, ErrAlreadySubscribed)
95+
if err == nil {
96+
ps.configReader(id, t, s, handler)
97+
}
98+
}
99+
time.Sleep(defaultScanningIntervalMS * time.Millisecond)
110100
}
111-
}
112-
}()
113-
s[id] = subscription{
114-
Consumer: consumer,
115-
cancel: handler.Cancel,
101+
}()
116102
}
117103
return nil
118104
}
@@ -157,12 +143,12 @@ func (ps *pubsub) Close() error {
157143
return nil
158144
}
159145

160-
func (ps *pubsub) handle(message *kf.Message, h messaging.MessageHandler) {
161-
var msg = &messaging.Message{}
162-
if err := proto.Unmarshal(message.Value, msg); err != nil {
146+
func (ps *pubsub) handle(message kafka.Message, h messaging.MessageHandler) {
147+
var msg messaging.Message
148+
if err := proto.Unmarshal(message.Value, &msg); err != nil {
163149
ps.logger.Warn(fmt.Sprintf("Failed to unmarshal received message: %s", err))
164150
}
165-
if err := h.Handle(msg); err != nil {
151+
if err := h.Handle(&msg); err != nil {
166152
ps.logger.Warn(fmt.Sprintf("Failed to handle Mainflux message: %s", err))
167153
}
168154
}
@@ -176,9 +162,53 @@ func (s subscription) close() error {
176162
return s.Close()
177163
}
178164

179-
func formatTopic(topic string) string {
180-
if strings.Contains(topic, "*") {
181-
return fmt.Sprintf("^%s", topic)
165+
func (ps *pubsub) listTopic() ([]string, error) {
166+
allRegex := regexp.MustCompile(SubjectAllChannels)
167+
allTopics, err := ktopics.ListRe(context.Background(), ps.client, allRegex)
168+
if err != nil {
169+
return []string{}, err
170+
}
171+
var topics []string
172+
for _, t := range allTopics {
173+
topics = append(topics, t.Name)
174+
}
175+
return topics, nil
176+
}
177+
178+
func (ps *pubsub) checkTopic(topic, id string, err error) (map[string]subscription, error) {
179+
// Check topic
180+
s, ok := ps.subscriptions[topic]
181+
switch ok {
182+
case true:
183+
// Check topic ID
184+
if _, ok := s[id]; ok {
185+
return map[string]subscription{}, err
186+
}
187+
default:
188+
s = make(map[string]subscription)
189+
ps.subscriptions[topic] = s
190+
}
191+
return s, nil
192+
}
193+
194+
func (ps *pubsub) configReader(id, topic string, s map[string]subscription, handler messaging.MessageHandler) {
195+
reader := kafka.NewReader(kafka.ReaderConfig{
196+
Brokers: []string{ps.url},
197+
GroupID: id,
198+
Topic: topic,
199+
StartOffset: offset,
200+
})
201+
go func() {
202+
for {
203+
message, err := reader.ReadMessage(context.Background())
204+
if err != nil {
205+
break
206+
}
207+
ps.handle(message, handler)
208+
}
209+
}()
210+
s[id] = subscription{
211+
Reader: reader,
212+
cancel: handler.Cancel,
182213
}
183-
return topic
184214
}

0 commit comments

Comments
 (0)