Skip to content

Commit 5156ea9

Browse files
committed
always log leader_pid label
1 parent b6b71aa commit 5156ea9

File tree

3 files changed

+23
-12
lines changed

3 files changed

+23
-12
lines changed

internal/component/database_observability/postgres/collector/pg_activity.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -237,10 +237,9 @@ func (c *Activity) fetchActivity(ctx context.Context) error {
237237
}
238238
}
239239

240-
// Prepare leader_pid for logging if it exists
241240
leaderPID := ""
242241
if activity.LeaderPID.Valid {
243-
leaderPID = fmt.Sprintf(` leader_pid="%d"`, activity.LeaderPID.Int64)
242+
leaderPID = fmt.Sprintf(`%d`, activity.LeaderPID.Int64)
244243
}
245244

246245
stateDuration := calculateDuration(activity.StateChange, activity.Now)
@@ -292,7 +291,7 @@ func (c *Activity) fetchActivity(ctx context.Context) error {
292291

293292
// Build query sample entry
294293
sampleLabels := fmt.Sprintf(
295-
`clock_timestamp="%s" instance="%s" app="%s" client="%s" backend_type="%s" backend_time="%s" state="%s" pid="%d"%s user="%s" userid="%d" datname="%s" datid="%d" xact_time="%s" xid="%d" xmin="%d" query_time="%s" queryid="%d" query="%s" engine="postgres"`,
294+
`clock_timestamp="%s" instance="%s" app="%s" client="%s" backend_type="%s" backend_time="%s" state="%s" pid="%d" leader_pid="%s" user="%s" userid="%d" datname="%s" datid="%d" xact_time="%s" xid="%d" xmin="%d" query_time="%s" queryid="%d" query="%s" engine="postgres"`,
296295
activity.Now.Format(time.RFC3339Nano),
297296
c.instanceKey,
298297
applicationName,

internal/component/database_observability/postgres/collector/pg_activity_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func TestActivity_QueryRedaction(t *testing.T) {
6969
{"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "test"},
7070
},
7171
expectedLines: []string{
72-
fmt.Sprintf(`level="info" clock_timestamp="%s" instance="test" app="testapp" client="127.0.0.1:5432" backend_type="client backend" backend_time="%s" state="active" pid="100" user="testuser" userid="1000" datname="testdb" datid="1" xact_time="%s" xid="0" xmin="0" query_time="%s" queryid="123" query="SELECT * FROM users WHERE id = ?" engine="postgres" cpu_time="%s"`,
72+
fmt.Sprintf(`level="info" clock_timestamp="%s" instance="test" app="testapp" client="127.0.0.1:5432" backend_type="client backend" backend_time="%s" state="active" pid="100" leader_pid="" user="testuser" userid="1000" datname="testdb" datid="1" xact_time="%s" xid="0" xmin="0" query_time="%s" queryid="123" query="SELECT * FROM users WHERE id = ?" engine="postgres" cpu_time="%s"`,
7373
now.Format(time.RFC3339Nano),
7474
time.Duration(0).String(),
7575
time.Duration(0).String(),
@@ -87,7 +87,7 @@ func TestActivity_QueryRedaction(t *testing.T) {
8787
{"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "test"},
8888
},
8989
expectedLines: []string{
90-
fmt.Sprintf(`level="info" clock_timestamp="%s" instance="test" app="testapp" client="127.0.0.1:5432" backend_type="client backend" backend_time="%s" state="active" pid="100" user="testuser" userid="1000" datname="testdb" datid="1" xact_time="%s" xid="0" xmin="0" query_time="%s" queryid="124" query="SELECT * FROM users WHERE id = 123" engine="postgres" cpu_time="%s"`,
90+
fmt.Sprintf(`level="info" clock_timestamp="%s" instance="test" app="testapp" client="127.0.0.1:5432" backend_type="client backend" backend_time="%s" state="active" pid="100" leader_pid="" user="testuser" userid="1000" datname="testdb" datid="1" xact_time="%s" xid="0" xmin="0" query_time="%s" queryid="124" query="SELECT * FROM users WHERE id = 123" engine="postgres" cpu_time="%s"`,
9191
now.Format(time.RFC3339Nano),
9292
time.Duration(0).String(),
9393
time.Duration(0).String(),
@@ -105,7 +105,7 @@ func TestActivity_QueryRedaction(t *testing.T) {
105105
{"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "test"},
106106
},
107107
expectedLines: []string{
108-
fmt.Sprintf(`level="info" clock_timestamp="%s" instance="test" app="testapp" client="127.0.0.1:5432" backend_type="client backend" backend_time="%s" state="active" pid="100" user="testuser" userid="1000" datname="testdb" datid="1" xact_time="%s" xid="0" xmin="0" query_time="%s" queryid="125" query="SELECT * FROM users WHERE id = ? /* comment ..." engine="postgres" cpu_time="%s"`,
108+
fmt.Sprintf(`level="info" clock_timestamp="%s" instance="test" app="testapp" client="127.0.0.1:5432" backend_type="client backend" backend_time="%s" state="active" pid="100" leader_pid="" user="testuser" userid="1000" datname="testdb" datid="1" xact_time="%s" xid="0" xmin="0" query_time="%s" queryid="125" query="SELECT * FROM users WHERE id = ? /* comment ..." engine="postgres" cpu_time="%s"`,
109109
now.Format(time.RFC3339Nano),
110110
time.Duration(0).String(),
111111
time.Duration(0).String(),
@@ -123,7 +123,7 @@ func TestActivity_QueryRedaction(t *testing.T) {
123123
{"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "test"},
124124
},
125125
expectedLines: []string{
126-
fmt.Sprintf(`level="info" clock_timestamp="%s" instance="test" app="testapp" client="127.0.0.1:5432" backend_type="client backend" backend_time="%s" state="active" pid="100" user="testuser" userid="1000" datname="testdb" datid="1" xact_time="%s" xid="0" xmin="0" query_time="%s" queryid="126" query="SELECT * FROM users WHERE id = 123 /* comment ..." engine="postgres" cpu_time="%s"`,
126+
fmt.Sprintf(`level="info" clock_timestamp="%s" instance="test" app="testapp" client="127.0.0.1:5432" backend_type="client backend" backend_time="%s" state="active" pid="100" leader_pid="" user="testuser" userid="1000" datname="testdb" datid="1" xact_time="%s" xid="0" xmin="0" query_time="%s" queryid="126" query="SELECT * FROM users WHERE id = 123 /* comment ..." engine="postgres" cpu_time="%s"`,
127127
now.Format(time.RFC3339Nano),
128128
time.Duration(0).String(),
129129
time.Duration(0).String(),
@@ -141,7 +141,7 @@ func TestActivity_QueryRedaction(t *testing.T) {
141141
{"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "test"},
142142
},
143143
expectedLines: []string{
144-
fmt.Sprintf(`level="info" clock_timestamp="%s" instance="test" app="testapp" client="127.0.0.1:5432" backend_type="client backend" backend_time="%s" state="active" pid="100" user="testuser" userid="1000" datname="testdb" datid="1" xact_time="%s" xid="0" xmin="0" query_time="%s" queryid="127" query="SELECT u.id, u.name, p.role FROM users u JOIN permissions p ON u.id = p.user_id WHERE u.id IN (?, ?) AND p.role = ?" engine="postgres" cpu_time="%s"`,
144+
fmt.Sprintf(`level="info" clock_timestamp="%s" instance="test" app="testapp" client="127.0.0.1:5432" backend_type="client backend" backend_time="%s" state="active" pid="100" leader_pid="" user="testuser" userid="1000" datname="testdb" datid="1" xact_time="%s" xid="0" xmin="0" query_time="%s" queryid="127" query="SELECT u.id, u.name, p.role FROM users u JOIN permissions p ON u.id = p.user_id WHERE u.id IN (?, ?) AND p.role = ?" engine="postgres" cpu_time="%s"`,
145145
now.Format(time.RFC3339Nano),
146146
time.Duration(0).String(),
147147
time.Duration(0).String(),
@@ -159,7 +159,7 @@ func TestActivity_QueryRedaction(t *testing.T) {
159159
{"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "test"},
160160
},
161161
expectedLines: []string{
162-
fmt.Sprintf(`level="info" clock_timestamp="%s" instance="test" app="testapp" client="127.0.0.1:5432" backend_type="client backend" backend_time="%s" state="active" pid="100" user="testuser" userid="1000" datname="testdb" datid="1" xact_time="%s" xid="0" xmin="0" query_time="%s" queryid="128" query="INSERT INTO users (id, name, email) VALUES (?, ?, ?)" engine="postgres" cpu_time="%s"`,
162+
fmt.Sprintf(`level="info" clock_timestamp="%s" instance="test" app="testapp" client="127.0.0.1:5432" backend_type="client backend" backend_time="%s" state="active" pid="100" leader_pid="" user="testuser" userid="1000" datname="testdb" datid="1" xact_time="%s" xid="0" xmin="0" query_time="%s" queryid="128" query="INSERT INTO users (id, name, email) VALUES (?, ?, ?)" engine="postgres" cpu_time="%s"`,
163163
now.Format(time.RFC3339Nano),
164164
time.Duration(0).String(),
165165
time.Duration(0).String(),
@@ -178,7 +178,7 @@ func TestActivity_QueryRedaction(t *testing.T) {
178178
{"job": database_observability.JobName, "op": OP_WAIT_EVENT, "instance": "test"},
179179
},
180180
expectedLines: []string{
181-
fmt.Sprintf(`level="info" clock_timestamp="%s" instance="test" app="testapp" client="127.0.0.1:5432" backend_type="client backend" backend_time="%s" state="waiting" pid="100" user="testuser" userid="1000" datname="testdb" datid="1" xact_time="%s" xid="0" xmin="0" query_time="%s" queryid="125" query="SELECT * FROM users WHERE id = ?" engine="postgres"`,
181+
fmt.Sprintf(`level="info" clock_timestamp="%s" instance="test" app="testapp" client="127.0.0.1:5432" backend_type="client backend" backend_time="%s" state="waiting" pid="100" leader_pid="" user="testuser" userid="1000" datname="testdb" datid="1" xact_time="%s" xid="0" xmin="0" query_time="%s" queryid="125" query="SELECT * FROM users WHERE id = ?" engine="postgres"`,
182182
now.Format(time.RFC3339Nano),
183183
time.Duration(0).String(),
184184
time.Duration(0).String(),
@@ -303,7 +303,7 @@ func TestActivity_FetchActivity(t *testing.T) {
303303
{"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "test"},
304304
},
305305
expectedLines: []string{
306-
fmt.Sprintf(`level="info" clock_timestamp="%s" instance="test" app="testapp" client="127.0.0.1:5432" backend_type="client backend" backend_time="%s" state="active" pid="100" user="testuser" userid="1000" datname="testdb" datid="1" xact_time="%s" xid="500" xmin="400" query_time="%s" queryid="123" query="SELECT * FROM users" engine="postgres" cpu_time="%s"`,
306+
fmt.Sprintf(`level="info" clock_timestamp="%s" instance="test" app="testapp" client="127.0.0.1:5432" backend_type="client backend" backend_time="%s" state="active" pid="100" leader_pid="" user="testuser" userid="1000" datname="testdb" datid="1" xact_time="%s" xid="500" xmin="400" query_time="%s" queryid="123" query="SELECT * FROM users" engine="postgres" cpu_time="%s"`,
307307
now.Format(time.RFC3339Nano),
308308
time.Duration(0).String(),
309309
time.Duration(0).String(),
@@ -374,7 +374,7 @@ func TestActivity_FetchActivity(t *testing.T) {
374374
{"job": database_observability.JobName, "op": OP_WAIT_EVENT, "instance": "test"},
375375
},
376376
expectedLines: []string{
377-
fmt.Sprintf(`level="info" clock_timestamp="%s" instance="test" app="testapp" client="127.0.0.1:5432" backend_type="client backend" backend_time="%s" state="waiting" pid="102" user="testuser" userid="1000" datname="testdb" datid="1" xact_time="%s" xid="0" xmin="0" query_time="%s" queryid="124" query="UPDATE users SET status = 'active'" engine="postgres"`,
377+
fmt.Sprintf(`level="info" clock_timestamp="%s" instance="test" app="testapp" client="127.0.0.1:5432" backend_type="client backend" backend_time="%s" state="waiting" pid="102" leader_pid="" user="testuser" userid="1000" datname="testdb" datid="1" xact_time="%s" xid="0" xmin="0" query_time="%s" queryid="124" query="UPDATE users SET status = 'active'" engine="postgres"`,
378378
now.Format(time.RFC3339Nano),
379379
time.Duration(0).String(),
380380
time.Duration(0).String(),

internal/component/database_observability/postgres/collector/query_tables_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,9 @@ func TestQueryTables(t *testing.T) {
372372
return collector.Stopped()
373373
}, 5*time.Second, 100*time.Millisecond)
374374

375+
// Give time for goroutines to clean up
376+
time.Sleep(100 * time.Millisecond)
377+
375378
err = mock.ExpectationsWereMet()
376379
require.NoError(t, err)
377380

@@ -442,6 +445,9 @@ func TestQueryTablesSQLDriverErrors(t *testing.T) {
442445
return collector.Stopped()
443446
}, 5*time.Second, 100*time.Millisecond)
444447

448+
// Give time for goroutines to clean up
449+
time.Sleep(100 * time.Millisecond)
450+
445451
err = mock.ExpectationsWereMet()
446452
require.NoError(t, err)
447453

@@ -502,6 +508,9 @@ func TestQueryTablesSQLDriverErrors(t *testing.T) {
502508
return collector.Stopped()
503509
}, 5*time.Second, 100*time.Millisecond)
504510

511+
// Give time for goroutines to clean up
512+
time.Sleep(100 * time.Millisecond)
513+
505514
err = mock.ExpectationsWereMet()
506515
require.NoError(t, err)
507516

@@ -560,6 +569,9 @@ func TestQueryTablesSQLDriverErrors(t *testing.T) {
560569
return collector.Stopped()
561570
}, 5*time.Second, 100*time.Millisecond)
562571

572+
// Give time for goroutines to clean up
573+
time.Sleep(100 * time.Millisecond)
574+
563575
err = mock.ExpectationsWereMet()
564576
require.NoError(t, err)
565577

0 commit comments

Comments
 (0)