Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/go.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ jobs:

strategy:
matrix:
go: [ "1.20", "1.21" ]
go: [ "1.21", "1.22" ]

steps:
- name: Checkout code
uses: actions/checkout@v3
uses: actions/checkout@v4

- name: Setup Go
uses: actions/setup-go@v3
uses: actions/setup-go@v4
with:
go-version: ${{ matrix.go }}

Expand Down
2 changes: 1 addition & 1 deletion api/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (
JSApiRequestNextT = "$JS.API.CONSUMER.MSG.NEXT.%s.%s"
JSApiRequestNext = "$JS.API.CONSUMER.MSG.NEXT.*.*"
JSApiConsumerLeaderStepDownT = "$JS.API.CONSUMER.LEADER.STEPDOWN.%s.%s"
JSApiconsumerPauseT = "$JS.API.CONSUMER.PAUSE.%s.%s"
JSApiConsumerPauseT = "$JS.API.CONSUMER.PAUSE.%s.%s"
JSMetricConsumerAckPre = JSMetricPrefix + ".CONSUMER.ACK"
JSAdvisoryConsumerMaxDeliveryExceedPre = JSAdvisoryPrefix + ".CONSUMER.MAX_DELIVERIES"
)
Expand Down
18 changes: 18 additions & 0 deletions api/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const (
JSApiMsgDeleteT = "$JS.API.STREAM.MSG.DELETE.%s"
JSApiMsgGetT = "$JS.API.STREAM.MSG.GET.%s"
JSApiMsgGet = "$JS.API.STREAM.MSG.GET.*"
JSDirectMsgGetT = "$JS.API.DIRECT.GET.%s"
JSDirectMsgGet = "$JS.API.DIRECT.GET.*"
JSApiStreamSnapshotT = "$JS.API.STREAM.SNAPSHOT.%s"
JSApiStreamSnapshot = "$JS.API.STREAM.SNAPSHOT.*"
JSApiStreamRestoreT = "$JS.API.STREAM.RESTORE.%s"
Expand Down Expand Up @@ -180,6 +182,22 @@ type JSApiMsgGetRequest struct {
Seq uint64 `json:"seq,omitempty"`
LastFor string `json:"last_by_subj,omitempty"`
NextFor string `json:"next_by_subj,omitempty"`

// Batch support. Used to request more then one msg at a time.
// Can be used with simple starting seq, but also NextFor with wildcards.
Batch int `json:"batch,omitempty"`
// This will make sure we limit how much data we blast out. If not set we will
// inherit the slow consumer default max setting of the server. Default is MAX_PENDING_SIZE.
MaxBytes int `json:"max_bytes,omitempty"`
// Return messages as of this start time.
StartTime *time.Time `json:"start_time,omitempty"`

// Multiple response support. Will get the last msgs matching the subjects. These can include wildcards.
MultiLastFor []string `json:"multi_last,omitempty"`
// Only return messages up to this sequence. If not set, will be last sequence for the stream.
UpToSeq uint64 `json:"up_to_seq,omitempty"`
// Only return messages up to this time.
UpToTime *time.Time `json:"up_to_time,omitempty"`
}

// io.nats.jetstream.api.v1.stream_snapshot_response
Expand Down
12 changes: 10 additions & 2 deletions consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,14 @@ func (c *Consumer) NextSubject() string {
return s
}

func DirectSubject(stream string) (string, error) {
if !IsValidName(stream) {
return "", fmt.Errorf("%q is not a valid stream name", stream)
}

return fmt.Sprintf(api.JSDirectMsgGetT, stream), nil
}

// NextSubject returns the subject used to retrieve the next message for pull-based Consumers, empty when not a pull-base consumer
func NextSubject(stream string, consumer string) (string, error) {
if !IsValidName(stream) {
Expand Down Expand Up @@ -904,7 +912,7 @@ func (c *Consumer) Pause(deadline time.Time) (*api.JSApiConsumerPauseResponse, e
PauseUntil: deadline,
}

err := c.mgr.jsonRequest(fmt.Sprintf(api.JSApiconsumerPauseT, c.StreamName(), c.Name()), &req, &resp)
err := c.mgr.jsonRequest(fmt.Sprintf(api.JSApiConsumerPauseT, c.StreamName(), c.Name()), &req, &resp)
if err != nil {
return nil, err
}
Expand All @@ -920,7 +928,7 @@ func (c *Consumer) Pause(deadline time.Time) (*api.JSApiConsumerPauseResponse, e
func (c *Consumer) Resume() error {
var resp *api.JSApiConsumerPauseResponse

err := c.mgr.jsonRequest(fmt.Sprintf(api.JSApiconsumerPauseT, c.StreamName(), c.Name()), nil, &resp)
err := c.mgr.jsonRequest(fmt.Sprintf(api.JSApiConsumerPauseT, c.StreamName(), c.Name()), nil, &resp)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/nats-io/jsm.go

go 1.20
go 1.21

require (
github.com/dustin/go-humanize v1.0.1
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/expr-lang/expr v1.16.1 h1:Na8CUcMdyGbnNpShY7kzcHCU7WqxuL+hnxgHZ4vaz/A=
Expand All @@ -25,7 +26,9 @@ github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDm
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
Expand Down
5 changes: 5 additions & 0 deletions natscontext/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,11 @@ func (c *Context) certStoreNatsOptions() ([]nats.Option, error) {
return nil, err
}

if tlsc.ClientCAs != nil {
tlsc.RootCAs = tlsc.ClientCAs
tlsc.ClientCAs = nil
}

// if no ca match was given but we have CA as a file lets pull in that file here
if len(c.config.WinCertStoreCaMatch) == 0 && c.config.CA != "" {
rootCAs, _ := x509.SystemCertPool()
Expand Down
27 changes: 27 additions & 0 deletions schema_source/jetstream/api/v1/stream_msg_get_request.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,33 @@
"next_by_subj": {
"type": "string",
"description": "Combined with sequence gets the next message for a subject with the given sequence or higher"
},
"batch": {
"type": "integer",
"description": "Request a number of messages to be delivered"
},
"max_bytes": {
"type": "integer",
"description": "Restrict batch get to a certain maximum cumulative bytes, defaults to server MAX_PENDING_SIZE"
},
"start_time": {
"description": "Start the batch at a certain point in time rather than sequence",
"$ref": "definitions.json#/definitions/golang_time"
},
"multi_last": {
"description": "Get the last messages from the supplied subjects",
"type": "array",
"items": {
"type": "string"
}
},
"up_to_seq": {
"description": "Returns messages up to this sequence otherwise last sequence for the stream",
"type": "integer"
},
"up_to_time": {
"description": "Only return messages up to a point in time",
"$ref": "definitions.json#/definitions/golang_time"
}
}
}
5 changes: 5 additions & 0 deletions schemas/jetstream/advisory/v1/domain_leader_elected.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@
"type": "boolean",
"default": false
},
"observer": {
"description": "Indicates if the server is running as an observer only",
"type": "boolean",
"default": false
},
"active": {
"description": "Nanoseconds since this peer was last seen",
"type": "number"
Expand Down
31 changes: 31 additions & 0 deletions schemas/jetstream/api/v1/stream_msg_get_request.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,37 @@
"next_by_subj": {
"type": "string",
"description": "Combined with sequence gets the next message for a subject with the given sequence or higher"
},
"batch": {
"type": "integer",
"description": "Request a number of messages to be delivered"
},
"max_bytes": {
"type": "integer",
"description": "Restrict batch get to a certain maximum cumulative bytes, defaults to server MAX_PENDING_SIZE"
},
"start_time": {
"description": "Start the batch at a certain point in time rather than sequence",
"$comment": "A point in time in RFC3339 format including timezone, though typically in UTC",
"type": "string",
"format": "date-time"
},
"multi_last": {
"description": "Get the last messages from the supplied subjects",
"type": "array",
"items": {
"type": "string"
}
},
"up_to_seq": {
"description": "Returns messages up to this sequence otherwise last sequence for the stream",
"type": "integer"
},
"up_to_time": {
"description": "Only return messages up to a point in time",
"$comment": "A point in time in RFC3339 format including timezone, though typically in UTC",
"type": "string",
"format": "date-time"
}
}
}
105 changes: 105 additions & 0 deletions streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ package jsm

import (
"context"
"encoding/json"
"errors"
"fmt"
"math"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -742,6 +745,108 @@ func (s *Stream) LeaderStepDown() error {
return nil
}

// DirectGet performs a direct get against the stream, supports Batch and Multi Subject behaviors
func (s *Stream) DirectGet(ctx context.Context, req api.JSApiMsgGetRequest, handler func(msg *nats.Msg)) (numPending uint64, lastSeq uint64, upToSeq uint64, err error) {
if !s.DirectAllowed() {
return 0, 0, 0, fmt.Errorf("direct gets are not enabled for %s", s.Name())
}
if req.Batch == 0 {
return 0, 0, 0, fmt.Errorf("batch size is required")
}
if req.Seq == 0 {
req.Seq = 1
}

rj, err := json.Marshal(req)
if err != nil {
return 0, 0, 0, err
}

nc := s.mgr.nc

to, cancel := context.WithCancelCause(ctx)
defer cancel(nil)

timer := time.AfterFunc(s.mgr.timeout, func() {
cancel(fmt.Errorf("timeout waiting for messages"))
})
defer timer.Stop()

sub, err := nc.Subscribe(nc.NewRespInbox(), func(m *nats.Msg) {
var err error

// move the timeout forward by 1 x timeout after getting any messages
timer.Reset(s.mgr.timeout)

switch m.Header.Get("Status") {
case "204": // end of batch
ls := m.Header.Get("Nats-Last-Sequence")
upTo := m.Header.Get("Nats-UpTo-Sequence")
if ls != "" {
lastSeq, err = strconv.ParseUint(ls, 10, 64)
if err != nil {
cancel(fmt.Errorf("invalid last sequence: %w", err))
}
}

if upTo != "" {
upToSeq, err = strconv.ParseUint(upTo, 10, 64)
if err != nil {
cancel(fmt.Errorf("invalid up-to sequence: %w", err))
}
}

cancel(nil)
return

case "404": // not found
cancel(fmt.Errorf("no messages found matching request"))
return

case "408": // invalid requests
cancel(fmt.Errorf("invalid request"))
return

case "413": // too many subjects
cancel(fmt.Errorf("too many subjects requested"))
return
}

np := m.Header.Get("Nats-Num-Pending")
if np == "" {
cancel(fmt.Errorf("server does not support batch requests"))
return
}

handler(m)
})
if err != nil {
return 0, 0, 0, err
}
defer sub.Unsubscribe()

msg := nats.NewMsg(s.DirectSubject())
msg.Data = rj
msg.Reply = sub.Subject

err = nc.PublishMsg(msg)

<-to.Done()

// if we got canceled without a error its just normal, like on EOB
err = context.Cause(to)
if errors.Is(err, context.Canceled) {
err = nil
}

return numPending, lastSeq, upToSeq, err
}

// DirectSubject is the subject to perform direct gets against
func (s *Stream) DirectSubject() string {
return fmt.Sprintf(api.JSDirectMsgGetT, s.Name())
}

// DetectGaps detects interior deletes in a stream, reports progress through the stream and each found gap.
//
// It uses the extended stream info to get the sequences and use that to detect gaps. The Deleted information
Expand Down
55 changes: 55 additions & 0 deletions streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1072,6 +1072,61 @@ func TestStream_DetectGaps(t *testing.T) {
}
}

func TestStream_DirectGet(t *testing.T) {
srv, nc, mgr := startJSServer(t)
defer srv.Shutdown()
defer nc.Flush()

s, err := mgr.NewStream("m1", jsm.Subjects("test.*"), jsm.AllowDirect())
checkErr(t, err, "create failed")

for i := 1; i <= 1000; i++ {
_, err = nc.Request(fmt.Sprintf("test.%d", i%5), []byte(fmt.Sprintf("%d", i)), time.Second)
checkErr(t, err, "publish failed")
}

check := func(t *testing.T, req api.JSApiMsgGetRequest, expectMatched, expectNumPending, expectLastSeq, expectUpToSeq uint64) {
t.Helper()

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

matched := uint64(0)
numPending, lastSeq, upToSeq, err := s.DirectGet(ctx, req, func(m *nats.Msg) {
matched++
})
checkErr(t, err, "Request failed")

if matched != expectMatched {
t.Fatalf("expected %d messages, got %d", expectMatched, matched)
}
if numPending != expectNumPending {
t.Fatalf("expected numPending %d got %d", expectNumPending, numPending)
}
if lastSeq != expectLastSeq {
t.Fatalf("expected lastSeq %d got %d", expectLastSeq, lastSeq)
}
if upToSeq != expectUpToSeq {
t.Fatalf("expected upToSeq %d got %d", expectUpToSeq, upToSeq)
}
}

cases := []struct {
name string
request api.JSApiMsgGetRequest
matched, numPending, lastSeq, upToSeq uint64
}{
{"Batch", api.JSApiMsgGetRequest{Batch: 100, Seq: 1}, 100, 0, 100, 0},
{"Multi Batch", api.JSApiMsgGetRequest{Batch: 100, Seq: 1, MultiLastFor: []string{">"}}, 5, 0, 1000, 1000},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
check(t, tc.request, tc.matched, tc.numPending, tc.lastSeq, tc.upToSeq)
})
}
}

func TestStreamRepublish(t *testing.T) {
srv, nc, mgr := startJSServer(t)
defer srv.Shutdown()
Expand Down