Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 100 additions & 5 deletions config/configgrpc/configgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configauth"
"go.opentelemetry.io/collector/config/configcompression"
"go.opentelemetry.io/collector/config/configlimiter"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/extension/extensionauth"
"go.opentelemetry.io/collector/extension/xextension/limiter"
)

var errMetadataNotFound = errors.New("no request metadata found")
Expand Down Expand Up @@ -187,6 +189,11 @@ type ServerConfig struct {
// Auth for this receiver
Auth *configauth.Authentication `mapstructure:"auth,omitempty"`

// Limiters are a collection of limiter extensions. Each
// Limitation names an extension that is expected to implement
// limiter.Extension. They are called in order.
Limiters []configlimiter.Limitation `mapreduce:"limiters"`
Comment on lines +192 to +195
Copy link
Member

@bogdandrutu bogdandrutu Mar 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I can see a case with multiple limiters, but it is ok I believe. Maybe in the PR description add a case where you can see multiple limiters being used.


// Include propagates the incoming connection's metadata to downstream consumers.
IncludeMetadata bool `mapstructure:"include_metadata,omitempty"`
}
Expand Down Expand Up @@ -478,6 +485,8 @@ func (gss *ServerConfig) getGrpcServerOptions(
var uInterceptors []grpc.UnaryServerInterceptor
var sInterceptors []grpc.StreamServerInterceptor

// Initialize the auth extension first.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add in the comment if there is any specific reason.


if gss.Auth != nil {
authenticator, err := gss.Auth.GetServerAuthenticator(context.Background(), host.GetExtensions())
if err != nil {
Expand All @@ -492,18 +501,46 @@ func (gss *ServerConfig) getGrpcServerOptions(
})
}

// Apply client metadata.

uInterceptors = append(uInterceptors, enhanceWithClientInformation(gss.IncludeMetadata))
sInterceptors = append(sInterceptors, enhanceStreamWithClientInformation(gss.IncludeMetadata))

// Apply limiter extensions (which see client metadata).

var limitExts []limiter.Limiter
for _, named := range gss.Limiters {
lim, err := named.GetLimiter(context.Background(), host.GetExtensions())
if err != nil {
return nil, err
}
limitExts = append(limitExts, lim)
}
if limitExts != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a best practice to compare len with 0 than the slice with nil.

uInterceptors = append(uInterceptors, func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
return applyUnaryLimiters(ctx, req, info, handler, limitExts)
})
sInterceptors = append(sInterceptors, func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return applyStreamLimiters(srv, ss, info, handler, limitExts)
})
}

// Enable OpenTelemetry observability plugin.

otelOpts := []otelgrpc.Option{
otelgrpc.WithTracerProvider(settings.TracerProvider),
otelgrpc.WithPropagators(otel.GetTextMapPropagator()),
otelgrpc.WithMeterProvider(settings.MeterProvider),
}

// Enable OpenTelemetry observability plugin.
// Combine the interceptors, the observability plugin, with
// user-provided gRPC options.

uInterceptors = append(uInterceptors, enhanceWithClientInformation(gss.IncludeMetadata))
sInterceptors = append(sInterceptors, enhanceStreamWithClientInformation(gss.IncludeMetadata))

opts = append(opts, grpc.StatsHandler(otelgrpc.NewServerHandler(otelOpts...)), grpc.ChainUnaryInterceptor(uInterceptors...), grpc.ChainStreamInterceptor(sInterceptors...))
opts = append(opts,
grpc.StatsHandler(otelgrpc.NewServerHandler(otelOpts...)),
grpc.ChainUnaryInterceptor(uInterceptors...),
grpc.ChainStreamInterceptor(sInterceptors...),
)

for _, opt := range extraOpts {
if wrapper, ok := opt.(grpcServerOptionWrapper); ok {
Expand Down Expand Up @@ -589,3 +626,61 @@ func authStreamServerInterceptor(srv any, stream grpc.ServerStream, _ *grpc.Stre

return handler(srv, wrapServerStream(ctx, stream))
}

func applyUnaryLimiters(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler, limiters []limiter.Limiter) (any, error) {
sz, err := sizeReq(req)
if err != nil {
return nil, err
}

for _, lim := range limiters {
rel, err := lim.Acquire(ctx, sz)
if err != nil {
return nil, err
}
defer rel()
}

return handler(ctx, req)
}

func applyStreamLimiters(srv any, stream grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler, limiters []limiter.Limiter) error {
return handler(srv, limitServerStream(stream, limiters))
}

func sizeReq(req any) (uint64, error) {
switch treq := req.(type) {
case []byte:
return uint64(len(treq)), nil
default:
return 0, fmt.Errorf("limiter cannot determine size: %T", treq)
}
}

// limitedServerStream is a thin wrapper around grpc.ServerStream that calls limiters.
type limitedServerStream struct {
grpc.ServerStream

limiters []limiter.Limiter
}

// limitServerStream returns a ServerStream that will call limiters.
func limitServerStream(stream grpc.ServerStream, limiters []limiter.Limiter) *limitedServerStream {
return &limitedServerStream{ServerStream: stream, limiters: limiters}
}

func (ls *limitedServerStream) RecvMsg(req any) error {
sz, err := sizeReq(req)
if err != nil {
return err
}
for _, lim := range ls.limiters {
rel, err := lim.Acquire(ls.Context(), sz)
if err != nil {
return err
}
defer rel()
}

return ls.ServerStream.RecvMsg(req)
}
6 changes: 6 additions & 0 deletions config/configgrpc/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ require (
go.opentelemetry.io/collector/component/componenttest v0.121.0
go.opentelemetry.io/collector/config/configauth v0.121.0
go.opentelemetry.io/collector/config/configcompression v1.27.0
go.opentelemetry.io/collector/config/configlimiter v0.0.0-00010101000000-000000000000
go.opentelemetry.io/collector/config/confignet v1.27.0
go.opentelemetry.io/collector/config/configopaque v1.27.0
go.opentelemetry.io/collector/config/configtls v1.27.0
go.opentelemetry.io/collector/extension/extensionauth v0.121.0
go.opentelemetry.io/collector/extension/xextension v0.0.0-00010101000000-000000000000
go.opentelemetry.io/collector/pdata v1.27.0
go.opentelemetry.io/collector/pdata/testdata v0.121.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0
Expand Down Expand Up @@ -79,3 +81,7 @@ replace go.opentelemetry.io/collector/component => ../../component
replace go.opentelemetry.io/collector/component/componenttest => ../../component/componenttest

replace go.opentelemetry.io/collector/consumer => ../../consumer

replace go.opentelemetry.io/collector/extension/xextension => ../../extension/xextension

replace go.opentelemetry.io/collector/config/configlimiter => ../configlimiter
1 change: 1 addition & 0 deletions config/configlimiter/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
45 changes: 45 additions & 0 deletions config/configlimiter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Limiter configuration

This module defines necessary interfaces to implement limiter
extensions. Limiters are included in the basic HTTP and gRPC Server
Config structs, so users will rarely create interact with extensions
these directly.

To imlpement a limiter extension, components should implement the
`GetClient` interface in
[`extension/xextension/limit`](#../../extension/xextension/limit/README.md).

The currently known limiter extensions are listed below.

## Limiter implementations

- [Memory Limiter Extension](../../extension/memorylimiterextension/README.md)
- [Admission Limiter Extension](../../extension/admissionlimiterextension/README.md)


Example:

```yaml
extensions:
# Used with gRPC traffic, consults GC statistics.
memory_limiter/cold
request_limit_mib: 100
waiting_limit_mib: 10

# Used with HTTP traffic, counts request bytes in flight.
admission_limiter/warm:
request_limit_mib: 10
waiting_limit_mib: 10

receivers:
otlp:
protocols:
http:
# ...
limiter:
- admission_limiter/warm
grpc:
# ...
limiter:
- memory_limiter/cold
```
41 changes: 41 additions & 0 deletions config/configlimiter/configlimiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package configlimiter implements the configuration settings to
// perform admission control on byte-weighted requests.
package configlimiter // import "go.opentelemetry.io/collector/config/configlimiter"

import (
"context"
"errors"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension/xextension/limiter"
)

var (
errLimiterNotFound = errors.New("limiter not found")
errNotLimiter = errors.New("requested component is not a limiter")
)

// Limitation defines the limit settings for a component.
type Limitation struct {
// LimiterID specifies the name of the extension to use in
// order to limit incoming requests.
LimiterID component.ID `mapstructure:"limiter,omitempty"`
}

// GetLimiter attempts to select the appropriate extensionauth.Server
// from the list of extensions, based on the requested extension
// name. If an authenticator is not found, an error is returned.
func (l Limitation) GetLimiter(ctx context.Context, extensions map[component.ID]component.Component) (limiter.Limiter, error) {
if ext, found := extensions[l.LimiterID]; found {
if ext, ok := ext.(limiter.Extension); ok && ext != nil {
return ext.GetLimiter(ctx)
}
return nil, errNotLimiter
}

return nil, fmt.Errorf("failed to resolve limiter %q: %w", l.LimiterID, errLimiterNotFound)
}
66 changes: 66 additions & 0 deletions config/configlimiter/configlimiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package configlimiter

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/extension/xextension/limiter"
)

var mockID = component.MustNewID("mock")
var otherID = component.MustNewID("other")

func TestGetLimiter(t *testing.T) {
testCases := []struct {
name string
cfg Limitation
limiter extension.Extension
expected error
}{
{
name: "obtain limiter",
cfg: Limitation{mockID},
limiter: limiter.NewNop(),
expected: nil,
},
{
name: "wrong limiter",
cfg: Limitation{otherID},
limiter: limiter.NewNop(),
expected: errNotLimiter,
},
{
name: "missing limiter",
cfg: Limitation{component.MustNewID("missing")},
limiter: limiter.NewNop(),
expected: errLimiterNotFound,
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
ext := map[component.ID]component.Component{
mockID: tt.limiter,
otherID: nil,
}

limExt, err := tt.cfg.GetLimiter(context.Background(), ext)

// verify
if tt.expected != nil {
require.ErrorIs(t, err, tt.expected)
assert.Nil(t, limExt)
} else {
require.NoError(t, err)
assert.NotNil(t, limExt)
}
})
}
}
39 changes: 39 additions & 0 deletions config/configlimiter/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
module go.opentelemetry.io/collector/config/configlimiter

go 1.23.0

require (
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/collector/component v1.27.0
go.opentelemetry.io/collector/extension v1.27.0
go.opentelemetry.io/collector/extension/xextension v0.0.0-00010101000000-000000000000
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/collector/pdata v1.27.0 // indirect
go.opentelemetry.io/otel v1.34.0 // indirect
go.opentelemetry.io/otel/metric v1.34.0 // indirect
go.opentelemetry.io/otel/trace v1.34.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/net v0.34.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect
google.golang.org/grpc v1.71.0 // indirect
google.golang.org/protobuf v1.36.5 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace go.opentelemetry.io/collector/pdata => ../../pdata

replace go.opentelemetry.io/collector/component => ../../component

replace go.opentelemetry.io/collector/component/componenttest => ../../component/componenttest

replace go.opentelemetry.io/collector/extension => ../../extension

replace go.opentelemetry.io/collector/extension/xextension => ../../extension/xextension
Loading
Loading