Skip to content

Commit fb67245

Browse files
cuichenlivincentfree
authored andcommitted
[receiver/postgresql] add basic query sample collection for postgresqlreceiver (open-telemetry#39573)
#### Description we introduced the top query collection to postgresql receiver. this will collect the queries from `pg_stat_statements` and report the related metrics. this will help the end user to identify which queries are most executed, and resources most used. Example output: ``` resourceLogs: - resource: attributes: [] scopeLogs: - logRecords: - attributes: - key: db.system.name value: stringValue: postgresql - key: db.namespace value: stringValue: postgres - key: db.query.text value: stringValue: "select * from pg_stat_activity where id = ?" - key: postgresql.calls value: intValue: 3 - key: postgresql.rows value: intValue: 10 - key: postgresql.shared_blks_dirtied value: intValue: 1 - key: postgresql.shared_blks_hit value: intValue: 2 - key: postgresql.shared_blks_read value: intValue: 3 - key: postgresql.shared_blks_written value: intValue: 4 - key: postgresql.temp_blks_read value: intValue: 5 - key: postgresql.temp_blks_written value: intValue: 6 - key: postgresql.queryid value: stringValue: "114514" - key: postgresql.rolname value: stringValue: master - key: postgresql.total_exec_time value: doubleValue: 1 - key: postgresql.total_plan_time value: doubleValue: 1 body: stringValue: top query scope: name: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/postgresqlreceiver version: 0.0.1 ```
1 parent e02d93c commit fb67245

File tree

19 files changed

+679
-28
lines changed

19 files changed

+679
-28
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: postgresqlreceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: we introduced the top query collection to postgresql receiver. this will collect the queries from `pg_stat_statements` and report the related metrics. this will help the end user to identify which queries are most executed, and resources most used.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [39573]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/postgresqlreceiver/README.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,33 @@ to grant the user you are using `pg_monitor`. Take the example from `testdata/in
6868
GRANT pg_monitor TO otelu;
6969
```
7070

71+
The following options are available:
72+
- `enabled`: (optional, default=false) whether this collection is enabled.
73+
- `max_rows_per_query`: (optional, default=1000) The max number of rows would return from the query
74+
against `pg_stat_activity`.
75+
76+
### Top Query Collection
77+
We provide functionality to collect the most executed queries from postgresql. It will get data from `pg_stat_statements` and report incremental value of `total_exec_time`, `total_plan_time`, `calls`, `rows`, `shared_blks_dirtied`, `shared_blks_hit`, `shared_blks_read`, `shared_blks_written`, `temp_blks_read`, `temp_blks_written`. To enable it, you will need the following configuration
78+
```
79+
...
80+
top_query_collection:
81+
enabled: true
82+
...
83+
```
84+
85+
By default, top query collection is disabled, also note, to use it, you will need
86+
to create the extension to every database. Take the example from `testdata/integration/02-create-extension.sh`
87+
88+
```sql
89+
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
90+
```
91+
92+
The following options are available:
93+
- `enabled`: (optional, default=false) whether this collection is enabled.
94+
- `max_rows_per_query`: (optional, default=1000) The max number of rows would return from the query
95+
against `pg_stat_statements`.
96+
- `top_n_query`: (optional, default=1000) The maximum number of active queries to report (to the next consumer) in a single run.
97+
7198
### Example Configuration
7299

73100
```yaml
@@ -88,6 +115,9 @@ receivers:
88115
key_file: /home/otel/mypostgreskey.key
89116
query_sample_collection:
90117
enabled: false
118+
top_query_collection:
119+
enabled: true
120+
top_n_query: 1234
91121
```
92122
93123
The full list of settings exposed for this receiver are documented in [config.go](./config.go) with detailed sample configurations in [testdata/config.yaml](./testdata/config.yaml). TLS config is documented further under the [opentelemetry collector's configtls package](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md).

receiver/postgresqlreceiver/client.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ type client interface {
6565
listDatabases(ctx context.Context) ([]string, error)
6666
getVersion(ctx context.Context) (string, error)
6767
getQuerySamples(ctx context.Context, limit int64, logger *zap.Logger) ([]map[string]any, error)
68+
getTopQuery(ctx context.Context, limit int64, logger *zap.Logger) ([]map[string]any, error)
6869
}
6970

7071
type postgreSQLClient struct {
@@ -830,3 +831,114 @@ func (c *postgreSQLClient) getQuerySamples(ctx context.Context, limit int64, log
830831

831832
return finalAttributes, errors.Join(errs...)
832833
}
834+
835+
func convertMillisecondToSecond(column string, value string, logger *zap.Logger) (any, error) {
836+
result := float64(0)
837+
var err error
838+
if value != "" {
839+
result, err = strconv.ParseFloat(value, 64)
840+
if err != nil {
841+
logger.Error("failed to parse float", zap.String("column", column), zap.String("value", value), zap.Error(err))
842+
}
843+
}
844+
return result / 1000.0, err
845+
}
846+
847+
func convertToInt(column string, value string, logger *zap.Logger) (any, error) {
848+
result := 0
849+
var err error
850+
if value != "" {
851+
result, err = strconv.Atoi(value)
852+
if err != nil {
853+
logger.Error("failed to parse int", zap.String("column", column), zap.String("value", value), zap.Error(err))
854+
}
855+
}
856+
return int64(result), err
857+
}
858+
859+
//go:embed templates/topQueryTemplate.tmpl
860+
var topQueryTemplate string
861+
862+
// getTopQuery implements client.
863+
func (c *postgreSQLClient) getTopQuery(ctx context.Context, limit int64, logger *zap.Logger) ([]map[string]any, error) {
864+
tmpl := template.Must(template.New("topQuery").Option("missingkey=error").Parse(topQueryTemplate))
865+
buf := bytes.Buffer{}
866+
867+
// TODO: Only get query after the oldest query we got from the previous sample query colelction.
868+
// For instance, if from the last sample query we got queries executed between 8:00 ~ 8:15,
869+
// in this query, we should only gather query after 8:15
870+
if err := tmpl.Execute(&buf, map[string]any{
871+
"limit": limit,
872+
}); err != nil {
873+
logger.Error("failed to execute template", zap.Error(err))
874+
return []map[string]any{}, fmt.Errorf("failed executing template: %w", err)
875+
}
876+
877+
wrappedDb := sqlquery.NewDbClient(sqlquery.DbWrapper{Db: c.client}, buf.String(), logger, sqlquery.TelemetryConfig{})
878+
879+
rows, err := wrappedDb.QueryRows(ctx)
880+
if err != nil {
881+
if !errors.Is(err, sqlquery.ErrNullValueWarning) {
882+
logger.Error("failed getting log rows", zap.Error(err))
883+
return []map[string]any{}, fmt.Errorf("getTopQuery failed getting log rows: %w", err)
884+
}
885+
// in case the sql returned rows contains null value, we just log a warning and continue
886+
logger.Warn("problems encountered getting log rows", zap.Error(err))
887+
}
888+
889+
errs := make([]error, 0)
890+
finalAttributes := make([]map[string]any, 0)
891+
892+
for _, row := range rows {
893+
hasConvention := map[string]string{
894+
"datname": "db.namespace",
895+
"query": "db.query.text",
896+
}
897+
898+
needConversion := map[string]func(string, string, *zap.Logger) (any, error){
899+
callsColumnName: convertToInt,
900+
rowsColumnName: convertToInt,
901+
sharedBlksDirtiedColumnName: convertToInt,
902+
sharedBlksHitColumnName: convertToInt,
903+
sharedBlksReadColumnName: convertToInt,
904+
sharedBlksWrittenColumnName: convertToInt,
905+
tempBlksReadColumnName: convertToInt,
906+
tempBlksWrittenColumnName: convertToInt,
907+
totalExecTimeColumnName: convertMillisecondToSecond,
908+
totalPlanTimeColumnName: convertMillisecondToSecond,
909+
"query": func(_ string, val string, logger *zap.Logger) (any, error) {
910+
// TODO: check if it is truncated.
911+
result, err := obfuscateSQL(val)
912+
if err != nil {
913+
logger.Error("failed to obfuscate query", zap.String("query", val))
914+
return "", err
915+
}
916+
return result, nil
917+
},
918+
}
919+
currentAttributes := make(map[string]any)
920+
921+
for col := range row {
922+
var val any
923+
var err error
924+
converter, ok := needConversion[col]
925+
if ok {
926+
val, err = converter(col, row[col], logger)
927+
if err != nil {
928+
logger.Warn("failed to convert column to int", zap.String("column", col), zap.Error(err))
929+
errs = append(errs, err)
930+
}
931+
} else {
932+
val = row[col]
933+
}
934+
if hasConvention[col] != "" {
935+
currentAttributes[hasConvention[col]] = val
936+
} else {
937+
currentAttributes[dbAttributePrefix+col] = val
938+
}
939+
}
940+
finalAttributes = append(finalAttributes, currentAttributes)
941+
}
942+
943+
return finalAttributes, errors.Join(errs...)
944+
}

receiver/postgresqlreceiver/config.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@ const (
2727
ErrHostPort = "invalid config: 'endpoint' must be in the form <host>:<port> no matter what 'transport' is configured"
2828
)
2929

30+
type TopQueryCollection struct {
31+
Enabled bool `mapstructure:"enabled"`
32+
MaxRowsPerQuery int64 `mapstructure:"max_rows_per_query"`
33+
TopNQuery int64 `mapstructure:"top_n_query"`
34+
}
35+
3036
type QuerySampleCollection struct {
3137
Enabled bool `mapstructure:"enabled"`
3238
MaxRowsPerQuery int64 `mapstructure:"max_rows_per_query"`
@@ -43,6 +49,7 @@ type Config struct {
4349
ConnectionPool `mapstructure:"connection_pool,omitempty"`
4450
metadata.MetricsBuilderConfig `mapstructure:",squash"`
4551
QuerySampleCollection `mapstructure:"query_sample_collection,omitempty"`
52+
TopQueryCollection `mapstructure:"top_query_collection,omitempty"`
4653
}
4754

4855
type ConnectionPool struct {

receiver/postgresqlreceiver/config_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,9 @@ func TestLoadConfig(t *testing.T) {
129129
expected.Endpoint = "localhost:5432"
130130
expected.Username = "otel"
131131
expected.Password = "${env:POSTGRESQL_PASSWORD}"
132-
//nolint:staticcheck
133132
expected.QuerySampleCollection.Enabled = true
133+
expected.TopNQuery = 1234
134+
expected.TopQueryCollection.Enabled = true
134135
require.Equal(t, expected, cfg)
135136
})
136137

receiver/postgresqlreceiver/consts.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package postgresqlreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/postgresqlreceiver"
5+
6+
const (
7+
dbAttributePrefix = "postgresql."
8+
queryidColumnName = "queryid"
9+
totalExecTimeColumnName = "total_exec_time"
10+
totalPlanTimeColumnName = "total_plan_time"
11+
callsColumnName = "calls"
12+
rowsColumnName = "rows"
13+
sharedBlksDirtiedColumnName = "shared_blks_dirtied"
14+
sharedBlksHitColumnName = "shared_blks_hit"
15+
sharedBlksReadColumnName = "shared_blks_read"
16+
sharedBlksWrittenColumnName = "shared_blks_written"
17+
tempBlksReadColumnName = "temp_blks_read"
18+
tempBlksWrittenColumnName = "temp_blks_written"
19+
)

receiver/postgresqlreceiver/factory.go

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"time"
99

10+
lru "github.com/hashicorp/golang-lru/v2"
1011
"go.opentelemetry.io/collector/component"
1112
"go.opentelemetry.io/collector/config/confignet"
1213
"go.opentelemetry.io/collector/config/configtls"
@@ -19,6 +20,18 @@ import (
1920
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/postgresqlreceiver/internal/metadata"
2021
)
2122

23+
// newCache creates a new cache with the given size.
24+
// If the size is less or equal to 0, it will be set to 1.
25+
// It will never return an error.
26+
func newCache(size int) *lru.Cache[string, float64] {
27+
if size <= 0 {
28+
size = 1
29+
}
30+
// lru will only returns error when the size is less than 0
31+
cache, _ := lru.New[string, float64](size)
32+
return cache
33+
}
34+
2235
func NewFactory() receiver.Factory {
2336
return receiver.NewFactory(
2437
metadata.Type,
@@ -47,6 +60,11 @@ func createDefaultConfig() component.Config {
4760
Enabled: false,
4861
MaxRowsPerQuery: 1000,
4962
},
63+
TopQueryCollection: TopQueryCollection{
64+
Enabled: false,
65+
TopNQuery: 1000,
66+
MaxRowsPerQuery: 1000,
67+
},
5068
}
5169
}
5270

@@ -65,7 +83,7 @@ func createMetricsReceiver(
6583
clientFactory = newDefaultClientFactory(cfg)
6684
}
6785

68-
ns := newPostgreSQLScraper(params, cfg, clientFactory)
86+
ns := newPostgreSQLScraper(params, cfg, clientFactory, newCache(1))
6987
s, err := scraper.NewMetrics(ns.scrape, scraper.WithShutdown(ns.shutdown))
7088
if err != nil {
7189
return nil, err
@@ -93,14 +111,31 @@ func createLogsReceiver(
93111
clientFactory = newDefaultClientFactory(cfg)
94112
}
95113

96-
ns := newPostgreSQLScraper(params, cfg, clientFactory)
97-
98114
opts := make([]scraperhelper.ControllerOption, 0)
99115

100-
//nolint:staticcheck
101116
if cfg.QuerySampleCollection.Enabled {
117+
// query sample collection does not need cache, but we do not want to make it
118+
// nil, so create one size 1 cache as a placeholder.
119+
ns := newPostgreSQLScraper(params, cfg, clientFactory, newCache(1))
120+
s, err := scraper.NewLogs(func(ctx context.Context) (plog.Logs, error) {
121+
return ns.scrapeQuerySamples(ctx, cfg.QuerySampleCollection.MaxRowsPerQuery)
122+
}, scraper.WithShutdown(ns.shutdown))
123+
if err != nil {
124+
return nil, err
125+
}
126+
opt := scraperhelper.AddFactoryWithConfig(
127+
scraper.NewFactory(metadata.Type, nil,
128+
scraper.WithLogs(func(context.Context, scraper.Settings, component.Config) (scraper.Logs, error) {
129+
return s, nil
130+
}, component.StabilityLevelAlpha)), nil)
131+
opts = append(opts, opt)
132+
}
133+
134+
if cfg.TopQueryCollection.Enabled {
135+
// we have 10 updated only attributes. so we set the cache size accordingly.
136+
ns := newPostgreSQLScraper(params, cfg, clientFactory, newCache(int(cfg.TopNQuery*10*2)))
102137
s, err := scraper.NewLogs(func(ctx context.Context) (plog.Logs, error) {
103-
return ns.scrapeQuerySamples(ctx, cfg.MaxRowsPerQuery)
138+
return ns.scrapeTopQuery(ctx, cfg.TopQueryCollection.MaxRowsPerQuery, cfg.TopNQuery)
104139
}, scraper.WithShutdown(ns.shutdown))
105140
if err != nil {
106141
return nil, err

receiver/postgresqlreceiver/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ require (
66
github.com/DATA-DOG/go-sqlmock v1.5.2
77
github.com/DataDog/datadog-agent/pkg/obfuscate v0.64.3
88
github.com/google/go-cmp v0.7.0
9+
github.com/hashicorp/golang-lru/v2 v2.0.7
910
github.com/lib/pq v1.10.9
1011
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.125.0
1112
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.125.0

receiver/postgresqlreceiver/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)