Skip to content

Commit 0d6b220

Browse files
authored
[chore]: [deltatocumulative]: remove nested implementation (#36498)
#### Description Removes the nested (aka overloading `streams.Map`) implementation. This has been entirely replaced by a leaner, "linear" implementation: - #35048 - #36486 <!--Describe what testing was performed and which tests were added.--> #### Testing Existing tests continue to pass unaltered <!--Describe the documentation added.--> #### Documentation not needed <!--Please delete paragraphs that you did not use before submitting.-->
1 parent 5002476 commit 0d6b220

File tree

28 files changed

+188
-1896
lines changed

28 files changed

+188
-1896
lines changed

processor/deltatocumulativeprocessor/chain.go

Lines changed: 0 additions & 51 deletions
This file was deleted.

processor/deltatocumulativeprocessor/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111

1212
"go.opentelemetry.io/collector/component"
1313

14-
telemetry "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/lineartelemetry"
14+
telemetry "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry"
1515
)
1616

1717
var _ component.ConfigValidator = (*Config)(nil)

processor/deltatocumulativeprocessor/factory.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ import (
1111
"go.opentelemetry.io/collector/consumer"
1212
"go.opentelemetry.io/collector/processor"
1313

14-
ltel "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/lineartelemetry"
1514
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata"
15+
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry"
1616
)
1717

1818
func NewFactory() processor.Factory {
@@ -29,13 +29,10 @@ func createMetricsProcessor(_ context.Context, set processor.Settings, cfg compo
2929
return nil, fmt.Errorf("configuration parsing error")
3030
}
3131

32-
ltel, err := ltel.New(set.TelemetrySettings)
32+
tel, err := telemetry.New(set.TelemetrySettings)
3333
if err != nil {
3434
return nil, err
3535
}
3636

37-
proc := newProcessor(pcfg, set.Logger, &ltel.TelemetryBuilder, next)
38-
linear := newLinear(pcfg, ltel, proc)
39-
40-
return Chain{linear, proc}, nil
37+
return newProcessor(pcfg, tel, next), nil
4138
}

processor/deltatocumulativeprocessor/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ require (
2121
go.opentelemetry.io/otel/sdk/metric v1.32.0
2222
go.opentelemetry.io/otel/trace v1.32.0
2323
go.uber.org/goleak v1.3.0
24-
go.uber.org/zap v1.27.0
2524
golang.org/x/tools v0.26.0
2625
gopkg.in/yaml.v3 v3.0.1
2726
)
@@ -52,6 +51,7 @@ require (
5251
go.opentelemetry.io/collector/processor/processorprofiles v0.114.1-0.20241202231142-b9ff1bc54c99 // indirect
5352
go.opentelemetry.io/otel/sdk v1.32.0 // indirect
5453
go.uber.org/multierr v1.11.0 // indirect
54+
go.uber.org/zap v1.27.0 // indirect
5555
golang.org/x/net v0.30.0 // indirect
5656
golang.org/x/sys v0.27.0 // indirect
5757
golang.org/x/text v0.19.0 // indirect

processor/deltatocumulativeprocessor/internal/data/data.go

Lines changed: 0 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -4,116 +4,23 @@
44
package data // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data"
55

66
import (
7-
"go.opentelemetry.io/collector/pdata/pcommon"
87
"go.opentelemetry.io/collector/pdata/pmetric"
98

109
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo"
1110
)
1211

13-
var (
14-
_ Point[Number] = Number{}
15-
_ Point[Histogram] = Histogram{}
16-
_ Point[ExpHistogram] = ExpHistogram{}
17-
_ Point[Summary] = Summary{}
18-
)
19-
20-
type Point[Self any] interface {
21-
StartTimestamp() pcommon.Timestamp
22-
Timestamp() pcommon.Timestamp
23-
Attributes() pcommon.Map
24-
25-
Clone() Self
26-
CopyTo(Self)
27-
28-
Add(Self) Self
29-
}
30-
31-
type Typed[Self any] interface {
32-
Point[Self]
33-
Number | Histogram | ExpHistogram | Summary
34-
}
35-
3612
type Number struct {
3713
pmetric.NumberDataPoint
3814
}
3915

40-
func Zero[P Typed[P]]() P {
41-
var point P
42-
switch ty := any(&point).(type) {
43-
case *Number:
44-
ty.NumberDataPoint = pmetric.NewNumberDataPoint()
45-
case *Histogram:
46-
ty.HistogramDataPoint = pmetric.NewHistogramDataPoint()
47-
case *ExpHistogram:
48-
ty.DataPoint = pmetric.NewExponentialHistogramDataPoint()
49-
}
50-
return point
51-
}
52-
53-
func (dp Number) Clone() Number {
54-
clone := Number{NumberDataPoint: pmetric.NewNumberDataPoint()}
55-
if dp.NumberDataPoint != (pmetric.NumberDataPoint{}) {
56-
dp.CopyTo(clone)
57-
}
58-
return clone
59-
}
60-
61-
func (dp Number) CopyTo(dst Number) {
62-
dp.NumberDataPoint.CopyTo(dst.NumberDataPoint)
63-
}
64-
6516
type Histogram struct {
6617
pmetric.HistogramDataPoint
6718
}
6819

69-
func (dp Histogram) Clone() Histogram {
70-
clone := Histogram{HistogramDataPoint: pmetric.NewHistogramDataPoint()}
71-
if dp.HistogramDataPoint != (pmetric.HistogramDataPoint{}) {
72-
dp.CopyTo(clone)
73-
}
74-
return clone
75-
}
76-
77-
func (dp Histogram) CopyTo(dst Histogram) {
78-
dp.HistogramDataPoint.CopyTo(dst.HistogramDataPoint)
79-
}
80-
8120
type ExpHistogram struct {
8221
expo.DataPoint
8322
}
8423

85-
func (dp ExpHistogram) Clone() ExpHistogram {
86-
clone := ExpHistogram{DataPoint: pmetric.NewExponentialHistogramDataPoint()}
87-
if dp.DataPoint != (expo.DataPoint{}) {
88-
dp.CopyTo(clone)
89-
}
90-
return clone
91-
}
92-
93-
func (dp ExpHistogram) CopyTo(dst ExpHistogram) {
94-
dp.DataPoint.CopyTo(dst.DataPoint)
95-
}
96-
97-
type mustPoint[D Point[D]] struct{ _ D }
98-
99-
var (
100-
_ = mustPoint[Number]{}
101-
_ = mustPoint[Histogram]{}
102-
_ = mustPoint[ExpHistogram]{}
103-
)
104-
10524
type Summary struct {
10625
pmetric.SummaryDataPoint
10726
}
108-
109-
func (dp Summary) Clone() Summary {
110-
clone := Summary{SummaryDataPoint: pmetric.NewSummaryDataPoint()}
111-
if dp.SummaryDataPoint != (pmetric.SummaryDataPoint{}) {
112-
dp.CopyTo(clone)
113-
}
114-
return clone
115-
}
116-
117-
func (dp Summary) CopyTo(dst Summary) {
118-
dp.SummaryDataPoint.CopyTo(dst.SummaryDataPoint)
119-
}

processor/deltatocumulativeprocessor/internal/delta/delta.go

Lines changed: 0 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -9,55 +9,9 @@ import (
99
"go.opentelemetry.io/collector/pdata/pcommon"
1010
"go.opentelemetry.io/collector/pdata/pmetric"
1111

12-
exp "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams"
1312
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data"
14-
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams"
1513
)
1614

17-
func New[D data.Point[D]]() Accumulator[D] {
18-
return Accumulator[D]{
19-
Map: make(exp.HashMap[D]),
20-
}
21-
}
22-
23-
var _ streams.Map[data.Number] = (*Accumulator[data.Number])(nil)
24-
25-
type Accumulator[D data.Point[D]] struct {
26-
streams.Map[D]
27-
}
28-
29-
func (a Accumulator[D]) Store(id streams.Ident, dp D) error {
30-
aggr, ok := a.Map.Load(id)
31-
32-
// new series: initialize with current sample
33-
if !ok {
34-
clone := dp.Clone()
35-
return a.Map.Store(id, clone)
36-
}
37-
38-
// drop bad samples
39-
switch {
40-
case dp.StartTimestamp() < aggr.StartTimestamp():
41-
// belongs to older series
42-
return ErrOlderStart{Start: aggr.StartTimestamp(), Sample: dp.StartTimestamp()}
43-
case dp.Timestamp() <= aggr.Timestamp():
44-
// out of order
45-
return ErrOutOfOrder{Last: aggr.Timestamp(), Sample: dp.Timestamp()}
46-
}
47-
48-
// detect gaps
49-
var gap error
50-
if dp.StartTimestamp() > aggr.Timestamp() {
51-
gap = ErrGap{From: aggr.Timestamp(), To: dp.StartTimestamp()}
52-
}
53-
54-
res := aggr.Add(dp)
55-
if err := a.Map.Store(id, res); err != nil {
56-
return err
57-
}
58-
return gap
59-
}
60-
6115
type ErrOlderStart struct {
6216
Start pcommon.Timestamp
6317
Sample pcommon.Timestamp
@@ -76,14 +30,6 @@ func (e ErrOutOfOrder) Error() string {
7630
return fmt.Sprintf("out of order: dropped sample from time=%s, because series is already at time=%s", e.Sample, e.Last)
7731
}
7832

79-
type ErrGap struct {
80-
From, To pcommon.Timestamp
81-
}
82-
83-
func (e ErrGap) Error() string {
84-
return fmt.Sprintf("gap in stream from %s to %s. samples were likely lost in transit", e.From, e.To)
85-
}
86-
8733
type Type interface {
8834
pmetric.NumberDataPoint | pmetric.HistogramDataPoint | pmetric.ExponentialHistogramDataPoint
8935

0 commit comments

Comments
 (0)