Skip to content

[receiver/postgresql] add basic query sample collection #39311

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 14, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .chloggen/postgresql-sample-query.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: postgresqlreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: add top query collection to help end user identify which query were executed in the postgresql database.


# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [39311]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
20 changes: 20 additions & 0 deletions receiver/postgresqlreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
| Status | |
| ------------- |-----------|
| Stability | [unmaintained]: metrics |
| | [development]: logs |
| Distributions | [contrib] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Areceiver%2Fpostgresql%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Areceiver%2Fpostgresql) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Areceiver%2Fpostgresql%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Areceiver%2Fpostgresql) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | \| Seeking more code owners! |
| Emeritus | [@djaglowski](https://www.github.com/djaglowski) |

[unmaintained]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#unmaintained
[development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you might like to make yourself a codeowner of this component. Separate PR please.

[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
<!-- end autogenerated section -->

Expand Down Expand Up @@ -50,6 +52,22 @@ The following settings are also optional and nested under `tls` to help configur
- `collection_interval` (default = `10s`): This receiver collects metrics on an interval. This value must be a string readable by Golang's [time.ParseDuration](https://pkg.go.dev/time#ParseDuration). Valid time units are `ns`, `us` (or `µs`), `ms`, `s`, `m`, `h`.
- `initial_delay` (default = `1s`): defines how long this receiver waits before starting.

### Query Sample Collection
We provide functionality to collect the query sample from postgresql. It will get historical query
from `pg_stat_activity`. To enable it, you will need the following configuration
```
...
query_sample_collection:
enabled: true
...
```
By default, query sample collection is disabled, also note, to use it, you will need
to grant the user you are using `pg_monitor`. Take the example from `testdata/integration/init.sql`

```sql
GRANT pg_monitor TO otelu;
```

### Example Configuration

```yaml
Expand All @@ -68,6 +86,8 @@ receivers:
ca_file: /home/otel/authorities.crt
cert_file: /home/otel/mypostgrescert.crt
key_file: /home/otel/mypostgreskey.key
query_sample_collection:
enabled: false
```

The full list of settings exposed for this receiver are documented in [config.go](./config.go) with detailed sample configurations in [testdata/config.yaml](./testdata/config.yaml). TLS config is documented further under the [opentelemetry collector's configtls package](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md).
Expand Down
95 changes: 95 additions & 0 deletions receiver/postgresqlreceiver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
package postgresqlreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/postgresqlreceiver"

import (
"bytes"
"context"
"database/sql"
_ "embed"
"errors"
"fmt"
"html/template"
"net"
"strconv"
"strings"
Expand All @@ -17,6 +20,9 @@ import (
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/featuregate"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery"
)

const lagMetricsInSecondsFeatureGateID = "postgresqlreceiver.preciselagmetrics"
Expand Down Expand Up @@ -58,6 +64,7 @@ type client interface {
getIndexStats(ctx context.Context, database string) (map[indexIdentifer]indexStat, error)
listDatabases(ctx context.Context) ([]string, error)
getVersion(ctx context.Context) (string, error)
getQuerySamples(ctx context.Context, limit int64, logger *zap.Logger) ([]map[string]any, error)
}

type postgreSQLClient struct {
Expand Down Expand Up @@ -735,3 +742,91 @@ func tableKey(database, schema, table string) tableIdentifier {
func indexKey(database, schema, table, index string) indexIdentifer {
return indexIdentifer(fmt.Sprintf("%s|%s|%s|%s", database, schema, table, index))
}

//go:embed templates/querySampleTemplate.tmpl
var querySampleTemplate string

func (c *postgreSQLClient) getQuerySamples(ctx context.Context, limit int64, logger *zap.Logger) ([]map[string]any, error) {
tmpl := template.Must(template.New("querySample").Option("missingkey=error").Parse(querySampleTemplate))
buf := bytes.Buffer{}

// TODO: Only get query after the oldest query we got from the previous sample query colelction.
// For instance, if from the last sample query we got queries executed between 8:00 ~ 8:15,
// in this query, we should only gather query after 8:15
if err := tmpl.Execute(&buf, map[string]any{
"limit": limit,
}); err != nil {
logger.Error("failed to execute template", zap.Error(err))
return []map[string]any{}, fmt.Errorf("failed executing template: %w", err)
}

wrappedDb := sqlquery.NewDbClient(sqlquery.DbWrapper{Db: c.client}, buf.String(), logger, sqlquery.TelemetryConfig{})

rows, err := wrappedDb.QueryRows(ctx)
if err != nil {
if !errors.Is(err, sqlquery.ErrNullValueWarning) {
logger.Error("failed getting log rows", zap.Error(err))
return []map[string]any{}, fmt.Errorf("getQuerySamples failed getting log rows: %w", err)
}
// in case the sql returned rows contains null value, we just log a warning and continue
logger.Warn("problems encountered getting log rows", zap.Error(err))
}

errs := make([]error, 0)
finalAttributes := make([]map[string]any, 0)
dbPrefix := "postgresql."
for _, row := range rows {
if row["query"] == "<insufficient privilege>" {
logger.Warn("skipping query sample due to insufficient privileges")
errs = append(errs, errors.New("skipping query sample due to insufficient privileges"))
continue
}
currentAttributes := make(map[string]any)
simpleColumns := []string{
"client_hostname",
"query_start",
"wait_event_type",
"wait_event",
"query_id",
"state",
"application_name",
}

for _, col := range simpleColumns {
currentAttributes[dbPrefix+col] = row[col]
}

clientPort := 0
if row["client_port"] != "" {
clientPort, err = strconv.Atoi(row["client_port"])
if err != nil {
logger.Warn("failed to convert client_port to int", zap.Error(err))
errs = append(errs, err)
}
}
pid := 0
if row["pid"] != "" {
pid, err = strconv.Atoi(row["pid"])
if err != nil {
logger.Warn("failed to convert pid to int", zap.Error(err))
errs = append(errs, err)
}
}
// TODO: check if the query is truncated.
obfuscated, err := obfuscateSQL(row["query"])
if err != nil {
logger.Warn("failed to obfuscate query", zap.String("query", row["query"]))
obfuscated = ""
}
currentAttributes[dbPrefix+"pid"] = pid
currentAttributes["network.peer.port"] = clientPort
currentAttributes["network.peer.address"] = row["client_addrs"]
currentAttributes["db.query.text"] = obfuscated
currentAttributes["db.namespace"] = row["datname"]
currentAttributes["user.name"] = row["usename"]
currentAttributes["db.system.name"] = "postgresql"
finalAttributes = append(finalAttributes, currentAttributes)
}

return finalAttributes, errors.Join(errs...)
}
6 changes: 6 additions & 0 deletions receiver/postgresqlreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ const (
ErrHostPort = "invalid config: 'endpoint' must be in the form <host>:<port> no matter what 'transport' is configured"
)

type QuerySampleCollection struct {
Enabled bool `mapstructure:"enabled"`
MaxRowsPerQuery int64 `mapstructure:"max_rows_per_query"`
}

type Config struct {
scraperhelper.ControllerConfig `mapstructure:",squash"`
Username string `mapstructure:"username"`
Expand All @@ -37,6 +42,7 @@ type Config struct {
configtls.ClientConfig `mapstructure:"tls,omitempty"` // provides SSL details
ConnectionPool `mapstructure:"connection_pool,omitempty"`
metadata.MetricsBuilderConfig `mapstructure:",squash"`
QuerySampleCollection `mapstructure:"query_sample_collection,omitempty"`
}

type ConnectionPool struct {
Expand Down
7 changes: 6 additions & 1 deletion receiver/postgresqlreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,13 @@ func TestLoadConfig(t *testing.T) {
expected.Endpoint = "localhost:5432"
expected.Username = "otel"
expected.Password = "${env:POSTGRESQL_PASSWORD}"

//nolint:staticcheck
expected.QuerySampleCollection.Enabled = true
require.Equal(t, expected, cfg)
})

cfg = factory.CreateDefaultConfig()

t.Run("postgresql/pool", func(t *testing.T) {
sub, err := cm.Sub(component.NewIDWithName(metadata.Type, "pool").String())
require.NoError(t, err)
Expand All @@ -151,6 +154,8 @@ func TestLoadConfig(t *testing.T) {
require.Equal(t, expected, cfg)
})

cfg = factory.CreateDefaultConfig()

t.Run("postgresql/all", func(t *testing.T) {
sub, err := cm.Sub(component.NewIDWithName(metadata.Type, "all").String())
require.NoError(t, err)
Expand Down
50 changes: 49 additions & 1 deletion receiver/postgresqlreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/scraper"
"go.opentelemetry.io/collector/scraper/scraperhelper"
Expand All @@ -22,7 +23,9 @@ func NewFactory() receiver.Factory {
return receiver.NewFactory(
metadata.Type,
createDefaultConfig,
receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability))
receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability),
receiver.WithLogs(createLogsReceiver, metadata.LogsStability),
)
}

func createDefaultConfig() component.Config {
Expand All @@ -40,6 +43,10 @@ func createDefaultConfig() component.Config {
InsecureSkipVerify: true,
},
MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(),
QuerySampleCollection: QuerySampleCollection{
Enabled: false,
MaxRowsPerQuery: 1000,
},
}
}

Expand Down Expand Up @@ -69,3 +76,44 @@ func createMetricsReceiver(
scraperhelper.AddScraper(metadata.Type, s),
)
}

// createLogsReceiver create a logs receiver based on provided config.
func createLogsReceiver(
_ context.Context,
params receiver.Settings,
receiverCfg component.Config,
logsConsumer consumer.Logs,
) (receiver.Logs, error) {
cfg := receiverCfg.(*Config)

var clientFactory postgreSQLClientFactory
if connectionPoolGate.IsEnabled() {
clientFactory = newPoolClientFactory(cfg)
} else {
clientFactory = newDefaultClientFactory(cfg)
}

ns := newPostgreSQLScraper(params, cfg, clientFactory)

opts := make([]scraperhelper.ControllerOption, 0)

//nolint:staticcheck
if cfg.QuerySampleCollection.Enabled {
s, err := scraper.NewLogs(func(ctx context.Context) (plog.Logs, error) {
return ns.scrapeQuerySamples(ctx, cfg.MaxRowsPerQuery)
}, scraper.WithShutdown(ns.shutdown))
if err != nil {
return nil, err
}
opt := scraperhelper.AddFactoryWithConfig(
scraper.NewFactory(metadata.Type, nil,
scraper.WithLogs(func(context.Context, scraper.Settings, component.Config) (scraper.Logs, error) {
return s, nil
}, component.StabilityLevelAlpha)), nil)
opts = append(opts, opt)
}

return scraperhelper.NewLogsController(
&cfg.ControllerConfig, params, logsConsumer, opts...,
)
}
7 changes: 7 additions & 0 deletions receiver/postgresqlreceiver/generated_component_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion receiver/postgresqlreceiver/generated_package_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading