Skip to content

Commit 9290bb2

Browse files
authored
fix: shutdown workers on SIGTERM or SIGINT (#92)
1 parent 73bcbce commit 9290bb2

File tree

13 files changed

+225
-211
lines changed

13 files changed

+225
-211
lines changed

.github/workflows/test.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ jobs:
1717
- name: Setup Go
1818
uses: actions/setup-go@v2
1919
with:
20-
go-version: 1.21
20+
go-version: 1.23
2121
- name: Check License Headers
2222
run: make license-header-check
2323
- name: Run Tests

go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,5 +220,4 @@ require (
220220
go-simpler.org/musttag v0.12.2 // indirect
221221
go-simpler.org/sloglint v0.7.2 // indirect
222222
go.uber.org/automaxprocs v1.5.3 // indirect
223-
go.uber.org/goleak v1.3.0 // indirect
224223
)

go.sum

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,8 @@ github.com/fatih/structtag v1.2.0 h1:/OdNE99OxoI/PqaW/SuSK9uxxT3f/tcSZgon/ssNSx4
147147
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
148148
github.com/firefart/nonamedreturns v1.0.5 h1:tM+Me2ZaXs8tfdDw3X6DOX++wMCOqzYUho6tUTYIdRA=
149149
github.com/firefart/nonamedreturns v1.0.5/go.mod h1:gHJjDqhGM4WyPt639SOZs+G89Ko7QKH5R5BhnO6xJhw=
150-
github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE=
151-
github.com/frankban/quicktest v1.14.3/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps=
150+
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
151+
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
152152
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
153153
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
154154
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
@@ -424,8 +424,6 @@ github.com/nishanths/exhaustive v0.12.0 h1:vIY9sALmw6T/yxiASewa4TQcFsVYZQQRUQJhK
424424
github.com/nishanths/exhaustive v0.12.0/go.mod h1:mEZ95wPIZW+x8kC4TgC+9YCUgiST7ecevsVDTgc2obs=
425425
github.com/nishanths/predeclared v0.2.2 h1:V2EPdZPliZymNAn79T8RkNApBjMmVKh5XRpLm/w98Vk=
426426
github.com/nishanths/predeclared v0.2.2/go.mod h1:RROzoN6TnGQupbC+lqggsOlcgysk3LMK/HI84Mp280c=
427-
github.com/nitrictech/nitric/core v0.0.0-20240913000004-5d21c28b00ba h1:ZIPl9waqhbqw3xB2+zpUI2T1kEHyMkOnZZMt6tdrEUM=
428-
github.com/nitrictech/nitric/core v0.0.0-20240913000004-5d21c28b00ba/go.mod h1:4LQH9hea9rX+0A+8G47NRk5nZuXCDqiwci1aZsHAkU8=
429427
github.com/nitrictech/nitric/core v0.0.0-20241003062412-76ea6275fb0b h1:ImQFk66gRM3v9A6qmPImOiV3HJMDAX93X5rplMKn6ok=
430428
github.com/nitrictech/nitric/core v0.0.0-20241003062412-76ea6275fb0b/go.mod h1:9bQnYPqLzq8CcPk5MHT3phg19CWJhDlFOfdIv27lwwM=
431429
github.com/nitrictech/protoutils v0.0.0-20220321044654-02667a814cdf h1:8MB8W8ylM8sCM2COGfiO39/tB6BTdiawLszaUGCNL5w=
@@ -481,8 +479,7 @@ github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:
481479
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
482480
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
483481
github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
484-
github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
485-
github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI=
482+
github.com/prometheus/client_model v0.6.0 h1:k1v3CzpSRUTrKMppY35TLwPvxHqBu0bYgxZzqGIgaos=
486483
github.com/prometheus/client_model v0.6.0/go.mod h1:NTQHnmxFpouOD0DpvP4XujX3CdOAGQPoaGhyTchlyt8=
487484
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
488485
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
@@ -643,8 +640,7 @@ go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
643640
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
644641
go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
645642
go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
646-
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
647-
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
643+
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
648644
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
649645
go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8=
650646
go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=

licenses.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package main
2020
import (
2121
_ "github.com/nitrictech/go-sdk/nitric"
2222
_ "github.com/nitrictech/go-sdk/nitric/apis"
23+
_ "github.com/nitrictech/go-sdk/nitric/batch"
2324
_ "github.com/nitrictech/go-sdk/nitric/errors"
2425
_ "github.com/nitrictech/go-sdk/nitric/keyvalue"
2526
_ "github.com/nitrictech/go-sdk/nitric/queues"

nitric/apis/workers.go renamed to nitric/apis/worker.go

Lines changed: 26 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ package apis
1717
import (
1818
"context"
1919
errorsstd "errors"
20-
"io"
2120

2221
grpcx "github.com/nitrictech/go-sdk/internal/grpc"
2322
"github.com/nitrictech/go-sdk/internal/handlers"
@@ -40,54 +39,48 @@ type apiWorkerOpts struct {
4039

4140
var _ workers.StreamWorker = (*apiWorker)(nil)
4241

43-
// Start implements Worker.
42+
// Start runs the API worker, creating a stream to the Nitric server
4443
func (a *apiWorker) Start(ctx context.Context) error {
4544
initReq := &v1.ClientMessage{
4645
Content: &v1.ClientMessage_RegistrationRequest{
4746
RegistrationRequest: a.registrationRequest,
4847
},
4948
}
5049

51-
stream, err := a.client.Serve(ctx)
52-
if err != nil {
53-
return err
54-
}
55-
56-
err = stream.Send(initReq)
57-
if err != nil {
58-
return err
50+
createStream := func(ctx context.Context) (workers.Stream[v1.ClientMessage, v1.RegistrationResponse, *v1.ServerMessage], error) {
51+
return a.client.Serve(ctx)
5952
}
6053

61-
for {
62-
var ctx *Ctx
63-
64-
resp, err := stream.Recv()
65-
66-
if errorsstd.Is(err, io.EOF) {
67-
err = stream.CloseSend()
68-
if err != nil {
69-
return err
70-
}
54+
handlerSrvMsg := func(msg *v1.ServerMessage) (*v1.ClientMessage, error) {
55+
if msg.GetRegistrationResponse() != nil {
56+
// No need to respond to the registration response
57+
return nil, nil
58+
}
7159

72-
return nil
73-
} else if err == nil && resp.GetRegistrationResponse() != nil {
74-
// There is no need to respond to the registration response
75-
} else if err == nil && resp.GetHttpRequest() != nil {
76-
ctx = NewCtx(resp)
60+
if msg.GetHttpRequest() != nil {
61+
handlerCtx := NewCtx(msg)
7762

78-
err = a.Handler(ctx)
63+
err := a.Handler(handlerCtx)
7964
if err != nil {
80-
ctx.WithError(err)
65+
handlerCtx.WithError(err)
8166
}
8267

83-
err = stream.Send(ctx.ToClientMessage())
84-
if err != nil {
85-
return err
86-
}
87-
} else {
88-
return err
68+
return handlerCtx.ToClientMessage(), nil
8969
}
70+
71+
return nil, errors.NewWithCause(
72+
codes.Internal,
73+
"ApiWorker: Unhandled server message",
74+
errorsstd.New("unhandled server message"),
75+
)
9076
}
77+
78+
return workers.HandleStream(
79+
ctx,
80+
createStream,
81+
initReq,
82+
handlerSrvMsg,
83+
)
9184
}
9285

9386
func newApiWorker(opts *apiWorkerOpts) *apiWorker {

nitric/batch/batch_workers.go renamed to nitric/batch/worker.go

Lines changed: 18 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ package batch
1616

1717
import (
1818
"context"
19-
"io"
2019

2120
"google.golang.org/grpc"
2221

@@ -25,6 +24,7 @@ import (
2524
"github.com/nitrictech/go-sdk/constants"
2625
"github.com/nitrictech/go-sdk/nitric/errors"
2726
"github.com/nitrictech/go-sdk/nitric/errors/codes"
27+
"github.com/nitrictech/go-sdk/nitric/workers"
2828
v1 "github.com/nitrictech/nitric/core/pkg/proto/batch/v1"
2929
)
3030

@@ -38,53 +38,38 @@ type jobWorkerOpts struct {
3838
Handler Handler
3939
}
4040

41-
// Start implements Worker.
41+
// Start runs the Job worker, creating a stream to the Nitric server
4242
func (s *jobWorker) Start(ctx context.Context) error {
4343
initReq := &v1.ClientMessage{
4444
Content: &v1.ClientMessage_RegistrationRequest{
4545
RegistrationRequest: s.registrationRequest,
4646
},
4747
}
4848

49-
// Create the request stream and send the initial request
50-
stream, err := s.client.HandleJob(ctx)
51-
if err != nil {
52-
return err
53-
}
54-
55-
err = stream.Send(initReq)
56-
if err != nil {
57-
return err
49+
createStream := func(ctx context.Context) (workers.Stream[v1.ClientMessage, v1.RegistrationResponse, *v1.ServerMessage], error) {
50+
return s.client.HandleJob(ctx)
5851
}
59-
for {
60-
var ctx *Ctx
6152

62-
resp, err := stream.Recv()
53+
handleSrvMsg := func(msg *v1.ServerMessage) (*v1.ClientMessage, error) {
54+
if msg.GetJobRequest() != nil {
55+
handlerCtx := NewCtx(msg)
6356

64-
if errorsstd.Is(err, io.EOF) {
65-
err = stream.CloseSend()
57+
err := s.handler(handlerCtx)
6658
if err != nil {
67-
return err
59+
handlerCtx.WithError(err)
6860
}
6961

70-
return nil
71-
} else if err == nil && resp.GetRegistrationResponse() != nil {
72-
// Do nothing
73-
} else if err == nil && resp.GetJobRequest() != nil {
74-
ctx = NewCtx(resp)
75-
err = s.handler(ctx)
76-
if err != nil {
77-
ctx.WithError(err)
78-
}
79-
80-
err = stream.Send(ctx.ToClientMessage())
81-
if err != nil {
82-
return err
83-
}
84-
} else {
85-
return err
62+
return handlerCtx.ToClientMessage(), nil
8663
}
64+
65+
return nil, errors.NewWithCause(
66+
codes.Internal,
67+
"JobWorker: Unhandled server message",
68+
errorsstd.New("unhandled server message"),
69+
)
8770
}
71+
72+
return workers.HandleStream(ctx, createStream, initReq, handleSrvMsg)
8873
}
8974

9075
func newJobWorker(opts *jobWorkerOpts) *jobWorker {

nitric/nitric.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,12 @@
1515
package nitric
1616

1717
import (
18+
"context"
19+
"fmt"
20+
"os"
21+
"os/signal"
22+
"syscall"
23+
1824
"github.com/nitrictech/go-sdk/nitric/apis"
1925
"github.com/nitrictech/go-sdk/nitric/batch"
2026
"github.com/nitrictech/go-sdk/nitric/keyvalue"
@@ -42,7 +48,19 @@ var (
4248
)
4349

4450
func Run() {
45-
err := workers.GetDefaultManager().Run()
51+
ctx, cancel := context.WithCancel(context.Background())
52+
defer cancel()
53+
54+
sigChan := make(chan os.Signal, 1)
55+
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)
56+
57+
go func() {
58+
<-sigChan
59+
fmt.Printf("Received signal, shutting down...\n")
60+
cancel()
61+
}()
62+
63+
err := workers.GetDefaultManager().Run(ctx)
4664
if err != nil {
4765
panic(err)
4866
}

nitric/schedules/schedule_workers.go renamed to nitric/schedules/worker.go

Lines changed: 22 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ package schedules
1717
import (
1818
"context"
1919
errorsstd "errors"
20-
"io"
2120

2221
grpcx "github.com/nitrictech/go-sdk/internal/grpc"
2322
"github.com/nitrictech/go-sdk/internal/handlers"
2423
"github.com/nitrictech/go-sdk/nitric/errors"
2524
"github.com/nitrictech/go-sdk/nitric/errors/codes"
25+
"github.com/nitrictech/go-sdk/nitric/workers"
2626
v1 "github.com/nitrictech/nitric/core/pkg/proto/schedules/v1"
2727
)
2828

@@ -36,53 +36,42 @@ type scheduleWorkerOpts struct {
3636
Handler handlers.Handler[Ctx]
3737
}
3838

39-
// Start implements Worker.
39+
// Start runs the Schedule worker, creating a stream to the Nitric server
4040
func (i *scheduleWorker) Start(ctx context.Context) error {
4141
initReq := &v1.ClientMessage{
4242
Content: &v1.ClientMessage_RegistrationRequest{
4343
RegistrationRequest: i.registrationRequest,
4444
},
4545
}
4646

47-
// Create the request stream and send the initial request
48-
stream, err := i.client.Schedule(ctx)
49-
if err != nil {
50-
return err
47+
createStream := func(ctx context.Context) (workers.Stream[v1.ClientMessage, v1.RegistrationResponse, *v1.ServerMessage], error) {
48+
return i.client.Schedule(ctx)
5149
}
5250

53-
err = stream.Send(initReq)
54-
if err != nil {
55-
return err
56-
}
57-
for {
58-
var ctx *Ctx
59-
60-
resp, err := stream.Recv()
61-
62-
if errorsstd.Is(err, io.EOF) {
63-
err = stream.CloseSend()
64-
if err != nil {
65-
return err
66-
}
51+
handlerSrvMsg := func(msg *v1.ServerMessage) (*v1.ClientMessage, error) {
52+
if msg.GetIntervalRequest() != nil {
53+
handlerCtx := NewCtx(msg)
6754

68-
return nil
69-
} else if err == nil && resp.GetRegistrationResponse() != nil {
70-
// There is no need to respond to the registration response
71-
} else if err == nil && resp.GetIntervalRequest() != nil {
72-
ctx = NewCtx(resp)
73-
err = i.handler(ctx)
55+
err := i.handler(handlerCtx)
7456
if err != nil {
75-
ctx.WithError(err)
57+
handlerCtx.WithError(err)
7658
}
7759

78-
err = stream.Send(ctx.ToClientMessage())
79-
if err != nil {
80-
return err
81-
}
82-
} else {
83-
return err
60+
return handlerCtx.ToClientMessage(), nil
8461
}
62+
return nil, errors.NewWithCause(
63+
codes.Internal,
64+
"ScheduleWorker: Unhandled server message",
65+
errorsstd.New("unhandled server message"),
66+
)
8567
}
68+
69+
return workers.HandleStream(
70+
ctx,
71+
createStream,
72+
initReq,
73+
handlerSrvMsg,
74+
)
8675
}
8776

8877
func newScheduleWorker(opts *scheduleWorkerOpts) *scheduleWorker {

0 commit comments

Comments
 (0)