Skip to content

Commit 787a646

Browse files
authored
[receiver/discovery] Send delete entity events for discovered services (#6260)
Send delete entity events once discovered services are removed to keep the "discovered services" view up-to-date. This change requires pulling the upstream bugfix open-telemetry/opentelemetry-collector-contrib#40279. This also moves service.name and service.type attributes to the set of identifying attributes according to the entity definition in the inventory service.
1 parent 87bee7a commit 787a646

File tree

5 files changed

+72
-83
lines changed

5 files changed

+72
-83
lines changed

go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ require (
151151
go.opentelemetry.io/collector/extension v1.32.0
152152
go.opentelemetry.io/collector/extension/zpagesextension v0.126.0
153153
go.opentelemetry.io/collector/otelcol v0.126.0
154-
go.opentelemetry.io/collector/pdata v1.32.0
154+
go.opentelemetry.io/collector/pdata v1.32.1-0.20250523042642-a867641d12bd
155155
go.opentelemetry.io/collector/pipeline v0.126.0
156156
go.opentelemetry.io/collector/processor v1.32.0
157157
go.opentelemetry.io/collector/processor/batchprocessor v0.126.0
@@ -653,7 +653,7 @@ require (
653653
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.126.0 // indirect
654654
github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.126.0 // indirect
655655
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.126.0 // indirect
656-
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata v0.126.0
656+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata v0.126.1-0.20250526171903-169686dbc75d
657657
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/signalfx v0.126.0 // indirect
658658
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.126.0 // indirect
659659
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/winperfcounters v0.126.0 // indirect
@@ -723,7 +723,7 @@ require (
723723
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
724724
gonum.org/v1/gonum v0.16.0 // indirect
725725
google.golang.org/api v0.232.0 // indirect
726-
google.golang.org/grpc v1.72.0 // indirect
726+
google.golang.org/grpc v1.72.1 // indirect
727727
google.golang.org/protobuf v1.36.6 // indirect
728728
gopkg.in/fsnotify.v1 v1.4.7 // indirect
729729
gopkg.in/go-playground/validator.v9 v9.31.0 // indirect

go.sum

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1439,8 +1439,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.
14391439
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.126.0/go.mod h1:dfRudXeUgf148puPQxJ91IXABpfqsl9rarrwqFuH2Ro=
14401440
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/core/xidutils v0.126.0 h1:A0ngaz213hmNCCX6AoKs3tqM0WAAsURHdIv5h0UXLeM=
14411441
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/core/xidutils v0.126.0/go.mod h1:NHImkI6CD3ijcfLe4h6XP1MXQG0PSWH9TpLenZf6Wys=
1442-
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata v0.126.0 h1:e5aLwj2GldsWGMuk9e2Is+TN/EJHPh25RWmUzRWA7Ps=
1443-
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata v0.126.0/go.mod h1:o2lvKV9aHwoYUGRsND18Xriker2hi6A6zxbyr6UkEFA=
1442+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata v0.126.1-0.20250526171903-169686dbc75d h1:tqbhR/PYfU4foM/zfEvH9Wr2V+5Mm+iPIS5QlXk7rcA=
1443+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata v0.126.1-0.20250526171903-169686dbc75d/go.mod h1:3rl7mtPncU+cQEXs0lnYh5rmf1b0f4D2B12oIX1h0U4=
14441444
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.126.0 h1:AnOgi0AF5kALP4hEILsQEnRzT/yNXfua598210Dn9ko=
14451445
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.126.0/go.mod h1:jjyo4lLRH9WOUJ9djpEql6xqVAaReNDY7ciWRt23FZk=
14461446
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic v0.126.0 h1:VObAgxn0ONZEAXcANe+dq9uHR7KDMqBxnQegEJu/dpM=
@@ -2112,8 +2112,8 @@ go.opentelemetry.io/collector/otelcol v0.126.0 h1:TQaIMKzDgWPR79QmuZYeC1LaiUG624
21122112
go.opentelemetry.io/collector/otelcol v0.126.0/go.mod h1:Ve1nB64BniDT1v6IT85zF9P/qmD3bDWfL0Mry+m6cFA=
21132113
go.opentelemetry.io/collector/otelcol/otelcoltest v0.126.0 h1:PJ0c2wT9y7e6T+Ag4S/f3Di2DYic07lrvj+z9Ep200I=
21142114
go.opentelemetry.io/collector/otelcol/otelcoltest v0.126.0/go.mod h1:tbJaLJbeXRNEGH+5+w1YXLLrg4Xt640Qzsw5JLL3VZ8=
2115-
go.opentelemetry.io/collector/pdata v1.32.0 h1:hBzlJV1rujr1UdD2CBy2gmaIKtC15ysg/z+x8F3McQA=
2116-
go.opentelemetry.io/collector/pdata v1.32.0/go.mod h1:m41io9nWpy7aCm/uD1L9QcKiZwOP0ldj83JEA34dmlk=
2115+
go.opentelemetry.io/collector/pdata v1.32.1-0.20250523042642-a867641d12bd h1:H136IWSouulaIdZFOzgXo/KCcQD1QlXhRCPoXYE7pdk=
2116+
go.opentelemetry.io/collector/pdata v1.32.1-0.20250523042642-a867641d12bd/go.mod h1:TDvbHuvIK+g6xqu1gxtz8ti4pB2x1WcBpjFob5KfleU=
21172117
go.opentelemetry.io/collector/pdata/pprofile v0.126.0 h1:ArYQxg5KdTb98r1X6KSZY7W6/4DPv/q6z7jSbSZ1mBc=
21182118
go.opentelemetry.io/collector/pdata/pprofile v0.126.0/go.mod h1:2fBTFDcXjVfseBQKnt/DTM0EYTmFoPKtRpjg8ql38Ek=
21192119
go.opentelemetry.io/collector/pdata/testdata v0.126.0 h1:CMJEYwg12tMI60GOiBIKyrZQp839bD0eJ4rmD4ttlUs=
@@ -2654,8 +2654,8 @@ google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM
26542654
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
26552655
google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
26562656
google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
2657-
google.golang.org/grpc v1.72.0 h1:S7UkcVa60b5AAQTaO6ZKamFp1zMZSU0fGDK2WZLbBnM=
2658-
google.golang.org/grpc v1.72.0/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM=
2657+
google.golang.org/grpc v1.72.1 h1:HR03wO6eyZ7lknl75XlxABNVLLFc2PAb6mHlYh756mA=
2658+
google.golang.org/grpc v1.72.1/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM=
26592659
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
26602660
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
26612661
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=

internal/receiver/discoveryreceiver/endpoint_tracker.go

Lines changed: 26 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ const (
3939

4040
// identifyingAttrKeys are the keys of attributes that are used to identify an entity.
4141
var identifyingAttrKeys = []string{
42+
serviceTypeAttr,
43+
semconv.AttributeServiceName,
4244
semconv.AttributeK8SPodUID,
4345
semconv.AttributeContainerID,
4446
semconv.AttributeK8SNodeUID,
@@ -135,7 +137,7 @@ func (et *endpointTracker) stop() {
135137

136138
func (et *endpointTracker) emitEntityStateEvents(observerCID component.ID, endpoints []observer.Endpoint) {
137139
if et.pLogs != nil {
138-
entityEvents, numFailed, err := entityStateEvents(observerCID, endpoints, et.correlations, time.Now())
140+
entityEvents, numFailed, err := entityEvents(observerCID, endpoints, et.correlations, time.Now(), experimentalmetricmetadata.EventTypeState)
139141
if err != nil {
140142
et.logger.Warn(fmt.Sprintf("failed converting %v endpoints to entity state events", numFailed), zap.Error(err))
141143
}
@@ -145,9 +147,10 @@ func (et *endpointTracker) emitEntityStateEvents(observerCID component.ID, endpo
145147
}
146148
}
147149

148-
func (et *endpointTracker) emitEntityDeleteEvents(endpoints []observer.Endpoint) {
150+
func (et *endpointTracker) emitEntityDeleteEvents(observerCID component.ID, endpoints []observer.Endpoint) {
149151
if et.pLogs != nil {
150-
entityEvents, numFailed, err := entityDeleteEvents(endpoints, time.Now())
152+
entityEvents, numFailed, err := entityEvents(observerCID, endpoints, et.correlations, time.Now(),
153+
experimentalmetricmetadata.EventTypeDelete)
151154
if err != nil {
152155
et.logger.Warn(fmt.Sprintf("failed converting %v endpoints to entity delete events", numFailed), zap.Error(err))
153156
}
@@ -226,18 +229,18 @@ func (n *notify) OnRemove(removed []observer.Endpoint) {
226229
n.endpointTracker.correlations.MarkStale(endpoint.ID)
227230
}
228231
}
229-
n.endpointTracker.emitEntityDeleteEvents(matchingEndpoints)
232+
n.endpointTracker.emitEntityDeleteEvents(n.observerID, matchingEndpoints)
230233
}
231234

232235
func (n *notify) OnChange(changed []observer.Endpoint) {
233236
n.endpointTracker.updateEndpoints(changed, n.observerID)
234237
}
235238

236-
// entityStateEvents converts observer endpoints to entity state events excluding those
239+
// entityEvents converts observer endpoints to entity state events excluding those
237240
// that don't have a discovery status attribute yet.
238-
func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, correlations *correlationStore,
239-
ts time.Time) (ees experimentalmetricmetadata.EntityEventsSlice, failed int, err error) {
240-
entityEvents := experimentalmetricmetadata.NewEntityEventsSlice()
241+
func entityEvents(observerID component.ID, endpoints []observer.Endpoint, correlations *correlationStore,
242+
ts time.Time, eventType experimentalmetricmetadata.EventType) (ees experimentalmetricmetadata.EntityEventsSlice, failed int, err error) {
243+
events := experimentalmetricmetadata.NewEntityEventsSlice()
241244
for _, endpoint := range endpoints {
242245
if endpoint.Details == nil {
243246
failed++
@@ -251,17 +254,11 @@ func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, c
251254
continue
252255
}
253256

254-
entityEvent := entityEvents.AppendEmpty()
255-
entityEvent.SetTimestamp(pcommon.NewTimestampFromTime(ts))
256-
entityState := entityEvent.SetEntityState()
257-
entityState.SetEntityType(entityType)
258-
attrs := entityState.Attributes()
259-
if envAttrs, e := endpointEnvToAttrs(endpoint.Details.Type(), endpoint.Details.Env()); e != nil {
257+
attrs, e := endpointEnvToAttrs(endpoint.Details.Type(), endpoint.Details.Env())
258+
if e != nil {
260259
err = multierr.Combine(err, fmt.Errorf("failed determining attributes for %q: %w", endpoint.ID, e))
261260
failed++
262-
} else {
263-
// this must be the first mutation of attrs since it's destructive
264-
envAttrs.CopyTo(attrs)
261+
continue
265262
}
266263
attrs.PutStr("type", string(endpoint.Details.Type()))
267264
attrs.PutStr(discovery.EndpointIDAttr, string(endpoint.ID))
@@ -273,31 +270,21 @@ func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, c
273270
}
274271
attrs.PutStr(serviceTypeAttr, deduceServiceType(attrs))
275272
attrs.PutStr(semconv.AttributeServiceName, deduceServiceName(attrs))
276-
extractIdentifyingAttrs(attrs, entityEvent.ID())
277-
}
278-
return entityEvents, failed, err
279-
}
280-
281-
func entityDeleteEvents(endpoints []observer.Endpoint, ts time.Time) (ees experimentalmetricmetadata.EntityEventsSlice, failed int, err error) {
282-
entityEvents := experimentalmetricmetadata.NewEntityEventsSlice()
283-
for _, endpoint := range endpoints {
284-
if endpoint.Details == nil {
285-
failed++
286-
err = multierr.Combine(err, fmt.Errorf("endpoint %q has no details", endpoint.ID))
287-
continue
288-
}
289273

290-
entityEvent := entityEvents.AppendEmpty()
291-
entityEvent.SetTimestamp(pcommon.NewTimestampFromTime(ts))
292-
entityEvent.SetEntityDelete()
293-
if envAttrs, e := endpointEnvToAttrs(endpoint.Details.Type(), endpoint.Details.Env()); e != nil {
294-
err = multierr.Combine(err, fmt.Errorf("failed determining attributes for %q: %w", endpoint.ID, e))
295-
failed++
296-
} else {
297-
extractIdentifyingAttrs(envAttrs, entityEvent.ID())
274+
event := events.AppendEmpty()
275+
event.SetTimestamp(pcommon.NewTimestampFromTime(ts))
276+
extractIdentifyingAttrs(attrs, event.ID())
277+
switch eventType {
278+
case experimentalmetricmetadata.EventTypeState:
279+
entityState := event.SetEntityState()
280+
entityState.SetEntityType(entityType)
281+
attrs.MoveTo(entityState.Attributes())
282+
case experimentalmetricmetadata.EventTypeDelete:
283+
deleteEvent := event.SetEntityDelete()
284+
deleteEvent.SetEntityType(entityType)
298285
}
299286
}
300-
return entityEvents, failed, err
287+
return events, failed, err
301288
}
302289

303290
func endpointEnvToAttrs(endpointType observer.EndpointType, endpointEnv observer.EndpointEnv) (pcommon.Map, error) {

0 commit comments

Comments
 (0)