Skip to content

Commit f2908f5

Browse files
committed
feat: Add health checks on integrations and service
Signed-off-by: Loïc Saint-Roch <[email protected]>
1 parent ca47745 commit f2908f5

File tree

10 files changed

+173
-25
lines changed

10 files changed

+173
-25
lines changed

integration/clickhouse/integration.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,3 +44,22 @@ func (conn *connection) Close(ctx context.Context) error {
4444

4545
return nil
4646
}
47+
48+
/*
49+
Status indicates if the integration is able to ping the ClickHouse server or not.
50+
Returns `200` if connection is working, `503` otherwise.
51+
*/
52+
func (conn *connection) Status(ctx context.Context) (int, error) {
53+
stack := errorstack.New("Integration is not in a healthy state", errorstack.WithIntegration(identifier))
54+
55+
err := conn.client.Ping(ctx)
56+
if err != nil {
57+
stack.WithValidations(errorstack.Validation{
58+
Message: err.Error(),
59+
})
60+
61+
return 503, stack
62+
}
63+
64+
return 200, nil
65+
}

integration/integration.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,10 @@ type Integration interface {
2626

2727
// Close closes the connection with the integration, if applicable.
2828
Close(ctx context.Context) error
29+
30+
// Status executes a health check of the integration. It returns an equivalent
31+
// HTTP status code of the health. It should most likely be `200` or `503`.
32+
// If the integration is unhealthy, it may return an error as well depending
33+
// on the underlying client.
34+
Status(ctx context.Context) (int, error)
2935
}

integration/nats/integration.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,16 @@ func (conn *connection) Close(ctx context.Context) error {
3434

3535
return nil
3636
}
37+
38+
/*
39+
Status indicates if the integration is able to connect to the NATS server or not.
40+
Returns `200` if connection is in a proper state, `503` otherwise.
41+
*/
42+
func (conn *connection) Status(ctx context.Context) (int, error) {
43+
var status int = 503
44+
if conn.nats.Status().String() == "CONNECTED" {
45+
status = 200
46+
}
47+
48+
return status, nil
49+
}

integration/postgres/integration.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,3 +44,22 @@ func (conn *connection) Close(ctx context.Context) error {
4444

4545
return nil
4646
}
47+
48+
/*
49+
Status indicates if the integration is able to ping the PostgreSQL server or not.
50+
Returns `200` if connection is working, `503` otherwise.
51+
*/
52+
func (conn *connection) Status(ctx context.Context) (int, error) {
53+
stack := errorstack.New("Integration is not in a healthy state", errorstack.WithIntegration(identifier))
54+
55+
err := conn.client.Ping(ctx)
56+
if err != nil {
57+
stack.WithValidations(errorstack.Validation{
58+
Message: err.Error(),
59+
})
60+
61+
return 503, stack
62+
}
63+
64+
return 200, nil
65+
}

integration/rest/integration.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,3 +84,10 @@ func (r *rest) Close(ctx context.Context) error {
8484

8585
return nil
8686
}
87+
88+
/*
89+
Status always returns a `200` status.
90+
*/
91+
func (r *rest) Status(ctx context.Context) (int, error) {
92+
return 200, nil
93+
}

integration/rest/response.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"net/http"
77

88
"go.nunchi.studio/helix/errorstack"
9+
"go.nunchi.studio/helix/service"
910

1011
"github.com/uptrace/bunrouter"
1112
)
@@ -61,6 +62,8 @@ func (r *rest) handlerHealthcheck(rw http.ResponseWriter, req bunrouter.Request)
6162
var status int = http.StatusOK
6263
if r.config.Healthcheck != nil {
6364
status = r.config.Healthcheck(req.Request)
65+
} else {
66+
status, _ = service.Status(req.Context())
6467
}
6568

6669
res := &Response{

integration/temporal/client.go

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,6 @@ type Client interface {
4747
DescribeTaskQueue(ctx context.Context, taskqueue string, taskqueueType enums.TaskQueueType) (*workflowservice.DescribeTaskQueueResponse, error)
4848
ResetWorkflowExecution(ctx context.Context, request *workflowservice.ResetWorkflowExecutionRequest) (*workflowservice.ResetWorkflowExecutionResponse, error)
4949

50-
CheckHealth(ctx context.Context, request *client.CheckHealthRequest) (*client.CheckHealthResponse, error)
51-
5250
ScheduleClient() ScheduleClient
5351
}
5452

@@ -543,29 +541,6 @@ func (c *iclient) ResetWorkflowExecution(ctx context.Context, request *workflows
543541
return res, err
544542
}
545543

546-
/*
547-
CheckHealth performs a server health check using the gRPC health check API. If the
548-
check fails, an error is returned.
549-
550-
It automatically handles tracing and error recording.
551-
*/
552-
func (c *iclient) CheckHealth(ctx context.Context, request *client.CheckHealthRequest) (*client.CheckHealthResponse, error) {
553-
ctx, span := trace.Start(ctx, trace.SpanKindClient, fmt.Sprintf("%s: CheckHealth", humanized))
554-
defer span.End()
555-
556-
var err error
557-
defer func() {
558-
if err != nil {
559-
span.RecordError("failed to check health", err)
560-
}
561-
}()
562-
563-
res, err := c.client.CheckHealth(ctx, request)
564-
setDefaultAttributes(span, c.config)
565-
566-
return res, err
567-
}
568-
569544
/*
570545
ScheduleClient creates a new shedule client with the same gRPC connection as this
571546
client.

integration/temporal/integration.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"go.nunchi.studio/helix/errorstack"
77
"go.nunchi.studio/helix/integration"
88

9+
"go.temporal.io/sdk/client"
910
"go.temporal.io/sdk/worker"
1011
)
1112

@@ -53,3 +54,22 @@ func (conn *connection) Close(ctx context.Context) error {
5354
conn.client.Close()
5455
return nil
5556
}
57+
58+
/*
59+
Status indicates if the integration is able to connect to the Temporal server or
60+
not. Returns `200` if connection is working, `503` otherwise.
61+
*/
62+
func (conn *connection) Status(ctx context.Context) (int, error) {
63+
stack := errorstack.New("Integration is not in a healthy state", errorstack.WithIntegration(identifier))
64+
65+
_, err := conn.client.CheckHealth(ctx, &client.CheckHealthRequest{})
66+
if err != nil {
67+
stack.WithValidations(errorstack.Validation{
68+
Message: err.Error(),
69+
})
70+
71+
return 503, stack
72+
}
73+
74+
return 200, nil
75+
}

integration/vault/integration.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package vault
33
import (
44
"context"
55

6+
"go.nunchi.studio/helix/errorstack"
67
"go.nunchi.studio/helix/integration"
78
)
89

@@ -32,3 +33,27 @@ Close does nothing since the underlying Vault client doesn't need to be closed.
3233
func (conn *connection) Close(ctx context.Context) error {
3334
return nil
3435
}
36+
37+
/*
38+
Status indicates if the integration is able to connect to the Vault server or not.
39+
Returns `200` if connection is working, `503` if Vault is sealed, is not
40+
initialized, or if an error occured.
41+
*/
42+
func (conn *connection) Status(ctx context.Context) (int, error) {
43+
stack := errorstack.New("Integration is not in a healthy state", errorstack.WithIntegration(identifier))
44+
45+
res, err := conn.client.Sys().Health()
46+
if err != nil {
47+
stack.WithValidations(errorstack.Validation{
48+
Message: err.Error(),
49+
})
50+
51+
return 503, stack
52+
}
53+
54+
if !res.Initialized || res.Sealed {
55+
return 503, nil
56+
}
57+
58+
return 200, nil
59+
}

service/integration.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package service
22

33
import (
4+
"context"
5+
"sync"
6+
47
"go.nunchi.studio/helix/errorstack"
58
"go.nunchi.studio/helix/integration"
69
)
@@ -52,3 +55,61 @@ func Attach(inte integration.Integration) error {
5255
svc.integrations = append(svc.integrations, inte)
5356
return nil
5457
}
58+
59+
/*
60+
Status executes a health check of each integration attached to the service, and
61+
returns the highest HTTP status code returned. This means if all integrations are
62+
healthy (status `200`) but one is temporarily unavailable (status `503`), the
63+
status returned would be `503`.
64+
*/
65+
func Status(ctx context.Context) (int, error) {
66+
67+
// Create a channel that will receive the HTTP status code of the health check
68+
// of each integration.
69+
chStatus := make(chan int, len(svc.integrations))
70+
chError := make(chan error, len(svc.integrations))
71+
72+
// Go through each integration attached to the service, and execute the health
73+
// checks asynchronously. Write the status returned to the channel.
74+
var wg sync.WaitGroup
75+
for _, inte := range svc.integrations {
76+
inte := inte
77+
wg.Add(1)
78+
79+
go func() {
80+
defer wg.Done()
81+
82+
status, err := inte.Status(ctx)
83+
if err != nil {
84+
chError <- err
85+
}
86+
87+
chStatus <- status
88+
}()
89+
}
90+
91+
wg.Wait()
92+
close(chStatus)
93+
close(chError)
94+
95+
// Define the highest status code returned, as it will be used as the main one
96+
// returned by this function.
97+
var max int = 200
98+
for status := range chStatus {
99+
if status > max {
100+
max = status
101+
}
102+
}
103+
104+
// Build a list of returned errors, and returned the error stack if applicable.
105+
stack := errorstack.New("Service is not in a healthy state")
106+
for err := range chError {
107+
stack.WithChildren(err)
108+
}
109+
110+
if stack.HasChildren() {
111+
return max, stack
112+
}
113+
114+
return max, nil
115+
}

0 commit comments

Comments
 (0)