Skip to content

Commit b675b9d

Browse files
alexvanboxelkevinnoel-be
authored andcommitted
[chore][receiver/googlecloudpubsubreceiver] Fix goroutines leak (open-telemetry#37311)
#### Description Fixes goroutines leak by properly closing the underlying gRPC client which is only when we're using an insecure custom endpoint. Enables shutdown tests. #### Link to tracking issue Related to open-telemetry#30438 Co-authored-by: Kevin N. <[email protected]>
1 parent 86eaad7 commit b675b9d

File tree

10 files changed

+304
-47
lines changed

10 files changed

+304
-47
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
change_type: bug_fix
2+
component: googlecloudpubsubreceiver
3+
note: Fix a goroutine leak during shutdown.
4+
issues: [30438]
5+
subtext: |
6+
A goroutine leak was found in the googlecloudpubsubreceiver.
7+
The goroutine leak was caused by the receiver not closing the underlying created gRPC client when using an insecure custom endpoint.
8+
change_logs: []

receiver/googlecloudpubsubreceiver/generated_component_test.go

Lines changed: 53 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

receiver/googlecloudpubsubreceiver/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ require (
66
cloud.google.com/go/logging v1.13.0
77
cloud.google.com/go/pubsub v1.45.3
88
github.com/google/go-cmp v0.6.0
9+
github.com/googleapis/gax-go/v2 v2.14.1
910
github.com/iancoleman/strcase v0.3.0
1011
github.com/json-iterator/go v1.1.12
1112
github.com/stretchr/testify v1.10.0
@@ -46,7 +47,6 @@ require (
4647
github.com/google/s2a-go v0.1.8 // indirect
4748
github.com/google/uuid v1.6.0 // indirect
4849
github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
49-
github.com/googleapis/gax-go/v2 v2.14.1 // indirect
5050
github.com/hashicorp/go-version v1.7.0 // indirect
5151
github.com/knadh/koanf/maps v0.1.1 // indirect
5252
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect

receiver/googlecloudpubsubreceiver/internal/handler.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"sync/atomic"
1313
"time"
1414

15-
pubsub "cloud.google.com/go/pubsub/apiv1"
1615
"cloud.google.com/go/pubsub/apiv1/pubsubpb"
1716
"go.uber.org/zap"
1817
"google.golang.org/grpc/codes"
@@ -27,7 +26,7 @@ type StreamHandler struct {
2726
pushMessage func(ctx context.Context, message *pubsubpb.ReceivedMessage) error
2827
acks []string
2928
mutex sync.Mutex
30-
client *pubsub.SubscriberClient
29+
client SubscriberClient
3130

3231
clientID string
3332
subscription string
@@ -53,7 +52,7 @@ func (handler *StreamHandler) ack(ackID string) {
5352
func NewHandler(
5453
ctx context.Context,
5554
logger *zap.Logger,
56-
client *pubsub.SubscriberClient,
55+
client SubscriberClient,
5756
clientID string,
5857
subscription string,
5958
callback func(ctx context.Context, message *pubsubpb.ReceivedMessage) error,
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal"
5+
6+
import (
7+
"context"
8+
9+
"cloud.google.com/go/pubsub/apiv1/pubsubpb"
10+
"github.com/googleapis/gax-go/v2"
11+
)
12+
13+
// subscriberClient subset of `pubsub.SubscriberClient`
14+
type SubscriberClient interface {
15+
Close() error
16+
StreamingPull(ctx context.Context, opts ...gax.CallOption) (pubsubpb.Subscriber_StreamingPullClient, error)
17+
}

receiver/googlecloudpubsubreceiver/metadata.yaml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@ tests:
1515
timeout: 20s
1616
subscription: projects/my-project/subscriptions/otlp-subscription
1717
skip_lifecycle: true
18-
skip_shutdown: true
1918
goleak:
20-
skip: false
2119
ignore:
2220
# See https://github.com/census-instrumentation/opencensus-go/issues/1191 for more information.
2321
top: go.opencensus.io/stats/view.(*worker).start

receiver/googlecloudpubsubreceiver/receiver.go

Lines changed: 12 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"sync"
1515
"time"
1616

17-
pubsub "cloud.google.com/go/pubsub/apiv1"
1817
"cloud.google.com/go/pubsub/apiv1/pubsubpb"
1918
"go.opentelemetry.io/collector/component"
2019
"go.opentelemetry.io/collector/consumer"
@@ -24,11 +23,6 @@ import (
2423
"go.opentelemetry.io/collector/pdata/ptrace"
2524
"go.opentelemetry.io/collector/receiver/receiverhelper"
2625
"go.uber.org/zap"
27-
"google.golang.org/api/option"
28-
"google.golang.org/grpc"
29-
"google.golang.org/grpc/codes"
30-
"google.golang.org/grpc/credentials/insecure"
31-
"google.golang.org/grpc/status"
3226

3327
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal"
3428
)
@@ -42,7 +36,7 @@ type pubsubReceiver struct {
4236
logsConsumer consumer.Logs
4337
userAgent string
4438
config *Config
45-
client *pubsub.SubscriberClient
39+
client internal.SubscriberClient
4640
tracesUnmarshaler ptrace.Unmarshaler
4741
metricsUnmarshaler pmetric.Unmarshaler
4842
logsUnmarshaler plog.Unmarshaler
@@ -68,34 +62,14 @@ const (
6862
gZip = iota
6963
)
7064

71-
func (receiver *pubsubReceiver) generateClientOptions() (copts []option.ClientOption) {
72-
if receiver.userAgent != "" {
73-
copts = append(copts, option.WithUserAgent(receiver.userAgent))
74-
}
75-
if receiver.config.Endpoint != "" {
76-
if receiver.config.Insecure {
77-
var dialOpts []grpc.DialOption
78-
if receiver.userAgent != "" {
79-
dialOpts = append(dialOpts, grpc.WithUserAgent(receiver.userAgent))
80-
}
81-
conn, _ := grpc.NewClient(receiver.config.Endpoint, append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))...)
82-
copts = append(copts, option.WithGRPCConn(conn))
83-
} else {
84-
copts = append(copts, option.WithEndpoint(receiver.config.Endpoint))
85-
}
86-
}
87-
return copts
88-
}
89-
9065
func (receiver *pubsubReceiver) Start(ctx context.Context, _ component.Host) error {
9166
if receiver.tracesConsumer == nil && receiver.metricsConsumer == nil && receiver.logsConsumer == nil {
9267
return errors.New("cannot start receiver: no consumers were specified")
9368
}
9469

9570
var startErr error
9671
receiver.startOnce.Do(func() {
97-
copts := receiver.generateClientOptions()
98-
client, err := pubsub.NewSubscriberClient(ctx, copts...)
72+
client, err := newSubscriberClient(ctx, receiver.config, receiver.userAgent)
9973
if err != nil {
10074
startErr = fmt.Errorf("failed creating the gRPC client to Pubsub: %w", err)
10175
return
@@ -115,21 +89,18 @@ func (receiver *pubsubReceiver) Start(ctx context.Context, _ component.Host) err
11589
}
11690

11791
func (receiver *pubsubReceiver) Shutdown(_ context.Context) error {
118-
var err error
119-
if receiver.client != nil {
120-
// A canceled code means the client connection is already closed,
121-
// Shutdown shouldn't return an error in that case.
122-
if closeErr := receiver.client.Close(); status.Code(closeErr) != codes.Canceled {
123-
err = closeErr
124-
}
92+
if receiver.handler != nil {
93+
receiver.logger.Info("Stopping Google Pubsub receiver")
94+
receiver.handler.CancelNow()
95+
receiver.logger.Info("Stopped Google Pubsub receiver")
96+
receiver.handler = nil
12597
}
126-
if receiver.handler == nil {
127-
return err
98+
if receiver.client == nil {
99+
return nil
128100
}
129-
receiver.logger.Info("Stopping Google Pubsub receiver")
130-
receiver.handler.CancelNow()
131-
receiver.logger.Info("Stopped Google Pubsub receiver")
132-
return err
101+
client := receiver.client
102+
receiver.client = nil
103+
return client.Close()
133104
}
134105

135106
func (receiver *pubsubReceiver) handleLogStrings(ctx context.Context, message *pubsubpb.ReceivedMessage) error {
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
"type": "service_account",
3+
"private_key_id": "abc",
4+
"private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDY3E8o1NEFcjMM\nHW/5ZfFJw29/8NEqpViNjQIx95Xx5KDtJ+nWn9+OW0uqsSqKlKGhAdAo+Q6bjx2c\nuXVsXTu7XrZUY5Kltvj94DvUa1wjNXs606r/RxWTJ58bfdC+gLLxBfGnB6CwK0YQ\nxnfpjNbkUfVVzO0MQD7UP0Hl5ZcY0Puvxd/yHuONQn/rIAieTHH1pqgW+zrH/y3c\n59IGThC9PPtugI9ea8RSnVj3PWz1bX2UkCDpy9IRh9LzJLaYYX9RUd7++dULUlat\nAaXBh1U6emUDzhrIsgApjDVtimOPbmQWmX1S60mqQikRpVYZ8u+NDD+LNw+/Eovn\nxCj2Y3z1AgMBAAECggEAWDBzoqO1IvVXjBA2lqId10T6hXmN3j1ifyH+aAqK+FVl\nGjyWjDj0xWQcJ9ync7bQ6fSeTeNGzP0M6kzDU1+w6FgyZqwdmXWI2VmEizRjwk+/\n/uLQUcL7I55Dxn7KUoZs/rZPmQDxmGLoue60Gg6z3yLzVcKiDc7cnhzhdBgDc8vd\nQorNAlqGPRnm3EqKQ6VQp6fyQmCAxrr45kspRXNLddat3AMsuqImDkqGKBmF3Q1y\nxWGe81LphUiRqvqbyUlh6cdSZ8pLBpc9m0c3qWPKs9paqBIvgUPlvOZMqec6x4S6\nChbdkkTRLnbsRr0Yg/nDeEPlkhRBhasXpxpMUBgPywKBgQDs2axNkFjbU94uXvd5\nznUhDVxPFBuxyUHtsJNqW4p/ujLNimGet5E/YthCnQeC2P3Ym7c3fiz68amM6hiA\nOnW7HYPZ+jKFnefpAtjyOOs46AkftEg07T9XjwWNPt8+8l0DYawPoJgbM5iE0L2O\nx8TU1Vs4mXc+ql9F90GzI0x3VwKBgQDqZOOqWw3hTnNT07Ixqnmd3dugV9S7eW6o\nU9OoUgJB4rYTpG+yFqNqbRT8bkx37iKBMEReppqonOqGm4wtuRR6LSLlgcIU9Iwx\nyfH12UWqVmFSHsgZFqM/cK3wGev38h1WBIOx3/djKn7BdlKVh8kWyx6uC8bmV+E6\nOoK0vJD6kwKBgHAySOnROBZlqzkiKW8c+uU2VATtzJSydrWm0J4wUPJifNBa/hVW\ndcqmAzXC9xznt5AVa3wxHBOfyKaE+ig8CSsjNyNZ3vbmr0X04FoV1m91k2TeXNod\njMTobkPThaNm4eLJMN2SQJuaHGTGERWC0l3T18t+/zrDMDCPiSLX1NAvAoGBAN1T\nVLJYdjvIMxf1bm59VYcepbK7HLHFkRq6xMJMZbtG0ryraZjUzYvB4q4VjHk2UDiC\nlhx13tXWDZH7MJtABzjyg+AI7XWSEQs2cBXACos0M4Myc6lU+eL+iA+OuoUOhmrh\nqmT8YYGu76/IBWUSqWuvcpHPpwl7871i4Ga/I3qnAoGBANNkKAcMoeAbJQK7a/Rn\nwPEJB+dPgNDIaboAsh1nZhVhN5cvdvCWuEYgOGCPQLYQF0zmTLcM+sVxOYgfy8mV\nfbNgPgsP5xmu6dw2COBKdtozw0HrWSRjACd1N4yGu75+wPCcX/gQarcjRcXXZeEa\nNtBLSfcqPULqD+h7br9lEJio\n-----END PRIVATE KEY-----\n",
5+
"client_email": "[email protected]",
6+
"client_id": "123-abc.apps.googleusercontent.com",
7+
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
8+
"token_uri": "http://localhost:8080/token"
9+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package googlecloudpubsubreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver"
5+
6+
import (
7+
"context"
8+
"fmt"
9+
10+
pubsub "cloud.google.com/go/pubsub/apiv1"
11+
"google.golang.org/api/option"
12+
"google.golang.org/grpc"
13+
"google.golang.org/grpc/credentials/insecure"
14+
15+
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal"
16+
)
17+
18+
// wrappedSubscriberClient allows to override the close function
19+
type wrappedSubscriberClient struct {
20+
internal.SubscriberClient
21+
closeFn func() error
22+
}
23+
24+
func (c *wrappedSubscriberClient) Close() error {
25+
if c.closeFn != nil {
26+
return c.closeFn()
27+
}
28+
return c.SubscriberClient.Close()
29+
}
30+
31+
func newSubscriberClient(ctx context.Context, config *Config, userAgent string) (internal.SubscriberClient, error) {
32+
clientOptions, closeFn, err := generateClientOptions(config, userAgent)
33+
if err != nil {
34+
return nil, fmt.Errorf("failed preparing the gRPC client options to PubSub: %w", err)
35+
}
36+
37+
client, err := pubsub.NewSubscriberClient(ctx, clientOptions...)
38+
if err != nil {
39+
return nil, fmt.Errorf("failed creating the gRPC client to PubSub: %w", err)
40+
}
41+
42+
if closeFn == nil {
43+
return client, nil
44+
}
45+
46+
return &wrappedSubscriberClient{
47+
SubscriberClient: client,
48+
closeFn: closeFn,
49+
}, nil
50+
}
51+
52+
func generateClientOptions(config *Config, userAgent string) ([]option.ClientOption, func() error, error) {
53+
var copts []option.ClientOption
54+
var closeFn func() error
55+
56+
if userAgent != "" {
57+
copts = append(copts, option.WithUserAgent(userAgent))
58+
}
59+
if config.Endpoint != "" {
60+
if config.Insecure {
61+
var dialOpts []grpc.DialOption
62+
if userAgent != "" {
63+
dialOpts = append(dialOpts, grpc.WithUserAgent(userAgent))
64+
}
65+
client, err := grpc.NewClient(config.Endpoint, append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))...)
66+
if err != nil {
67+
return nil, nil, err
68+
}
69+
copts = append(copts, option.WithGRPCConn(client))
70+
closeFn = client.Close // we need to be able to properly close the grpc client otherwise it'll leak goroutines
71+
} else {
72+
copts = append(copts, option.WithEndpoint(config.Endpoint))
73+
}
74+
}
75+
return copts, closeFn, nil
76+
}

0 commit comments

Comments
 (0)