wkafka is a wrapper for kafka library to initialize and use for microservices.
go get github.com/worldline-go/wkafkaThis library is using franz-go.
First set the connection config to create a new kafka client.
Main config struct that contains brokers, security settings and consumer validation.
brokers: # list of brokers, default is empty
- localhost:9092
security:
tls:
enabled: false
cert_file: ""
key_file: ""
ca_file: ""
sasl: # SASL/SCRAM authentication could be multiple and will be used in order
- plain:
enabled: false
user: ""
pass: ""
scram:
enabled: false
algorithm: "" # "SCRAM-SHA-256" or "SCRAM-SHA-512"
user: ""
pass: ""
consumer: # consumer validation and default values
prefix_group_id: "" # add always a prefix to group id
format_dlq_topic: "" # format dead letter topic name, ex: "finops_{{.AppName}}_dlq"
validation:
group_id: # validate group id
enabled: false
rgx_group_id: "" # regex to validate group id ex: "^finops_.*$"
plugins: # additional configuration for plugins
plugin_name: # plugin name
key: value # plugin's key value configurationFor creating a consumer we need to give additional consumer config when initializing the client.
topics: [] # list of topics to subscribe
group_id: "" # group id to subscribe, make is as unique as possible per service
# start offset to consume, 0 is the earliest offset, -1 is the latest offset and more than 0 is the offset number
# group_id has already committed offset then this will be ignored
start_offset: 0 # -1 to start end of the offsets
skip: # this is programatically skip, kafka will still consume the message
# example skip topic and offset
mytopic: # topic name to skip
0: # partition number
offsets: # list of offsets to skip
- 31
- 90
before: 20 # skip all offsets before or equal to this offset
# max records to consume per poll, 0 is default value which is not checking
# use this option with block_rebalance
max_poll_records: 0
# settings for default decoder (JSON)
decoder:
skip_invalid: true # skip invalid messages that can't be decoded; default is true
# flag to block rebalance while pulled messages on process
# use with block_rebalance_timeout and max_poll_records
block_rebalance: false
# only works with block_rebalance option
# DLQ consumer does not use block rebalance
block_rebalance_timeout: 60s # timeout to block rebalance, default is 60 seconds
# max records to consume per batch to give callback function, default is 100
# on concurrent this is the size of the each sub group, also check run_size option
batch_count: 100
concurrent:
enabled: false # enable concurrent processing of messages
process: 10 # max concurrent processing, default is 10
min_size: 1 # minimum size of the bucket for merging multiple bucket, default is 1
run_size: 0 # size of the group to start processing, default is 0 which means same as batch_count
type: "key" # type of grouping records to process, can be "mix", "partition", "key"; default is "key"
dlq:
disabled: false # disable dead letter queue
topic: "" # dead letter topic name, it can be assigned in the kafka config's format_dlq_topic
retry_interval: "10s" # retry time interval of the message if can't be processed, default is 10s
retry_max_interval: "15m" # max interval for exponential time duration limit, default is 15m
start_offset: 0 # -1 to start end of the offsets
skip: # same as skip but just for dead letter topic and not need to specify topic name
# example skip offset
0:
offsets:
- 31
before: 20Always give the client information so we can view in publish message's headers and kafka UI.
client, err := wkafka.New(
ctx, kafkaConfig,
wkafka.WithConsumer(consumeConfig),
wkafka.WithClientInfo("testapp", "v0.1.0"),
)
if err != nil {
return err
}
defer client.Close()Now you need to run consumer with a callback function.
There is 2 options to run consumer, batch or single (WithCallbackBatch or WithCallback).
Default decoder is json, but you can change it with WithDecode option.
If you use []byte as data type then raw data will be passed to the callback function, batch consumer like [][]byte type.
// example single consumer
if err := client.Consume(ctx, wkafka.WithCallback(ProcessSingle)); err != nil {
return fmt.Errorf("consume: %w", err)
}Send record to dead letter queue, use WrapErrDLQ function with to wrap the error and it will be send to dead letter queue.
Check the aditional options for custom decode and precheck.
Use consumer client or create without consumer settings, New also try to connect to brokers.
client, err := wkafka.New(kafkaConfig)
if err != nil {
return err
}
defer client.Close()Create a producer based of client and specific data type.
WithHook, WithEncoder, WithHeaders options are optional.
Use WithHook to get metadata of the record and modify to produce record.
producer, err := wkafka.NewProducer[*Data](client, "test", wkafka.WithHook(ProduceHook))
if err != nil {
return err
}
return producer.Produce(ctx, data)import (
"github.com/worldline-go/wkafka/handler"
)Editing the skip map and use our handler to initialize server mux.
Redislike API with support channel need for request goes to other instances.
In the kafka config add the plugins section with handler plugin.
plugins:
handler:
enabled: true # enable plugins
addr: ":17070" # address to listen, default is ":17070"
pubsub:
prefix: "finops_", # prefix for pubsub channels, default is empty
redis:
address: "localhost:6379" # address for redis like APIThan enable the handler in the kafka client initialization.
wkafka.WithPlugin(handler.PluginWithName()),Or you can do it manually:
// import github.com/worldline-go/wkafka/handler
mux := http.NewServeMux()
mux.Handle(handler.New(ctx, client, handler.Config{}.ToOption()))It will serve default on /wkafka/ui path.
Handler Example
make env
# run the example
EXAMPLE=consumer_single_handler make exampleAdd messages in here to skip the message http://localhost:7071
go get github.com/twmb/franz-go/plugin/kotelUse that with initializing the kafka client.
kafkaTracer := kotel.NewTracer()
kafkaClient, err = wkafka.New(ctx,
config.Application.KafkaConfig,
wkafka.WithConsumer(config.Application.KafkaConsumer),
wkafka.WithClientInfo(config.ServiceName, config.ServiceVersion),
wkafka.WithKGOOptions(kgo.WithHooks(kotel.NewKotel(kotel.WithTracer(kafkaTracer)).Hooks()...)),
)Important to have span kind as producer.
ctx, spanKafka := otel.Tracer("").Start(ctx, "produce_message", trace.WithSpanKind(trace.SpanKindProducer))
defer spanKafka.End()
if err := h.KafkaProducer.Produce(ctx, product); err != nil {
spanKafka.SetStatus(codes.Error, err.Error())
return c.JSON(http.StatusBadRequest, model.Message{
Message: err.Error(),
})
}k.Tracer is we initialized on kafka client (kotel.NewTracer()).
func (k *Kafka) Consume(ctx context.Context, product model.Product) error {
// use tracer's returned ctx for next spans
ctx, span := k.Tracer.WithProcessSpan(wkafka.CtxRecord(ctx))
defer span.End()
span.SetAttributes(attribute.String("product.name", product.Name))
log.Info().Str("product", product.Name).Str("description", product.Description).Msg("consume message")
return nil
}Initialize kafka and redpanda console with docker-compose.
# using "docker compose" command, if you use podman then add compose extension and link docker with podman binary
make env| Service | Description |
|---|---|
| localhost:9092 | Kafka broker for connect |
| http://localhost:7071 | Redpanda console |
| http://localhost:3000 | Grafana |
| http://localhost:8080/wkafka/ui | Handler UI |
Use examples with EXAMPLE env variable:
EXAMPLE=... make example