Skip to content

Commit c4dd245

Browse files
[receiver/sqlquery] Add support of optional Start and Stop Timestamp (#19160)
In this PR, support for Start and Stop Timestamp to be picked up from the metrics stored in the rows is added. This will allow SQL queries to be more expressive and allows users to define the period of aggregation which is not only dependent on the default system time set upon processing. **Link to tracking Issue:** #18925 #14146 --------- Co-authored-by: Juraci Paixão Kröhling <[email protected]>
1 parent 1edf7c9 commit c4dd245

File tree

5 files changed

+128
-1
lines changed

5 files changed

+128
-1
lines changed
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: enhancement
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
5+
component: sqlqueryreceiver
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: Add support of Start and End Timestamp Column in Metric Configuration.
9+
10+
# One or more tracking issues related to the change
11+
issues: [18925, 14146]
12+
13+
# (Optional) One or more lines of additional information to render under the primary note.
14+
# These lines will be padded with 2 spaces and then inserted directly into the document.
15+
# Use pipe (|) for multiline entries.
16+
subtext:

receiver/sqlqueryreceiver/README.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,12 @@ Each _metric_ in the configuration will produce one OTel metric per row returned
111111
to `cumulative`.
112112
- `description` (optional): the description applied to the metric.
113113
- `unit` (optional): the units applied to the metric.
114-
- `static_attributes` (optional): static attributes applied to the metrics
114+
- `static_attributes` (optional): static attributes applied to the metrics.
115+
- `start_ts_column` (optional): the name of the column containing the start timestamp, the value of which is applied to
116+
the metric's start timestamp (otherwise the current time is used). Only applies if the metric is of type cumulative
117+
sum.
118+
- `ts_column` (optional): the name of the column containing the timestamp, the value of which is applied to the
119+
metric's timestamp. This can be current timestamp depending upon the time of last recorded metric's datapoint.
115120

116121
### Example
117122

receiver/sqlqueryreceiver/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ type MetricCfg struct {
9393
Unit string `mapstructure:"unit"`
9494
Description string `mapstructure:"description"`
9595
StaticAttributes map[string]string `mapstructure:"static_attributes"`
96+
StartTsColumn string `mapstructure:"start_ts_column"`
97+
TsColumn string `mapstructure:"ts_column"`
9698
}
9799

98100
func (c MetricCfg) Validate() error {

receiver/sqlqueryreceiver/metrics.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,28 @@ func rowToMetric(row stringMap, cfg MetricCfg, dest pmetric.Metric, startTime pc
1818
dest.SetUnit(cfg.Unit)
1919
dataPointSlice := setMetricFields(cfg, dest)
2020
dataPoint := dataPointSlice.AppendEmpty()
21+
if cfg.StartTsColumn != "" {
22+
if val, found := row[cfg.StartTsColumn]; found {
23+
timestamp, err := strconv.ParseInt(val, 10, 64)
24+
if err != nil {
25+
return fmt.Errorf("failed to parse uint64 for %q, value was %q: %w", cfg.StartTsColumn, val, err)
26+
}
27+
startTime = pcommon.Timestamp(timestamp)
28+
} else {
29+
return fmt.Errorf("rowToMetric: start_ts_column not found")
30+
}
31+
}
32+
if cfg.TsColumn != "" {
33+
if val, found := row[cfg.TsColumn]; found {
34+
timestamp, err := strconv.ParseInt(val, 10, 64)
35+
if err != nil {
36+
return fmt.Errorf("failed to parse uint64 for %q, value was %q: %w", cfg.TsColumn, val, err)
37+
}
38+
ts = pcommon.Timestamp(timestamp)
39+
} else {
40+
return fmt.Errorf("rowToMetric: ts_column not found")
41+
}
42+
}
2143
setTimestamp(cfg, dataPoint, startTime, ts, scrapeCfg)
2244
value, found := row[cfg.ValueColumn]
2345
if !found {

receiver/sqlqueryreceiver/scraper_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/stretchr/testify/assert"
1313
"github.com/stretchr/testify/require"
1414
"go.opentelemetry.io/collector/component/componenttest"
15+
"go.opentelemetry.io/collector/pdata/pcommon"
1516
"go.opentelemetry.io/collector/pdata/pmetric"
1617
"go.opentelemetry.io/collector/receiver/scrapererror"
1718
"go.uber.org/zap"
@@ -398,3 +399,84 @@ func TestScraper_FakeDB_MultiRows_Error(t *testing.T) {
398399
assert.Error(t, err)
399400
assert.True(t, scrapererror.IsPartialScrapeError(err))
400401
}
402+
403+
func TestScraper_StartAndTSColumn(t *testing.T) {
404+
client := &fakeDBClient{
405+
stringMaps: [][]stringMap{{
406+
{
407+
"mycol": "42",
408+
"StartTs": "1682417791",
409+
"Ts": "1682418264",
410+
},
411+
}},
412+
}
413+
scrpr := scraper{
414+
client: client,
415+
query: Query{
416+
Metrics: []MetricCfg{{
417+
MetricName: "my.name",
418+
ValueColumn: "mycol",
419+
TsColumn: "Ts",
420+
StartTsColumn: "StartTs",
421+
DataType: MetricTypeSum,
422+
Aggregation: MetricAggregationCumulative,
423+
}},
424+
},
425+
}
426+
metrics, err := scrpr.Scrape(context.Background())
427+
require.NoError(t, err)
428+
metric := metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0)
429+
assert.Equal(t, pcommon.Timestamp(1682417791), metric.Sum().DataPoints().At(0).StartTimestamp())
430+
assert.Equal(t, pcommon.Timestamp(1682418264), metric.Sum().DataPoints().At(0).Timestamp())
431+
}
432+
433+
func TestScraper_StartAndTS_ErrorOnColumnNotFound(t *testing.T) {
434+
client := &fakeDBClient{
435+
stringMaps: [][]stringMap{{
436+
{
437+
"mycol": "42",
438+
"StartTs": "1682417791",
439+
},
440+
}},
441+
}
442+
scrpr := scraper{
443+
client: client,
444+
query: Query{
445+
Metrics: []MetricCfg{{
446+
MetricName: "my.name",
447+
ValueColumn: "mycol",
448+
TsColumn: "Ts",
449+
StartTsColumn: "StartTs",
450+
DataType: MetricTypeSum,
451+
Aggregation: MetricAggregationCumulative,
452+
}},
453+
},
454+
}
455+
_, err := scrpr.Scrape(context.Background())
456+
assert.Error(t, err)
457+
}
458+
459+
func TestScraper_StartAndTS_ErrorOnParse(t *testing.T) {
460+
client := &fakeDBClient{
461+
stringMaps: [][]stringMap{{
462+
{
463+
"mycol": "42",
464+
"StartTs": "blah",
465+
},
466+
}},
467+
}
468+
scrpr := scraper{
469+
client: client,
470+
query: Query{
471+
Metrics: []MetricCfg{{
472+
MetricName: "my.name",
473+
ValueColumn: "mycol",
474+
StartTsColumn: "StartTs",
475+
DataType: MetricTypeSum,
476+
Aggregation: MetricAggregationCumulative,
477+
}},
478+
},
479+
}
480+
_, err := scrpr.Scrape(context.Background())
481+
assert.Error(t, err)
482+
}

0 commit comments

Comments
 (0)