Skip to content

Commit ce9c1be

Browse files
committed
[receiver/googlecloudpubsub] Turn noisy warn in reset metric (open-telemetry#37571)
1 parent f399f0f commit ce9c1be

13 files changed

+394
-47
lines changed
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
change_type: enhancement
2+
3+
component: googlecloudpubsubreceiver
4+
5+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
6+
note: Turn noisy `warn` log about Pub/Sub servers into `debug`, and turn the reset count into a metric
7+
8+
issues: [37571]
9+
10+
subtext: |
11+
The receiver uses the Google Cloud Pub/Sub StreamingPull API and keeps a open connection. The Pub/Sub servers
12+
recurrently close the connection after a time period to avoid a long-running sticky connection. Before the
13+
receiver logged `warn` log lines everytime this happened. These log lines are moved to debug so that fleets with
14+
lots of collectors with the receiver don't span logs at warn level. To keep track of the resets, whenever a
15+
connection reset happens a `otelcol_receiver_googlecloudpubsub_stream_restarts` metric is increased by one.
16+
17+
change_logs: [user]
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
[comment]: <> (Code generated by mdatagen. DO NOT EDIT.)
2+
3+
# googlecloudpubsub
4+
5+
## Internal Telemetry
6+
7+
The following telemetry is emitted by this component.
8+
9+
### otelcol_receiver.googlecloudpubsub.stream_restarts
10+
11+
Number of times the stream (re)starts due to a Pub/Sub servers connection close
12+
13+
The receiver uses the Google Cloud Pub/Sub StreamingPull API and keeps a open connection. The Pub/Sub servers
14+
recurrently close the connection after a time period to avoid a long-running sticky connection. This metric
15+
counts the number of the resets that occurred during the lifetime of the container.
16+
17+
18+
| Unit | Metric Type | Value Type | Monotonic |
19+
| ---- | ----------- | ---------- | --------- |
20+
| 1 | Sum | Int | true |

receiver/googlecloudpubsubreceiver/factory.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,24 +41,24 @@ func (factory *pubsubReceiverFactory) CreateDefaultConfig() component.Config {
4141
return &Config{}
4242
}
4343

44-
func (factory *pubsubReceiverFactory) ensureReceiver(params receiver.Settings, config component.Config) (*pubsubReceiver, error) {
44+
func (factory *pubsubReceiverFactory) ensureReceiver(settings receiver.Settings, config component.Config) (*pubsubReceiver, error) {
4545
receiver := factory.receivers[config.(*Config)]
4646
if receiver != nil {
4747
return receiver, nil
4848
}
4949
rconfig := config.(*Config)
5050
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
51-
ReceiverID: params.ID,
51+
ReceiverID: settings.ID,
5252
Transport: reportTransport,
53-
ReceiverCreateSettings: params,
53+
ReceiverCreateSettings: settings,
5454
})
5555
if err != nil {
5656
return nil, err
5757
}
5858
receiver = &pubsubReceiver{
59-
logger: params.Logger,
59+
settings: settings,
6060
obsrecv: obsrecv,
61-
userAgent: strings.ReplaceAll(rconfig.UserAgent, "{{version}}", params.BuildInfo.Version),
61+
userAgent: strings.ReplaceAll(rconfig.UserAgent, "{{version}}", settings.BuildInfo.Version),
6262
config: rconfig,
6363
}
6464
factory.receivers[config.(*Config)] = receiver

receiver/googlecloudpubsubreceiver/go.mod

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ require (
2020
go.opentelemetry.io/collector/pdata v1.25.0
2121
go.opentelemetry.io/collector/receiver v0.119.0
2222
go.opentelemetry.io/collector/receiver/receivertest v0.119.0
23+
go.opentelemetry.io/otel v1.34.0
24+
go.opentelemetry.io/otel/metric v1.34.0
25+
go.opentelemetry.io/otel/sdk/metric v1.34.0
26+
go.opentelemetry.io/otel/trace v1.34.0
2327
go.uber.org/goleak v1.3.0
2428
go.uber.org/multierr v1.11.0
2529
go.uber.org/zap v1.27.0
@@ -72,11 +76,7 @@ require (
7276
go.opentelemetry.io/collector/receiver/xreceiver v0.119.0 // indirect
7377
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect
7478
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect
75-
go.opentelemetry.io/otel v1.34.0 // indirect
76-
go.opentelemetry.io/otel/metric v1.34.0 // indirect
7779
go.opentelemetry.io/otel/sdk v1.34.0 // indirect
78-
go.opentelemetry.io/otel/sdk/metric v1.34.0 // indirect
79-
go.opentelemetry.io/otel/trace v1.34.0 // indirect
8080
golang.org/x/crypto v0.32.0 // indirect
8181
golang.org/x/net v0.34.0 // indirect
8282
golang.org/x/oauth2 v0.25.0 // indirect

receiver/googlecloudpubsubreceiver/internal/handler.go

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,14 @@ import (
1313
"time"
1414

1515
"cloud.google.com/go/pubsub/apiv1/pubsubpb"
16+
"go.opentelemetry.io/collector/receiver"
17+
"go.opentelemetry.io/otel/attribute"
18+
"go.opentelemetry.io/otel/metric"
1619
"go.uber.org/zap"
1720
"google.golang.org/grpc/codes"
1821
"google.golang.org/grpc/status"
22+
23+
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal/metadata"
1924
)
2025

2126
// Time to wait before restarting, when the stream stopped
@@ -36,7 +41,8 @@ type StreamHandler struct {
3641
streamWaitGroup sync.WaitGroup
3742
// wait group for the handler
3843
handlerWaitGroup sync.WaitGroup
39-
logger *zap.Logger
44+
settings receiver.Settings
45+
telemetryBuilder *metadata.TelemetryBuilder
4046
// time that acknowledge loop waits before acknowledging messages
4147
ackBatchWait time.Duration
4248

@@ -51,19 +57,21 @@ func (handler *StreamHandler) ack(ackID string) {
5157

5258
func NewHandler(
5359
ctx context.Context,
54-
logger *zap.Logger,
60+
settings receiver.Settings,
61+
telemetryBuilder *metadata.TelemetryBuilder,
5562
client SubscriberClient,
5663
clientID string,
5764
subscription string,
5865
callback func(ctx context.Context, message *pubsubpb.ReceivedMessage) error,
5966
) (*StreamHandler, error) {
6067
handler := StreamHandler{
61-
logger: logger,
62-
client: client,
63-
clientID: clientID,
64-
subscription: subscription,
65-
pushMessage: callback,
66-
ackBatchWait: 10 * time.Second,
68+
settings: settings,
69+
telemetryBuilder: telemetryBuilder,
70+
client: client,
71+
clientID: clientID,
72+
subscription: subscription,
73+
pushMessage: callback,
74+
ackBatchWait: 10 * time.Second,
6775
}
6876
return &handler, handler.initStream(ctx)
6977
}
@@ -85,6 +93,11 @@ func (handler *StreamHandler) initStream(ctx context.Context) error {
8593
_ = handler.stream.CloseSend()
8694
return err
8795
}
96+
handler.telemetryBuilder.ReceiverGooglecloudpubsubStreamRestarts.Add(ctx, 1,
97+
metric.WithAttributes(
98+
attribute.String("otelcol.component.kind", "receiver"),
99+
attribute.String("otelcol.component.id", handler.settings.ID.String()),
100+
))
88101
return nil
89102
}
90103

@@ -102,7 +115,7 @@ func (handler *StreamHandler) recoverableStream(ctx context.Context) {
102115
var loopCtx context.Context
103116
loopCtx, cancel := context.WithCancel(ctx)
104117

105-
handler.logger.Info("Starting Streaming Pull")
118+
handler.settings.Logger.Debug("Starting Streaming Pull")
106119
handler.streamWaitGroup.Add(2)
107120
go handler.requestStream(loopCtx, cancel)
108121
go handler.responseStream(loopCtx, cancel)
@@ -117,13 +130,13 @@ func (handler *StreamHandler) recoverableStream(ctx context.Context) {
117130
if handler.isRunning.Load() {
118131
err := handler.initStream(ctx)
119132
if err != nil {
120-
handler.logger.Error("Failed to recovery stream.")
133+
handler.settings.Logger.Error("Failed to recovery stream.")
121134
}
122135
}
123-
handler.logger.Warn("End of recovery loop, restarting.")
136+
handler.settings.Logger.Debug("End of recovery loop, restarting.")
124137
time.Sleep(streamRecoveryBackoffPeriod)
125138
}
126-
handler.logger.Warn("Shutting down recovery loop.")
139+
handler.settings.Logger.Warn("Shutting down recovery loop.")
127140
handler.handlerWaitGroup.Done()
128141
}
129142

@@ -157,15 +170,15 @@ func (handler *StreamHandler) requestStream(ctx context.Context, cancel context.
157170
for {
158171
if err := handler.acknowledgeMessages(); err != nil {
159172
if errors.Is(err, io.EOF) {
160-
handler.logger.Warn("EOF reached")
173+
handler.settings.Logger.Warn("EOF reached")
161174
break
162175
}
163-
handler.logger.Error(fmt.Sprintf("Failed in acknowledge messages with error %v", err))
176+
handler.settings.Logger.Error(fmt.Sprintf("Failed in acknowledge messages with error %v", err))
164177
break
165178
}
166179
select {
167180
case <-ctx.Done():
168-
handler.logger.Warn("requestStream <-ctx.Done()")
181+
handler.settings.Logger.Debug("requestStream <-ctx.Done()")
169182
case <-timer.C:
170183
timer.Reset(handler.ackBatchWait)
171184
}
@@ -176,7 +189,7 @@ func (handler *StreamHandler) requestStream(ctx context.Context, cancel context.
176189
}
177190
}
178191
cancel()
179-
handler.logger.Warn("Request Stream loop ended.")
192+
handler.settings.Logger.Debug("Request Stream loop ended.")
180193
_ = handler.stream.CloseSend()
181194
handler.streamWaitGroup.Done()
182195
}
@@ -202,30 +215,30 @@ func (handler *StreamHandler) responseStream(ctx context.Context, cancel context
202215
case errors.Is(err, io.EOF):
203216
activeStreaming = false
204217
case !grpcStatus:
205-
handler.logger.Warn("response stream breaking on error",
218+
handler.settings.Logger.Warn("response stream breaking on error",
206219
zap.Error(err))
207220
activeStreaming = false
208221
case s.Code() == codes.Unavailable:
209-
handler.logger.Info("response stream breaking on gRPC s 'Unavailable'")
222+
handler.settings.Logger.Debug("response stream breaking on gRPC s 'Unavailable'")
210223
activeStreaming = false
211224
case s.Code() == codes.NotFound:
212-
handler.logger.Error("resource doesn't exist, wait 60 seconds, and restarting stream")
225+
handler.settings.Logger.Error("resource doesn't exist, wait 60 seconds, and restarting stream")
213226
time.Sleep(time.Second * 60)
214227
activeStreaming = false
215228
default:
216-
handler.logger.Warn("response stream breaking on gRPC s "+s.Message(),
229+
handler.settings.Logger.Warn("response stream breaking on gRPC s "+s.Message(),
217230
zap.String("s", s.Message()),
218231
zap.Error(err))
219232
activeStreaming = false
220233
}
221234
}
222235
if errors.Is(ctx.Err(), context.Canceled) {
223236
// Canceling the loop, collector is probably stopping
224-
handler.logger.Warn("response stream ctx.Err() == context.Canceled")
237+
handler.settings.Logger.Warn("response stream ctx.Err() == context.Canceled")
225238
break
226239
}
227240
}
228241
cancel()
229-
handler.logger.Warn("Response Stream loop ended.")
242+
handler.settings.Logger.Debug("Response Stream loop ended.")
230243
handler.streamWaitGroup.Done()
231244
}

receiver/googlecloudpubsubreceiver/internal/handler_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@ import (
1212
"cloud.google.com/go/pubsub/apiv1/pubsubpb"
1313
"cloud.google.com/go/pubsub/pstest"
1414
"github.com/stretchr/testify/assert"
15-
"go.uber.org/zap/zaptest"
15+
"go.opentelemetry.io/collector/receiver/receivertest"
1616
"google.golang.org/api/option"
1717
"google.golang.org/grpc"
1818
"google.golang.org/grpc/credentials/insecure"
19+
20+
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal/metadata"
1921
)
2022

2123
func TestCancelStream(t *testing.T) {
@@ -41,10 +43,13 @@ func TestCancelStream(t *testing.T) {
4143
})
4244
assert.NoError(t, err)
4345

46+
settings := receivertest.NewNopSettings()
47+
telemetryBuilder, _ := metadata.NewTelemetryBuilder(settings.TelemetrySettings)
48+
4449
client, err := pubsub.NewSubscriberClient(ctx, copts...)
4550
assert.NoError(t, err)
4651

47-
handler, err := NewHandler(ctx, zaptest.NewLogger(t), client, "client-id", "projects/my-project/subscriptions/otlp",
52+
handler, err := NewHandler(ctx, settings, telemetryBuilder, client, "client-id", "projects/my-project/subscriptions/otlp",
4853
func(context.Context, *pubsubpb.ReceivedMessage) error {
4954
return nil
5055
})

receiver/googlecloudpubsubreceiver/internal/metadata/generated_telemetry.go

Lines changed: 68 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)