diff --git a/internal/storage/v2/memory/fixtures/db_traces_01.json b/internal/storage/v2/memory/fixtures/db_traces_01.json new file mode 100644 index 00000000000..7ec5def589c --- /dev/null +++ b/internal/storage/v2/memory/fixtures/db_traces_01.json @@ -0,0 +1,212 @@ +{ + "resourceSpans": [ + { + "resource": { + "attributes": [ + { + "key": "service.name", + "value": { + "stringValue": "service-x" + } + } + ] + }, + "scopeSpans": [ + { + "scope": { + "name": "testing-library", + "version": "1.1.1" + }, + "spans": [ + { + "traceId": "00000000000000010000000000000000", + "spanId": "0000000000000002", + "parentSpanId": "0000000000000003", + "flags": 1, + "name": "test-general-conversion", + "startTimeUnixNano": "1485467191639875000", + "endTimeUnixNano": "1485467191639880000", + "attributes": [ + { + "key": "peer.service", + "value": { + "stringValue": "service-y" + } + }, + { + "key": "peer.ipv4", + "value": { + "intValue": "23456" + } + }, + { + "key": "blob", + "value": { + "bytesValue": "AAAwOQ==" + } + }, + { + "key": "temperature", + "value": { + "doubleValue": 72.5 + } + } + ], + "events": [ + { + "timeUnixNano": "1485467191639875000", + "name": "testing-event", + "attributes": [ + { + "key": "event-x", + "value": { + "stringValue": "event-y" + } + } + ] + }, + { + "timeUnixNano": "1485467191639875000", + "attributes": [ + { + "key": "x", + "value": { + "stringValue": "y" + } + } + ] + } + ], + "links": [ + { + "traceId": "00000000000000010000000000000000", + "spanId": "0000000000000004", + "attributes": [ + { + "key": "opentracing.ref_type", + "value": { + "stringValue": "follows_from" + } + } + ] + }, + { + "traceId": "00000000000000ff0000000000000000", + "spanId": "00000000000000ff", + "attributes": [ + { + "key": "opentracing.ref_type", + "value": { + "stringValue": "child_of" + } + } + ] + } + ], + "status": { + "code": 2 + } + } + ] + }, + { + "scope": { + "name": "testing-library-1", + "version": "1.1.2" + }, + "spans": [ + { + "traceId": "00000000000000010000000000000000", + "spanId": "0000000000000004", + "parentSpanId": "0000000000000011", + "flags": 1, + "name": "test-general-conversion", + "startTimeUnixNano": "1485467191639875000", + "endTimeUnixNano": "1485467191639880000", + "attributes": [ + { + "key": "peer.service", + "value": { + "stringValue": "service-y" + } + }, + { + "key": "peer.ipv4", + "value": { + "intValue": "23456" + } + }, + { + "key": "blob", + "value": { + "bytesValue": "AAAwOQ==" + } + }, + { + "key": "temperature", + "value": { + "doubleValue": 72.5 + } + } + ], + "events": [ + { + "timeUnixNano": "1485467191639875000", + "name": "testing-event", + "attributes": [ + { + "key": "event-x", + "value": { + "stringValue": "event-y" + } + } + ] + }, + { + "timeUnixNano": "1485467191639875000", + "attributes": [ + { + "key": "x", + "value": { + "stringValue": "y" + } + } + ] + } + ], + "links": [ + { + "traceId": "00000000000000010000000000000000", + "spanId": "0000000000000004", + "attributes": [ + { + "key": "opentracing.ref_type", + "value": { + "stringValue": "follows_from" + } + } + ] + }, + { + "traceId": "00000000000000ff0000000000000000", + "spanId": "00000000000000ff", + "attributes": [ + { + "key": "opentracing.ref_type", + "value": { + "stringValue": "child_of" + } + } + ] + } + ], + "status": { + "code": 2 + } + } + ] + } + ] + } + ] +} \ No newline at end of file diff --git a/internal/storage/v2/memory/fixtures/db_traces_02.json b/internal/storage/v2/memory/fixtures/db_traces_02.json new file mode 100644 index 00000000000..d51aa779560 --- /dev/null +++ b/internal/storage/v2/memory/fixtures/db_traces_02.json @@ -0,0 +1,212 @@ +{ + "resourceSpans": [ + { + "resource": { + "attributes": [ + { + "key": "service.name", + "value": { + "stringValue": "service-x" + } + } + ] + }, + "scopeSpans": [ + { + "scope": { + "name": "testing-library", + "version": "1.1.1" + }, + "spans": [ + { + "traceId": "00000000000000020000000000000000", + "spanId": "0000000000000003", + "parentSpanId": "0000000000000010", + "flags": 1, + "name": "test-general-conversion", + "startTimeUnixNano": "1485467191639875000", + "endTimeUnixNano": "1485467191639880000", + "attributes": [ + { + "key": "peer.service", + "value": { + "stringValue": "service-y" + } + }, + { + "key": "peer.ipv4", + "value": { + "intValue": "23456" + } + }, + { + "key": "blob", + "value": { + "bytesValue": "AAAwOQ==" + } + }, + { + "key": "temperature", + "value": { + "doubleValue": 72.5 + } + } + ], + "events": [ + { + "timeUnixNano": "1485467191639875000", + "name": "testing-event", + "attributes": [ + { + "key": "event-x", + "value": { + "stringValue": "event-y" + } + } + ] + }, + { + "timeUnixNano": "1485467191639875000", + "attributes": [ + { + "key": "x", + "value": { + "stringValue": "y" + } + } + ] + } + ], + "links": [ + { + "traceId": "00000000000000010000000000000000", + "spanId": "0000000000000004", + "attributes": [ + { + "key": "opentracing.ref_type", + "value": { + "stringValue": "follows_from" + } + } + ] + }, + { + "traceId": "00000000000000ff0000000000000000", + "spanId": "00000000000000ff", + "attributes": [ + { + "key": "opentracing.ref_type", + "value": { + "stringValue": "child_of" + } + } + ] + } + ], + "status": { + "code": 2 + } + } + ] + }, + { + "scope": { + "name": "testing-library-1", + "version": "1.1.2" + }, + "spans": [ + { + "traceId": "00000000000000020000000000000000", + "spanId": "0000000000000005", + "parentSpanId": "0000000000000010", + "flags": 1, + "name": "test-general-conversion", + "startTimeUnixNano": "1485467191639875000", + "endTimeUnixNano": "1485467191639880000", + "attributes": [ + { + "key": "peer.service", + "value": { + "stringValue": "service-y" + } + }, + { + "key": "peer.ipv4", + "value": { + "intValue": "23456" + } + }, + { + "key": "blob", + "value": { + "bytesValue": "AAAwOQ==" + } + }, + { + "key": "temperature", + "value": { + "doubleValue": 72.5 + } + } + ], + "events": [ + { + "timeUnixNano": "1485467191639875000", + "name": "testing-event", + "attributes": [ + { + "key": "event-x", + "value": { + "stringValue": "event-y" + } + } + ] + }, + { + "timeUnixNano": "1485467191639875000", + "attributes": [ + { + "key": "x", + "value": { + "stringValue": "y" + } + } + ] + } + ], + "links": [ + { + "traceId": "00000000000000010000000000000000", + "spanId": "0000000000000004", + "attributes": [ + { + "key": "opentracing.ref_type", + "value": { + "stringValue": "follows_from" + } + } + ] + }, + { + "traceId": "00000000000000ff0000000000000000", + "spanId": "00000000000000ff", + "attributes": [ + { + "key": "opentracing.ref_type", + "value": { + "stringValue": "child_of" + } + } + ] + } + ], + "status": { + "code": 2 + } + } + ] + } + ] + } + ] +} \ No newline at end of file diff --git a/internal/storage/v2/memory/fixtures/otel_traces_01.json b/internal/storage/v2/memory/fixtures/otel_traces_01.json new file mode 100644 index 00000000000..21b2ba7313d --- /dev/null +++ b/internal/storage/v2/memory/fixtures/otel_traces_01.json @@ -0,0 +1,390 @@ +{ + "resourceSpans": [ + { + "resource": { + "attributes": [ + { + "key": "service.name", + "value": { + "stringValue": "service-x" + } + } + ] + }, + "scopeSpans": [ + { + "scope": { + "name": "testing-library", + "version": "1.1.1" + }, + "spans": [ + { + "traceId": "00000000000000010000000000000000", + "spanId": "0000000000000002", + "parentSpanId": "0000000000000003", + "flags": 1, + "name": "test-general-conversion", + "startTimeUnixNano": "1485467191639875000", + "endTimeUnixNano": "1485467191639880000", + "attributes": [ + { + "key": "peer.service", + "value": { + "stringValue": "service-y" + } + }, + { + "key": "peer.ipv4", + "value": { + "intValue": "23456" + } + }, + { + "key": "blob", + "value": { + "bytesValue": "AAAwOQ==" + } + }, + { + "key": "temperature", + "value": { + "doubleValue": 72.5 + } + } + ], + "events": [ + { + "timeUnixNano": "1485467191639875000", + "name": "testing-event", + "attributes": [ + { + "key": "event-x", + "value": { + "stringValue": "event-y" + } + } + ] + }, + { + "timeUnixNano": "1485467191639875000", + "attributes": [ + { + "key": "x", + "value": { + "stringValue": "y" + } + } + ] + } + ], + "links": [ + { + "traceId": "00000000000000010000000000000000", + "spanId": "0000000000000004", + "attributes": [ + { + "key": "opentracing.ref_type", + "value": { + "stringValue": "follows_from" + } + } + ] + }, + { + "traceId": "00000000000000ff0000000000000000", + "spanId": "00000000000000ff", + "attributes": [ + { + "key": "opentracing.ref_type", + "value": { + "stringValue": "child_of" + } + } + ] + } + ], + "status": { + "code": 2 + } + }, + { + "traceId": "00000000000000020000000000000000", + "spanId": "0000000000000003", + "parentSpanId": "0000000000000010", + "flags": 1, + "name": "test-general-conversion", + "startTimeUnixNano": "1485467191639875000", + "endTimeUnixNano": "1485467191639880000", + "attributes": [ + { + "key": "peer.service", + "value": { + "stringValue": "service-y" + } + }, + { + "key": "peer.ipv4", + "value": { + "intValue": "23456" + } + }, + { + "key": "blob", + "value": { + "bytesValue": "AAAwOQ==" + } + }, + { + "key": "temperature", + "value": { + "doubleValue": 72.5 + } + } + ], + "events": [ + { + "timeUnixNano": "1485467191639875000", + "name": "testing-event", + "attributes": [ + { + "key": "event-x", + "value": { + "stringValue": "event-y" + } + } + ] + }, + { + "timeUnixNano": "1485467191639875000", + "attributes": [ + { + "key": "x", + "value": { + "stringValue": "y" + } + } + ] + } + ], + "links": [ + { + "traceId": "00000000000000010000000000000000", + "spanId": "0000000000000004", + "attributes": [ + { + "key": "opentracing.ref_type", + "value": { + "stringValue": "follows_from" + } + } + ] + }, + { + "traceId": "00000000000000ff0000000000000000", + "spanId": "00000000000000ff", + "attributes": [ + { + "key": "opentracing.ref_type", + "value": { + "stringValue": "child_of" + } + } + ] + } + ], + "status": { + "code": 2 + } + } + ] + }, + { + "scope": { + "name": "testing-library-1", + "version": "1.1.2" + }, + "spans": [ + { + "traceId": "00000000000000010000000000000000", + "spanId": "0000000000000004", + "parentSpanId": "0000000000000011", + "flags": 1, + "name": "test-general-conversion", + "startTimeUnixNano": "1485467191639875000", + "endTimeUnixNano": "1485467191639880000", + "attributes": [ + { + "key": "peer.service", + "value": { + "stringValue": "service-y" + } + }, + { + "key": "peer.ipv4", + "value": { + "intValue": "23456" + } + }, + { + "key": "blob", + "value": { + "bytesValue": "AAAwOQ==" + } + }, + { + "key": "temperature", + "value": { + "doubleValue": 72.5 + } + } + ], + "events": [ + { + "timeUnixNano": "1485467191639875000", + "name": "testing-event", + "attributes": [ + { + "key": "event-x", + "value": { + "stringValue": "event-y" + } + } + ] + }, + { + "timeUnixNano": "1485467191639875000", + "attributes": [ + { + "key": "x", + "value": { + "stringValue": "y" + } + } + ] + } + ], + "links": [ + { + "traceId": "00000000000000010000000000000000", + "spanId": "0000000000000004", + "attributes": [ + { + "key": "opentracing.ref_type", + "value": { + "stringValue": "follows_from" + } + } + ] + }, + { + "traceId": "00000000000000ff0000000000000000", + "spanId": "00000000000000ff", + "attributes": [ + { + "key": "opentracing.ref_type", + "value": { + "stringValue": "child_of" + } + } + ] + } + ], + "status": { + "code": 2 + } + }, + { + "traceId": "00000000000000020000000000000000", + "spanId": "0000000000000005", + "parentSpanId": "0000000000000010", + "flags": 1, + "name": "test-general-conversion", + "startTimeUnixNano": "1485467191639875000", + "endTimeUnixNano": "1485467191639880000", + "attributes": [ + { + "key": "peer.service", + "value": { + "stringValue": "service-y" + } + }, + { + "key": "peer.ipv4", + "value": { + "intValue": "23456" + } + }, + { + "key": "blob", + "value": { + "bytesValue": "AAAwOQ==" + } + }, + { + "key": "temperature", + "value": { + "doubleValue": 72.5 + } + } + ], + "events": [ + { + "timeUnixNano": "1485467191639875000", + "name": "testing-event", + "attributes": [ + { + "key": "event-x", + "value": { + "stringValue": "event-y" + } + } + ] + }, + { + "timeUnixNano": "1485467191639875000", + "attributes": [ + { + "key": "x", + "value": { + "stringValue": "y" + } + } + ] + } + ], + "links": [ + { + "traceId": "00000000000000010000000000000000", + "spanId": "0000000000000004", + "attributes": [ + { + "key": "opentracing.ref_type", + "value": { + "stringValue": "follows_from" + } + } + ] + }, + { + "traceId": "00000000000000ff0000000000000000", + "spanId": "00000000000000ff", + "attributes": [ + { + "key": "opentracing.ref_type", + "value": { + "stringValue": "child_of" + } + } + ] + } + ], + "status": { + "code": 2 + } + } + ] + } + ] + } + ] +} \ No newline at end of file diff --git a/internal/storage/v2/memory/memory.go b/internal/storage/v2/memory/memory.go new file mode 100644 index 00000000000..384c61618bd --- /dev/null +++ b/internal/storage/v2/memory/memory.go @@ -0,0 +1,151 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package memory + +import ( + "context" + "sync" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + conventions "go.opentelemetry.io/collector/semconv/v1.16.0" + + v1 "github.com/jaegertracing/jaeger/internal/storage/v1/memory" + "github.com/jaegertracing/jaeger/internal/storage/v2/api/tracestore" + "github.com/jaegertracing/jaeger/internal/tenancy" +) + +// Store is an in-memory store of traces +type Store struct { + sync.RWMutex + // Each tenant gets a copy of default config. + // In the future this can be extended to contain per-tenant configuration. + defaultConfig v1.Configuration + perTenant map[string]*Tenant +} + +// NewStore creates an in-memory store +func NewStore(cfg v1.Configuration) *Store { + return &Store{ + defaultConfig: cfg, + perTenant: make(map[string]*Tenant), + } +} + +// getTenant returns the per-tenant storage. Note that tenantID has already been checked for by the collector or query +func (st *Store) getTenant(tenantID string) *Tenant { + st.RLock() + tenant, ok := st.perTenant[tenantID] + st.RUnlock() + if !ok { + st.Lock() + defer st.Unlock() + tenant, ok = st.perTenant[tenantID] + if !ok { + tenant = newTenant(&st.defaultConfig) + st.perTenant[tenantID] = tenant + } + } + return tenant +} + +// WriteTraces write the traces into the tenant by grouping all the spans with same trace id together. +// The traces will not be saved as they are coming, rather they would be reshuffled. +func (st *Store) WriteTraces(ctx context.Context, td ptrace.Traces) error { + resourceSpansByTraceId := reshuffleResourceSpans(td.ResourceSpans()) + m := st.getTenant(tenancy.GetTenant(ctx)) + for traceId, sameTraceIDResourceSpan := range resourceSpansByTraceId { + for _, resourceSpan := range sameTraceIDResourceSpan.All() { + serviceName := getServiceNameFromResource(resourceSpan.Resource()) + if serviceName == "" { + continue + } + m.storeService(serviceName) + for _, scopeSpan := range resourceSpan.ScopeSpans().All() { + for _, span := range scopeSpan.Spans().All() { + operation := tracestore.Operation{ + Name: span.Name(), + SpanKind: span.Kind().String(), + } + m.storeOperation(serviceName, operation) + } + } + } + m.storeTraces(traceId, sameTraceIDResourceSpan) + } + return nil +} + +// reshuffleResourceSpans reshuffles the resource spans so as to group the spans from same traces together. To understand this reshuffling +// take an example of 2 resource spans, then these two resource spans have 2 scope spans each. +// Every scope span consists of 2 spans with trace ids: 1 and 2. Now the final structure should look like: +// For TraceID1: [ResourceSpan1:[ScopeSpan1:[Span(TraceID1)],ScopeSpan2:[Span(TraceID1)], ResourceSpan2:[ScopeSpan1:[Span(TraceID1)],ScopeSpan2:[Span(TraceID1)] +// A similar structure will be there for TraceID2 +func reshuffleResourceSpans(resourceSpanSlice ptrace.ResourceSpansSlice) map[pcommon.TraceID]ptrace.ResourceSpansSlice { + resourceSpansByTraceId := make(map[pcommon.TraceID]ptrace.ResourceSpansSlice) + for _, resourceSpan := range resourceSpanSlice.All() { + scopeSpansByTraceId := reshuffleScopeSpans(resourceSpan.ScopeSpans()) + // All the scope spans here will have the same resource as of resourceSpan. Therefore: + // Copy the resource to an empty resourceSpan. After this, append the scope spans with same + // trace id to this empty resource span. Finally move this resource span to the resourceSpanSlice + // containing other resource spans and having same trace id. + for traceId, scopeSpansSlice := range scopeSpansByTraceId { + resourceSpanByTraceId := ptrace.NewResourceSpans() + resourceSpan.Resource().CopyTo(resourceSpanByTraceId.Resource()) + scopeSpansSlice.MoveAndAppendTo(resourceSpanByTraceId.ScopeSpans()) + resourceSpansSlice, ok := resourceSpansByTraceId[traceId] + if !ok { + resourceSpansSlice = ptrace.NewResourceSpansSlice() + resourceSpansByTraceId[traceId] = resourceSpansSlice + } + resourceSpanByTraceId.MoveTo(resourceSpansSlice.AppendEmpty()) + } + } + return resourceSpansByTraceId +} + +// reshuffleScopeSpans reshuffles all the scope spans of a resource span to group +// spans of same trace ids together. The first step is to iterate the scope spans and then. +// copy the scope to an empty scopeSpan. After this, append the spans with same +// trace id to this empty scope span. Finally move this scope span to the scope span +// slice containing other scope spans and having same trace id. +func reshuffleScopeSpans(scopeSpanSlice ptrace.ScopeSpansSlice) map[pcommon.TraceID]ptrace.ScopeSpansSlice { + scopeSpansByTraceId := make(map[pcommon.TraceID]ptrace.ScopeSpansSlice) + for _, scopeSpan := range scopeSpanSlice.All() { + spansByTraceId := reshuffleSpans(scopeSpan.Spans()) + for traceId, spansSlice := range spansByTraceId { + scopeSpanByTraceId := ptrace.NewScopeSpans() + scopeSpan.Scope().CopyTo(scopeSpanByTraceId.Scope()) + spansSlice.MoveAndAppendTo(scopeSpanByTraceId.Spans()) + scopeSpansSlice, ok := scopeSpansByTraceId[traceId] + if !ok { + scopeSpansSlice = ptrace.NewScopeSpansSlice() + scopeSpansByTraceId[traceId] = scopeSpansSlice + } + scopeSpanByTraceId.MoveTo(scopeSpansSlice.AppendEmpty()) + } + } + return scopeSpansByTraceId +} + +func reshuffleSpans(spanSlice ptrace.SpanSlice) map[pcommon.TraceID]ptrace.SpanSlice { + spansByTraceId := make(map[pcommon.TraceID]ptrace.SpanSlice) + for _, span := range spanSlice.All() { + spansSlice, ok := spansByTraceId[span.TraceID()] + if !ok { + spansSlice = ptrace.NewSpanSlice() + spansByTraceId[span.TraceID()] = spansSlice + } + span.CopyTo(spansSlice.AppendEmpty()) + } + return spansByTraceId +} + +func getServiceNameFromResource(resource pcommon.Resource) string { + val, ok := resource.Attributes().Get(conventions.AttributeServiceName) + if !ok { + return "" + } + return val.Str() +} diff --git a/internal/storage/v2/memory/memory_test.go b/internal/storage/v2/memory/memory_test.go new file mode 100644 index 00000000000..8ce90a24923 --- /dev/null +++ b/internal/storage/v2/memory/memory_test.go @@ -0,0 +1,133 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package memory + +import ( + "bytes" + "context" + "encoding/hex" + "encoding/json" + "fmt" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + + v1 "github.com/jaegertracing/jaeger/internal/storage/v1/memory" + "github.com/jaegertracing/jaeger/internal/tenancy" +) + +func TestNewStore_DefaultConfig(t *testing.T) { + store := NewStore(v1.Configuration{}) + td := loadInputTraces(t, 1) + err := store.WriteTraces(context.Background(), td) + require.NoError(t, err) + tenant := store.getTenant(tenancy.GetTenant(context.Background())) + traceID1 := fromString(t, "00000000000000010000000000000000") + traces, ok := tenant.traces[traceID1] + require.True(t, ok) + expected := loadOutputTraces(t, 1) + testTraces(t, expected, traces) + traceID2 := fromString(t, "00000000000000020000000000000000") + traces2, ok := tenant.traces[traceID2] + require.True(t, ok) + expected2 := loadOutputTraces(t, 2) + testTraces(t, expected2, traces2) +} + +func TestWriteTraces_WriteTwoBatches(t *testing.T) { + store := NewStore(v1.Configuration{}) + traceId := fromString(t, "00000000000000010000000000000000") + td1 := ptrace.NewTraces() + td1.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetTraceID(traceId) + err := store.WriteTraces(context.Background(), td1) + require.NoError(t, err) + td2 := ptrace.NewTraces() + td2.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetTraceID(traceId) + err = store.WriteTraces(context.Background(), td2) + require.NoError(t, err) + tenant := store.getTenant(tenancy.GetTenant(context.Background())) + assert.Equal(t, 2, tenant.traces[traceId].ResourceSpans().Len()) +} + +func TestWriteTraces_WriteTraceWithTwoResourceSpans(t *testing.T) { + store := NewStore(v1.Configuration{}) + traceId := fromString(t, "00000000000000010000000000000000") + td := ptrace.NewTraces() + resourceSpans := td.ResourceSpans() + scopeSpan1 := resourceSpans.AppendEmpty().ScopeSpans().AppendEmpty() + scopeSpan1.Spans().AppendEmpty().SetTraceID(traceId) + scopeSpan1.Spans().AppendEmpty().SetTraceID(traceId) + scopeSpan2 := resourceSpans.AppendEmpty().ScopeSpans().AppendEmpty() + scopeSpan2.Spans().AppendEmpty().SetTraceID(traceId) + scopeSpan2.Spans().AppendEmpty().SetTraceID(traceId) + err := store.WriteTraces(context.Background(), td) + require.NoError(t, err) + tenant := store.getTenant(tenancy.GetTenant(context.Background())) + // All spans have same trace id, so output should be same as input (that is no reshuffling, effectively) + assert.Equal(t, td, tenant.traces[traceId]) +} + +func TestNewStore_TracesLimit(t *testing.T) { + maxTraces := 5 + store := NewStore(v1.Configuration{ + MaxTraces: maxTraces, + }) + for i := 1; i < 10; i++ { + traceID := fromString(t, fmt.Sprintf("000000000000000%d0000000000000000", i)) + traces := ptrace.NewTraces() + traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetTraceID(traceID) + err := store.WriteTraces(context.Background(), traces) + require.NoError(t, err) + } + assert.Len(t, store.getTenant(tenancy.GetTenant(context.Background())).traces, maxTraces) + assert.Len(t, store.getTenant(tenancy.GetTenant(context.Background())).ids, maxTraces) +} + +func fromString(t *testing.T, dbTraceId string) pcommon.TraceID { + var traceId [16]byte + traceBytes, err := hex.DecodeString(dbTraceId) + require.NoError(t, err) + copy(traceId[:], traceBytes) + return traceId +} + +func testTraces(t *testing.T, expectedTraces ptrace.Traces, actualTraces ptrace.Traces) { + if !assert.Equal(t, expectedTraces, actualTraces) { + marshaller := ptrace.JSONMarshaler{} + actualTd, err := marshaller.MarshalTraces(actualTraces) + require.NoError(t, err) + writeActualData(t, "traces", actualTd) + } +} + +func writeActualData(t *testing.T, name string, data []byte) { + var prettyJson bytes.Buffer + err := json.Indent(&prettyJson, data, "", " ") + require.NoError(t, err) + path := "fixtures/actual_" + name + ".json" + err = os.WriteFile(path, prettyJson.Bytes(), 0o644) + require.NoError(t, err) + t.Log("Saved the actual " + name + " to " + path) +} + +func loadInputTraces(t *testing.T, i int) ptrace.Traces { + return loadTraces(t, fmt.Sprintf("fixtures/otel_traces_%02d.json", i)) +} + +func loadOutputTraces(t *testing.T, i int) ptrace.Traces { + return loadTraces(t, fmt.Sprintf("fixtures/db_traces_%02d.json", i)) +} + +func loadTraces(t *testing.T, name string) ptrace.Traces { + unmarshller := ptrace.JSONUnmarshaler{} + data, err := os.ReadFile(name) + require.NoError(t, err) + td, err := unmarshller.UnmarshalTraces(data) + require.NoError(t, err) + return td +} diff --git a/internal/storage/v2/memory/package_test.go b/internal/storage/v2/memory/package_test.go new file mode 100644 index 00000000000..3652418536f --- /dev/null +++ b/internal/storage/v2/memory/package_test.go @@ -0,0 +1,14 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package memory + +import ( + "testing" + + "github.com/jaegertracing/jaeger/internal/testutils" +) + +func TestMain(m *testing.M) { + testutils.VerifyGoLeaks(m) +} diff --git a/internal/storage/v2/memory/tenant.go b/internal/storage/v2/memory/tenant.go new file mode 100644 index 00000000000..20ab6fb39b3 --- /dev/null +++ b/internal/storage/v2/memory/tenant.go @@ -0,0 +1,77 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package memory + +import ( + "sync" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + + v1 "github.com/jaegertracing/jaeger/internal/storage/v1/memory" + "github.com/jaegertracing/jaeger/internal/storage/v2/api/tracestore" +) + +// Tenant is an in-memory store of traces for a single tenant +type Tenant struct { + sync.RWMutex + ids []pcommon.TraceID // ring buffer used to evict oldest traces + traces map[pcommon.TraceID]ptrace.Traces + services map[string]struct{} + operations map[string]map[tracestore.Operation]struct{} + config *v1.Configuration + evict int // position in ids[] of the last evicted trace +} + +func newTenant(cfg *v1.Configuration) *Tenant { + return &Tenant{ + ids: make([]pcommon.TraceID, cfg.MaxTraces), + traces: map[pcommon.TraceID]ptrace.Traces{}, + services: map[string]struct{}{}, + operations: map[string]map[tracestore.Operation]struct{}{}, + config: cfg, + evict: -1, + } +} + +func (t *Tenant) storeService(serviceName string) { + t.Lock() + defer t.Unlock() + t.services[serviceName] = struct{}{} +} + +func (t *Tenant) storeOperation(serviceName string, operation tracestore.Operation) { + t.Lock() + defer t.Unlock() + if _, ok := t.operations[serviceName]; ok { + t.operations[serviceName][operation] = struct{}{} + return + } + t.operations[serviceName] = make(map[tracestore.Operation]struct{}) + t.operations[serviceName][operation] = struct{}{} +} + +func (t *Tenant) storeTraces(traceId pcommon.TraceID, resourceSpanSlice ptrace.ResourceSpansSlice) { + t.Lock() + defer t.Unlock() + if foundTraces, ok := t.traces[traceId]; ok { + resourceSpanSlice.MoveAndAppendTo(foundTraces.ResourceSpans()) + return + } + traces := ptrace.NewTraces() + resourceSpanSlice.MoveAndAppendTo(traces.ResourceSpans()) + t.traces[traceId] = traces + // if we have a limit, let's cleanup the oldest traces + if t.config.MaxTraces > 0 { + // we only have to deal with this slice if we have a limit + t.evict = (t.evict + 1) % t.config.MaxTraces + // do we have an item already on this position? if so, we are overriding it, + // and we need to remove from the map + if !t.ids[t.evict].IsEmpty() { + delete(t.traces, t.ids[t.evict]) + } + // update the ring with the trace id + t.ids[t.evict] = traceId + } +}