Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
b40c300
feat(plugin): add prototype NATS.io plugin for sender subsystem
muhammad-asghar-ali Oct 4, 2025
1d29663
refactor(nats.io): remove unused method from NATS plugin interface
muhammad-asghar-ali Oct 4, 2025
2ab6202
fix(nats.io): prevent panic on closed channel in NATS.io mock client …
muhammad-asghar-ali Oct 4, 2025
d187fa7
refactor(nats.io): create dedicated NATS.io connection per worker for…
muhammad-asghar-ali Oct 9, 2025
38e2978
feat(NATS.io): use nats:///subject format to remove misleading host info
muhammad-asghar-ali Oct 16, 2025
eea2f9b
fix(): improve code coverage
muhammad-asghar-ali Oct 16, 2025
578cdfa
fix(): golangci-lint issues
muhammad-asghar-ali Oct 16, 2025
c545554
Merge branch 'main' into nats-plugin
muhammad-asghar-ali Oct 31, 2025
8a73138
refactor(): convert to use base plugin pattern
muhammad-asghar-ali Oct 31, 2025
d4015c1
fix(): pointer addr
muhammad-asghar-ali Oct 31, 2025
d53e392
Merge branch 'resonatehq:main' into main
muhammad-asghar-ali Nov 1, 2025
551f672
Merge branch 'main' into nats-plugin
muhammad-asghar-ali Nov 1, 2025
9dbd616
chore(): add head in nats plugin process func
muhammad-asghar-ali Nov 1, 2025
2152d44
fix(): pass correct args to process
muhammad-asghar-ali Nov 1, 2025
3107adc
Merge branch 'main' into nats-plugin
muhammad-asghar-ali Nov 4, 2025
9e672df
Merge branch 'main' into nats-plugin
muhammad-asghar-ali Nov 9, 2025
b983d37
sync with main
muhammad-asghar-ali Dec 23, 2025
32df92c
chore(): go mod tidy
muhammad-asghar-ali Dec 23, 2025
6484dd5
feat(): add NATS subsystem and plugin implementation with sender support
muhammad-asghar-ali Dec 25, 2025
2c65de2
chore(nats): remove the nats from api to remove the complexity
muhammad-asghar-ali Dec 26, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cmd/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/resonatehq/resonate/internal/aio"
"github.com/resonatehq/resonate/internal/api"
httpPlugin "github.com/resonatehq/resonate/internal/app/plugins/http"
"github.com/resonatehq/resonate/internal/app/plugins/nats"
"github.com/resonatehq/resonate/internal/app/plugins/poll"
"github.com/resonatehq/resonate/internal/app/plugins/sqs"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/echo"
Expand Down Expand Up @@ -216,6 +217,14 @@ func (c *Config) AIOPlugins(a aio.AIO, metrics *metrics.Metrics) ([]aio.Plugin,

plugins = append(plugins, plugin)
}
if c.AIO.Subsystems.Sender.Config.Plugins.NATS.Enabled {
plugin, err := nats.New(a, metrics, &c.AIO.Subsystems.Sender.Config.Plugins.NATS.Config)
if err != nil {
return nil, "", err
}

plugins = append(plugins, plugin)
}

return plugins, pollAddr, nil
}
Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/lib/pq v1.10.9
github.com/mattn/go-sqlite3 v1.14.32
github.com/mitchellh/mapstructure v1.5.0
github.com/nats-io/nats.go v1.46.1
github.com/oapi-codegen/runtime v1.1.2
github.com/prometheus/client_golang v1.23.2
github.com/resonatehq/gocoro v0.0.0-20240928015848-78539a59dab0
Expand Down Expand Up @@ -56,12 +57,15 @@ require (
github.com/goccy/go-yaml v1.18.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nats-io/nkeys v0.4.11 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pelletier/go-toml/v2 v2.2.4 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/nats-io/nats.go v1.46.1 h1:bqQ2ZcxVd2lpYI97xYASeRTY3I5boe/IVmuUDPitHfo=
github.com/nats-io/nats.go v1.46.1/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/oapi-codegen/runtime v1.1.2 h1:P2+CubHq8fO4Q6fV1tqDBZHCwpVpvPg7oKiYzQgXIyI=
github.com/oapi-codegen/runtime v1.1.2/go.mod h1:SK9X900oXmPWilYR5/WKPzt3Kqxn/uS/+lbpREv+eCg=
github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4=
Expand Down
201 changes: 201 additions & 0 deletions internal/app/plugins/nats/nats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package nats

import (
"context"
"encoding/json"
"fmt"
"log/slog"
"strconv"
"time"

natsgo "github.com/nats-io/nats.go"

"github.com/resonatehq/resonate/internal/aio"
"github.com/resonatehq/resonate/internal/kernel/t_aio"
"github.com/resonatehq/resonate/internal/metrics"
)

type Config struct {
Size int `flag:"size" desc:"submission buffered channel size" default:"1000"`
Workers int `flag:"workers" desc:"number of workers" default:"4"`
Timeout time.Duration `flag:"timeout" desc:"nats request timeout" default:"30s"`
TimeToRetry time.Duration `flag:"ttr" desc:"time to wait before resending" default:"15s"`
TimeToClaim time.Duration `flag:"ttc" desc:"time to wait for claim before resending" default:"1m"`
}

type Client interface {
Publish(subject string, data []byte) error
Close()
}

type Worker struct {
id int
sq <-chan *aio.Message
timeout time.Duration
aio aio.AIO
metrics *metrics.Metrics
config *Config
client Client
}

type NATS struct {
sq chan *aio.Message
workers []*Worker
}

type Addr struct {
URL string `json:"url"`
Subject string `json:"subject"`
}

func New(a aio.AIO, metrics *metrics.Metrics, config *Config) (*NATS, error) {
return NewWithClient(a, metrics, config, nil)
}

func NewWithClient(a aio.AIO, metrics *metrics.Metrics, config *Config, client Client) (*NATS, error) {
sq := make(chan *aio.Message, config.Size)
workers := make([]*Worker, config.Workers)

for i := 0; i < config.Workers; i++ {
workers[i] = &Worker{
id: i,
sq: sq,
timeout: config.Timeout,
aio: a,
metrics: metrics,
config: config,
client: client,
}
}

return &NATS{
sq: sq,
workers: workers,
}, nil
}

func (p *NATS) String() string {
return fmt.Sprintf("%s:nats", t_aio.Sender.String())
}

func (p *NATS) Type() string {
return "nats"
}

func (p *NATS) Start(chan<- error) error {
for _, worker := range p.workers {
go worker.Start()
}

return nil
}

func (p *NATS) Stop() error {
if p.sq != nil {
close(p.sq)
}
return nil
}

func (p *NATS) Enqueue(msg *aio.Message) bool {
if p.sq == nil || msg == nil {
return false
}

select {
case p.sq <- msg:
return true
default:
return false
}
}

func (w *Worker) String() string {
return fmt.Sprintf("%s:nats", t_aio.Sender.String())
}

func (w *Worker) Start() {
counter := w.metrics.AioWorkerInFlight.WithLabelValues(w.String(), strconv.Itoa(w.id))
w.metrics.AioWorker.WithLabelValues(w.String()).Inc()
defer w.metrics.AioWorker.WithLabelValues(w.String()).Dec()

for {
msg, ok := <-w.sq
if !ok {
return
}

counter.Inc()
success, err := w.Process(msg.Body, msg.Addr)
if err != nil {
slog.Warn("failed to send task", "err", err)
}

msg.Done(&t_aio.SenderCompletion{
Success: success,
TimeToRetry: w.config.TimeToRetry.Milliseconds(),
TimeToClaim: w.config.TimeToClaim.Milliseconds(),
})
counter.Dec()
}
}

func (w *Worker) Process(body []byte, data []byte) (bool, error) {
var addr Addr
if err := json.Unmarshal(data, &addr); err != nil {
return false, err
}

// TODO - need to find the best approach to handle this
if addr.URL == "" {
return false, fmt.Errorf("missing URL")
}
if addr.Subject == "" {
return false, fmt.Errorf("missing subject")
}

client := w.client
if client == nil {
opts := []natsgo.Option{
natsgo.Timeout(w.timeout),
natsgo.RetryOnFailedConnect(false), // Disable retry for timeout testing
natsgo.PingInterval(20 * time.Second),
natsgo.MaxPingsOutstanding(2),
}

nc, err := natsgo.Connect(addr.URL, opts...)
if err != nil {
return false, err
}
client = nc
defer nc.Close()
}

// If we have a client (including mock clients), use it directly
if w.client != nil {
err := client.Publish(addr.Subject, body)
if err != nil {
return false, err
}
return true, nil
}

// Only use timeout logic for real NATS connections
ctx, cancel := context.WithTimeout(context.Background(), w.timeout)
defer cancel()

done := make(chan error, 1)
go func() {
done <- client.Publish(addr.Subject, body)
}()

select {
case err := <-done:
if err != nil {
return false, err
}
return true, nil
case <-ctx.Done():
return false, ctx.Err()
}
}
Loading