Skip to content

[receiver/postgresql] add basic query sample collection for postgresqlreceiver #39573

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 29, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/postgresql-add-top-query-collection.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: postgresqlreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: we introduced the top query collection to postgresql receiver. this will collect the queries from `pg_stat_statements` and report the related metrics. this will help the end user to identify which queries are most executed, and resources most used.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [39573]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
30 changes: 30 additions & 0 deletions receiver/postgresqlreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,33 @@ to grant the user you are using `pg_monitor`. Take the example from `testdata/in
GRANT pg_monitor TO otelu;
```

The following options are available:
- `enabled`: (optional, default=false) whether this collection is enabled.
- `max_rows_per_query`: (optional, default=1000) The max number of rows would return from the query
against `pg_stat_activity`.

### Top Query Collection
We provide functionality to collect the most executed queries from postgresql. It will get data from `pg_stat_statements` and report incremental value of `total_exec_time`, `total_plan_time`, `calls`, `rows`, `shared_blks_dirtied`, `shared_blks_hit`, `shared_blks_read`, `shared_blks_written`, `temp_blks_read`, `temp_blks_written`. To enable it, you will need the following configuration
```
...
top_query_collection:
enabled: true
...
```

By default, top query collection is disabled, also note, to use it, you will need
to create the extension to every database. Take the example from `testdata/integration/02-create-extension.sh`

```sql
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
```

The following options are available:
- `enabled`: (optional, default=false) whether this collection is enabled.
- `max_rows_per_query`: (optional, default=1000) The max number of rows would return from the query
against `pg_stat_statements`.
- `top_n_query`: (optional, default=1000) The maximum number of active queries to report (to the next consumer) in a single run.

### Example Configuration

```yaml
Expand All @@ -88,6 +115,9 @@ receivers:
key_file: /home/otel/mypostgreskey.key
query_sample_collection:
enabled: false
top_query_collection:
enabled: true
top_n_query: 1234
```
The full list of settings exposed for this receiver are documented in [config.go](./config.go) with detailed sample configurations in [testdata/config.yaml](./testdata/config.yaml). TLS config is documented further under the [opentelemetry collector's configtls package](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md).
Expand Down
112 changes: 112 additions & 0 deletions receiver/postgresqlreceiver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type client interface {
listDatabases(ctx context.Context) ([]string, error)
getVersion(ctx context.Context) (string, error)
getQuerySamples(ctx context.Context, limit int64, logger *zap.Logger) ([]map[string]any, error)
getTopQuery(ctx context.Context, limit int64, logger *zap.Logger) ([]map[string]any, error)
}

type postgreSQLClient struct {
Expand Down Expand Up @@ -830,3 +831,114 @@ func (c *postgreSQLClient) getQuerySamples(ctx context.Context, limit int64, log

return finalAttributes, errors.Join(errs...)
}

func convertMillisecondToSecond(column string, value string, logger *zap.Logger) (any, error) {
result := float64(0)
var err error
if value != "" {
result, err = strconv.ParseFloat(value, 64)
if err != nil {
logger.Error("failed to parse float", zap.String("column", column), zap.String("value", value), zap.Error(err))
}
}
return result / 1000.0, err
}

func convertToInt(column string, value string, logger *zap.Logger) (any, error) {
result := 0
var err error
if value != "" {
result, err = strconv.Atoi(value)
if err != nil {
logger.Error("failed to parse int", zap.String("column", column), zap.String("value", value), zap.Error(err))
}
}
return int64(result), err
}

//go:embed templates/topQueryTemplate.tmpl
var topQueryTemplate string

// getTopQuery implements client.
func (c *postgreSQLClient) getTopQuery(ctx context.Context, limit int64, logger *zap.Logger) ([]map[string]any, error) {
tmpl := template.Must(template.New("topQuery").Option("missingkey=error").Parse(topQueryTemplate))
buf := bytes.Buffer{}

// TODO: Only get query after the oldest query we got from the previous sample query colelction.
// For instance, if from the last sample query we got queries executed between 8:00 ~ 8:15,
// in this query, we should only gather query after 8:15
if err := tmpl.Execute(&buf, map[string]any{
"limit": limit,
}); err != nil {
logger.Error("failed to execute template", zap.Error(err))
return []map[string]any{}, fmt.Errorf("failed executing template: %w", err)
}

wrappedDb := sqlquery.NewDbClient(sqlquery.DbWrapper{Db: c.client}, buf.String(), logger, sqlquery.TelemetryConfig{})

rows, err := wrappedDb.QueryRows(ctx)
if err != nil {
if !errors.Is(err, sqlquery.ErrNullValueWarning) {
logger.Error("failed getting log rows", zap.Error(err))
return []map[string]any{}, fmt.Errorf("getTopQuery failed getting log rows: %w", err)
}
// in case the sql returned rows contains null value, we just log a warning and continue
logger.Warn("problems encountered getting log rows", zap.Error(err))
}

errs := make([]error, 0)
finalAttributes := make([]map[string]any, 0)

for _, row := range rows {
hasConvention := map[string]string{
"datname": "db.namespace",
"query": "db.query.text",
}

needConversion := map[string]func(string, string, *zap.Logger) (any, error){
CallsColumnName: convertToInt,
RowsColumnName: convertToInt,
SharedBlksDirtiedColumnName: convertToInt,
SharedBlksHitColumnName: convertToInt,
SharedBlksReadColumnName: convertToInt,
SharedBlksWrittenColumnName: convertToInt,
TempBlksReadColumnName: convertToInt,
TempBlksWrittenColumnName: convertToInt,
TotalExecTimeColumnName: convertMillisecondToSecond,
TotalPlanTimeColumnName: convertMillisecondToSecond,
"query": func(_ string, val string, logger *zap.Logger) (any, error) {
// TODO: check if it is truncated.
result, err := obfuscateSQL(val)
if err != nil {
logger.Error("failed to obfuscate query", zap.String("query", val))
return "", err
}
return result, nil
},
}
currentAttributes := make(map[string]any)

for col := range row {
var val any
var err error
converter, ok := needConversion[col]
if ok {
val, err = converter(col, row[col], logger)
if err != nil {
logger.Warn("failed to convert column to int", zap.String("column", col), zap.Error(err))
errs = append(errs, err)
}
} else {
val = row[col]
}
if hasConvention[col] != "" {
currentAttributes[hasConvention[col]] = val
} else {
currentAttributes[DbAttributePrefix+col] = val
}
}
finalAttributes = append(finalAttributes, currentAttributes)
}

return finalAttributes, errors.Join(errs...)
}
7 changes: 7 additions & 0 deletions receiver/postgresqlreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ const (
ErrHostPort = "invalid config: 'endpoint' must be in the form <host>:<port> no matter what 'transport' is configured"
)

type TopQueryCollection struct {
Enabled bool `mapstructure:"enabled"`
MaxRowsPerQuery int64 `mapstructure:"max_rows_per_query"`
TopNQuery int64 `mapstructure:"top_n_query"`
}

type QuerySampleCollection struct {
Enabled bool `mapstructure:"enabled"`
MaxRowsPerQuery int64 `mapstructure:"max_rows_per_query"`
Expand All @@ -43,6 +49,7 @@ type Config struct {
ConnectionPool `mapstructure:"connection_pool,omitempty"`
metadata.MetricsBuilderConfig `mapstructure:",squash"`
QuerySampleCollection `mapstructure:"query_sample_collection,omitempty"`
TopQueryCollection `mapstructure:"top_query_collection,omitempty"`
}

type ConnectionPool struct {
Expand Down
3 changes: 2 additions & 1 deletion receiver/postgresqlreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,9 @@ func TestLoadConfig(t *testing.T) {
expected.Endpoint = "localhost:5432"
expected.Username = "otel"
expected.Password = "${env:POSTGRESQL_PASSWORD}"
//nolint:staticcheck
expected.QuerySampleCollection.Enabled = true
expected.TopNQuery = 1234
expected.TopQueryCollection.Enabled = true
require.Equal(t, expected, cfg)
})

Expand Down
19 changes: 19 additions & 0 deletions receiver/postgresqlreceiver/consts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package postgresqlreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/postgresqlreceiver"

const (
DbAttributePrefix = "postgresql."
QueryidColumnName = "queryid"
TotalExecTimeColumnName = "total_exec_time"
TotalPlanTimeColumnName = "total_plan_time"
CallsColumnName = "calls"
RowsColumnName = "rows"
SharedBlksDirtiedColumnName = "shared_blks_dirtied"
SharedBlksHitColumnName = "shared_blks_hit"
SharedBlksReadColumnName = "shared_blks_read"
SharedBlksWrittenColumnName = "shared_blks_written"
TempBlksReadColumnName = "temp_blks_read"
TempBlksWrittenColumnName = "temp_blks_written"
)
45 changes: 40 additions & 5 deletions receiver/postgresqlreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"time"

lru "github.com/hashicorp/golang-lru/v2"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configtls"
Expand All @@ -19,6 +20,18 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/postgresqlreceiver/internal/metadata"
)

// newCache creates a new cache with the given size.
// If the size is less or equal to 0, it will be set to 1.
// It will never return an error.
func newCache(size int) *lru.Cache[string, float64] {
if size <= 0 {
size = 1
}
// lru will only returns error when the size is less than 0
cache, _ := lru.New[string, float64](size)
return cache
}

func NewFactory() receiver.Factory {
return receiver.NewFactory(
metadata.Type,
Expand Down Expand Up @@ -47,6 +60,11 @@ func createDefaultConfig() component.Config {
Enabled: false,
MaxRowsPerQuery: 1000,
},
TopQueryCollection: TopQueryCollection{
Enabled: false,
TopNQuery: 1000,
MaxRowsPerQuery: 1000,
},
}
}

Expand All @@ -65,7 +83,7 @@ func createMetricsReceiver(
clientFactory = newDefaultClientFactory(cfg)
}

ns := newPostgreSQLScraper(params, cfg, clientFactory)
ns := newPostgreSQLScraper(params, cfg, clientFactory, newCache(1))
s, err := scraper.NewMetrics(ns.scrape, scraper.WithShutdown(ns.shutdown))
if err != nil {
return nil, err
Expand Down Expand Up @@ -93,14 +111,31 @@ func createLogsReceiver(
clientFactory = newDefaultClientFactory(cfg)
}

ns := newPostgreSQLScraper(params, cfg, clientFactory)

opts := make([]scraperhelper.ControllerOption, 0)

//nolint:staticcheck
if cfg.QuerySampleCollection.Enabled {
// query sample collection does not need cache, but we do not want to make it
// nil, so create one size 1 cache as a placeholder.
ns := newPostgreSQLScraper(params, cfg, clientFactory, newCache(1))
s, err := scraper.NewLogs(func(ctx context.Context) (plog.Logs, error) {
return ns.scrapeQuerySamples(ctx, cfg.QuerySampleCollection.MaxRowsPerQuery)
}, scraper.WithShutdown(ns.shutdown))
if err != nil {
return nil, err
}
opt := scraperhelper.AddFactoryWithConfig(
scraper.NewFactory(metadata.Type, nil,
scraper.WithLogs(func(context.Context, scraper.Settings, component.Config) (scraper.Logs, error) {
return s, nil
}, component.StabilityLevelAlpha)), nil)
opts = append(opts, opt)
}

if cfg.TopQueryCollection.Enabled {
// we have 10 updated only attributes. so we set the cache size accordingly.
ns := newPostgreSQLScraper(params, cfg, clientFactory, newCache(int(cfg.TopNQuery*10*2)))
s, err := scraper.NewLogs(func(ctx context.Context) (plog.Logs, error) {
return ns.scrapeQuerySamples(ctx, cfg.MaxRowsPerQuery)
return ns.scrapeTopQuery(ctx, cfg.TopQueryCollection.MaxRowsPerQuery, cfg.TopNQuery)
}, scraper.WithShutdown(ns.shutdown))
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions receiver/postgresqlreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/DataDog/datadog-agent/pkg/obfuscate v0.64.3
github.com/google/go-cmp v0.7.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/lib/pq v1.10.9
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.124.1
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.124.1
Expand Down
2 changes: 2 additions & 0 deletions receiver/postgresqlreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading