diff --git a/internal/storage/metricstore/elasticsearch/README.md b/internal/storage/metricstore/elasticsearch/README.md new file mode 100644 index 00000000000..b856c630c95 --- /dev/null +++ b/internal/storage/metricstore/elasticsearch/README.md @@ -0,0 +1,115 @@ +## `getCallRate` Calculation Explained + +The `getCallRate` method calculates the call rate (requests per second) for a service by querying span data stored in Elasticsearch. The process involves three key stages: filtering the relevant spans, performing a time-series aggregation to count requests, and post-processing the aggregated data to calculate the final rate. + +This document breaks down each of these stages, referencing the corresponding parts of the Elasticsearch query and the Go implementation. + +### 1\. Filter Query Part + +The first step is to isolate the specific set of documents (spans) needed for the calculation. We use a `bool` query with a `filter` clause, which is efficient as it doesn't contribute to document scoring. + +**ES Query Reference:** + +```json +"query": { + "bool": { + "filter": [ + { "terms": { "process.serviceName": "[${service}]" } }, + { "terms": { "tag.span@kind": "[{server}]" } },, + { + "range": { + "startTimeMillis": { + "gte": "now-6h", + "lte": "now", + "format": "epoch_millis" + } + } + } + ] + } +} +``` + +**Explanation:** + +* **`{ "terms": { "process.serviceName": "[${service}]" } }`**: This filter selects spans that belong to the specified service. This is the primary entity for which we are calculating the call rate. +* **`{ "terms": { "tag.span@kind": "server" } }`**: This is a critical filter for correctly calculating the *incoming* call rate. By filtering for spans where `span.kind` is `server` (or other), we ensure that we are only counting spans that represent a server (or other) receiving a request. This prevents us from incorrectly counting outgoing calls made by the service. +* **`{ "range": { "startTimeMillis": ... } }`**: This filter restricts the spans to a specific time window. The `getCallRate` implementation uses an extended time range (by adding a 10-minute lookback period via `extendedStartTimeMillis`). This is done to ensure that when we calculate the rate for the earliest time points in our requested window, we have sufficient historical data to compute a meaningful value. + +**Code Reference:** + +This logic is constructed in the `buildQuery` method. The filters are progressively added to a `boolQuery`. + +----- + +### 2\. Aggregation Query Part + +After filtering the spans, we need to aggregate them into a time series that we can use to calculate a rate. The query does not calculate the rate directly; instead, it prepares the data by creating a running total of requests over time. + +Note: The reference ES query in the prompt includes a `moving_fn` aggregation to calculate the rate within Elasticsearch. However, the `getCallRate` method in the provided Go code uses a different approach: it fetches the cumulative sum and calculates the rate in the application layer, as described in the Post-Processing section. The aggregation below reflects the logic in the Go code. + +**ES Query Reference (as implemented in `buildCallRateAggregations`):** + +```json +"aggs": { + "requests_per_bucket": { + "date_histogram": { + "field": "startTimeMillis", + "fixed_interval": "60s", + "min_doc_count": 0, + "extended_bounds": { + "min": "now-6h", + "max": "now" + } + }, + "aggs": { + "cumulative_requests": { + "cumulative_sum": { "buckets_path": "_count" } + } + } + } +} +``` + +**Explanation:** + +1. **`date_histogram`**: This aggregation is the foundation of our time series. It groups the filtered spans into time buckets of a `fixed_interval` (e.g., `60s`). For each bucket, it provides a count (`_count`) of the documents (i.e., server spans) that fall within that time interval. + +2. **`cumulative_sum`**: This is a sub-aggregation that operates on the buckets created by the `date_histogram`. It calculates a running total of the document counts. For any given time bucket, its `cumulative_requests` value is the sum of all `_count`s from the very first bucket up to and including the current one. + +**Code Reference:** + +This aggregation pipeline is constructed in the `buildCallRateAggregations` method. + +----- + +### 3\. Post-Processing Part + +The final step happens in the application layer, within the `getCallRateProcessMetrics` function. This function takes the time series of `(timestamp, cumulative_request_count)` pairs returned by Elasticsearch and transforms it into a series of call rates. + +**Explanation:** + +The function implements a sliding window algorithm to calculate the rate. It iterates through each data point and, for each point, it calculates the average rate over a preceding "lookback" period. + +The core calculation for each point in the time series is: + +$$\text{rate} = \frac{\Delta \text{Value}}{\Delta \text{Time}} = \frac{\text{lastVal} - \text{firstVal}}{\text{windowSizeSeconds}}$$ + +Where: + +* `lastVal`: The cumulative request count at the end of the sliding window (the current data point). +* `firstVal`: The cumulative request count at the beginning of the sliding window. +* `lastVal - firstVal`: The total number of new requests that occurred during the window. +* `windowSizeSeconds`: The duration of the sliding window in seconds. + +**Why this approach?** + +This post-processing logic effectively calculates the slope of the cumulative requests graph over a sliding window, which is the definition of a rate. Performing this calculation client-side provides several advantages: + +* **Flexibility:** It gives full control over the rate calculation logic and how to handle edge cases, such as intervals with no data (`NaN` values). +* **Simplicity:** It keeps the Elasticsearch query relatively simple and offloads potentially complex scripting from the database, which can be more performant and easier to maintain. +* **Clarity:** The logic is explicitly defined in the Go code, making it clear how the final metric is derived from the raw cumulative counts. + +**Code Reference:** + +The post-processing logic resides in `getCallRateProcessMetrics`, which is passed as a function pointer to the main query executor in `GetCallRates`. \ No newline at end of file diff --git a/internal/storage/metricstore/elasticsearch/factory.go b/internal/storage/metricstore/elasticsearch/factory.go index 5fef4e02da3..9b7ce543111 100644 --- a/internal/storage/metricstore/elasticsearch/factory.go +++ b/internal/storage/metricstore/elasticsearch/factory.go @@ -39,7 +39,7 @@ func NewFactory(ctx context.Context, cfg config.Configuration, telset telemetry. // CreateMetricsReader implements storage.MetricStoreFactory. func (f *Factory) CreateMetricsReader() (metricstore.Reader, error) { - mr := NewMetricsReader(f.client, f.telset.Logger, f.telset.TracerProvider) + mr := NewMetricsReader(f.client, f.config, f.telset.Logger, f.telset.TracerProvider) return metricstoremetrics.NewReaderDecorator(mr, f.telset.Metrics), nil } diff --git a/internal/storage/metricstore/elasticsearch/query_logger.go b/internal/storage/metricstore/elasticsearch/query_logger.go new file mode 100644 index 00000000000..fd36904f5a0 --- /dev/null +++ b/internal/storage/metricstore/elasticsearch/query_logger.go @@ -0,0 +1,56 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package elasticsearch + +import ( + "context" + "encoding/json" + + "github.com/olivere/elastic/v7" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/internal/telemetry/otelsemconv" +) + +// QueryLogger handles logging and tracing of Elasticsearch queries. +type QueryLogger struct { + logger *zap.Logger + tracer trace.Tracer +} + +// NewQueryLogger creates a new QueryLogger. +func NewQueryLogger(logger *zap.Logger, tracer trace.Tracer) *QueryLogger { + return &QueryLogger{ + logger: logger, + tracer: tracer, + } +} + +// TraceQuery adds tracing attributes. +func (ql *QueryLogger) TraceQuery(ctx context.Context, metricName string) trace.Span { + _, span := ql.tracer.Start(ctx, metricName) + span.SetAttributes( + otelsemconv.DBSystemKey.String("elasticsearch"), + attribute.Key("component").String("es-metricsreader-query-logger"), + ) + return span +} + +// LogAndTraceResult logs the Elasticsearch query results and potentially adds them to the span. +func (ql *QueryLogger) LogAndTraceResult(span trace.Span, searchResult *elastic.SearchResult) { + if span.IsRecording() { + resultJSON, _ := json.MarshalIndent(searchResult, "", " ") + ql.logger.Debug("Elasticsearch metricsreader query results", zap.String("results", string(resultJSON))) + span.SetAttributes(attribute.String("db.response_json", string(resultJSON))) + } +} + +// LogErrorToSpan logs an error to the trace span. +func (*QueryLogger) LogErrorToSpan(span trace.Span, err error) { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) +} diff --git a/internal/storage/metricstore/elasticsearch/query_logger_test.go b/internal/storage/metricstore/elasticsearch/query_logger_test.go new file mode 100644 index 00000000000..fae4ffb0d0e --- /dev/null +++ b/internal/storage/metricstore/elasticsearch/query_logger_test.go @@ -0,0 +1,111 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package elasticsearch + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/olivere/elastic/v7" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" + + "github.com/jaegertracing/jaeger/internal/telemetry/otelsemconv" +) + +type testContext struct { + t *testing.T + logger *zap.Logger + tp trace.TracerProvider + exporter *tracetest.InMemoryExporter + tracer trace.Tracer + ql *QueryLogger +} + +func newTestContext(t *testing.T) *testContext { + logger := zaptest.NewLogger(t) + tp, exporter := tracerProvider(t) + tracer := tp.Tracer("test") + ql := NewQueryLogger(logger, tracer) + + return &testContext{ + t: t, + logger: logger, + tp: tp, + exporter: exporter, + tracer: tracer, + ql: ql, + } +} + +func TestQueryLogger(t *testing.T) { + t.Run("TraceQuery", func(t *testing.T) { + tc := newTestContext(t) + assert.NotNil(t, tc.ql) + + span := tc.ql.TraceQuery(context.Background(), "test_query") + assert.NotNil(t, span) + + // End the span to ensure it gets exported + span.End() + + // Give the exporter time to process + require.Eventually(t, func() bool { + return len(tc.exporter.GetSpans()) > 0 + }, time.Second, 10*time.Millisecond) + + spans := tc.exporter.GetSpans() + assert.Len(t, spans, 1) + assert.Equal(t, "test_query", spans[0].Name) + assert.Contains(t, spans[0].Attributes, attribute.String(string(otelsemconv.DBSystemKey), "elasticsearch")) + }) +} + +func TestLogAndTraceResult(t *testing.T) { + t.Run("LogAndTraceResult", func(t *testing.T) { + tc := newTestContext(t) + _, span := tc.tracer.Start(context.Background(), "test_span") + + result := &elastic.SearchResult{TookInMillis: 10, Hits: &elastic.SearchHits{TotalHits: &elastic.TotalHits{Value: 5, Relation: "eq"}}} + tc.ql.LogAndTraceResult(span, result) + + span.End() + require.Eventually(t, func() bool { + return len(tc.exporter.GetSpans()) > 0 + }, time.Second, 10*time.Millisecond) + + spans := tc.exporter.GetSpans() + assert.Len(t, spans, 1) + assert.Equal(t, "test_span", spans[0].Name) + assert.Contains(t, spans[0].Attributes[0].Key, "db.response_json") + }) +} + +func TestLogErrorToSpan(t *testing.T) { + t.Run("LogErrorToSpan", func(t *testing.T) { + tc := newTestContext(t) + _, span := tc.tracer.Start(context.Background(), "test_span") + + testErr := errors.New("test error") + tc.ql.LogErrorToSpan(span, testErr) + + span.End() + require.Eventually(t, func() bool { + return len(tc.exporter.GetSpans()) > 0 + }, time.Second, 10*time.Millisecond) + + spans := tc.exporter.GetSpans() + assert.Len(t, spans, 1) + assert.Equal(t, codes.Error, spans[0].Status.Code) + assert.Equal(t, "test error", spans[0].Status.Description) + }) +} diff --git a/internal/storage/metricstore/elasticsearch/reader.go b/internal/storage/metricstore/elasticsearch/reader.go index 2dbbe14bc16..0af049b0a50 100644 --- a/internal/storage/metricstore/elasticsearch/reader.go +++ b/internal/storage/metricstore/elasticsearch/reader.go @@ -6,47 +6,371 @@ package elasticsearch import ( "context" "errors" + "fmt" + "math" + "strconv" + "strings" "time" + "github.com/olivere/elastic/v7" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" + "github.com/jaegertracing/jaeger-idl/model/v1" "github.com/jaegertracing/jaeger/internal/proto-gen/api_v2/metrics" es "github.com/jaegertracing/jaeger/internal/storage/elasticsearch" + "github.com/jaegertracing/jaeger/internal/storage/elasticsearch/config" "github.com/jaegertracing/jaeger/internal/storage/v1/api/metricstore" ) var ErrNotImplemented = errors.New("metrics querying is currently not implemented yet") -const minStep = time.Millisecond +const ( + minStep = time.Millisecond + aggName = "results_buckets" + culmuAggName = "cumulative_requests" +) -// MetricsReader is a Elasticsearch metrics reader. +// MetricsReader is an Elasticsearch metrics reader. type MetricsReader struct { - client es.Client - logger *zap.Logger - tracer trace.Tracer + client es.Client + cfg config.Configuration + logger *zap.Logger + tracer trace.Tracer + queryLogger *QueryLogger +} + +// TimeRange represents a time range for metrics queries. +type TimeRange struct { + startTimeMillis int64 + endTimeMillis int64 + // extendedStartTimeMillis is an extended start time used for lookback periods + // in certain aggregations (e.g., cumulative sums or rate calculations) + // where data prior to startTimeMillis is needed to compute metrics accurately + // within the primary time range. This typically accounts for a window of + // preceding data (e.g., 10 minutes) to ensure that the initial data + // points in the primary time range have enough historical context for calculation. + extendedStartTimeMillis int64 +} + +// MetricsQueryParams contains parameters for Elasticsearch metrics queries. +type MetricsQueryParams struct { + metricstore.BaseQueryParameters + metricName string + metricDesc string + boolQuery elastic.BoolQuery + aggQuery elastic.Aggregation + // bucketsToPointsFunc is a function that turn raw ES Histogram Bucket result into + // array of Pair for easier post-processing (using processMetricsFunc) + bucketsToPointsFunc func(buckets []*elastic.AggregationBucketHistogramItem) []*Pair + // processMetricsFunc is a function that processes the raw time-series + // data (pairs of timestamp and value) returned from Elasticsearch + // aggregations into the final metric values. This is used for calculations + // like rates (e.g., calls/sec) which require manipulating the raw counts + // or sums over specific time windows. + processMetricsFunc func(mf *metrics.MetricFamily, params metricstore.BaseQueryParameters) *metrics.MetricFamily } -func NewMetricsReader(client es.Client, logger *zap.Logger, tracer trace.TracerProvider) *MetricsReader { +// Pair represents a timestamp-value pair for metrics. +type Pair struct { + TimeStamp int64 + Value float64 +} + +// NewMetricsReader initializes a new MetricsReader. +func NewMetricsReader(client es.Client, cfg config.Configuration, logger *zap.Logger, tracer trace.TracerProvider) *MetricsReader { + tr := tracer.Tracer("elasticsearch-metricstore") return &MetricsReader{ - client: client, - logger: logger, - tracer: tracer.Tracer("elasticsearch-metricstore"), + client: client, + cfg: cfg, + logger: logger, + tracer: tr, + queryLogger: NewQueryLogger(logger, tr), } } +// GetLatencies retrieves latency metrics func (MetricsReader) GetLatencies(_ context.Context, _ *metricstore.LatenciesQueryParameters) (*metrics.MetricFamily, error) { return nil, ErrNotImplemented } -func (MetricsReader) GetCallRates(_ context.Context, _ *metricstore.CallRateQueryParameters) (*metrics.MetricFamily, error) { - return nil, ErrNotImplemented +// GetCallRates retrieves call rate metrics +func (r MetricsReader) GetCallRates(ctx context.Context, params *metricstore.CallRateQueryParameters) (*metrics.MetricFamily, error) { + timeRange, err := calculateTimeRange(¶ms.BaseQueryParameters) + if err != nil { + return nil, err + } + + metricsParams := MetricsQueryParams{ + BaseQueryParameters: params.BaseQueryParameters, + metricName: "service_call_rate", + metricDesc: "calls/sec, grouped by service", + boolQuery: r.buildQuery(params.BaseQueryParameters, timeRange), + aggQuery: r.buildCallRateAggregations(params.BaseQueryParameters, timeRange), + bucketsToPointsFunc: getCallRateBucketsToPoints, + processMetricsFunc: getCallRateProcessMetrics, + } + + metricFamily, err := r.executeSearch(ctx, metricsParams) + if err != nil { + return nil, err + } + // Trim results to original time range + return trimMetricPointsBefore(metricFamily, timeRange.startTimeMillis), nil } +// GetErrorRates retrieves error rate metrics func (MetricsReader) GetErrorRates(_ context.Context, _ *metricstore.ErrorRateQueryParameters) (*metrics.MetricFamily, error) { return nil, ErrNotImplemented } +// GetMinStepDuration returns the minimum step duration. func (MetricsReader) GetMinStepDuration(_ context.Context, _ *metricstore.MinStepDurationQueryParameters) (time.Duration, error) { return minStep, nil } + +// trimMetricPointsBefore removes metric points older than startMillis from each metric in the MetricFamily. +func trimMetricPointsBefore(mf *metrics.MetricFamily, startMillis int64) *metrics.MetricFamily { + for _, metric := range mf.Metrics { + points := metric.MetricPoints + // Find first index where point >= startMillis + cutoff := 0 + for ; cutoff < len(points); cutoff++ { + point := points[cutoff] + pointMillis := point.Timestamp.Seconds*1000 + int64(point.Timestamp.Nanos)/1000000 + if pointMillis >= startMillis { + break + } + } + // Slice the array starting from cutoff index + metric.MetricPoints = points[cutoff:] + } + return mf +} + +// buildQuery constructs the Elasticsearch bool query. +func (r MetricsReader) buildQuery(params metricstore.BaseQueryParameters, timeRange TimeRange) elastic.BoolQuery { + boolQuery := elastic.NewBoolQuery() + + serviceNameQuery := elastic.NewTermsQuery("process.serviceName", buildInterfaceSlice(params.ServiceNames)...) + boolQuery.Filter(serviceNameQuery) + + // Span kind filter + spanKindField := strings.ReplaceAll(model.SpanKindKey, ".", r.cfg.Tags.DotReplacement) + spanKindQuery := elastic.NewTermsQuery("tag."+spanKindField, buildInterfaceSlice(normalizeSpanKinds(params.SpanKinds))...) + boolQuery.Filter(spanKindQuery) + + rangeQuery := elastic.NewRangeQuery("startTimeMillis"). + // Use extendedStartTimeMillis to allow for a 10-minute lookback. + Gte(timeRange.extendedStartTimeMillis). + Lte(timeRange.endTimeMillis). + Format("epoch_millis") + boolQuery.Filter(rangeQuery) + + // Corresponding ES query: + // { + // "query": { + // "bool": { + // "filter": [ + // {"terms": {"process.serviceName": ["name1"] }}, + // {"terms": {"tag.span@kind": ["server"] }}, // Dot replacement: @ + // { + // "range": { + // "startTimeMillis": { + // "gte": "now-'lookback'-5m", + // "lte": "now", + // "format": "epoch_millis"}}}]} + // }, + + return *boolQuery +} + +// processCallRateMetrics processes the MetricFamily to calculate call rates +func getCallRateProcessMetrics(mf *metrics.MetricFamily, params metricstore.BaseQueryParameters) *metrics.MetricFamily { + lookback := int(math.Ceil(float64(params.RatePer.Milliseconds()) / float64(params.Step.Milliseconds()))) + if lookback < 1 { + lookback = 1 + } + + for _, metric := range mf.Metrics { + points := metric.MetricPoints + var processedPoints []*metrics.MetricPoint + + for i := range points { + currentPoint := points[i] + currentValue := currentPoint.GetGaugeValue().GetDoubleValue() + + // Elasticsearch's percentiles aggregation returns 0.0 for time buckets with no documents + // These aren't true zero values but represent missing data points in sparse time series + // We convert them to NaN to distinguish from actual measured zero values (slope of 0) + if currentValue == 0.0 { + processedPoints = append(processedPoints, &metrics.MetricPoint{ + Timestamp: currentPoint.Timestamp, + Value: toDomainMetricPointValue(math.NaN()), + }) + continue + } + + // For first (lookback-1) points, we don't have enough history + if i < lookback-1 { + processedPoints = append(processedPoints, &metrics.MetricPoint{ + Timestamp: currentPoint.Timestamp, + Value: toDomainMetricPointValue(0.0), + }) + continue + } + + // Get boundary values for our lookback window + firstPoint := points[i-lookback+1] + firstValue := firstPoint.GetGaugeValue().GetDoubleValue() + lastValue := currentValue + + // Calculate time window duration in seconds + windowSizeSeconds := float64(lookback) * params.Step.Seconds() + + // Calculate rate of change per second + rate := (lastValue - firstValue) / windowSizeSeconds + rate = math.Round(rate*100) / 100 // Round to 2 decimal places + + processedPoints = append(processedPoints, &metrics.MetricPoint{ + Timestamp: currentPoint.Timestamp, + Value: toDomainMetricPointValue(rate), + }) + } + + metric.MetricPoints = processedPoints + } + + return mf +} + +func getCallRateBucketsToPoints(buckets []*elastic.AggregationBucketHistogramItem) []*Pair { + var points []*Pair + + for _, bucket := range buckets { + aggMap, ok := bucket.Aggregations.CumulativeSum(culmuAggName) + if !ok { + return nil + } + value := math.NaN() + if aggMap != nil && aggMap.Value != nil { + value = *aggMap.Value + } + points = append(points, &Pair{ + TimeStamp: int64(bucket.Key), + Value: value, + }) + } + return points +} + +// buildCallRateAggregations constructs the GetCallRate aggregations. +func (MetricsReader) buildCallRateAggregations(params metricstore.BaseQueryParameters, timeRange TimeRange) elastic.Aggregation { + fixedIntervalString := strconv.FormatInt(params.Step.Milliseconds(), 10) + "ms" + dateHistoAgg := elastic.NewDateHistogramAggregation(). + Field("startTimeMillis"). + FixedInterval(fixedIntervalString). + MinDocCount(0). + ExtendedBounds(timeRange.extendedStartTimeMillis, timeRange.endTimeMillis) + + cumulativeSumAgg := elastic.NewCumulativeSumAggregation().BucketsPath("_count") + + // Corresponding AGG ES query: + // "aggs": { + // "results_buckets": { + // "date_histogram": { + // "field": "startTimeMillis", + // "fixed_interval": "60s", + // "min_doc_count": 0, + // "extended_bounds": { + // "min": "now-lookback-5m", + // "max": "now" + // } + // }, + // "aggs": { + // "cumulative_requests": { + // "cumulative_sum": { + // "buckets_path": "_count" + // } + // } + // + + dateHistoAgg = dateHistoAgg. + SubAggregation(culmuAggName, cumulativeSumAgg) + + if params.GroupByOperation { + operationsAgg := elastic.NewTermsAggregation(). + Field("operationName"). + Size(10). + SubAggregation("date_histogram", dateHistoAgg) // Nest the dateHistoAgg inside operationsAgg + return operationsAgg + } + + return dateHistoAgg +} + +// executeSearch performs the Elasticsearch search. +func (r MetricsReader) executeSearch(ctx context.Context, p MetricsQueryParams) (*metrics.MetricFamily, error) { + if p.GroupByOperation { + p.metricName = strings.Replace(p.metricName, "service", "service_operation", 1) + p.metricDesc += " & operation" + } + + span := r.queryLogger.TraceQuery(ctx, p.metricName) + defer span.End() + + indexName := r.cfg.Indices.IndexPrefix.Apply("jaeger-span-*") + searchResult, err := r.client.Search(indexName). + Query(&p.boolQuery). + Size(0). // Set Size to 0 to return only aggregation results, excluding individual search hits + Aggregation(aggName, p.aggQuery). + Do(ctx) + if err != nil { + err = fmt.Errorf("failed executing metrics query: %w", err) + r.queryLogger.LogErrorToSpan(span, err) + return nil, err + } + + r.queryLogger.LogAndTraceResult(span, searchResult) + + rawResult, err := ToDomainMetricsFamily(p, searchResult) + if err != nil { + return nil, err + } + + processedResult := p.processMetricsFunc(rawResult, p.BaseQueryParameters) + return processedResult, nil +} + +// normalizeSpanKinds normalizes a slice of span kinds. +func normalizeSpanKinds(spanKinds []string) []string { + normalized := make([]string, len(spanKinds)) + for i, kind := range spanKinds { + normalized[i] = strings.ToLower(strings.TrimPrefix(kind, "SPAN_KIND_")) + } + return normalized +} + +// buildInterfaceSlice converts []string to []interface{} for elastic terms query. +func buildInterfaceSlice(s []string) []any { + ifaceSlice := make([]any, len(s)) + for i, v := range s { + ifaceSlice[i] = v + } + return ifaceSlice +} + +func calculateTimeRange(params *metricstore.BaseQueryParameters) (TimeRange, error) { + if params == nil || params.EndTime == nil || params.Lookback == nil { + return TimeRange{}, errors.New("invalid parameters") + } + endTime := *params.EndTime + startTime := endTime.Add(-*params.Lookback) + extendedStartTime := startTime.Add(-10 * time.Minute) + + return TimeRange{ + startTimeMillis: startTime.UnixMilli(), + endTimeMillis: endTime.UnixMilli(), + extendedStartTimeMillis: extendedStartTime.UnixMilli(), + }, nil +} diff --git a/internal/storage/metricstore/elasticsearch/reader_test.go b/internal/storage/metricstore/elasticsearch/reader_test.go index eff8748f307..d5f34ec62f5 100644 --- a/internal/storage/metricstore/elasticsearch/reader_test.go +++ b/internal/storage/metricstore/elasticsearch/reader_test.go @@ -5,8 +5,13 @@ package elasticsearch import ( "context" + "encoding/json" + "io" + "math" "net/http" "net/http/httptest" + "os" + "strings" "testing" "time" @@ -17,103 +22,459 @@ import ( "go.opentelemetry.io/otel/trace" "go.uber.org/zap" - "github.com/jaegertracing/jaeger/internal/metrics" + esmetrics "github.com/jaegertracing/jaeger/internal/metrics" + "github.com/jaegertracing/jaeger/internal/proto-gen/api_v2/metrics" es "github.com/jaegertracing/jaeger/internal/storage/elasticsearch" "github.com/jaegertracing/jaeger/internal/storage/elasticsearch/config" "github.com/jaegertracing/jaeger/internal/storage/v1/api/metricstore" - "github.com/jaegertracing/jaeger/internal/telemetry" ) -// mockServerResponse simulates a basic Elasticsearch version response. -var mockServerResponse = []byte(` -{ - "version": { - "number": "6.8.0" +var mockCallRateQuery = `{ + "query": { + "bool": { + "filter": [ + {"terms": {"process.serviceName": ["driver"]}}, + {"terms": {"tag.span@kind": ["server"]}}, + {"range": { + "startTimeMillis": { + "gte": 1749894300000, + "lte": 1749894960000, + "format": "epoch_millis" + } + }} + ] } + }, + "size": 0, + "aggregations": { + "results_buckets": { + "date_histogram": { + "field": "startTimeMillis", + "fixed_interval": "60000ms", + "min_doc_count": 0, + "extended_bounds": { + "min": 1749894900000, + "max": 1749894960000 + } + }, + "aggregations": { + "cumulative_requests": { + "cumulative_sum": { + "buckets_path": "_count" + } + } + } + } + } +}` + +const ( + mockEsValidResponse = "testdata/output_valid_es.json" + mockCallRateResponse = "testdata/output_call_rate.json" + mockCallRateOperationResponse = "testdata/output_call_rate_operation.json" + mockEmptyResponse = "testdata/output_empty.json" + mockErrorResponse = "testdata/output_error_es.json" +) + +type metricsTestCase struct { + name string + serviceNames []string + spanKinds []string + groupByOp bool + query string // Elasticsearch query to validate + responseFile string + wantName string + wantDesc string + wantLabels map[string]string + wantPoints []struct { + TimestampSec int64 + Value float64 + } + wantErr string } -`) -// tracerProvider creates a new OpenTelemetry TracerProvider for testing. -func tracerProvider(t *testing.T) trace.TracerProvider { +func tracerProvider(t *testing.T) (trace.TracerProvider, *tracetest.InMemoryExporter) { exporter := tracetest.NewInMemoryExporter() tp := sdktrace.NewTracerProvider( sdktrace.WithSampler(sdktrace.AlwaysSample()), sdktrace.WithSyncer(exporter), ) t.Cleanup(func() { + require.NoError(t, tp.ForceFlush(context.Background())) require.NoError(t, tp.Shutdown(context.Background())) }) - return tp + return tp, exporter } -func clientProvider(t *testing.T, c *config.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) es.Client { +func clientProvider(t *testing.T, c *config.Configuration, logger *zap.Logger, metricsFactory esmetrics.Factory) es.Client { client, err := config.NewClient(context.Background(), c, logger, metricsFactory) require.NoError(t, err) require.NotNil(t, client) - t.Cleanup(func() { require.NoError(t, client.Close()) }) return client } -// setupMetricsReader provides a common setup for tests requiring a MetricsReader. -func setupMetricsReader(t *testing.T) *MetricsReader { - logger := zap.NewNop() - tracer := tracerProvider(t) +func assertMetricFamily(t *testing.T, got *metrics.MetricFamily, m metricsTestCase) { + if got == nil { + t.Fatal("Expected non-nil MetricFamily") + } + assert.Equal(t, m.wantName, got.Name, "Metric name mismatch") + assert.Equal(t, m.wantDesc, got.Help, "Metric description mismatch") + assert.Equal(t, metrics.MetricType_GAUGE, got.Type, "Metric type mismatch") + assert.Len(t, got.Metrics, 1, "Expected one metric") - mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { - w.Header().Set("Content-Type", "application/json") - w.Write(mockServerResponse) - })) - t.Cleanup(mockServer.Close) - cfg := config.Configuration{ - Servers: []string{mockServer.URL}, - LogLevel: "debug", + metric := got.Metrics[0] + gotLabels := make(map[string]string) + for _, label := range metric.Labels { + gotLabels[label.Name] = label.Value } + assert.Equal(t, m.wantLabels, gotLabels, "Labels mismatch") - client := clientProvider(t, &cfg, logger, telemetry.NoopSettings().Metrics) - reader := NewMetricsReader(client, logger, tracer) + if len(m.wantPoints) == 0 { + assert.Empty(t, metric.MetricPoints, "Expected no metric points") + return + } - require.NotNil(t, reader) - return reader + assert.Len(t, metric.MetricPoints, len(m.wantPoints), "Metric points count mismatch") + for i, point := range metric.MetricPoints { + assert.Equal(t, m.wantPoints[i].TimestampSec, point.Timestamp.GetSeconds(), "Timestamp mismatch for point %d", i) + actualValue := point.Value.(*metrics.MetricPoint_GaugeValue).GaugeValue.GetDoubleValue() + assert.InDelta(t, m.wantPoints[i].Value, actualValue, 0.01, "Value mismatch for point %d", i) + } } -func TestGetLatencies(t *testing.T) { - reader := setupMetricsReader(t) +func TestGetCallRates_ErrorCases(t *testing.T) { + endTime := time.UnixMilli(0) + tests := []struct { + name string + params *metricstore.CallRateQueryParameters + wantErr string + }{ + { + name: "nil base params", + params: &metricstore.CallRateQueryParameters{}, + wantErr: "invalid parameters", + }, + { + name: "nil end time params", + params: &metricstore.CallRateQueryParameters{ + BaseQueryParameters: metricstore.BaseQueryParameters{}, + }, + wantErr: "invalid parameters", + }, + { + name: "nil step params", + params: &metricstore.CallRateQueryParameters{ + BaseQueryParameters: metricstore.BaseQueryParameters{ + EndTime: &(endTime), + }, + }, + wantErr: "invalid parameters", + }, + } - qParams := &metricstore.LatenciesQueryParameters{} - r, err := reader.GetLatencies(context.Background(), qParams) - assert.Zero(t, r) - require.ErrorIs(t, err, ErrNotImplemented) - require.EqualError(t, err, ErrNotImplemented.Error()) + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + reader, _ := setupMetricsReaderAndServer(t, "", mockEmptyResponse) + metricFamily, err := reader.GetCallRates(context.Background(), tc.params) + require.Error(t, err) + assert.Contains(t, err.Error(), tc.wantErr) + require.Nil(t, metricFamily) + }) + } } func TestGetCallRates(t *testing.T) { - reader := setupMetricsReader(t) + expectedPoints := []struct { + TimestampSec int64 + Value float64 + }{ + {1749894840, math.NaN()}, + {1749894900, 0.0}, + {1749894960, 0.0}, + {1749895020, 0.0}, + {1749895080, 0.0}, + {1749895140, 0.0}, + {1749895200, 0.0}, + {1749895260, 0.0}, + {1749895320, 0.0}, + {1749895380, 0.75}, + {1749895440, 0.9}, + } + tests := []metricsTestCase{ + { + name: "group by service only", + serviceNames: []string{"driver"}, + spanKinds: []string{"SPAN_KIND_SERVER"}, + groupByOp: false, + query: mockCallRateQuery, + responseFile: mockCallRateResponse, + wantName: "service_call_rate", + wantDesc: "calls/sec, grouped by service", + wantLabels: map[string]string{ + "service_name": "driver", + }, + wantPoints: expectedPoints, + }, + { + name: "group by service and operation", + serviceNames: []string{"driver"}, + spanKinds: []string{"SPAN_KIND_SERVER"}, + groupByOp: true, + responseFile: mockCallRateOperationResponse, + wantName: "service_operation_call_rate", + wantDesc: "calls/sec, grouped by service & operation", + wantLabels: map[string]string{ + "service_name": "driver", + "operation": "/FindNearest", + }, + wantPoints: expectedPoints, + }, + { + name: "different service names", + serviceNames: []string{"jaeger"}, + spanKinds: []string{"SPAN_KIND_SERVER", "SPAN_KIND_CLIENT"}, + groupByOp: false, + responseFile: mockCallRateResponse, + wantName: "service_call_rate", + wantDesc: "calls/sec, grouped by service", + wantLabels: map[string]string{ + "service_name": "jaeger", + }, + wantPoints: expectedPoints, + }, + { + name: "empty response", + serviceNames: []string{"driver"}, + spanKinds: []string{"SPAN_KIND_SERVER"}, + groupByOp: false, + responseFile: mockEmptyResponse, + wantName: "service_call_rate", + wantDesc: "calls/sec, grouped by service", + wantLabels: map[string]string{ + "service_name": "driver", + }, + wantPoints: nil, + }, + { + name: "server error", + serviceNames: []string{"driver"}, + spanKinds: []string{"SPAN_KIND_SERVER"}, + groupByOp: false, + responseFile: mockErrorResponse, + wantErr: "failed executing metrics query", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + reader, exporter := setupMetricsReaderAndServer(t, tc.query, tc.responseFile) + + params := &metricstore.CallRateQueryParameters{ + BaseQueryParameters: buildTestBaseQueryParameters(tc), + } - qParams := &metricstore.CallRateQueryParameters{} - r, err := reader.GetCallRates(context.Background(), qParams) + metricFamily, err := reader.GetCallRates(context.Background(), params) + if tc.wantErr != "" { + require.ErrorContains(t, err, tc.wantErr) + assert.Nil(t, metricFamily) + } else { + require.NoError(t, err) + assertMetricFamily(t, metricFamily, tc) + } + + spans := exporter.GetSpans() + if tc.wantErr == "" { + assert.Len(t, spans, 1, "Expected one span for the Elasticsearch query") + } + }) + } +} + +func TestGetLatencies(t *testing.T) { + reader, _ := setupMetricsReaderAndServer(t, "", mockEsValidResponse) + r, err := reader.GetLatencies(context.Background(), &metricstore.LatenciesQueryParameters{}) assert.Zero(t, r) require.ErrorIs(t, err, ErrNotImplemented) require.EqualError(t, err, ErrNotImplemented.Error()) } func TestGetErrorRates(t *testing.T) { - reader := setupMetricsReader(t) - - qParams := &metricstore.ErrorRateQueryParameters{} - r, err := reader.GetErrorRates(context.Background(), qParams) + reader, _ := setupMetricsReaderAndServer(t, "", mockEsValidResponse) + r, err := reader.GetErrorRates(context.Background(), &metricstore.ErrorRateQueryParameters{}) assert.Zero(t, r) require.ErrorIs(t, err, ErrNotImplemented) require.EqualError(t, err, ErrNotImplemented.Error()) } func TestGetMinStepDuration(t *testing.T) { - reader := setupMetricsReader(t) - - params := metricstore.MinStepDurationQueryParameters{} - minStep, err := reader.GetMinStepDuration(context.Background(), ¶ms) + reader, _ := setupMetricsReaderAndServer(t, "", mockEsValidResponse) + minStep, err := reader.GetMinStepDuration(context.Background(), &metricstore.MinStepDurationQueryParameters{}) require.NoError(t, err) assert.Equal(t, time.Millisecond, minStep) } + +func sendResponse(t *testing.T, w http.ResponseWriter, responseFile string) { + bytes, err := os.ReadFile(responseFile) + require.NoError(t, err) + + _, err = w.Write(bytes) + require.NoError(t, err) +} + +func startMockEsServer(t *testing.T, wantEsQuery string, responseFile string) *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + + // Handle initial ping request + if r.Method == http.MethodHead || r.URL.Path == "/" { + sendResponse(t, w, mockEsValidResponse) + return + } + + // Read request body + body, err := io.ReadAll(r.Body) + assert.NoError(t, err, "Failed to read request body") + defer r.Body.Close() + + // Validate query if provided + if wantEsQuery != "" { + var expected, actual map[string]any + assert.NoError(t, json.Unmarshal([]byte(wantEsQuery), &expected)) + assert.NoError(t, json.Unmarshal(body, &actual)) + normalizeScripts(expected) + normalizeScripts(actual) + + compareQueryStructure(t, expected, actual) + } + + sendResponse(t, w, responseFile) + })) +} + +func normalizeScripts(m any) { + if m, ok := m.(map[string]any); ok { + if script, ok := m["script"].(map[string]any); ok { + if source, ok := script["source"].(string); ok { + // Remove whitespace and newlines for comparison + script["source"] = strings.Join(strings.Fields(source), " ") + } + } + for _, v := range m { + normalizeScripts(v) + } + } +} + +func compareQueryStructure(t *testing.T, expected, actual map[string]any) { + // Compare the bool query structure (without time ranges) + if expectedQuery, ok := expected["query"].(map[string]any); ok { + actualQuery := actual["query"].(map[string]any) + compareBoolQuery(t, expectedQuery, actualQuery) + } + + // Compare aggregations + if expectedAggs, ok := expected["aggregations"].(map[string]any); ok { + actualAggs := actual["aggregations"].(map[string]any) + // For convenience, we remove date_histogram for easier comparison here because date_histogram includes time bounds which can vary by a few milliseconds + removeHistogramBounds(expectedAggs) + removeHistogramBounds(actualAggs) + + assert.Equal(t, expectedAggs, actualAggs, "Aggregations mismatch") + } +} + +// Simple helper to remove extended_bounds from any date_histogram +func removeHistogramBounds(aggs map[string]any) { + for _, agg := range aggs { + aggMap, ok := agg.(map[string]any) + if !ok { + continue + } + + // Remove from date_histogram if present + if histo, ok := aggMap["date_histogram"].(map[string]any); ok { + delete(histo, "extended_bounds") + } + + // Handle nested aggregations + if nested, ok := aggMap["aggregations"].(map[string]any); ok { + removeHistogramBounds(nested) + } + } +} + +func compareBoolQuery(t *testing.T, expected, actual map[string]any) { + expectedBool, eok := expected["bool"].(map[string]any) + actualBool, aok := actual["bool"].(map[string]any) + + if !eok || !aok { + return + } + + // Compare filters (excluding time ranges) + if expectedFilters, ok := expectedBool["filter"].([]any); ok { + actualFilters := actualBool["filter"].([]any) + compareFilters(t, expectedFilters, actualFilters) + } +} + +func compareFilters(t *testing.T, expected, actual []any) { + // We'll compare the same number of filters, but skip time ranges + assert.Len(t, actual, len(expected), "Different number of filters") + + for i := range expected { + expectedFilter := expected[i].(map[string]any) + actualFilter := actual[i].(map[string]any) + + // Skip range queries entirely + if _, isRange := expectedFilter["range"]; isRange { + continue + } + + assert.Equal(t, expectedFilter, actualFilter, "Filter mismatch at index %d", i) + } +} + +func setupMetricsReaderAndServer(t *testing.T, wantEsQuery string, responseFile string) (*MetricsReader, *tracetest.InMemoryExporter) { + logger, _ := zap.NewDevelopment() // Use development logger for client-side logs + tracer, exporter := tracerProvider(t) + + mockServer := startMockEsServer(t, wantEsQuery, responseFile) + + t.Cleanup(mockServer.Close) + cfg := config.Configuration{ + Servers: []string{mockServer.URL}, + LogLevel: "debug", + Tags: config.TagsAsFields{ + Include: "span.kind,error", + DotReplacement: "@", + }, + } + + client := clientProvider(t, &cfg, logger, esmetrics.NullFactory) + reader := NewMetricsReader(client, cfg, logger, tracer) + require.NotNil(t, reader) + + return reader, exporter +} + +func buildTestBaseQueryParameters(tc metricsTestCase) metricstore.BaseQueryParameters { + endTime := time.UnixMilli(1749894900000) + lookback := 6 * time.Hour + step := time.Minute + ratePer := 10 * time.Minute + + return metricstore.BaseQueryParameters{ + ServiceNames: tc.serviceNames, + GroupByOperation: tc.groupByOp, + EndTime: &endTime, + Lookback: &lookback, + Step: &step, + RatePer: &ratePer, + SpanKinds: tc.spanKinds, + } +} diff --git a/internal/storage/metricstore/elasticsearch/testdata/output_call_rate.json b/internal/storage/metricstore/elasticsearch/testdata/output_call_rate.json new file mode 100644 index 00000000000..03b772d5839 --- /dev/null +++ b/internal/storage/metricstore/elasticsearch/testdata/output_call_rate.json @@ -0,0 +1,109 @@ +{ + "took": 5, + "timed_out": false, + "_shards": { + "total": 1, + "successful": 1, + "skipped": 0, + "failed": 0 + }, + "hits": { + "total": 100, + "max_score": 0.0, + "hits": [] + }, + "aggregations": { + "results_buckets": { + "buckets": [ + { + "key_as_string": "1749894840000", + "key": 1749894840000, + "doc_count": 10, + "cumulative_requests": { + "value": 0 + } + }, + { + "key_as_string": "1749894900000", + "key": 1749894900000, + "doc_count": 10, + "cumulative_requests": { + "value": 10 + } + }, + { + "key_as_string": "1749894960000", + "key": 1749894960000, + "doc_count": 20, + "cumulative_requests": { + "value": 30 + } + }, + { + "key_as_string": "1749895020000", + "key": 1749895020000, + "doc_count": 30, + "cumulative_requests": { + "value": 60 + } + }, + { + "key_as_string": "1749895080000", + "key": 1749895080000, + "doc_count": 40, + "cumulative_requests": { + "value": 100 + } + }, + { + "key_as_string": "1749895140000", + "key": 1749895140000, + "doc_count": 50, + "cumulative_requests": { + "value": 150 + } + }, + { + "key_as_string": "1749895200000", + "key": 1749895200000, + "doc_count": 60, + "cumulative_requests": { + "value": 210 + } + }, + { + "key_as_string": "1749895260000", + "key": 1749895260000, + "doc_count": 70, + "cumulative_requests": { + "value": 280 + } + }, + { + "key_as_string": "1749895320000", + "key": 1749895320000, + "doc_count": 80, + "cumulative_requests": { + "value": 360 + } + }, + { + "key_as_string": "1749895380000", + "key": 1749895380000, + "doc_count": 90, + "cumulative_requests": { + "value": 450 + } + }, + { + "key_as_string": "1749895440000", + "key": 1749895440000, + "doc_count": 100, + "cumulative_requests": { + "value": 550 + } + } + ] + } + } +} \ No newline at end of file diff --git a/internal/storage/metricstore/elasticsearch/testdata/output_call_rate_operation.json b/internal/storage/metricstore/elasticsearch/testdata/output_call_rate_operation.json new file mode 100644 index 00000000000..c75e533c728 --- /dev/null +++ b/internal/storage/metricstore/elasticsearch/testdata/output_call_rate_operation.json @@ -0,0 +1,119 @@ +{ + "took": 501, + "timed_out": false, + "_shards": { + "total": 5, + "successful": 5, + "skipped": 0, + "failed": 0 + }, + "hits": { + "total": 10000, + "max_score": null, + "hits": [] + }, + "aggregations": { + "results_buckets": { + "doc_count_error_upper_bound": 0, + "sum_other_doc_count": 0, + "buckets": [ + { + "key": "/FindNearest", + "doc_count": 100, + "date_histogram": { + "buckets": [ + { + "key_as_string": "1749894840000", + "key": 1749894840000, + "doc_count": 10, + "cumulative_requests": { + "value": 0 + } + }, + { + "key_as_string": "1749894900000", + "key": 1749894900000, + "doc_count": 10, + "cumulative_requests": { + "value": 10 + } + }, + { + "key_as_string": "1749894960000", + "key": 1749894960000, + "doc_count": 20, + "cumulative_requests": { + "value": 30 + } + }, + { + "key_as_string": "1749895020000", + "key": 1749895020000, + "doc_count": 30, + "cumulative_requests": { + "value": 60 + } + }, + { + "key_as_string": "1749895080000", + "key": 1749895080000, + "doc_count": 40, + "cumulative_requests": { + "value": 100 + } + }, + { + "key_as_string": "1749895140000", + "key": 1749895140000, + "doc_count": 50, + "cumulative_requests": { + "value": 150 + } + }, + { + "key_as_string": "1749895200000", + "key": 1749895200000, + "doc_count": 60, + "cumulative_requests": { + "value": 210 + } + }, + { + "key_as_string": "1749895260000", + "key": 1749895260000, + "doc_count": 70, + "cumulative_requests": { + "value": 280 + } + }, + { + "key_as_string": "1749895320000", + "key": 1749895320000, + "doc_count": 80, + "cumulative_requests": { + "value": 360 + } + }, + { + "key_as_string": "1749895380000", + "key": 1749895380000, + "doc_count": 90, + "cumulative_requests": { + "value": 450 + } + }, + { + "key_as_string": "1749895440000", + "key": 1749895440000, + "doc_count": 100, + "cumulative_requests": { + "value": 550 + } + } + ] + } + } + ] + } + } +} \ No newline at end of file diff --git a/internal/storage/metricstore/elasticsearch/testdata/output_empty.json b/internal/storage/metricstore/elasticsearch/testdata/output_empty.json new file mode 100644 index 00000000000..f091b204d4b --- /dev/null +++ b/internal/storage/metricstore/elasticsearch/testdata/output_empty.json @@ -0,0 +1,21 @@ + +{ + "took": 5, + "timed_out": false, + "_shards": { + "total": 1, + "successful": 1, + "skipped": 0, + "failed": 0 + }, + "hits": { + "total": 0, + "max_score": 0.0, + "hits": [] + }, + "aggregations": { + "results_buckets": { + "buckets": [] + } + } +} diff --git a/internal/storage/metricstore/elasticsearch/testdata/output_error_es.json b/internal/storage/metricstore/elasticsearch/testdata/output_error_es.json new file mode 100644 index 00000000000..77862cf261c --- /dev/null +++ b/internal/storage/metricstore/elasticsearch/testdata/output_error_es.json @@ -0,0 +1 @@ +{"error": "internal server error"} \ No newline at end of file diff --git a/internal/storage/metricstore/elasticsearch/testdata/output_latencies.json b/internal/storage/metricstore/elasticsearch/testdata/output_latencies.json new file mode 100644 index 00000000000..7aa782aadc7 --- /dev/null +++ b/internal/storage/metricstore/elasticsearch/testdata/output_latencies.json @@ -0,0 +1,48 @@ + +{ + "took": 5, + "timed_out": false, + "_shards": { + "total": 1, + "successful": 1, + "skipped": 0, + "failed": 0 + }, + "hits": { + "total": 100, + "max_score": 0.0, + "hits": [] + }, + "aggregations": { + "results_buckets": { + "buckets": [ + { + "key_as_string": "2025-06-13T11:15:00.000Z", + "key": 1749894900000, + "doc_count": 50, + "percentiles_of_bucket": { + "values": { + "95.0": 200.0 + } + }, + "results": { + "value": 0.2 + } + }, + { + "key_as_string": "2025-06-13T11:16:00.000Z", + "key": 1749894960000, + "doc_count": 60, + "percentiles_of_bucket": { + "values": { + "95.0": 210.0 + } + }, + "results": { + "value": 0.21 + } + } + ] + } + } +} diff --git a/internal/storage/metricstore/elasticsearch/testdata/output_valid_es.json b/internal/storage/metricstore/elasticsearch/testdata/output_valid_es.json new file mode 100644 index 00000000000..a84fda1ff14 --- /dev/null +++ b/internal/storage/metricstore/elasticsearch/testdata/output_valid_es.json @@ -0,0 +1,5 @@ +{ + "version": { + "number": "6.8.0" + } +} \ No newline at end of file diff --git a/internal/storage/metricstore/elasticsearch/to_domain.go b/internal/storage/metricstore/elasticsearch/to_domain.go new file mode 100644 index 00000000000..d440c059af7 --- /dev/null +++ b/internal/storage/metricstore/elasticsearch/to_domain.go @@ -0,0 +1,154 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package elasticsearch + +import ( + "fmt" + "time" + + "github.com/gogo/protobuf/types" + "github.com/olivere/elastic/v7" + + "github.com/jaegertracing/jaeger/internal/proto-gen/api_v2/metrics" +) + +// ToDomainMetricsFamily converts Elasticsearch aggregations to Jaeger's MetricFamily. +func ToDomainMetricsFamily(m MetricsQueryParams, result *elastic.SearchResult) (*metrics.MetricFamily, error) { + domainMetrics, err := toDomainMetrics(m, result) + if err != nil { + return nil, fmt.Errorf("failed to convert aggregations to metrics: %w", err) + } + + return &metrics.MetricFamily{ + Name: m.metricName, + Type: metrics.MetricType_GAUGE, + Help: m.metricDesc, + Metrics: domainMetrics, + }, nil +} + +// toDomainMetrics converts Elasticsearch aggregations to Jaeger metrics. +func toDomainMetrics(m MetricsQueryParams, result *elastic.SearchResult) ([]*metrics.Metric, error) { + labels := buildServiceLabels(m.ServiceNames) + + if !m.GroupByOperation { + buckets, err := extractBuckets(result) + if err != nil { + return nil, err + } + return []*metrics.Metric{ + { + Labels: labels, + MetricPoints: toDomainMetricPoints(m.bucketsToPointsFunc(buckets)), + }, + }, nil + } + + // Handle grouped results when groupByOp is true + agg, found := result.Aggregations.Terms(aggName) + if !found { + return nil, fmt.Errorf("%s aggregation not found", aggName) + } + + var metricsData []*metrics.Metric + for _, bucket := range agg.Buckets { + metric, err := processOperationBucket(m, bucket, labels) + if err != nil { + return nil, fmt.Errorf("failed to process bucket: %w", err) + } + metricsData = append(metricsData, metric) + } + + return metricsData, nil +} + +func buildServiceLabels(serviceNames []string) []*metrics.Label { + labels := make([]*metrics.Label, len(serviceNames)) + for i, name := range serviceNames { + labels[i] = &metrics.Label{Name: "service_name", Value: name} + } + return labels +} + +func processOperationBucket(m MetricsQueryParams, bucket *elastic.AggregationBucketKeyItem, baseLabels []*metrics.Label) (*metrics.Metric, error) { + key, ok := bucket.Key.(string) + if !ok { + return nil, fmt.Errorf("bucket key is not a string: %v", bucket.Key) + } + + // Extract nested date_histogram buckets + dateHistAgg, found := bucket.Aggregations.DateHistogram("date_histogram") + if !found { + return nil, fmt.Errorf("date_histogram aggregation not found in bucket %q", key) + } + + // Combine base labels with operation label + labels := append(baseLabels, toDomainLabels(key)...) + + return &metrics.Metric{ + Labels: labels, + MetricPoints: toDomainMetricPoints(m.bucketsToPointsFunc(dateHistAgg.Buckets)), + }, nil +} + +// toDomainLabels converts the bucket key to Jaeger metric labels. +func toDomainLabels(key string) []*metrics.Label { + return []*metrics.Label{ + { + Name: "operation", + Value: key, + }, + } +} + +// extractBuckets retrieves date histogram buckets from Elasticsearch results. +func extractBuckets(result *elastic.SearchResult) ([]*elastic.AggregationBucketHistogramItem, error) { + agg, found := result.Aggregations.DateHistogram(aggName) + if !found { + return nil, fmt.Errorf("%s aggregation not found", aggName) + } + return agg.Buckets, nil +} + +// toDomainMetricPoints converts Elasticsearch buckets to Jaeger metric points. +func toDomainMetricPoints(rawResult []*Pair) []*metrics.MetricPoint { + metricPoints := make([]*metrics.MetricPoint, 0, len(rawResult)) + for _, pair := range rawResult { + mp := toDomainMetricPoint(pair) + if mp != nil { + metricPoints = append(metricPoints, mp) + } + } + + return metricPoints +} + +// toDomainMetricPoint converts a single Pair to a Jaeger metric point. +func toDomainMetricPoint(pair *Pair) *metrics.MetricPoint { + timestamp := toDomainTimestamp(pair.TimeStamp) + if timestamp == nil { + return nil + } + + return &metrics.MetricPoint{ + Value: toDomainMetricPointValue(pair.Value), + Timestamp: timestamp, + } +} + +// toDomainTimestamp converts milliseconds since epoch to protobuf Timestamp. +func toDomainTimestamp(millis int64) *types.Timestamp { + timestamp := time.Unix(0, millis*int64(time.Millisecond)) + protoTimestamp, _ := types.TimestampProto(timestamp) + return protoTimestamp +} + +// toDomainMetricPointValue converts a float64 value to Jaeger's gauge metric point. +func toDomainMetricPointValue(value float64) *metrics.MetricPoint_GaugeValue { + return &metrics.MetricPoint_GaugeValue{ + GaugeValue: &metrics.GaugeValue{ + Value: &metrics.GaugeValue_DoubleValue{DoubleValue: value}, + }, + } +} diff --git a/internal/storage/metricstore/elasticsearch/to_domain_test.go b/internal/storage/metricstore/elasticsearch/to_domain_test.go new file mode 100644 index 00000000000..dc44dd97097 --- /dev/null +++ b/internal/storage/metricstore/elasticsearch/to_domain_test.go @@ -0,0 +1,286 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package elasticsearch + +import ( + "encoding/json" + "testing" + "time" + + "github.com/gogo/protobuf/types" + "github.com/olivere/elastic/v7" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/jaegertracing/jaeger/internal/proto-gen/api_v2/metrics" + "github.com/jaegertracing/jaeger/internal/storage/v1/api/metricstore" +) + +func TestToMetricsFamily(t *testing.T) { + tests := []struct { + name string + params MetricsQueryParams + result *elastic.SearchResult + expected *metrics.MetricFamily + err string + }{ + { + name: "successful conversion", + params: mockMetricsQueryParams([]string{"service1"}, false), + result: createTestSearchResult(false), + expected: &metrics.MetricFamily{ + Name: "test_metric", + Type: metrics.MetricType_GAUGE, + Help: "test description", + Metrics: []*metrics.Metric{ + { + Labels: []*metrics.Label{ + {Name: "service_name", Value: "service1"}, + }, + MetricPoints: []*metrics.MetricPoint{ + createEpochGaugePoint(1.23), + }, + }, + }, + }, + }, + { + name: "missing aggregation", + params: MetricsQueryParams{ + metricName: "test_metric", + }, + result: &elastic.SearchResult{ + Aggregations: make(elastic.Aggregations), + }, + err: "results_buckets aggregation not found", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := ToDomainMetricsFamily(tt.params, tt.result) + if tt.err != "" { + require.ErrorContains(t, err, tt.err) + return + } + require.NoError(t, err) + require.Equal(t, tt.expected, got) + }) + } +} + +func TestToDomainMetrics(t *testing.T) { + tests := []struct { + name string + params MetricsQueryParams + result *elastic.SearchResult + expected []*metrics.Metric + err string + }{ + { + name: "simple metrics", + params: mockMetricsQueryParams([]string{"service1"}, false), + result: createTestSearchResult(false), + expected: []*metrics.Metric{ + { + Labels: []*metrics.Label{ + {Name: "service_name", Value: "service1"}, + }, + MetricPoints: []*metrics.MetricPoint{ + createEpochGaugePoint(1.23), + }, + }, + }, + }, + { + name: "grouped by operation", + params: mockMetricsQueryParams([]string{"service1"}, true), + result: createTestSearchResult(true), + expected: []*metrics.Metric{ + { + Labels: []*metrics.Label{ + {Name: "service_name", Value: "service1"}, + {Name: "operation", Value: "op1"}, + }, + MetricPoints: []*metrics.MetricPoint{ + createEpochGaugePoint(1.23), + }, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := toDomainMetrics(tt.params, tt.result) + if tt.err != "" { + require.ErrorContains(t, err, tt.err) + return + } + require.NoError(t, err) + require.Equal(t, tt.expected, got) + }) + } +} + +func TestToDomainMetrics_ErrorCases(t *testing.T) { + tests := []struct { + name string + params MetricsQueryParams + result *elastic.SearchResult + errMsg string + }{ + { + name: "missing terms aggregation when group by operation", + params: mockMetricsQueryParams([]string{"service1"}, true), + result: &elastic.SearchResult{ + Aggregations: make(elastic.Aggregations), // Empty aggregations + }, + errMsg: "results_buckets aggregation not found", + }, + { + name: "bucket key not string", + params: mockMetricsQueryParams([]string{"service1"}, true), + result: createTestSearchResultWithNonStringKey(), + errMsg: "bucket key is not a string", + }, + { + name: "missing date histogram in operation bucket", + params: mockMetricsQueryParams([]string{"service1"}, true), + result: createTestSearchResultMissingDateHistogram(), + errMsg: "date_histogram aggregation not found in bucket", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := toDomainMetrics(tt.params, tt.result) + require.Error(t, err) + assert.Contains(t, err.Error(), tt.errMsg) + }) + } +} + +func createEpochGaugePoint(value float64) *metrics.MetricPoint { + return &metrics.MetricPoint{ + Value: &metrics.MetricPoint_GaugeValue{ + GaugeValue: &metrics.GaugeValue{ + Value: &metrics.GaugeValue_DoubleValue{DoubleValue: value}, + }, + }, + Timestamp: mustTimestampProto(time.Unix(0, 0)), + } +} + +// mockMetricsQueryParams creates a MetricsQueryParams struct for testing. +func mockMetricsQueryParams(serviceNames []string, groupByOp bool) MetricsQueryParams { + return MetricsQueryParams{ + metricName: "test_metric", + metricDesc: "test description", + BaseQueryParameters: metricstore.BaseQueryParameters{ + ServiceNames: serviceNames, + GroupByOperation: groupByOp, + }, + bucketsToPointsFunc: func(_ []*elastic.AggregationBucketHistogramItem) []*Pair { + return []*Pair{{TimeStamp: 0, Value: 1.23}} + }, + processMetricsFunc: func(mf *metrics.MetricFamily, _ metricstore.BaseQueryParameters) *metrics.MetricFamily { + return mf + }, + } +} + +// createTestSearchResultWithNonStringKey creates an Elasticsearch SearchResult +// where the bucket key for operation is an integer, causing a type error. +func createTestSearchResultWithNonStringKey() *elastic.SearchResult { + rawAggregation := json.RawMessage(`{ + "buckets": [{ + "key": 12345, + "doc_count": 10, + "date_histogram": { + "buckets": [{ + "key": 123456, + "doc_count": 5, + "results": {"value": 1.23} + }] + } + }] + }`) + + aggs := make(elastic.Aggregations) + aggs[aggName] = rawAggregation + + return &elastic.SearchResult{ + Aggregations: aggs, + } +} + +// createTestSearchResultMissingDateHistogram creates an Elasticsearch SearchResult +// where an operation bucket is missing the expected date_histogram aggregation. +func createTestSearchResultMissingDateHistogram() *elastic.SearchResult { + rawAggregation := json.RawMessage(`{ + "buckets": [{ + "key": "op1", + "doc_count": 10 + }] + }`) + + aggs := make(elastic.Aggregations) + aggs[aggName] = rawAggregation + + return &elastic.SearchResult{ + Aggregations: aggs, + } +} + +// createTestSearchResult creates a well-formed Elasticsearch SearchResult +// for testing successful conversions, with or without operation grouping. +func createTestSearchResult(groupByOperation bool) *elastic.SearchResult { + var rawAggregation json.RawMessage + + if groupByOperation { + rawAggregation = json.RawMessage(`{ + "buckets": [{ + "key": "op1", + "doc_count": 10, + "date_histogram": { + "buckets": [{ + "key_as_string": "123456", + "key": 123456, + "doc_count": 5, + "cumulative_requests": { + "value": 1.23 + } + }] + } + }] + }`) + } else { + rawAggregation = json.RawMessage(`{ + "buckets": [{ + "key_as_string": "123456", + "key": 123456, + "doc_count": 5, + "cumulative_requests": { + "value": 1.23 + } + }] + }`) + } + + aggs := make(elastic.Aggregations) + aggs[aggName] = rawAggregation + + return &elastic.SearchResult{ + Aggregations: aggs, + } +} + +func mustTimestampProto(t time.Time) *types.Timestamp { + ts, err := types.TimestampProto(t) + if err != nil { + panic(err) + } + return ts +}