Skip to content

Commit 2d02663

Browse files
authored
[receiver/discovery] Emit evaluation entities by the endpoint tracker (#4728)
This change consolidates the entity emitting in one place: endpoint tracker. Once the evaluation succeeds, the endpoint attributes are updated in the correlation store and the new entity state is emitted right away. Every subsequent state is emitted with all the added attributes. The tests are kept with minimal changes to ensure that no attributes are lost in the emitted entities. Currently, we emit superset of all the attributes that were previously emitted on regular basis or on the evaluation. Later, we will reduce the set and remove redundant attributes.
1 parent 660c89e commit 2d02663

File tree

11 files changed

+148
-206
lines changed

11 files changed

+148
-206
lines changed

internal/receiver/discoveryreceiver/correlation.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ type correlationStore interface {
5050
GetOrCreate(endpointID observer.EndpointID, receiverID component.ID) correlation
5151
Attrs(endpointID observer.EndpointID) map[string]string
5252
UpdateAttrs(endpointID observer.EndpointID, attrs map[string]string)
53+
EmitCh() chan correlation
5354
Endpoints(updatedBefore time.Time) []observer.Endpoint
5455
// Start the reaping loop to prevent unnecessary endpoint buildup
5556
Start()
@@ -71,6 +72,7 @@ type store struct {
7172
attrs *sync.Map
7273
// sentinel for terminating reaper loop
7374
sentinel chan struct{}
75+
emitCh chan correlation
7476
reapInterval time.Duration
7577
ttl time.Duration
7678
}
@@ -84,6 +86,7 @@ func newCorrelationStore(logger *zap.Logger, ttl time.Duration) correlationStore
8486
reapInterval: 30 * time.Second,
8587
ttl: ttl,
8688
sentinel: make(chan struct{}, 1),
89+
emitCh: make(chan correlation),
8790
}
8891
}
8992

@@ -118,6 +121,11 @@ func (s *store) MarkStale(endpointID observer.EndpointID) {
118121
}
119122
}
120123

124+
// EmitCh returns a channel to emit endpoints immediately.
125+
func (s *store) EmitCh() chan correlation {
126+
return s.emitCh
127+
}
128+
121129
// Endpoints returns all active endpoints that have not been updated since the provided time.
122130
func (s *store) Endpoints(updatedBefore time.Time) []observer.Endpoint {
123131
var endpoints []observer.Endpoint

internal/receiver/discoveryreceiver/endpoint_tracker.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ func (et *endpointTracker) startEmitLoop() {
9292
timer := time.NewTicker(et.emitInterval)
9393
for {
9494
select {
95+
case corr := <-et.correlations.EmitCh():
96+
et.emitEntityStateEvents(corr.observerID, []observer.Endpoint{corr.endpoint})
9597
case <-timer.C:
9698
for obs := range et.observables {
9799
et.emitEntityStateEvents(obs, et.correlations.Endpoints(time.Now().Add(-et.emitInterval)))
@@ -114,7 +116,7 @@ func (et *endpointTracker) stop() {
114116

115117
func (et *endpointTracker) emitEntityStateEvents(observerCID component.ID, endpoints []observer.Endpoint) {
116118
if et.pLogs != nil {
117-
entityEvents, numFailed, err := entityStateEvents(observerCID, endpoints, time.Now())
119+
entityEvents, numFailed, err := entityStateEvents(observerCID, endpoints, et.correlations, time.Now())
118120
if err != nil {
119121
et.logger.Warn(fmt.Sprintf("failed converting %v endpoints to log records", numFailed), zap.Error(err))
120122
}
@@ -209,7 +211,7 @@ func (n *notify) OnChange(changed []observer.Endpoint) {
209211
n.endpointTracker.updateEndpoints(changed, n.observerID)
210212
}
211213

212-
func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, ts time.Time) (ees experimentalmetricmetadata.EntityEventsSlice, failed int, err error) {
214+
func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, correlations correlationStore, ts time.Time) (ees experimentalmetricmetadata.EntityEventsSlice, failed int, err error) {
213215
entityEvents := experimentalmetricmetadata.NewEntityEventsSlice()
214216
for _, endpoint := range endpoints {
215217
entityEvent := entityEvents.AppendEmpty()
@@ -230,6 +232,9 @@ func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, t
230232
attrs.PutStr("endpoint", endpoint.Target)
231233
attrs.PutStr(observerNameAttr, observerID.Name())
232234
attrs.PutStr(observerTypeAttr, observerID.Type().String())
235+
for k, v := range correlations.Attrs(endpoint.ID) {
236+
attrs.PutStr(k, v)
237+
}
233238
}
234239
return entityEvents, failed, err
235240
}

internal/receiver/discoveryreceiver/endpoint_tracker_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ func TestEndpointToPLogsHappyPath(t *testing.T) {
164164
t.Run(test.name, func(t *testing.T) {
165165
events, failed, err := entityStateEvents(
166166
component.MustNewIDWithName("observer_type", "observer.name"),
167-
[]observer.Endpoint{test.endpoint}, t0,
167+
[]observer.Endpoint{test.endpoint}, newCorrelationStore(zap.NewNop(), time.Hour), t0,
168168
)
169169
require.NoError(t, err)
170170
require.Zero(t, failed)
@@ -276,7 +276,7 @@ func TestEndpointToPLogsInvalidEndpoints(t *testing.T) {
276276
// Validate entity_state event
277277
events, failed, err := entityStateEvents(
278278
component.MustNewIDWithName("observer_type", "observer.name"),
279-
[]observer.Endpoint{test.endpoint}, t0,
279+
[]observer.Endpoint{test.endpoint}, newCorrelationStore(zap.NewNop(), time.Hour), t0,
280280
)
281281
if test.expectedError != "" {
282282
require.Error(t, err)
@@ -343,7 +343,7 @@ func FuzzEndpointToPlogs(f *testing.F) {
343343
Transport: observer.Transport(transport),
344344
},
345345
},
346-
}, t0,
346+
}, newCorrelationStore(zap.NewNop(), time.Hour), t0,
347347
)
348348

349349
expectedLogs := expectedPLogs(endpointID)
@@ -661,6 +661,7 @@ func TestEntityEmittingLifecycle(t *testing.T) {
661661
require.Equal(t, 1, gotLogs.LogRecordCount())
662662
// TODO: Use plogtest.IgnoreTimestamp once available
663663
expectedEvents, failed, err := entityStateEvents(obsID, []observer.Endpoint{portEndpoint},
664+
newCorrelationStore(zap.NewNop(), time.Hour),
664665
gotLogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Timestamp().AsTime())
665666
require.NoError(t, err)
666667
require.Zero(t, failed)

internal/receiver/discoveryreceiver/evaluator.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"github.com/antonmedv/expr/vm"
2525
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer"
2626
"go.opentelemetry.io/collector/component"
27-
"go.opentelemetry.io/collector/pdata/pcommon"
2827
"go.uber.org/zap"
2928
"gopkg.in/yaml.v2"
3029

@@ -117,10 +116,10 @@ func (e *evaluator) evaluateMatch(match Match, pattern string, status discovery.
117116
}
118117

119118
// correlateResourceAttributes sets correlation attributes including embedded base64 config content, if configured.
120-
func (e *evaluator) correlateResourceAttributes(cfg *Config, to pcommon.Map, corr correlation) {
119+
func (e *evaluator) correlateResourceAttributes(cfg *Config, to map[string]string, corr correlation) {
121120
observerID := corr.observerID.String()
122121
if observerID != "" && observerID != discovery.NoType.String() {
123-
to.PutStr(discovery.ObserverIDAttr, observerID)
122+
to[discovery.ObserverIDAttr] = observerID
124123
}
125124

126125
if e.config.EmbedReceiverConfig {
@@ -140,6 +139,6 @@ func (e *evaluator) correlateResourceAttributes(cfg *Config, to pcommon.Map, cor
140139
if err != nil {
141140
e.logger.Error("failed embedding receiver config", zap.String("observer", observerID), zap.Error(err))
142141
}
143-
to.PutStr(discovery.ReceiverConfigAttr, base64.StdEncoding.EncodeToString(cfgYaml))
142+
to[discovery.ReceiverConfigAttr] = base64.StdEncoding.EncodeToString(cfgYaml)
144143
}
145144
}

internal/receiver/discoveryreceiver/evaluator_test.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer"
2525
"github.com/stretchr/testify/require"
2626
"go.opentelemetry.io/collector/component"
27-
"go.opentelemetry.io/collector/pdata/pcommon"
2827
"go.uber.org/zap"
2928
)
3029

@@ -124,12 +123,11 @@ func TestCorrelateResourceAttrs(t *testing.T) {
124123
},
125124
}
126125

127-
to := pcommon.NewMap()
128-
126+
to := map[string]string{}
129127
require.Empty(t, eval.correlations.Attrs(endpointID))
130128
eval.correlateResourceAttributes(cfg, to, corr)
131129

132-
expectedResourceAttrs := map[string]any{
130+
expectedResourceAttrs := map[string]string{
133131
"discovery.observer.id": "type/name",
134132
}
135133

@@ -147,7 +145,7 @@ watch_observers:
147145
`))
148146
}
149147

150-
require.Equal(t, expectedResourceAttrs, to.AsRaw())
148+
require.Equal(t, expectedResourceAttrs, to)
151149
})
152150
}
153151
}

internal/receiver/discoveryreceiver/metric_evaluator.go

Lines changed: 31 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,9 @@ import (
1919
"fmt"
2020

2121
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer"
22-
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
2322
"go.opentelemetry.io/collector/component"
2423
"go.opentelemetry.io/collector/consumer"
2524
"go.opentelemetry.io/collector/pdata/pcommon"
26-
"go.opentelemetry.io/collector/pdata/plog"
2725
"go.opentelemetry.io/collector/pdata/pmetric"
2826
"go.uber.org/zap"
2927
"go.uber.org/zap/zapcore"
@@ -47,12 +45,10 @@ var (
4745
// Status match rules. If so, they emit log records for the matching metric.
4846
type metricEvaluator struct {
4947
*evaluator
50-
pLogs chan plog.Logs
5148
}
5249

53-
func newMetricEvaluator(logger *zap.Logger, cfg *Config, pLogs chan plog.Logs, correlations correlationStore) *metricEvaluator {
50+
func newMetricEvaluator(logger *zap.Logger, cfg *Config, correlations correlationStore) *metricEvaluator {
5451
return &metricEvaluator{
55-
pLogs: pLogs,
5652
evaluator: newEvaluator(logger, cfg, correlations,
5753
// TODO: provide more capable env w/ resource and metric attributes
5854
func(pattern string) map[string]any {
@@ -67,15 +63,13 @@ func (m *metricEvaluator) Capabilities() consumer.Capabilities {
6763
}
6864

6965
func (m *metricEvaluator) ConsumeMetrics(_ context.Context, md pmetric.Metrics) error {
70-
if pLogs := m.evaluateMetrics(md); pLogs.LogRecordCount() > 0 {
71-
m.pLogs <- pLogs
72-
}
66+
m.evaluateMetrics(md)
7367
return nil
7468
}
7569

7670
// evaluateMetrics parses the provided Metrics and returns plog.Logs with a single log record if it matches
7771
// against the first applicable configured Status match rule.
78-
func (m *metricEvaluator) evaluateMetrics(md pmetric.Metrics) plog.Logs {
72+
func (m *metricEvaluator) evaluateMetrics(md pmetric.Metrics) {
7973
if ce := m.logger.Check(zapcore.DebugLevel, "evaluating metrics"); ce != nil {
8074
if mbytes, err := jsonMarshaler.MarshalMetrics(md); err == nil {
8175
ce.Write(zap.ByteString("metrics", mbytes))
@@ -84,22 +78,22 @@ func (m *metricEvaluator) evaluateMetrics(md pmetric.Metrics) plog.Logs {
8478
}
8579
}
8680
if md.MetricCount() == 0 {
87-
return plog.NewLogs()
81+
return
8882
}
8983

9084
receiverID, endpointID := statussources.MetricsToReceiverIDs(md)
9185
if receiverID == discovery.NoType || endpointID == "" {
9286
m.logger.Debug("unable to evaluate metrics from receiver without corresponding name or Endpoint.ID", zap.Any("metrics", md))
93-
return plog.NewLogs()
87+
return
9488
}
9589

9690
rEntry, ok := m.config.Receivers[receiverID]
9791
if !ok {
9892
m.logger.Info("No matching configured receiver for metric status evaluation", zap.String("receiver", receiverID.String()))
99-
return plog.NewLogs()
93+
return
10094
}
10195
if rEntry.Status == nil || len(rEntry.Status.Metrics) == 0 {
102-
return plog.NewLogs()
96+
return
10397
}
10498

10599
for _, match := range rEntry.Status.Metrics {
@@ -108,20 +102,26 @@ func (m *metricEvaluator) evaluateMetrics(md pmetric.Metrics) plog.Logs {
108102
continue
109103
}
110104

111-
entityEvents := experimentalmetricmetadata.NewEntityEventsSlice()
112-
entityEvent := entityEvents.AppendEmpty()
113-
entityEvent.ID().PutStr(discovery.EndpointIDAttr, string(endpointID))
114-
entityState := entityEvent.SetEntityState()
115-
116-
res.Attributes().CopyTo(entityState.Attributes())
117105
corr := m.correlations.GetOrCreate(endpointID, receiverID)
118-
m.correlateResourceAttributes(m.config, entityState.Attributes(), corr)
106+
attrs := m.correlations.Attrs(endpointID)
119107

120-
// Remove the endpoint ID from the attributes as it's set in the entity ID.
121-
entityState.Attributes().Remove(discovery.EndpointIDAttr)
108+
// If the status is already the same as desired, we don't need to update the entity state.
109+
if match.Status == discovery.StatusType(attrs[discovery.StatusAttr]) {
110+
return
111+
}
122112

123-
entityState.Attributes().PutStr(eventTypeAttr, metricMatch)
124-
entityState.Attributes().PutStr(receiverRuleAttr, rEntry.Rule.String())
113+
res.Attributes().Range(func(k string, v pcommon.Value) bool {
114+
// skip endpoint ID attr since it's set in the entity ID
115+
if k == discovery.EndpointIDAttr {
116+
return true
117+
}
118+
attrs[k] = v.AsString()
119+
return true
120+
})
121+
m.correlateResourceAttributes(m.config, attrs, corr)
122+
123+
attrs[eventTypeAttr] = metricMatch
124+
attrs[receiverRuleAttr] = rEntry.Rule.String()
125125

126126
desiredRecord := match.Record
127127
if desiredRecord == nil {
@@ -131,19 +131,17 @@ func (m *metricEvaluator) evaluateMetrics(md pmetric.Metrics) plog.Logs {
131131
if desiredRecord.Body != "" {
132132
desiredMsg = desiredRecord.Body
133133
}
134-
entityState.Attributes().PutStr(discovery.MessageAttr, desiredMsg)
134+
attrs[discovery.MessageAttr] = desiredMsg
135135
for k, v := range desiredRecord.Attributes {
136-
entityState.Attributes().PutStr(k, v)
137-
}
138-
entityState.Attributes().PutStr(metricNameAttr, metric.Name())
139-
entityState.Attributes().PutStr(discovery.StatusAttr, string(match.Status))
140-
if ts := m.timestampFromMetric(metric); ts != nil {
141-
entityEvent.SetTimestamp(*ts)
136+
attrs[k] = v
142137
}
138+
attrs[metricNameAttr] = metric.Name()
139+
attrs[discovery.StatusAttr] = string(match.Status)
140+
m.correlations.UpdateAttrs(endpointID, attrs)
143141

144-
return entityEvents.ConvertAndMoveToLogs()
142+
m.correlations.EmitCh() <- corr
143+
return
145144
}
146-
return plog.NewLogs()
147145
}
148146

149147
// findMatchedMetric finds the metric that matches the provided match rule and return it along with the resource if found.
@@ -166,42 +164,3 @@ func (m *metricEvaluator) findMatchedMetric(md pmetric.Metrics, match Match, rec
166164
}
167165
return pcommon.NewResource(), pmetric.NewMetric(), false
168166
}
169-
170-
func (m *metricEvaluator) timestampFromMetric(metric pmetric.Metric) *pcommon.Timestamp {
171-
var ts *pcommon.Timestamp
172-
switch dt := metric.Type(); dt {
173-
case pmetric.MetricTypeGauge:
174-
dps := metric.Gauge().DataPoints()
175-
if dps.Len() > 0 {
176-
t := dps.At(0).Timestamp()
177-
ts = &t
178-
}
179-
case pmetric.MetricTypeSum:
180-
dps := metric.Sum().DataPoints()
181-
if dps.Len() > 0 {
182-
t := dps.At(0).Timestamp()
183-
ts = &t
184-
}
185-
case pmetric.MetricTypeHistogram:
186-
dps := metric.Histogram().DataPoints()
187-
if dps.Len() > 0 {
188-
t := dps.At(0).Timestamp()
189-
ts = &t
190-
}
191-
case pmetric.MetricTypeExponentialHistogram:
192-
dps := metric.ExponentialHistogram().DataPoints()
193-
if dps.Len() > 0 {
194-
t := dps.At(0).Timestamp()
195-
ts = &t
196-
}
197-
case pmetric.MetricTypeSummary:
198-
dps := metric.Summary().DataPoints()
199-
if dps.Len() > 0 {
200-
t := dps.At(0).Timestamp()
201-
ts = &t
202-
}
203-
default:
204-
m.logger.Debug("cannot get timestamp from data type", zap.String("data type", dt.String()))
205-
}
206-
return ts
207-
}

0 commit comments

Comments
 (0)