Skip to content

Commit b0212bb

Browse files
authored
[chore] [receiver/discovery] Simplify endpoint evaluation tests (#4737)
We don't need to parse the entity events in the evaluation tests. They can be simplified. The `entityStateEvents` and `entityDeleteEvents` functions can be tested separately.
1 parent 9b4f189 commit b0212bb

File tree

3 files changed

+100
-91
lines changed

3 files changed

+100
-91
lines changed

internal/receiver/discoveryreceiver/endpoint_tracker_test.go

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"time"
2222

2323
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer"
24+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
2425
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest"
2526
"github.com/stretchr/testify/assert"
2627
"github.com/stretchr/testify/require"
@@ -378,7 +379,7 @@ func FuzzEndpointToPlogs(f *testing.F) {
378379
}
379380

380381
var (
381-
t0 = time.Unix(0, 0)
382+
t0 = time.Unix(0, 0).UTC()
382383

383384
podEndpoint = observer.Endpoint{
384385
ID: observer.EndpointID("pod.endpoint.id"),
@@ -694,3 +695,64 @@ func (f *fakeObservable) ListAndWatch(notify observer.Notify) {
694695
}
695696

696697
func (f *fakeObservable) Unsubscribe(observer.Notify) {}
698+
699+
func TestEntityStateEvents(t *testing.T) {
700+
logger := zap.NewNop()
701+
cfg := createDefaultConfig().(*Config)
702+
cfg.Receivers = map[component.ID]ReceiverEntry{
703+
component.MustNewIDWithName("fake_receiver", ""): {
704+
Rule: mustNewRule(`type == "port" && pod.name == "pod.name" && port == 1`),
705+
},
706+
}
707+
708+
cStore := newCorrelationStore(logger, cfg.CorrelationTTL)
709+
cStore.UpdateAttrs(portEndpoint.ID, map[string]string{
710+
"attr1": "val1",
711+
"attr2": "val2",
712+
})
713+
714+
events, failed, err := entityStateEvents(component.MustNewIDWithName("observer_type", "observer.name"),
715+
[]observer.Endpoint{portEndpoint}, cStore, t0)
716+
require.NoError(t, err)
717+
require.Zero(t, failed)
718+
require.Equal(t, 1, events.Len())
719+
720+
event := events.At(0)
721+
assert.Equal(t, experimentalmetricmetadata.EventTypeState, event.EventType())
722+
assert.Equal(t, t0, event.Timestamp().AsTime())
723+
assert.Equal(t, map[string]any{discovery.EndpointIDAttr: string(portEndpoint.ID)}, event.ID().AsRaw())
724+
assert.Equal(t, map[string]any{
725+
observerNameAttr: "observer.name",
726+
observerTypeAttr: "observer_type",
727+
"endpoint": "port.target",
728+
"name": "port.name",
729+
"port": int64(1),
730+
"pod": map[string]any{
731+
"annotations": map[string]any{
732+
"annotation.one": "value.one",
733+
"annotation.two": "value.two",
734+
},
735+
"labels": map[string]any{
736+
"label.one": "value.one",
737+
"label.two": "value.two",
738+
},
739+
"name": "pod.name",
740+
"namespace": "namespace",
741+
"uid": "uid",
742+
},
743+
"transport": "transport",
744+
"type": "port",
745+
"attr1": "val1",
746+
"attr2": "val2",
747+
}, event.EntityStateDetails().Attributes().AsRaw())
748+
}
749+
750+
func TestEntityDeleteEvents(t *testing.T) {
751+
events := entityDeleteEvents([]observer.Endpoint{portEndpoint}, t0)
752+
require.Equal(t, 1, events.Len())
753+
754+
event := events.At(0)
755+
assert.Equal(t, experimentalmetricmetadata.EventTypeDelete, event.EventType())
756+
assert.Equal(t, t0, event.Timestamp().AsTime())
757+
assert.Equal(t, map[string]any{discovery.EndpointIDAttr: string(portEndpoint.ID)}, event.ID().AsRaw())
758+
}

internal/receiver/discoveryreceiver/metric_evaluator_test.go

Lines changed: 16 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,13 @@ func TestMetricEvaluation(t *testing.T) {
8585
emitCh := cStore.EmitCh()
8686
emitWG := sync.WaitGroup{}
8787
emitWG.Add(1)
88-
var corr correlation
8988
go func() {
90-
corr = <-emitCh
89+
<-emitCh
9190
emitWG.Done()
9291
}()
9392

94-
cStore.UpdateEndpoint(observer.Endpoint{ID: "endpoint.id"}, receiverID, observerID)
93+
endpointID := observer.EndpointID("endpoint.id")
94+
cStore.UpdateEndpoint(observer.Endpoint{ID: endpointID}, receiverID, observerID)
9595

9696
me := newMetricEvaluator(logger, cfg, cStore)
9797

@@ -126,44 +126,19 @@ func TestMetricEvaluation(t *testing.T) {
126126
// wait for the emit channel to be processed
127127
emitWG.Wait()
128128

129-
entityEvents, numFailed, err := entityStateEvents(corr.observerID,
130-
[]observer.Endpoint{corr.endpoint}, cStore, time.Now())
131-
require.NoError(t, err)
132-
require.Equal(t, 0, numFailed)
133-
emitted := entityEvents.ConvertAndMoveToLogs()
134-
rl := emitted.ResourceLogs().At(0)
135-
require.Equal(t, 0, rl.Resource().Attributes().Len())
136-
137-
sLogs := rl.ScopeLogs()
138-
require.Equal(t, 1, sLogs.Len())
139-
sl := sLogs.At(0)
140-
lrs := sl.LogRecords()
141-
require.Equal(t, 1, lrs.Len())
142-
lr := sl.LogRecords().At(0)
143-
144-
lrAttrs := lr.Attributes()
145-
require.Equal(t, map[string]any{
146-
discovery.OtelEntityIDAttr: map[string]any{
147-
"discovery.endpoint.id": "endpoint.id",
148-
},
149-
discovery.OtelEntityEventTypeAttr: discovery.OtelEntityEventTypeState,
150-
discovery.OtelEntityAttributesAttr: map[string]any{
151-
"discovery.event.type": "metric.match",
152-
"discovery.observer.id": "an_observer/observer.name",
153-
"discovery.receiver.name": "receiver.name",
154-
"discovery.receiver.rule": "a.rule",
155-
"discovery.receiver.type": "a_receiver",
156-
"discovery.status": string(status),
157-
"discovery.message": "desired body content",
158-
"metric.name": "desired.name",
159-
"one": "one.value",
160-
"two": "two.value",
161-
"extra_attr": "target_resource",
162-
"discovery.observer.name": "observer.name",
163-
"discovery.observer.type": "an_observer",
164-
"endpoint": "",
165-
},
166-
}, lrAttrs.AsRaw())
129+
require.Equal(t, map[string]string{
130+
"discovery.event.type": "metric.match",
131+
"discovery.observer.id": "an_observer/observer.name",
132+
"discovery.receiver.name": "receiver.name",
133+
"discovery.receiver.rule": "a.rule",
134+
"discovery.receiver.type": "a_receiver",
135+
"discovery.status": string(status),
136+
"discovery.message": "desired body content",
137+
"metric.name": "desired.name",
138+
"one": "one.value",
139+
"two": "two.value",
140+
"extra_attr": "target_resource",
141+
}, cStore.Attrs(endpointID))
167142
})
168143
}
169144
})

internal/receiver/discoveryreceiver/statement_evaluator_test.go

Lines changed: 21 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,14 @@ func TestStatementEvaluation(t *testing.T) {
7474
emitCh := cStore.EmitCh()
7575
emitWG := sync.WaitGroup{}
7676
emitWG.Add(1)
77-
var corr correlation
7877
go func() {
79-
corr = <-emitCh
78+
<-emitCh
8079
emitWG.Done()
8180
}()
8281

8382
receiverID := component.MustNewIDWithName("a_receiver", "receiver.name")
84-
cStore.UpdateEndpoint(observer.Endpoint{ID: "endpoint.id"}, receiverID, observerID)
83+
endpointID := observer.EndpointID("endpoint.id")
84+
cStore.UpdateEndpoint(observer.Endpoint{ID: endpointID}, receiverID, observerID)
8585

8686
se, err := newStatementEvaluator(logger, component.MustNewID("some_type"), cfg, cStore)
8787
require.NoError(t, err)
@@ -107,36 +107,17 @@ func TestStatementEvaluation(t *testing.T) {
107107
// wait for the emit channel to be processed
108108
emitWG.Wait()
109109

110-
entityEvents, numFailed, err := entityStateEvents(corr.observerID,
111-
[]observer.Endpoint{corr.endpoint}, cStore, time.Now())
112-
require.NoError(t, err)
113-
require.Equal(t, 0, numFailed)
114-
emitted := entityEvents.ConvertAndMoveToLogs()
115-
116-
require.Equal(t, 1, emitted.ResourceLogs().Len())
117-
rl := emitted.ResourceLogs().At(0)
118-
require.Equal(t, 0, rl.Resource().Attributes().Len())
119-
120-
sLogs := rl.ScopeLogs()
121-
require.Equal(t, 1, sLogs.Len())
122-
sl := sLogs.At(0)
123-
lrs := sl.LogRecords()
124-
require.Equal(t, 1, lrs.Len())
125-
lr := sl.LogRecords().At(0)
126-
127-
oea, ok := lr.Attributes().Get(discovery.OtelEntityAttributesAttr)
128-
require.True(t, ok)
129-
entityAttrs := oea.Map()
110+
attrs := cStore.Attrs(endpointID)
130111

131112
// Validate "caller" attribute
132-
callerAttr, ok := entityAttrs.Get("caller")
113+
callerAttr, ok := attrs["caller"]
133114
require.True(t, ok)
134115
_, expectedFile, _, _ := runtime.Caller(0)
135116
// runtime doesn't use os.PathSeparator
136117
splitPath := strings.Split(expectedFile, "/")
137118
expectedCaller := splitPath[len(splitPath)-1]
138-
require.Contains(t, callerAttr.Str(), expectedCaller)
139-
entityAttrs.Remove("caller")
119+
require.Contains(t, callerAttr, expectedCaller)
120+
delete(attrs, "caller")
140121

141122
// Validate the rest of the attributes
142123
expectedMsg := "desired body content"
@@ -147,29 +128,20 @@ func TestStatementEvaluation(t *testing.T) {
147128
expectedMsg = fmt.Sprintf("%s (evaluated \"{\\\"field.one\\\":\\\"field.one.value\\\",\\\"field_two\\\":\\\"field.two.value\\\",\\\"message\\\":\\\"desired.statement\\\"}\")", expectedMsg)
148129
}
149130
}
150-
require.Equal(t, map[string]any{
151-
discovery.OtelEntityIDAttr: map[string]any{
152-
"discovery.endpoint.id": "endpoint.id",
153-
},
154-
discovery.OtelEntityEventTypeAttr: discovery.OtelEntityEventTypeState,
155-
discovery.OtelEntityAttributesAttr: map[string]any{
156-
"discovery.event.type": "statement.match",
157-
"discovery.observer.id": "an_observer/observer.name",
158-
"discovery.receiver.name": "receiver.name",
159-
"discovery.receiver.rule": `type == "container"`,
160-
"discovery.receiver.type": "a_receiver",
161-
"discovery.status": string(status),
162-
"discovery.message": expectedMsg,
163-
"name": `a_receiver/receiver.name/receiver_creator/rc.name/{endpoint=""}/endpoint.id`,
164-
"attr.one": "attr.one.value",
165-
"attr.two": "attr.two.value",
166-
"field.one": "field.one.value",
167-
"field_two": "field.two.value",
168-
"discovery.observer.name": "observer.name",
169-
"discovery.observer.type": "an_observer",
170-
"endpoint": "",
171-
},
172-
}, lr.Attributes().AsRaw())
131+
require.Equal(t, map[string]string{
132+
"discovery.event.type": "statement.match",
133+
"discovery.observer.id": "an_observer/observer.name",
134+
"discovery.receiver.name": "receiver.name",
135+
"discovery.receiver.rule": `type == "container"`,
136+
"discovery.receiver.type": "a_receiver",
137+
"discovery.status": string(status),
138+
"discovery.message": expectedMsg,
139+
"name": `a_receiver/receiver.name/receiver_creator/rc.name/{endpoint=""}/endpoint.id`,
140+
"attr.one": "attr.one.value",
141+
"attr.two": "attr.two.value",
142+
"field.one": "field.one.value",
143+
"field_two": "field.two.value",
144+
}, attrs)
173145
})
174146
}
175147
})

0 commit comments

Comments
 (0)