Skip to content

Commit fce2cfe

Browse files
authored
[extension/healthcheckv2] Add support for streaming Watch RPC to gRPC service (#34049)
**Description:** <Describe what has changed.> The PR is the fifth in a series to decompose #30673 into more manageable pieces for review. This PR builds on #34028 and completes the gRPC service by adding support for the streaming Watch RPC. For reference, the gRPC service is an implementation of [grpc_health_v1 service](https://github.com/grpc/grpc-proto/blob/master/grpc/health/v1/health.proto). **Link to tracking Issue:** #26661 **Testing:** Units / manual **Documentation:** Comments, etc.
1 parent 12d071d commit fce2cfe

File tree

4 files changed

+962
-1
lines changed

4 files changed

+962
-1
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: 'enhancement'
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: 'healthcheckv2extension'
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add support for streaming Watch RPC to healthcheckv2 gRPC service.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [26661]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

extension/healthcheckv2extension/internal/grpc/grpc.go

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@ import (
1616
)
1717

1818
var (
19-
errNotFound = grpcstatus.Error(codes.NotFound, "Service not found.")
19+
errNotFound = grpcstatus.Error(codes.NotFound, "Service not found.")
20+
errShuttingDown = grpcstatus.Error(codes.Canceled, "Server shutting down.")
21+
errStreamSend = grpcstatus.Error(codes.Canceled, "Error sending; stream terminated.")
22+
errStreamEnded = grpcstatus.Error(codes.Canceled, "Stream has ended.")
2023

2124
statusToServingStatusMap = map[component.Status]healthpb.HealthCheckResponse_ServingStatus{
2225
component.StatusNone: healthpb.HealthCheckResponse_NOT_SERVING,
@@ -44,6 +47,79 @@ func (s *Server) Check(
4447
}, nil
4548
}
4649

50+
func (s *Server) Watch(req *healthpb.HealthCheckRequest, stream healthpb.Health_WatchServer) error {
51+
sub, unsub := s.aggregator.Subscribe(status.Scope(req.Service), status.Concise)
52+
defer unsub()
53+
54+
var lastServingStatus healthpb.HealthCheckResponse_ServingStatus = -1
55+
var failureTimer *time.Timer
56+
failureCh := make(chan struct{})
57+
58+
for {
59+
select {
60+
case st, ok := <-sub:
61+
if !ok {
62+
return errShuttingDown
63+
}
64+
var sst healthpb.HealthCheckResponse_ServingStatus
65+
66+
switch {
67+
case st == nil:
68+
sst = healthpb.HealthCheckResponse_SERVICE_UNKNOWN
69+
case s.componentHealthConfig.IncludeRecoverable &&
70+
s.componentHealthConfig.RecoveryDuration > 0 &&
71+
st.Status() == component.StatusRecoverableError:
72+
if failureTimer == nil {
73+
failureTimer = time.AfterFunc(
74+
s.componentHealthConfig.RecoveryDuration,
75+
func() { failureCh <- struct{}{} },
76+
)
77+
}
78+
sst = lastServingStatus
79+
if lastServingStatus == -1 {
80+
sst = healthpb.HealthCheckResponse_SERVING
81+
}
82+
default:
83+
if failureTimer != nil {
84+
if !failureTimer.Stop() {
85+
<-failureTimer.C
86+
}
87+
failureTimer = nil
88+
}
89+
sst = s.toServingStatus(st.Event)
90+
}
91+
92+
if lastServingStatus == sst {
93+
continue
94+
}
95+
96+
lastServingStatus = sst
97+
98+
err := stream.Send(&healthpb.HealthCheckResponse{Status: sst})
99+
if err != nil {
100+
return errStreamSend
101+
}
102+
case <-failureCh:
103+
failureTimer.Stop()
104+
failureTimer = nil
105+
if lastServingStatus == healthpb.HealthCheckResponse_NOT_SERVING {
106+
continue
107+
}
108+
lastServingStatus = healthpb.HealthCheckResponse_NOT_SERVING
109+
err := stream.Send(
110+
&healthpb.HealthCheckResponse{
111+
Status: healthpb.HealthCheckResponse_NOT_SERVING,
112+
},
113+
)
114+
if err != nil {
115+
return errStreamSend
116+
}
117+
case <-stream.Context().Done():
118+
return errStreamEnded
119+
}
120+
}
121+
}
122+
47123
func (s *Server) toServingStatus(
48124
ev status.Event,
49125
) healthpb.HealthCheckResponse_ServingStatus {

0 commit comments

Comments
 (0)