Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Global MeterProvider registration unwraps global instrument Observers, the undocumented Unwrap() methods are now private. (#5881)
- Fix `go.opentelemetry.io/otel/exporters/prometheus` trying to add exemplars to Gauge metrics, which is unsupported. (#5912)
- Fix incorrect metrics generated from callbacks when multiple readers are used in `go.opentelemetry.io/otel/sdk/metric`. (#5900)

### Changed

Expand Down
60 changes: 45 additions & 15 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ func (m *meter) int64ObservableInstrument(id Instrument, callbacks []metric.Int6
continue
}
inst.appendMeasures(in)

// Add the measures to the pipeline. It is required to maintain
// measures per pipeline to avoid calling the measure that
// is not part of the pipeline.
insert.pipeline.addInt64Measure(inst.observableID, in)
for _, cback := range callbacks {
inst := int64Observer{measures: in}
fn := cback
Expand Down Expand Up @@ -309,6 +314,11 @@ func (m *meter) float64ObservableInstrument(id Instrument, callbacks []metric.Fl
continue
}
inst.appendMeasures(in)

// Add the measures to the pipeline. It is required to maintain
// measures per pipeline to avoid calling the measure that
// is not part of the pipeline.
insert.pipeline.addFloat64Measure(inst.observableID, in)
for _, cback := range callbacks {
inst := float64Observer{measures: in}
fn := cback
Expand Down Expand Up @@ -440,9 +450,9 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable)
// Don't allocate a observer if not needed.
return noopRegister{}, nil
}

reg := newObserver()
unregs := make([]func(), len(m.pipes))
var err error
validInstruments := make([]metric.Observable, 0, len(insts))
for _, inst := range insts {
switch o := inst.(type) {
case int64Observable:
Expand All @@ -452,49 +462,63 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable)
}
continue
}
reg.registerInt64(o.observableID)

validInstruments = append(validInstruments, inst)
case float64Observable:
if e := o.registerable(m); e != nil {
if !errors.Is(e, errEmptyAgg) {
err = errors.Join(err, e)
}
continue
}
reg.registerFloat64(o.observableID)

validInstruments = append(validInstruments, inst)
default:
// Instrument external to the SDK.
return nil, fmt.Errorf("invalid observable: from different implementation")
}
}

if reg.len() == 0 {
if len(validInstruments) == 0 {
// All insts use drop aggregation or are invalid.
return noopRegister{}, err
}

// Some or all instruments were valid.
cback := func(ctx context.Context) error { return f(ctx, reg) }
return m.pipes.registerMultiCallback(cback), err
for ix, pipe := range m.pipes {
reg := newObserver(pipe)
for _, inst := range validInstruments {
switch o := inst.(type) {
case int64Observable:
reg.registerInt64(o.observableID)
case float64Observable:
reg.registerFloat64(o.observableID)
}
}

// Some or all instruments were valid.
cBack := func(ctx context.Context) error { return f(ctx, reg) }
unregs[ix] = pipe.addMultiCallback(cBack)
}

return unregisterFuncs{f: unregs}, err
}

type observer struct {
embedded.Observer

pipe *pipeline
float64 map[observableID[float64]]struct{}
int64 map[observableID[int64]]struct{}
}

func newObserver() observer {
func newObserver(p *pipeline) observer {
return observer{
pipe: p,
float64: make(map[observableID[float64]]struct{}),
int64: make(map[observableID[int64]]struct{}),
}
}

func (r observer) len() int {
return len(r.float64) + len(r.int64)
}

func (r observer) registerFloat64(id observableID[float64]) {
r.float64[id] = struct{}{}
}
Expand Down Expand Up @@ -530,7 +554,10 @@ func (r observer) ObserveFloat64(o metric.Float64Observable, v float64, opts ...
return
}
c := metric.NewObserveConfig(opts)
oImpl.observe(v, c.Attributes())
measures := r.pipe.float64Measures[oImpl.observableID]
for _, m := range measures {
m(context.Background(), v, c.Attributes())
}
}

func (r observer) ObserveInt64(o metric.Int64Observable, v int64, opts ...metric.ObserveOption) {
Expand All @@ -555,7 +582,10 @@ func (r observer) ObserveInt64(o metric.Int64Observable, v int64, opts ...metric
return
}
c := metric.NewObserveConfig(opts)
oImpl.observe(v, c.Attributes())
measures := r.pipe.int64Measures[oImpl.observableID]
for _, m := range measures {
m(context.Background(), v, c.Attributes())
}
}

type noopRegister struct{ embedded.Registration }
Expand Down
43 changes: 26 additions & 17 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"sync/atomic"

"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/embedded"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
Expand Down Expand Up @@ -43,10 +42,12 @@ func newPipeline(res *resource.Resource, reader Reader, views []View, exemplarFi
res = resource.Empty()
}
return &pipeline{
resource: res,
reader: reader,
views: views,
exemplarFilter: exemplarFilter,
resource: res,
reader: reader,
views: views,
int64Measures: map[observableID[int64]][]aggregate.Measure[int64]{},
float64Measures: map[observableID[float64]][]aggregate.Measure[float64]{},
exemplarFilter: exemplarFilter,
// aggregations is lazy allocated when needed.
}
}
Expand All @@ -64,10 +65,26 @@ type pipeline struct {
views []View

sync.Mutex
aggregations map[instrumentation.Scope][]instrumentSync
callbacks []func(context.Context) error
multiCallbacks list.List
exemplarFilter exemplar.Filter
int64Measures map[observableID[int64]][]aggregate.Measure[int64]
float64Measures map[observableID[float64]][]aggregate.Measure[float64]
aggregations map[instrumentation.Scope][]instrumentSync
callbacks []func(context.Context) error
multiCallbacks list.List
exemplarFilter exemplar.Filter
}

// addInt64Measure adds a new int64 measure to the pipeline for each observer.
func (p *pipeline) addInt64Measure(id observableID[int64], m []aggregate.Measure[int64]) {
p.Lock()
defer p.Unlock()
p.int64Measures[id] = m
}

// addFloat64Measure adds a new float64 measure to the pipeline for each observer.
func (p *pipeline) addFloat64Measure(id observableID[float64], m []aggregate.Measure[float64]) {
p.Lock()
defer p.Unlock()
p.float64Measures[id] = m
}

// addSync adds the instrumentSync to pipeline p with scope. This method is not
Expand Down Expand Up @@ -574,14 +591,6 @@ func newPipelines(res *resource.Resource, readers []Reader, views []View, exempl
return pipes
}

func (p pipelines) registerMultiCallback(c multiCallback) metric.Registration {
unregs := make([]func(), len(p))
for i, pipe := range p {
unregs[i] = pipe.addMultiCallback(c)
}
return unregisterFuncs{f: unregs}
}

type unregisterFuncs struct {
embedded.Registration
f []func()
Expand Down
106 changes: 106 additions & 0 deletions sdk/metric/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/go-logr/logr"
"github.com/go-logr/logr/funcr"
Expand All @@ -24,6 +26,7 @@ import (
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
"go.opentelemetry.io/otel/sdk/resource"
Expand Down Expand Up @@ -101,6 +104,21 @@ func TestPipelineConcurrentSafe(t *testing.T) {
defer wg.Done()
pipe.addMultiCallback(func(context.Context) error { return nil })
}()

wg.Add(1)
go func() {
defer wg.Done()
b := aggregate.Builder[int64]{
Temporality: metricdata.CumulativeTemporality,
ReservoirFunc: nil,
AggregationLimit: 0,
}
var oID observableID[int64]
m, _ := b.PrecomputedSum(false)
measures := []aggregate.Measure[int64]{}
measures = append(measures, m)
pipe.addInt64Measure(oID, measures)
}()
}
wg.Wait()
}
Expand Down Expand Up @@ -518,3 +536,91 @@ func TestExemplars(t *testing.T) {
check(t, r, 2, 2, 2)
})
}

func TestAddingAndObservingMeasureConcurrency(t *testing.T) {
exp := &fnExporter{}
r1 := NewPeriodicReader(exp, WithInterval(1*time.Second))
r2 := NewPeriodicReader(exp, WithInterval(600*time.Second))

mp := NewMeterProvider(WithReader(r1), WithReader(r2))
m := mp.Meter("test")

oc1, err := m.Int64ObservableCounter("int64-observable-counter")
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
_, err := m.Int64ObservableCounter("int64-observable-counter-2")
require.NoError(t, err)
}()

wg.Add(1)
go func() {
defer wg.Done()
_, err := m.RegisterCallback(
func(_ context.Context, o metric.Observer) error {
o.ObserveInt64(oc1, 2)
return nil
}, oc1)
require.NoError(t, err)
}()

wg.Add(1)
go func() {
defer wg.Done()
_ = mp.pipes[0].produce(context.Background(), &metricdata.ResourceMetrics{})
}()

wg.Add(1)
go func() {
defer wg.Done()
_ = mp.pipes[1].produce(context.Background(), &metricdata.ResourceMetrics{})
}()

wg.Wait()
}

func TestPipelineWithMultipleReaders(t *testing.T) {
exp := &fnExporter{}
r1 := NewPeriodicReader(exp, WithInterval(1*time.Second))
r2 := NewPeriodicReader(exp, WithInterval(600*time.Second))

mp := NewMeterProvider(WithReader(r1), WithReader(r2))
m := mp.Meter("test")

var val atomic.Int64
val.Add(1)
measure := func(_ context.Context, m metric.Meter) {
oc, err := m.Int64ObservableCounter("int64-observable-counter")
require.NoError(t, err)
_, err = m.RegisterCallback(
// SDK periodically calls this function to collect data.
func(_ context.Context, o metric.Observer) error {
o.ObserveInt64(oc, val.Load())
return nil
}, oc)
require.NoError(t, err)
}
ctx := context.Background()
measure(ctx, m)
rm := new(metricdata.ResourceMetrics)
val.Add(1)

// adding sleep deliberately so that the callback get triggered
time.Sleep(2 * time.Second)
err := r1.Collect(ctx, rm)
require.NoError(t, err)
assert.Len(t, rm.ScopeMetrics, 1)
assert.Len(t, rm.ScopeMetrics[0].Metrics, 1)
assert.Equal(t, int64(2), rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Value)

val.Add(1)
time.Sleep(1 * time.Second)
err = r2.Collect(ctx, rm)
require.NoError(t, err)
assert.Len(t, rm.ScopeMetrics, 1)
assert.Len(t, rm.ScopeMetrics[0].Metrics, 1)
assert.Equal(t, int64(3), rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Value)
}
Loading