Skip to content

Commit 543872d

Browse files
committed
add basic query sample collection for postgresqlreceiver
1 parent 3e50dcd commit 543872d

28 files changed

+1245
-9
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: postgresqlreceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: add top query collection to help end user identify which query were executed in the postgresql database.
11+
12+
13+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
14+
issues: [39311]
15+
16+
# (Optional) One or more lines of additional information to render under the primary note.
17+
# These lines will be padded with 2 spaces and then inserted directly into the document.
18+
# Use pipe (|) for multiline entries.
19+
subtext:
20+
21+
# If your change doesn't affect end users or the exported elements of any package,
22+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
23+
# Optional: The change log or logs in which this entry should be included.
24+
# e.g. '[user]' or '[user, api]'
25+
# Include 'user' if the change is relevant to end users.
26+
# Include 'api' if there is a change to a library API.
27+
# Default: '[user]'
28+
change_logs: [user]

receiver/postgresqlreceiver/README.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@
44
| Status | |
55
| ------------- |-----------|
66
| Stability | [unmaintained]: metrics |
7+
| | [development]: logs |
78
| Distributions | [contrib] |
89
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Areceiver%2Fpostgresql%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Areceiver%2Fpostgresql) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Areceiver%2Fpostgresql%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Areceiver%2Fpostgresql) |
910
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | \| Seeking more code owners! |
1011
| Emeritus | [@djaglowski](https://www.github.com/djaglowski) |
1112

1213
[unmaintained]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#unmaintained
14+
[development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development
1315
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
1416
<!-- end autogenerated section -->
1517

@@ -50,6 +52,22 @@ The following settings are also optional and nested under `tls` to help configur
5052
- `collection_interval` (default = `10s`): This receiver collects metrics on an interval. This value must be a string readable by Golang's [time.ParseDuration](https://pkg.go.dev/time#ParseDuration). Valid time units are `ns`, `us` (or `µs`), `ms`, `s`, `m`, `h`.
5153
- `initial_delay` (default = `1s`): defines how long this receiver waits before starting.
5254

55+
### Query Sample Collection
56+
We provide functionality to collect the query sample from postgresql. It will get historical query
57+
from `pg_stat_activity`. To enable it, you will need the following configuration
58+
```
59+
...
60+
query_sample_collection:
61+
enabled: true
62+
...
63+
```
64+
By default, query sample collection is disabled, also note, to use it, you will need
65+
to grant the user you are using `pg_monitor`. Take the example from `testdata/integration/init.sql`
66+
67+
```sql
68+
GRANT pg_monitor TO otelu;
69+
```
70+
5371
### Example Configuration
5472

5573
```yaml
@@ -68,6 +86,8 @@ receivers:
6886
ca_file: /home/otel/authorities.crt
6987
cert_file: /home/otel/mypostgrescert.crt
7088
key_file: /home/otel/mypostgreskey.key
89+
query_sample_collection:
90+
enabled: false
7191
```
7292
7393
The full list of settings exposed for this receiver are documented in [config.go](./config.go) with detailed sample configurations in [testdata/config.yaml](./testdata/config.yaml). TLS config is documented further under the [opentelemetry collector's configtls package](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md).

receiver/postgresqlreceiver/client.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@
44
package postgresqlreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/postgresqlreceiver"
55

66
import (
7+
"bytes"
78
"context"
89
"database/sql"
10+
_ "embed"
911
"errors"
1012
"fmt"
13+
"html/template"
1114
"net"
1215
"strconv"
1316
"strings"
@@ -17,6 +20,9 @@ import (
1720
"go.opentelemetry.io/collector/config/configtls"
1821
"go.opentelemetry.io/collector/featuregate"
1922
"go.uber.org/multierr"
23+
"go.uber.org/zap"
24+
25+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery"
2026
)
2127

2228
const lagMetricsInSecondsFeatureGateID = "postgresqlreceiver.preciselagmetrics"
@@ -58,6 +64,7 @@ type client interface {
5864
getIndexStats(ctx context.Context, database string) (map[indexIdentifer]indexStat, error)
5965
listDatabases(ctx context.Context) ([]string, error)
6066
getVersion(ctx context.Context) (string, error)
67+
getQuerySamples(ctx context.Context, limit int64, logger *zap.Logger) ([]map[string]any, error)
6168
}
6269

6370
type postgreSQLClient struct {
@@ -735,3 +742,91 @@ func tableKey(database, schema, table string) tableIdentifier {
735742
func indexKey(database, schema, table, index string) indexIdentifer {
736743
return indexIdentifer(fmt.Sprintf("%s|%s|%s|%s", database, schema, table, index))
737744
}
745+
746+
//go:embed templates/querySampleTemplate.tmpl
747+
var querySampleTemplate string
748+
749+
func (c *postgreSQLClient) getQuerySamples(ctx context.Context, limit int64, logger *zap.Logger) ([]map[string]any, error) {
750+
tmpl := template.Must(template.New("querySample").Option("missingkey=error").Parse(querySampleTemplate))
751+
buf := bytes.Buffer{}
752+
753+
// TODO: Only get query after the oldest query we got from the previous sample query colelction.
754+
// For instance, if from the last sample query we got queries executed between 8:00 ~ 8:15,
755+
// in this query, we should only gather query after 8:15
756+
if err := tmpl.Execute(&buf, map[string]any{
757+
"limit": limit,
758+
}); err != nil {
759+
logger.Error("failed to execute template", zap.Error(err))
760+
return []map[string]any{}, fmt.Errorf("failed executing template: %w", err)
761+
}
762+
763+
wrappedDb := sqlquery.NewDbClient(sqlquery.DbWrapper{Db: c.client}, buf.String(), logger, sqlquery.TelemetryConfig{})
764+
765+
rows, err := wrappedDb.QueryRows(ctx)
766+
if err != nil {
767+
if !errors.Is(err, sqlquery.ErrNullValueWarning) {
768+
logger.Error("failed getting log rows", zap.Error(err))
769+
return []map[string]any{}, fmt.Errorf("getQuerySamples failed getting log rows: %w", err)
770+
}
771+
// in case the sql returned rows contains null value, we just log a warning and continue
772+
logger.Warn("problems encountered getting log rows", zap.Error(err))
773+
}
774+
775+
errs := make([]error, 0)
776+
finalAttributes := make([]map[string]any, 0)
777+
dbPrefix := "postgresql."
778+
for _, row := range rows {
779+
if row["query"] == "<insufficient privilege>" {
780+
logger.Warn("skipping query sample due to insufficient privileges")
781+
errs = append(errs, errors.New("skipping query sample due to insufficient privileges"))
782+
continue
783+
}
784+
currentAttributes := make(map[string]any)
785+
simpleColumns := []string{
786+
"client_hostname",
787+
"query_start",
788+
"wait_event_type",
789+
"wait_event",
790+
"query_id",
791+
"state",
792+
"application_name",
793+
}
794+
795+
for _, col := range simpleColumns {
796+
currentAttributes[dbPrefix+col] = row[col]
797+
}
798+
799+
clientPort := 0
800+
if row["client_port"] != "" {
801+
clientPort, err = strconv.Atoi(row["client_port"])
802+
if err != nil {
803+
logger.Warn("failed to convert client_port to int", zap.Error(err))
804+
errs = append(errs, err)
805+
}
806+
}
807+
pid := 0
808+
if row["pid"] != "" {
809+
pid, err = strconv.Atoi(row["pid"])
810+
if err != nil {
811+
logger.Warn("failed to convert pid to int", zap.Error(err))
812+
errs = append(errs, err)
813+
}
814+
}
815+
// TODO: check if the query is truncated.
816+
obfuscated, err := obfuscateSQL(row["query"])
817+
if err != nil {
818+
logger.Warn("failed to obfuscate query", zap.String("query", row["query"]))
819+
obfuscated = ""
820+
}
821+
currentAttributes[dbPrefix+"pid"] = pid
822+
currentAttributes["network.peer.port"] = clientPort
823+
currentAttributes["network.peer.address"] = row["client_addrs"]
824+
currentAttributes["db.query.text"] = obfuscated
825+
currentAttributes["db.namespace"] = row["datname"]
826+
currentAttributes["user.name"] = row["usename"]
827+
currentAttributes["db.system.name"] = "postgresql"
828+
finalAttributes = append(finalAttributes, currentAttributes)
829+
}
830+
831+
return finalAttributes, errors.Join(errs...)
832+
}

receiver/postgresqlreceiver/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ const (
2727
ErrHostPort = "invalid config: 'endpoint' must be in the form <host>:<port> no matter what 'transport' is configured"
2828
)
2929

30+
type QuerySampleCollection struct {
31+
Enabled bool `mapstructure:"enabled"`
32+
MaxRowsPerQuery int64 `mapstructure:"max_rows_per_query"`
33+
}
34+
3035
type Config struct {
3136
scraperhelper.ControllerConfig `mapstructure:",squash"`
3237
Username string `mapstructure:"username"`
@@ -37,6 +42,7 @@ type Config struct {
3742
configtls.ClientConfig `mapstructure:"tls,omitempty"` // provides SSL details
3843
ConnectionPool `mapstructure:"connection_pool,omitempty"`
3944
metadata.MetricsBuilderConfig `mapstructure:",squash"`
45+
QuerySampleCollection `mapstructure:"query_sample_collection,omitempty"`
4046
}
4147

4248
type ConnectionPool struct {

receiver/postgresqlreceiver/config_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,13 @@ func TestLoadConfig(t *testing.T) {
129129
expected.Endpoint = "localhost:5432"
130130
expected.Username = "otel"
131131
expected.Password = "${env:POSTGRESQL_PASSWORD}"
132-
132+
//nolint:staticcheck
133+
expected.QuerySampleCollection.Enabled = true
133134
require.Equal(t, expected, cfg)
134135
})
135136

137+
cfg = factory.CreateDefaultConfig()
138+
136139
t.Run("postgresql/pool", func(t *testing.T) {
137140
sub, err := cm.Sub(component.NewIDWithName(metadata.Type, "pool").String())
138141
require.NoError(t, err)
@@ -151,6 +154,8 @@ func TestLoadConfig(t *testing.T) {
151154
require.Equal(t, expected, cfg)
152155
})
153156

157+
cfg = factory.CreateDefaultConfig()
158+
154159
t.Run("postgresql/all", func(t *testing.T) {
155160
sub, err := cm.Sub(component.NewIDWithName(metadata.Type, "all").String())
156161
require.NoError(t, err)

receiver/postgresqlreceiver/factory.go

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"go.opentelemetry.io/collector/config/confignet"
1212
"go.opentelemetry.io/collector/config/configtls"
1313
"go.opentelemetry.io/collector/consumer"
14+
"go.opentelemetry.io/collector/pdata/plog"
1415
"go.opentelemetry.io/collector/receiver"
1516
"go.opentelemetry.io/collector/scraper"
1617
"go.opentelemetry.io/collector/scraper/scraperhelper"
@@ -22,7 +23,9 @@ func NewFactory() receiver.Factory {
2223
return receiver.NewFactory(
2324
metadata.Type,
2425
createDefaultConfig,
25-
receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability))
26+
receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability),
27+
receiver.WithLogs(createLogsReceiver, metadata.LogsStability),
28+
)
2629
}
2730

2831
func createDefaultConfig() component.Config {
@@ -40,6 +43,10 @@ func createDefaultConfig() component.Config {
4043
InsecureSkipVerify: true,
4144
},
4245
MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(),
46+
QuerySampleCollection: QuerySampleCollection{
47+
Enabled: false,
48+
MaxRowsPerQuery: 1000,
49+
},
4350
}
4451
}
4552

@@ -69,3 +76,44 @@ func createMetricsReceiver(
6976
scraperhelper.AddScraper(metadata.Type, s),
7077
)
7178
}
79+
80+
// createLogsReceiver create a logs receiver based on provided config.
81+
func createLogsReceiver(
82+
_ context.Context,
83+
params receiver.Settings,
84+
receiverCfg component.Config,
85+
logsConsumer consumer.Logs,
86+
) (receiver.Logs, error) {
87+
cfg := receiverCfg.(*Config)
88+
89+
var clientFactory postgreSQLClientFactory
90+
if connectionPoolGate.IsEnabled() {
91+
clientFactory = newPoolClientFactory(cfg)
92+
} else {
93+
clientFactory = newDefaultClientFactory(cfg)
94+
}
95+
96+
ns := newPostgreSQLScraper(params, cfg, clientFactory)
97+
98+
opts := make([]scraperhelper.ControllerOption, 0)
99+
100+
//nolint:staticcheck
101+
if cfg.QuerySampleCollection.Enabled {
102+
s, err := scraper.NewLogs(func(ctx context.Context) (plog.Logs, error) {
103+
return ns.scrapeQuerySamples(ctx, cfg.MaxRowsPerQuery)
104+
}, scraper.WithShutdown(ns.shutdown))
105+
if err != nil {
106+
return nil, err
107+
}
108+
opt := scraperhelper.AddFactoryWithConfig(
109+
scraper.NewFactory(metadata.Type, nil,
110+
scraper.WithLogs(func(context.Context, scraper.Settings, component.Config) (scraper.Logs, error) {
111+
return s, nil
112+
}, component.StabilityLevelAlpha)), nil)
113+
opts = append(opts, opt)
114+
}
115+
116+
return scraperhelper.NewLogsController(
117+
&cfg.ControllerConfig, params, logsConsumer, opts...,
118+
)
119+
}

receiver/postgresqlreceiver/generated_component_test.go

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

receiver/postgresqlreceiver/generated_package_test.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)