From 017392a16d59f9a5c3248303f4ecca63339be16e Mon Sep 17 00:00:00 2001 From: Cuichen Li Date: Wed, 2 Apr 2025 17:34:16 +0800 Subject: [PATCH 1/3] add basic top query collection for postgresqlreceiver --- receiver/postgresqlreceiver/README.md | 30 ++++ receiver/postgresqlreceiver/client.go | 112 ++++++++++++ receiver/postgresqlreceiver/config.go | 7 + receiver/postgresqlreceiver/config_test.go | 3 +- receiver/postgresqlreceiver/consts.go | 19 +++ receiver/postgresqlreceiver/factory.go | 45 ++++- receiver/postgresqlreceiver/go.mod | 1 + receiver/postgresqlreceiver/go.sum | 2 + .../postgresqlreceiver/integration_test.go | 81 ++++++++- receiver/postgresqlreceiver/scraper.go | 160 +++++++++++++++++- receiver/postgresqlreceiver/scraper_test.go | 97 +++++++++-- .../templates/topQueryTemplate.tmpl | 22 +++ .../postgresqlreceiver/testdata/config.yaml | 3 + .../integration/{init.sql => 01-init.sql} | 4 + .../integration/02-create-extension.sh | 14 ++ .../scraper/query-sample/expected.yaml | 2 +- .../testdata/scraper/top-query/expected.yaml | 56 ++++++ .../scraper/top-query/expectedSql.sql | 22 +++ 18 files changed, 652 insertions(+), 28 deletions(-) create mode 100644 receiver/postgresqlreceiver/consts.go create mode 100644 receiver/postgresqlreceiver/templates/topQueryTemplate.tmpl rename receiver/postgresqlreceiver/testdata/integration/{init.sql => 01-init.sql} (92%) create mode 100644 receiver/postgresqlreceiver/testdata/integration/02-create-extension.sh create mode 100644 receiver/postgresqlreceiver/testdata/scraper/top-query/expected.yaml create mode 100644 receiver/postgresqlreceiver/testdata/scraper/top-query/expectedSql.sql diff --git a/receiver/postgresqlreceiver/README.md b/receiver/postgresqlreceiver/README.md index 05d54bb1dcbc7..74d69f37a261a 100644 --- a/receiver/postgresqlreceiver/README.md +++ b/receiver/postgresqlreceiver/README.md @@ -68,6 +68,33 @@ to grant the user you are using `pg_monitor`. Take the example from `testdata/in GRANT pg_monitor TO otelu; ``` +The following options are available: +- `enabled`: (optional, default=false) whether this collection is enabled. +- `max_rows_per_query`: (optional, default=1000) The max number of rows would return from the query +against `pg_stat_activity`. + +### Top Query Collection +We provide functionality to collect the most executed queries from postgresql. It will get data from `pg_stat_statements` and report incremental value of `total_exec_time`, `total_plan_time`, `calls`, `rows`, `shared_blks_dirtied`, `shared_blks_hit`, `shared_blks_read`, `shared_blks_written`, `temp_blks_read`, `temp_blks_written`. To enable it, you will need the following configuration +``` +... + top_query_collection: + enabled: true +... +``` + +By default, top query collection is disabled, also note, to use it, you will need +to create the extension to every database. Take the example from `testdata/integration/02-create-extension.sh` + +```sql +CREATE EXTENSION IF NOT EXISTS pg_stat_statements; +``` + +The following options are available: +- `enabled`: (optional, default=false) whether this collection is enabled. +- `max_rows_per_query`: (optional, default=1000) The max number of rows would return from the query +against `pg_stat_statements`. +- `top_n_query`: (optional, default=1000) The maximum number of active queries to report (to the next consumer) in a single run. + ### Example Configuration ```yaml @@ -88,6 +115,9 @@ receivers: key_file: /home/otel/mypostgreskey.key query_sample_collection: enabled: false + top_query_collection: + enabled: true + top_n_query: 1234 ``` The full list of settings exposed for this receiver are documented in [config.go](./config.go) with detailed sample configurations in [testdata/config.yaml](./testdata/config.yaml). TLS config is documented further under the [opentelemetry collector's configtls package](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md). diff --git a/receiver/postgresqlreceiver/client.go b/receiver/postgresqlreceiver/client.go index 7f33906611726..d342ca0fa6e25 100644 --- a/receiver/postgresqlreceiver/client.go +++ b/receiver/postgresqlreceiver/client.go @@ -65,6 +65,7 @@ type client interface { listDatabases(ctx context.Context) ([]string, error) getVersion(ctx context.Context) (string, error) getQuerySamples(ctx context.Context, limit int64, logger *zap.Logger) ([]map[string]any, error) + getTopQuery(ctx context.Context, limit int64, logger *zap.Logger) ([]map[string]any, error) } type postgreSQLClient struct { @@ -830,3 +831,114 @@ func (c *postgreSQLClient) getQuerySamples(ctx context.Context, limit int64, log return finalAttributes, errors.Join(errs...) } + +func convertMillisecondToSecond(column string, value string, logger *zap.Logger) (any, error) { + result := float64(0) + var err error + if value != "" { + result, err = strconv.ParseFloat(value, 64) + if err != nil { + logger.Error("failed to parse float", zap.String("column", column), zap.String("value", value), zap.Error(err)) + } + } + return result / 1000.0, err +} + +func convertToInt(column string, value string, logger *zap.Logger) (any, error) { + result := 0 + var err error + if value != "" { + result, err = strconv.Atoi(value) + if err != nil { + logger.Error("failed to parse int", zap.String("column", column), zap.String("value", value), zap.Error(err)) + } + } + return int64(result), err +} + +//go:embed templates/topQueryTemplate.tmpl +var topQueryTemplate string + +// getTopQuery implements client. +func (c *postgreSQLClient) getTopQuery(ctx context.Context, limit int64, logger *zap.Logger) ([]map[string]any, error) { + tmpl := template.Must(template.New("topQuery").Option("missingkey=error").Parse(topQueryTemplate)) + buf := bytes.Buffer{} + + // TODO: Only get query after the oldest query we got from the previous sample query colelction. + // For instance, if from the last sample query we got queries executed between 8:00 ~ 8:15, + // in this query, we should only gather query after 8:15 + if err := tmpl.Execute(&buf, map[string]any{ + "limit": limit, + }); err != nil { + logger.Error("failed to execute template", zap.Error(err)) + return []map[string]any{}, fmt.Errorf("failed executing template: %w", err) + } + + wrappedDb := sqlquery.NewDbClient(sqlquery.DbWrapper{Db: c.client}, buf.String(), logger, sqlquery.TelemetryConfig{}) + + rows, err := wrappedDb.QueryRows(ctx) + if err != nil { + if !errors.Is(err, sqlquery.ErrNullValueWarning) { + logger.Error("failed getting log rows", zap.Error(err)) + return []map[string]any{}, fmt.Errorf("getTopQuery failed getting log rows: %w", err) + } + // in case the sql returned rows contains null value, we just log a warning and continue + logger.Warn("problems encountered getting log rows", zap.Error(err)) + } + + errs := make([]error, 0) + finalAttributes := make([]map[string]any, 0) + + for _, row := range rows { + hasConvention := map[string]string{ + "datname": "db.namespace", + "query": "db.query.text", + } + + needConversion := map[string]func(string, string, *zap.Logger) (any, error){ + CallsColumnName: convertToInt, + RowsColumnName: convertToInt, + SharedBlksDirtiedColumnName: convertToInt, + SharedBlksHitColumnName: convertToInt, + SharedBlksReadColumnName: convertToInt, + SharedBlksWrittenColumnName: convertToInt, + TempBlksReadColumnName: convertToInt, + TempBlksWrittenColumnName: convertToInt, + TotalExecTimeColumnName: convertMillisecondToSecond, + TotalPlanTimeColumnName: convertMillisecondToSecond, + "query": func(_ string, val string, logger *zap.Logger) (any, error) { + // TODO: check if it is truncated. + result, err := obfuscateSQL(val) + if err != nil { + logger.Error("failed to obfuscate query", zap.String("query", val)) + return "", err + } + return result, nil + }, + } + currentAttributes := make(map[string]any) + + for col := range row { + var val any + var err error + converter, ok := needConversion[col] + if ok { + val, err = converter(col, row[col], logger) + if err != nil { + logger.Warn("failed to convert column to int", zap.String("column", col), zap.Error(err)) + errs = append(errs, err) + } + } else { + val = row[col] + } + if hasConvention[col] != "" { + currentAttributes[hasConvention[col]] = val + } else { + currentAttributes[DbAttributePrefix+col] = val + } + } + finalAttributes = append(finalAttributes, currentAttributes) + } + + return finalAttributes, errors.Join(errs...) +} diff --git a/receiver/postgresqlreceiver/config.go b/receiver/postgresqlreceiver/config.go index a04d788d785cc..f18701e7d8fb4 100644 --- a/receiver/postgresqlreceiver/config.go +++ b/receiver/postgresqlreceiver/config.go @@ -27,6 +27,12 @@ const ( ErrHostPort = "invalid config: 'endpoint' must be in the form : no matter what 'transport' is configured" ) +type TopQueryCollection struct { + Enabled bool `mapstructure:"enabled"` + MaxRowsPerQuery int64 `mapstructure:"max_rows_per_query"` + TopNQuery int64 `mapstructure:"top_n_query"` +} + type QuerySampleCollection struct { Enabled bool `mapstructure:"enabled"` MaxRowsPerQuery int64 `mapstructure:"max_rows_per_query"` @@ -43,6 +49,7 @@ type Config struct { ConnectionPool `mapstructure:"connection_pool,omitempty"` metadata.MetricsBuilderConfig `mapstructure:",squash"` QuerySampleCollection `mapstructure:"query_sample_collection,omitempty"` + TopQueryCollection `mapstructure:"top_query_collection,omitempty"` } type ConnectionPool struct { diff --git a/receiver/postgresqlreceiver/config_test.go b/receiver/postgresqlreceiver/config_test.go index aff6676161c97..50edecda696fc 100644 --- a/receiver/postgresqlreceiver/config_test.go +++ b/receiver/postgresqlreceiver/config_test.go @@ -129,8 +129,9 @@ func TestLoadConfig(t *testing.T) { expected.Endpoint = "localhost:5432" expected.Username = "otel" expected.Password = "${env:POSTGRESQL_PASSWORD}" - //nolint:staticcheck expected.QuerySampleCollection.Enabled = true + expected.TopNQuery = 1234 + expected.TopQueryCollection.Enabled = true require.Equal(t, expected, cfg) }) diff --git a/receiver/postgresqlreceiver/consts.go b/receiver/postgresqlreceiver/consts.go new file mode 100644 index 0000000000000..5b1e40e0e4943 --- /dev/null +++ b/receiver/postgresqlreceiver/consts.go @@ -0,0 +1,19 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package postgresqlreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/postgresqlreceiver" + +const ( + DbAttributePrefix = "postgresql." + QueryidColumnName = "queryid" + TotalExecTimeColumnName = "total_exec_time" + TotalPlanTimeColumnName = "total_plan_time" + CallsColumnName = "calls" + RowsColumnName = "rows" + SharedBlksDirtiedColumnName = "shared_blks_dirtied" + SharedBlksHitColumnName = "shared_blks_hit" + SharedBlksReadColumnName = "shared_blks_read" + SharedBlksWrittenColumnName = "shared_blks_written" + TempBlksReadColumnName = "temp_blks_read" + TempBlksWrittenColumnName = "temp_blks_written" +) diff --git a/receiver/postgresqlreceiver/factory.go b/receiver/postgresqlreceiver/factory.go index e2f74c0e87b93..9cfc7a7b1c928 100644 --- a/receiver/postgresqlreceiver/factory.go +++ b/receiver/postgresqlreceiver/factory.go @@ -7,6 +7,7 @@ import ( "context" "time" + lru "github.com/hashicorp/golang-lru/v2" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/config/configtls" @@ -19,6 +20,18 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/postgresqlreceiver/internal/metadata" ) +// newCache creates a new cache with the given size. +// If the size is less or equal to 0, it will be set to 1. +// It will never return an error. +func newCache(size int) *lru.Cache[string, float64] { + if size <= 0 { + size = 1 + } + // lru will only returns error when the size is less than 0 + cache, _ := lru.New[string, float64](size) + return cache +} + func NewFactory() receiver.Factory { return receiver.NewFactory( metadata.Type, @@ -47,6 +60,11 @@ func createDefaultConfig() component.Config { Enabled: false, MaxRowsPerQuery: 1000, }, + TopQueryCollection: TopQueryCollection{ + Enabled: false, + TopNQuery: 1000, + MaxRowsPerQuery: 1000, + }, } } @@ -65,7 +83,7 @@ func createMetricsReceiver( clientFactory = newDefaultClientFactory(cfg) } - ns := newPostgreSQLScraper(params, cfg, clientFactory) + ns := newPostgreSQLScraper(params, cfg, clientFactory, newCache(1)) s, err := scraper.NewMetrics(ns.scrape, scraper.WithShutdown(ns.shutdown)) if err != nil { return nil, err @@ -93,14 +111,31 @@ func createLogsReceiver( clientFactory = newDefaultClientFactory(cfg) } - ns := newPostgreSQLScraper(params, cfg, clientFactory) - opts := make([]scraperhelper.ControllerOption, 0) - //nolint:staticcheck if cfg.QuerySampleCollection.Enabled { + // query sample collection does not need cache, but we do not want to make it + // nil, so create one size 1 cache as a placeholder. + ns := newPostgreSQLScraper(params, cfg, clientFactory, newCache(1)) + s, err := scraper.NewLogs(func(ctx context.Context) (plog.Logs, error) { + return ns.scrapeQuerySamples(ctx, cfg.QuerySampleCollection.MaxRowsPerQuery) + }, scraper.WithShutdown(ns.shutdown)) + if err != nil { + return nil, err + } + opt := scraperhelper.AddFactoryWithConfig( + scraper.NewFactory(metadata.Type, nil, + scraper.WithLogs(func(context.Context, scraper.Settings, component.Config) (scraper.Logs, error) { + return s, nil + }, component.StabilityLevelAlpha)), nil) + opts = append(opts, opt) + } + + if cfg.TopQueryCollection.Enabled { + // we have 10 updated only attributes. so we set the cache size accordingly. + ns := newPostgreSQLScraper(params, cfg, clientFactory, newCache(int(cfg.TopNQuery*10*2))) s, err := scraper.NewLogs(func(ctx context.Context) (plog.Logs, error) { - return ns.scrapeQuerySamples(ctx, cfg.MaxRowsPerQuery) + return ns.scrapeTopQuery(ctx, cfg.TopQueryCollection.MaxRowsPerQuery, cfg.TopNQuery) }, scraper.WithShutdown(ns.shutdown)) if err != nil { return nil, err diff --git a/receiver/postgresqlreceiver/go.mod b/receiver/postgresqlreceiver/go.mod index cc77b19f8e9bc..f46c29f5f8b75 100644 --- a/receiver/postgresqlreceiver/go.mod +++ b/receiver/postgresqlreceiver/go.mod @@ -6,6 +6,7 @@ require ( github.com/DATA-DOG/go-sqlmock v1.5.2 github.com/DataDog/datadog-agent/pkg/obfuscate v0.64.3 github.com/google/go-cmp v0.7.0 + github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/lib/pq v1.10.9 github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.124.1 github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.124.1 diff --git a/receiver/postgresqlreceiver/go.sum b/receiver/postgresqlreceiver/go.sum index 9e4f1dab123dc..358ce660ad165 100644 --- a/receiver/postgresqlreceiver/go.sum +++ b/receiver/postgresqlreceiver/go.sum @@ -174,6 +174,8 @@ github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/C github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= diff --git a/receiver/postgresqlreceiver/integration_test.go b/receiver/postgresqlreceiver/integration_test.go index 4c44d93121791..1066c10d3d272 100644 --- a/receiver/postgresqlreceiver/integration_test.go +++ b/receiver/postgresqlreceiver/integration_test.go @@ -76,8 +76,8 @@ func integrationTest(name string, databases []string, pgVersion string) func(*te "POSTGRES_DB": "otel", }, Files: []testcontainers.ContainerFile{{ - HostFilePath: filepath.Join("testdata", "integration", "init.sql"), - ContainerFilePath: "/docker-entrypoint-initdb.d/init.sql", + HostFilePath: filepath.Join("testdata", "integration", "01-init.sql"), + ContainerFilePath: "/docker-entrypoint-initdb.d/01-init.sql", FileMode: 700, }}, ExposedPorts: []string{postgresqlPort}, @@ -130,12 +130,23 @@ func TestScrapeLogsFromContainer(t *testing.T) { "POSTGRES_PASSWORD": "otel", "POSTGRES_DB": "otel", }, - Files: []testcontainers.ContainerFile{{ - HostFilePath: filepath.Join("testdata", "integration", "init.sql"), - ContainerFilePath: "/docker-entrypoint-initdb.d/init.sql", - FileMode: 700, - }}, + Files: []testcontainers.ContainerFile{ + { + HostFilePath: filepath.Join("testdata", "integration", "01-init.sql"), + ContainerFilePath: "/docker-entrypoint-initdb.d/01-init.sql", + FileMode: 700, + }, + { + HostFilePath: filepath.Join("testdata", "integration", "02-create-extension.sh"), + ContainerFilePath: "/docker-entrypoint-initdb.d/02-create-extension.sh", + FileMode: 700, + }, + }, ExposedPorts: []string{postgresqlPort}, + Cmd: []string{ + "-c", + "shared_preload_libraries=pg_stat_statements", + }, WaitingFor: wait.ForListeningPort(postgresqlPort). WithStartupTimeout(2 * time.Minute), }, @@ -153,7 +164,7 @@ func TestScrapeLogsFromContainer(t *testing.T) { _, err = db.Query("Select * from test2 where id = 67") assert.NoError(t, err) - db.Close() + defer db.Close() cfg := Config{ Databases: []string{"postgres"}, @@ -171,6 +182,9 @@ func TestScrapeLogsFromContainer(t *testing.T) { QuerySampleCollection: QuerySampleCollection{ Enabled: true, }, + TopQueryCollection: TopQueryCollection{ + Enabled: true, + }, } clientFactory := newDefaultClientFactory(&cfg) @@ -178,7 +192,7 @@ func TestScrapeLogsFromContainer(t *testing.T) { TelemetrySettings: component.TelemetrySettings{ Logger: zap.Must(zap.NewProduction()), }, - }, &cfg, clientFactory) + }, &cfg, clientFactory, newCache(1)) plogs, err := ns.scrapeQuerySamples(context.Background(), 30) assert.NoError(t, err) logRecords := plogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords() @@ -198,4 +212,53 @@ func TestScrapeLogsFromContainer(t *testing.T) { found = true } assert.True(t, found, "Expected to find a log record with the query text") + + firstTimeTopQueryPLogs, err := ns.scrapeTopQuery(context.Background(), 30, 30) + assert.NoError(t, err) + logRecords = firstTimeTopQueryPLogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords() + found = false + for _, record := range logRecords.All() { + attributes := record.Attributes().AsRaw() + queryAttribute, ok := attributes["db.query.text"] + query := strings.ToLower(queryAttribute.(string)) + assert.True(t, ok) + if !strings.HasPrefix(query, "select * from test2 where") { + continue + } + assert.Equal(t, "select * from test2 where id = ?", query) + databaseAttribute, ok := attributes["db.namespace"] + assert.True(t, ok) + assert.Equal(t, "otel2", databaseAttribute.(string)) + calls, ok := attributes["postgresql.calls"] + assert.True(t, ok) + assert.Equal(t, int64(1), calls.(int64)) + found = true + } + assert.True(t, found, "Expected to find a log record with the query text from the first time top query") + + _, err = db.Query("Select * from test2 where id = 67") + assert.NoError(t, err) + + secondTimeTopQueryPLogs, err := ns.scrapeTopQuery(context.Background(), 30, 30) + assert.NoError(t, err) + logRecords = secondTimeTopQueryPLogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords() + found = false + for _, record := range logRecords.All() { + attributes := record.Attributes().AsRaw() + queryAttribute, ok := attributes["db.query.text"] + query := strings.ToLower(queryAttribute.(string)) + assert.True(t, ok) + if !strings.HasPrefix(query, "select * from test2 where") { + continue + } + assert.Equal(t, "select * from test2 where id = ?", query) + databaseAttribute, ok := attributes["db.namespace"] + assert.True(t, ok) + assert.Equal(t, "otel2", databaseAttribute.(string)) + calls, ok := attributes["postgresql.calls"] + assert.True(t, ok) + assert.Equal(t, int64(2), calls.(int64)) + found = true + } + assert.True(t, found, "Expected to find a log record with the query text from the first time top query") } diff --git a/receiver/postgresqlreceiver/scraper.go b/receiver/postgresqlreceiver/scraper.go index 2827ba9fa8685..18f89f20421f3 100644 --- a/receiver/postgresqlreceiver/scraper.go +++ b/receiver/postgresqlreceiver/scraper.go @@ -4,12 +4,14 @@ package postgresqlreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/postgresqlreceiver" import ( + "container/heap" "context" "errors" "fmt" "sync" "time" + lru "github.com/hashicorp/golang-lru/v2" "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" @@ -41,7 +43,7 @@ type postgreSQLScraper struct { clientFactory postgreSQLClientFactory mb *metadata.MetricsBuilder excludes map[string]struct{} - + cache *lru.Cache[string, float64] // if enabled, uses a separated attribute for the schema separateSchemaAttr bool } @@ -73,6 +75,7 @@ func newPostgreSQLScraper( settings receiver.Settings, config *Config, clientFactory postgreSQLClientFactory, + cache *lru.Cache[string, float64], ) *postgreSQLScraper { excludes := make(map[string]struct{}) for _, db := range config.ExcludeDatabases { @@ -92,6 +95,7 @@ func newPostgreSQLScraper( clientFactory: clientFactory, mb: metadata.NewMetricsBuilder(config.MetricsBuilderConfig, settings), excludes: excludes, + cache: cache, separateSchemaAttr: separateSchemaAttr, } @@ -189,6 +193,31 @@ func (p *postgreSQLScraper) scrapeQuerySamples(ctx context.Context, maxRowsPerQu return logs, nil } +func (p *postgreSQLScraper) scrapeTopQuery(ctx context.Context, maxRowsPerQuery int64, topNQuery int64) (plog.Logs, error) { + logs := plog.NewLogs() + resourceLog := logs.ResourceLogs().AppendEmpty() + + scopedLog := resourceLog.ScopeLogs().AppendEmpty() + scopedLog.Scope().SetName(metadata.ScopeName) + scopedLog.Scope().SetVersion("0.0.1") + + dbClient, err := p.clientFactory.getClient(defaultPostgreSQLDatabase) + if err != nil { + p.logger.Error("Failed to initialize connection to postgres", zap.Error(err)) + return logs, err + } + + var errs errsMux + + logRecords := scopedLog.LogRecords() + + p.collectTopQuery(ctx, dbClient, &logRecords, maxRowsPerQuery, topNQuery, &errs, p.logger) + + defer dbClient.Close() + + return logs, nil +} + func (p *postgreSQLScraper) collectQuerySamples(ctx context.Context, dbClient client, logRecords *plog.LogRecordSlice, limit int64, mux *errsMux, logger *zap.Logger) { timestamp := pcommon.NewTimestampFromTime(time.Now()) @@ -209,6 +238,96 @@ func (p *postgreSQLScraper) collectQuerySamples(ctx context.Context, dbClient cl } } +func (p *postgreSQLScraper) collectTopQuery(ctx context.Context, dbClient client, logRecords *plog.LogRecordSlice, limit int64, topNQuery int64, mux *errsMux, logger *zap.Logger) { + timestamp := pcommon.NewTimestampFromTime(time.Now()) + + rows, err := dbClient.getTopQuery(ctx, limit, logger) + if err != nil { + logger.Error("failed to get top query", zap.Error(err)) + mux.addPartial(err) + return + } + + type updatedOnlyInfo struct { + finalConverter func(float64) any + } + + convertToInt := func(f float64) any { + return int64(f) + } + + updatedOnly := map[string]updatedOnlyInfo{ + TotalExecTimeColumnName: {}, + TotalPlanTimeColumnName: {}, + RowsColumnName: {finalConverter: convertToInt}, + CallsColumnName: {finalConverter: convertToInt}, + SharedBlksDirtiedColumnName: {finalConverter: convertToInt}, + SharedBlksHitColumnName: {finalConverter: convertToInt}, + SharedBlksReadColumnName: {finalConverter: convertToInt}, + SharedBlksWrittenColumnName: {finalConverter: convertToInt}, + TempBlksReadColumnName: {finalConverter: convertToInt}, + TempBlksWrittenColumnName: {finalConverter: convertToInt}, + } + + pq := make(priorityQueue, 0) + + for i, row := range rows { + queryID := row[DbAttributePrefix+QueryidColumnName] + + if queryID == nil { + // this should not happen, but in case + logger.Error("queryid is nil", zap.Any("atts", row)) + mux.addPartial(errors.New("queryid is nil")) + continue + } + + for columnName, info := range updatedOnly { + var valInAtts float64 + _val := row[DbAttributePrefix+columnName] + if i, ok := _val.(int64); ok { + valInAtts = float64(i) + } else { + valInAtts = _val.(float64) + } + valInCache, exist := p.cache.Get(queryID.(string) + columnName) + valDelta := valInAtts + if exist { + valDelta = valInAtts - valInCache + } + finalValue := float64(0) + if valDelta > 0 { + p.cache.Add(queryID.(string)+columnName, valDelta) + finalValue = valDelta + } + if info.finalConverter != nil { + row[DbAttributePrefix+columnName] = info.finalConverter(finalValue) + } else { + row[DbAttributePrefix+columnName] = finalValue + } + } + item := Item{ + row: row, + priority: row[DbAttributePrefix+TotalExecTimeColumnName].(float64), + index: i, + } + pq.Push(&item) + } + + heap.Init(&pq) + for pq.Len() > 0 && logRecords.Len() < int(topNQuery) { + item := heap.Pop(&pq).(*Item) + record := logRecords.AppendEmpty() + record.SetTimestamp(timestamp) + record.SetEventName("top query") + if err := record.Attributes().FromRaw(item.row); err != nil { + mux.addPartial(err) + logger.Error("failed to read attributes from row", zap.Error(err)) + } + record.Attributes().PutStr("db.system.name", "postgresql") + record.Body().SetStr("top query") + } +} + func (p *postgreSQLScraper) shutdown(_ context.Context) error { if p.clientFactory != nil { p.clientFactory.close() @@ -513,3 +632,42 @@ func (p *postgreSQLScraper) retrieveBackends( r.activityMap = activityByDB r.Unlock() } + +// reference: https://pkg.go.dev/container/heap#example-package-priorityQueue + +type Item struct { + row map[string]any + priority float64 + index int +} + +type priorityQueue []*Item + +func (pq priorityQueue) Len() int { return len(pq) } + +func (pq priorityQueue) Less(i, j int) bool { + return pq[i].priority > pq[j].priority +} + +func (pq priorityQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +func (pq *priorityQueue) Push(x any) { + n := len(*pq) + item := x.(*Item) + item.index = n + *pq = append(*pq, item) +} + +func (pq *priorityQueue) Pop() any { + old := *pq + n := len(old) + item := old[n-1] + old[n-1] = nil // don't stop the GC from reclaiming the item eventually + item.index = -1 // for safety + *pq = old[0 : n-1] + return item +} diff --git a/receiver/postgresqlreceiver/scraper_test.go b/receiver/postgresqlreceiver/scraper_test.go index 7b53368599622..f0073a4b4de8a 100644 --- a/receiver/postgresqlreceiver/scraper_test.go +++ b/receiver/postgresqlreceiver/scraper_test.go @@ -33,7 +33,7 @@ func TestUnsuccessfulScrape(t *testing.T) { cfg := factory.CreateDefaultConfig().(*Config) cfg.Endpoint = "fake:11111" - scraper := newPostgreSQLScraper(receivertest.NewNopSettings(metadata.Type), cfg, newDefaultClientFactory(cfg)) + scraper := newPostgreSQLScraper(receivertest.NewNopSettings(metadata.Type), cfg, newDefaultClientFactory(cfg), newCache(1)) actualMetrics, err := scraper.scrape(context.Background()) require.Error(t, err) @@ -63,7 +63,7 @@ func TestScraper(t *testing.T) { cfg.Metrics.PostgresqlSequentialScans.Enabled = true cfg.Metrics.PostgresqlDatabaseLocks.Enabled = true - scraper := newPostgreSQLScraper(receivertest.NewNopSettings(metadata.Type), cfg, factory) + scraper := newPostgreSQLScraper(receivertest.NewNopSettings(metadata.Type), cfg, factory, newCache(1)) actualMetrics, err := scraper.scrape(context.Background()) require.NoError(t, err) @@ -115,7 +115,7 @@ func TestScraperNoDatabaseSingle(t *testing.T) { require.False(t, cfg.Metrics.PostgresqlDatabaseLocks.Enabled) cfg.Metrics.PostgresqlDatabaseLocks.Enabled = true - scraper := newPostgreSQLScraper(receivertest.NewNopSettings(metadata.Type), cfg, factory) + scraper := newPostgreSQLScraper(receivertest.NewNopSettings(metadata.Type), cfg, factory, newCache(1)) actualMetrics, err := scraper.scrape(context.Background()) require.NoError(t, err) @@ -139,7 +139,7 @@ func TestScraperNoDatabaseSingle(t *testing.T) { cfg.Metrics.PostgresqlSequentialScans.Enabled = false cfg.Metrics.PostgresqlDatabaseLocks.Enabled = false - scraper = newPostgreSQLScraper(receivertest.NewNopSettings(metadata.Type), cfg, factory) + scraper = newPostgreSQLScraper(receivertest.NewNopSettings(metadata.Type), cfg, factory, newCache(1)) actualMetrics, err = scraper.scrape(context.Background()) require.NoError(t, err) @@ -189,7 +189,7 @@ func TestScraperNoDatabaseMultipleWithoutPreciseLag(t *testing.T) { cfg.Metrics.PostgresqlSequentialScans.Enabled = true require.False(t, cfg.Metrics.PostgresqlDatabaseLocks.Enabled) cfg.Metrics.PostgresqlDatabaseLocks.Enabled = true - scraper := newPostgreSQLScraper(receivertest.NewNopSettings(metadata.Type), cfg, &factory) + scraper := newPostgreSQLScraper(receivertest.NewNopSettings(metadata.Type), cfg, &factory, newCache(1)) actualMetrics, err := scraper.scrape(context.Background()) require.NoError(t, err) @@ -240,7 +240,7 @@ func TestScraperNoDatabaseMultiple(t *testing.T) { cfg.Metrics.PostgresqlSequentialScans.Enabled = true require.False(t, cfg.Metrics.PostgresqlDatabaseLocks.Enabled) cfg.Metrics.PostgresqlDatabaseLocks.Enabled = true - scraper := newPostgreSQLScraper(receivertest.NewNopSettings(metadata.Type), cfg, &factory) + scraper := newPostgreSQLScraper(receivertest.NewNopSettings(metadata.Type), cfg, &factory, newCache(1)) actualMetrics, err := scraper.scrape(context.Background()) require.NoError(t, err) @@ -292,7 +292,7 @@ func TestScraperWithResourceAttributeFeatureGate(t *testing.T) { require.False(t, cfg.Metrics.PostgresqlDatabaseLocks.Enabled) cfg.Metrics.PostgresqlDatabaseLocks.Enabled = true - scraper := newPostgreSQLScraper(receivertest.NewNopSettings(metadata.Type), cfg, &factory) + scraper := newPostgreSQLScraper(receivertest.NewNopSettings(metadata.Type), cfg, &factory, newCache(1)) actualMetrics, err := scraper.scrape(context.Background()) require.NoError(t, err) @@ -343,7 +343,7 @@ func TestScraperWithResourceAttributeFeatureGateSingle(t *testing.T) { cfg.Metrics.PostgresqlSequentialScans.Enabled = true require.False(t, cfg.Metrics.PostgresqlDatabaseLocks.Enabled) cfg.Metrics.PostgresqlDatabaseLocks.Enabled = true - scraper := newPostgreSQLScraper(receivertest.NewNopSettings(metadata.Type), cfg, &factory) + scraper := newPostgreSQLScraper(receivertest.NewNopSettings(metadata.Type), cfg, &factory, newCache(1)) actualMetrics, err := scraper.scrape(context.Background()) require.NoError(t, err) @@ -370,7 +370,7 @@ func TestScraperExcludeDatabase(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.ExcludeDatabases = []string{"open"} - scraper := newPostgreSQLScraper(receivertest.NewNopSettings(metadata.Type), cfg, &factory) + scraper := newPostgreSQLScraper(receivertest.NewNopSettings(metadata.Type), cfg, &factory, newCache(1)) actualMetrics, err := scraper.scrape(context.Background()) require.NoError(t, err) @@ -394,7 +394,6 @@ var expectedScrapeSampleQuery string func TestScrapeQuerySample(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.Databases = []string{} - //nolint:staticcheck cfg.QuerySampleCollection.Enabled = true db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) assert.NoError(t, err) @@ -411,7 +410,7 @@ func TestScrapeQuerySample(t *testing.T) { settings.TelemetrySettings = component.TelemetrySettings{ Logger: logger, } - scraper := newPostgreSQLScraper(settings, cfg, factory) + scraper := newPostgreSQLScraper(settings, cfg, factory, newCache(1)) mock.ExpectQuery(expectedScrapeSampleQuery).WillReturnRows(sqlmock.NewRows( []string{"datname", "usename", "client_addrs", "client_hostname", "client_port", "query_start", "wait_event_type", "wait_event", "query_id", "pid", "application_name", "state", "query"}, ).FromCSVString("postgres,otelu,11.4.5.14,otel,114514,2025-02-12T16:37:54.843+08:00,,,123131231231,1450,receiver,idle,select * from pg_stat_activity where id = 32")) @@ -424,6 +423,77 @@ func TestScrapeQuerySample(t *testing.T) { assert.NoError(t, errs) } +//go:embed testdata/scraper/top-query/expectedSql.sql +var expectedScrapeTopQuery string + +func TestScrapeTopQueries(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.Databases = []string{} + cfg.TopQueryCollection.Enabled = true + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + assert.NoError(t, err) + + defer db.Close() + + factory := mockSimpleClientFactory{ + db: db, + } + + settings := receivertest.NewNopSettings(metadata.Type) + logger, err := zap.NewProduction() + assert.NoError(t, err) + settings.TelemetrySettings = component.TelemetrySettings{ + Logger: logger, + } + + queryid := "114514" + expectedReturnedValue := map[string]string{ + "calls": "123", + "datname": "postgres", + "shared_blks_dirtied": "1111", + "shared_blks_hit": "1112", + "shared_blks_read": "1113", + "shared_blks_written": "1114", + "temp_blks_read": "1115", + "temp_blks_written": "1116", + "query": "select * from pg_stat_activity where id = 32", + "queryid": queryid, + "rolname": "master", + "rows": "30", + "total_exec_time": "11000", + "total_plan_time": "12000", + } + + expectedRows := make([]string, 0, len(expectedReturnedValue)) + expectedValues := "" + for k, v := range expectedReturnedValue { + expectedRows = append(expectedRows, k) + expectedValues += fmt.Sprintf("%s,", v) + } + + scraper := newPostgreSQLScraper(settings, cfg, factory, newCache(30)) + scraper.cache.Add(queryid+TotalExecTimeColumnName, 10) + scraper.cache.Add(queryid+TotalPlanTimeColumnName, 11) + scraper.cache.Add(queryid+CallsColumnName, 120) + scraper.cache.Add(queryid+RowsColumnName, 20) + + scraper.cache.Add(queryid+SharedBlksDirtiedColumnName, 1110) + scraper.cache.Add(queryid+SharedBlksHitColumnName, 1110) + scraper.cache.Add(queryid+SharedBlksReadColumnName, 1110) + scraper.cache.Add(queryid+SharedBlksWrittenColumnName, 1110) + scraper.cache.Add(queryid+TempBlksReadColumnName, 1110) + scraper.cache.Add(queryid+TempBlksWrittenColumnName, 1110) + + mock.ExpectQuery(expectedScrapeTopQuery).WillReturnRows(sqlmock.NewRows(expectedRows).FromCSVString(expectedValues[:len(expectedValues)-1])) + actualLogs, err := scraper.scrapeTopQuery(context.Background(), 31, 32) + assert.NoError(t, err) + expectedFile := filepath.Join("testdata", "scraper", "top-query", "expected.yaml") + expectedLogs, err := golden.ReadLogs(expectedFile) + require.NoError(t, err) + errs := plogtest.CompareLogs(expectedLogs, actualLogs, plogtest.IgnoreTimestamp()) + assert.NoError(t, errs) +} + type ( mockClientFactory struct{ mock.Mock } mockClient struct{ mock.Mock } @@ -432,6 +502,11 @@ type ( } ) +// getTopQuery implements client. +func (m *mockClient) getTopQuery(_ context.Context, _ int64, _ *zap.Logger) ([]map[string]any, error) { + panic("unimplemented") +} + // close implements postgreSQLClientFactory. func (m mockSimpleClientFactory) close() error { return nil diff --git a/receiver/postgresqlreceiver/templates/topQueryTemplate.tmpl b/receiver/postgresqlreceiver/templates/topQueryTemplate.tmpl new file mode 100644 index 0000000000000..d6e13e6c547c3 --- /dev/null +++ b/receiver/postgresqlreceiver/templates/topQueryTemplate.tmpl @@ -0,0 +1,22 @@ +SELECT + calls, + datname, + shared_blks_dirtied, + shared_blks_hit, + shared_blks_read, + shared_blks_written, + temp_blks_read, + temp_blks_written, + query, + queryid::TEXT, + rolname, + rows::TEXT, + total_exec_time, + total_plan_time +FROM + pg_stat_statements as pg_stat_statements + LEFT JOIN pg_roles ON pg_stat_statements.userid = pg_roles.oid + LEFT JOIN pg_database ON pg_stat_statements.dbid = pg_database.oid +WHERE + query != '' +LIMIT {{ .limit }}; \ No newline at end of file diff --git a/receiver/postgresqlreceiver/testdata/config.yaml b/receiver/postgresqlreceiver/testdata/config.yaml index de47dbba87fc1..c05b58e74e160 100644 --- a/receiver/postgresqlreceiver/testdata/config.yaml +++ b/receiver/postgresqlreceiver/testdata/config.yaml @@ -4,6 +4,9 @@ postgresql/minimal: password: ${env:POSTGRESQL_PASSWORD} query_sample_collection: enabled: true + top_query_collection: + enabled: true + top_n_query: 1234 postgresql/pool: endpoint: localhost:5432 transport: tcp diff --git a/receiver/postgresqlreceiver/testdata/integration/init.sql b/receiver/postgresqlreceiver/testdata/integration/01-init.sql similarity index 92% rename from receiver/postgresqlreceiver/testdata/integration/init.sql rename to receiver/postgresqlreceiver/testdata/integration/01-init.sql index 5eeff74ffc058..0e2e9c09a3851 100644 --- a/receiver/postgresqlreceiver/testdata/integration/init.sql +++ b/receiver/postgresqlreceiver/testdata/integration/01-init.sql @@ -2,6 +2,7 @@ CREATE USER otelu WITH PASSWORD 'otelp'; GRANT SELECT ON pg_stat_database TO otelu; GRANT pg_monitor TO otelu; + CREATE TABLE table1 ( id serial PRIMARY KEY ); @@ -25,3 +26,6 @@ CREATE INDEX otel2index ON test2(id); INSERT INTO test2 (id) VALUES(67); SELECT * FROM test2; + + +CREATE EXTENSION pg_stat_statements; diff --git a/receiver/postgresqlreceiver/testdata/integration/02-create-extension.sh b/receiver/postgresqlreceiver/testdata/integration/02-create-extension.sh new file mode 100644 index 0000000000000..362480f811350 --- /dev/null +++ b/receiver/postgresqlreceiver/testdata/integration/02-create-extension.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +set -e +echo "Enabling pg_stat_statements" +psql -v ON_ERROR_STOP=1 --username "root" --dbname "postgres" <<-EOSQL + CREATE EXTENSION IF NOT EXISTS pg_stat_statements; +EOSQL + +psql -v ON_ERROR_STOP=1 --username "root" --dbname "otel2" <<-EOSQL + CREATE EXTENSION IF NOT EXISTS pg_stat_statements; +EOSQL \ No newline at end of file diff --git a/receiver/postgresqlreceiver/testdata/scraper/query-sample/expected.yaml b/receiver/postgresqlreceiver/testdata/scraper/query-sample/expected.yaml index 7424d006d7665..3780c0ac1c34e 100644 --- a/receiver/postgresqlreceiver/testdata/scraper/query-sample/expected.yaml +++ b/receiver/postgresqlreceiver/testdata/scraper/query-sample/expected.yaml @@ -50,4 +50,4 @@ resourceLogs: stringValue: sample scope: name: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/postgresqlreceiver - version: 0.0.1 \ No newline at end of file + version: 0.0.1 diff --git a/receiver/postgresqlreceiver/testdata/scraper/top-query/expected.yaml b/receiver/postgresqlreceiver/testdata/scraper/top-query/expected.yaml new file mode 100644 index 0000000000000..4b75b7e207893 --- /dev/null +++ b/receiver/postgresqlreceiver/testdata/scraper/top-query/expected.yaml @@ -0,0 +1,56 @@ +resourceLogs: + - resource: + attributes: [] + scopeLogs: + - logRecords: + - attributes: + - key: db.system.name + value: + stringValue: postgresql + - key: db.namespace + value: + stringValue: postgres + - key: db.query.text + value: + stringValue: "select * from pg_stat_activity where id = ?" + - key: postgresql.calls + value: + intValue: 3 + - key: postgresql.rows + value: + intValue: 10 + - key: postgresql.shared_blks_dirtied + value: + intValue: 1 + - key: postgresql.shared_blks_hit + value: + intValue: 2 + - key: postgresql.shared_blks_read + value: + intValue: 3 + - key: postgresql.shared_blks_written + value: + intValue: 4 + - key: postgresql.temp_blks_read + value: + intValue: 5 + - key: postgresql.temp_blks_written + value: + intValue: 6 + - key: postgresql.queryid + value: + stringValue: "114514" + - key: postgresql.rolname + value: + stringValue: master + - key: postgresql.total_exec_time + value: + doubleValue: 1 + - key: postgresql.total_plan_time + value: + doubleValue: 1 + body: + stringValue: top query + scope: + name: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/postgresqlreceiver + version: 0.0.1 \ No newline at end of file diff --git a/receiver/postgresqlreceiver/testdata/scraper/top-query/expectedSql.sql b/receiver/postgresqlreceiver/testdata/scraper/top-query/expectedSql.sql new file mode 100644 index 0000000000000..6b72d8e76bbeb --- /dev/null +++ b/receiver/postgresqlreceiver/testdata/scraper/top-query/expectedSql.sql @@ -0,0 +1,22 @@ +SELECT + calls, + datname, + shared_blks_dirtied, + shared_blks_hit, + shared_blks_read, + shared_blks_written, + temp_blks_read, + temp_blks_written, + query, + queryid::TEXT, + rolname, + rows::TEXT, + total_exec_time, + total_plan_time +FROM + pg_stat_statements as pg_stat_statements + LEFT JOIN pg_roles ON pg_stat_statements.userid = pg_roles.oid + LEFT JOIN pg_database ON pg_stat_statements.dbid = pg_database.oid +WHERE + query != '' +LIMIT 31; \ No newline at end of file From 9ba9caaa2da1952aeb578345049162e9bb4a1a79 Mon Sep 17 00:00:00 2001 From: Cuichen Li Date: Thu, 24 Apr 2025 09:07:40 +0800 Subject: [PATCH 2/3] add change log --- .../postgresql-add-top-query-collection.yaml | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 .chloggen/postgresql-add-top-query-collection.yaml diff --git a/.chloggen/postgresql-add-top-query-collection.yaml b/.chloggen/postgresql-add-top-query-collection.yaml new file mode 100644 index 0000000000000..3948626bfc46f --- /dev/null +++ b/.chloggen/postgresql-add-top-query-collection.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: postgresqlreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: we introduced the top query collection to postgresql receiver. this will collect the queries from `pg_stat_statements` and report the related metrics. this will help the end user to identify which queries are most executed, and resources most used. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [39573] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] From 7d94ec858309769701a9c3c26df0bee052e097f0 Mon Sep 17 00:00:00 2001 From: Cuichen Li Date: Mon, 28 Apr 2025 09:44:38 +0800 Subject: [PATCH 3/3] update based on comment --- receiver/postgresqlreceiver/client.go | 22 ++++++------- receiver/postgresqlreceiver/consts.go | 24 +++++++-------- receiver/postgresqlreceiver/scraper.go | 34 ++++++++++----------- receiver/postgresqlreceiver/scraper_test.go | 22 ++++++------- 4 files changed, 51 insertions(+), 51 deletions(-) diff --git a/receiver/postgresqlreceiver/client.go b/receiver/postgresqlreceiver/client.go index d342ca0fa6e25..17f9fc06ac4bf 100644 --- a/receiver/postgresqlreceiver/client.go +++ b/receiver/postgresqlreceiver/client.go @@ -896,16 +896,16 @@ func (c *postgreSQLClient) getTopQuery(ctx context.Context, limit int64, logger } needConversion := map[string]func(string, string, *zap.Logger) (any, error){ - CallsColumnName: convertToInt, - RowsColumnName: convertToInt, - SharedBlksDirtiedColumnName: convertToInt, - SharedBlksHitColumnName: convertToInt, - SharedBlksReadColumnName: convertToInt, - SharedBlksWrittenColumnName: convertToInt, - TempBlksReadColumnName: convertToInt, - TempBlksWrittenColumnName: convertToInt, - TotalExecTimeColumnName: convertMillisecondToSecond, - TotalPlanTimeColumnName: convertMillisecondToSecond, + callsColumnName: convertToInt, + rowsColumnName: convertToInt, + sharedBlksDirtiedColumnName: convertToInt, + sharedBlksHitColumnName: convertToInt, + sharedBlksReadColumnName: convertToInt, + sharedBlksWrittenColumnName: convertToInt, + tempBlksReadColumnName: convertToInt, + tempBlksWrittenColumnName: convertToInt, + totalExecTimeColumnName: convertMillisecondToSecond, + totalPlanTimeColumnName: convertMillisecondToSecond, "query": func(_ string, val string, logger *zap.Logger) (any, error) { // TODO: check if it is truncated. result, err := obfuscateSQL(val) @@ -934,7 +934,7 @@ func (c *postgreSQLClient) getTopQuery(ctx context.Context, limit int64, logger if hasConvention[col] != "" { currentAttributes[hasConvention[col]] = val } else { - currentAttributes[DbAttributePrefix+col] = val + currentAttributes[dbAttributePrefix+col] = val } } finalAttributes = append(finalAttributes, currentAttributes) diff --git a/receiver/postgresqlreceiver/consts.go b/receiver/postgresqlreceiver/consts.go index 5b1e40e0e4943..faea4ecaa768a 100644 --- a/receiver/postgresqlreceiver/consts.go +++ b/receiver/postgresqlreceiver/consts.go @@ -4,16 +4,16 @@ package postgresqlreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/postgresqlreceiver" const ( - DbAttributePrefix = "postgresql." - QueryidColumnName = "queryid" - TotalExecTimeColumnName = "total_exec_time" - TotalPlanTimeColumnName = "total_plan_time" - CallsColumnName = "calls" - RowsColumnName = "rows" - SharedBlksDirtiedColumnName = "shared_blks_dirtied" - SharedBlksHitColumnName = "shared_blks_hit" - SharedBlksReadColumnName = "shared_blks_read" - SharedBlksWrittenColumnName = "shared_blks_written" - TempBlksReadColumnName = "temp_blks_read" - TempBlksWrittenColumnName = "temp_blks_written" + dbAttributePrefix = "postgresql." + queryidColumnName = "queryid" + totalExecTimeColumnName = "total_exec_time" + totalPlanTimeColumnName = "total_plan_time" + callsColumnName = "calls" + rowsColumnName = "rows" + sharedBlksDirtiedColumnName = "shared_blks_dirtied" + sharedBlksHitColumnName = "shared_blks_hit" + sharedBlksReadColumnName = "shared_blks_read" + sharedBlksWrittenColumnName = "shared_blks_written" + tempBlksReadColumnName = "temp_blks_read" + tempBlksWrittenColumnName = "temp_blks_written" ) diff --git a/receiver/postgresqlreceiver/scraper.go b/receiver/postgresqlreceiver/scraper.go index 18f89f20421f3..3cbf86a3ac099 100644 --- a/receiver/postgresqlreceiver/scraper.go +++ b/receiver/postgresqlreceiver/scraper.go @@ -209,12 +209,12 @@ func (p *postgreSQLScraper) scrapeTopQuery(ctx context.Context, maxRowsPerQuery var errs errsMux + defer dbClient.Close() + logRecords := scopedLog.LogRecords() p.collectTopQuery(ctx, dbClient, &logRecords, maxRowsPerQuery, topNQuery, &errs, p.logger) - defer dbClient.Close() - return logs, nil } @@ -257,22 +257,22 @@ func (p *postgreSQLScraper) collectTopQuery(ctx context.Context, dbClient client } updatedOnly := map[string]updatedOnlyInfo{ - TotalExecTimeColumnName: {}, - TotalPlanTimeColumnName: {}, - RowsColumnName: {finalConverter: convertToInt}, - CallsColumnName: {finalConverter: convertToInt}, - SharedBlksDirtiedColumnName: {finalConverter: convertToInt}, - SharedBlksHitColumnName: {finalConverter: convertToInt}, - SharedBlksReadColumnName: {finalConverter: convertToInt}, - SharedBlksWrittenColumnName: {finalConverter: convertToInt}, - TempBlksReadColumnName: {finalConverter: convertToInt}, - TempBlksWrittenColumnName: {finalConverter: convertToInt}, + totalExecTimeColumnName: {}, + totalPlanTimeColumnName: {}, + rowsColumnName: {finalConverter: convertToInt}, + callsColumnName: {finalConverter: convertToInt}, + sharedBlksDirtiedColumnName: {finalConverter: convertToInt}, + sharedBlksHitColumnName: {finalConverter: convertToInt}, + sharedBlksReadColumnName: {finalConverter: convertToInt}, + sharedBlksWrittenColumnName: {finalConverter: convertToInt}, + tempBlksReadColumnName: {finalConverter: convertToInt}, + tempBlksWrittenColumnName: {finalConverter: convertToInt}, } pq := make(priorityQueue, 0) for i, row := range rows { - queryID := row[DbAttributePrefix+QueryidColumnName] + queryID := row[dbAttributePrefix+queryidColumnName] if queryID == nil { // this should not happen, but in case @@ -283,7 +283,7 @@ func (p *postgreSQLScraper) collectTopQuery(ctx context.Context, dbClient client for columnName, info := range updatedOnly { var valInAtts float64 - _val := row[DbAttributePrefix+columnName] + _val := row[dbAttributePrefix+columnName] if i, ok := _val.(int64); ok { valInAtts = float64(i) } else { @@ -300,14 +300,14 @@ func (p *postgreSQLScraper) collectTopQuery(ctx context.Context, dbClient client finalValue = valDelta } if info.finalConverter != nil { - row[DbAttributePrefix+columnName] = info.finalConverter(finalValue) + row[dbAttributePrefix+columnName] = info.finalConverter(finalValue) } else { - row[DbAttributePrefix+columnName] = finalValue + row[dbAttributePrefix+columnName] = finalValue } } item := Item{ row: row, - priority: row[DbAttributePrefix+TotalExecTimeColumnName].(float64), + priority: row[dbAttributePrefix+totalExecTimeColumnName].(float64), index: i, } pq.Push(&item) diff --git a/receiver/postgresqlreceiver/scraper_test.go b/receiver/postgresqlreceiver/scraper_test.go index f0073a4b4de8a..9e2b5bf9f2dfd 100644 --- a/receiver/postgresqlreceiver/scraper_test.go +++ b/receiver/postgresqlreceiver/scraper_test.go @@ -472,17 +472,17 @@ func TestScrapeTopQueries(t *testing.T) { } scraper := newPostgreSQLScraper(settings, cfg, factory, newCache(30)) - scraper.cache.Add(queryid+TotalExecTimeColumnName, 10) - scraper.cache.Add(queryid+TotalPlanTimeColumnName, 11) - scraper.cache.Add(queryid+CallsColumnName, 120) - scraper.cache.Add(queryid+RowsColumnName, 20) - - scraper.cache.Add(queryid+SharedBlksDirtiedColumnName, 1110) - scraper.cache.Add(queryid+SharedBlksHitColumnName, 1110) - scraper.cache.Add(queryid+SharedBlksReadColumnName, 1110) - scraper.cache.Add(queryid+SharedBlksWrittenColumnName, 1110) - scraper.cache.Add(queryid+TempBlksReadColumnName, 1110) - scraper.cache.Add(queryid+TempBlksWrittenColumnName, 1110) + scraper.cache.Add(queryid+totalExecTimeColumnName, 10) + scraper.cache.Add(queryid+totalPlanTimeColumnName, 11) + scraper.cache.Add(queryid+callsColumnName, 120) + scraper.cache.Add(queryid+rowsColumnName, 20) + + scraper.cache.Add(queryid+sharedBlksDirtiedColumnName, 1110) + scraper.cache.Add(queryid+sharedBlksHitColumnName, 1110) + scraper.cache.Add(queryid+sharedBlksReadColumnName, 1110) + scraper.cache.Add(queryid+sharedBlksWrittenColumnName, 1110) + scraper.cache.Add(queryid+tempBlksReadColumnName, 1110) + scraper.cache.Add(queryid+tempBlksWrittenColumnName, 1110) mock.ExpectQuery(expectedScrapeTopQuery).WillReturnRows(sqlmock.NewRows(expectedRows).FromCSVString(expectedValues[:len(expectedValues)-1])) actualLogs, err := scraper.scrapeTopQuery(context.Background(), 31, 32)