Skip to content

Commit c57c90e

Browse files
committed
[extension/dbstorage] Add DB Transactions to dbstorage.Batch() method
1 parent 1f8c1ee commit c57c90e

File tree

8 files changed

+390
-28
lines changed

8 files changed

+390
-28
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: dbstorageextension
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add DB Transactions to dbstorage.Batch() method as it is expected by Storage API
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: []
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [api]

extension/storage/dbstorage/client.go

Lines changed: 55 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,12 @@ import (
99
"errors"
1010
"fmt"
1111

12+
"go.opentelemetry.io/collector/extension/xextension/storage"
13+
1214
// Postgres driver
1315
_ "github.com/jackc/pgx/v5/stdlib"
1416
// SQLite driver
1517
_ "github.com/mattn/go-sqlite3"
16-
"go.opentelemetry.io/collector/extension/xextension/storage"
1718
)
1819

1920
const (
@@ -33,7 +34,7 @@ type dbStorageClient struct {
3334

3435
func newClient(ctx context.Context, driverName string, db *sql.DB, tableName string) (*dbStorageClient, error) {
3536
createTableSQL := createTable
36-
if driverName == "sqlite" {
37+
if driverName == driverSqlite {
3738
createTableSQL = createTableSqlite
3839
}
3940
var err error
@@ -59,45 +60,37 @@ func newClient(ctx context.Context, driverName string, db *sql.DB, tableName str
5960

6061
// Get will retrieve data from storage that corresponds to the specified key
6162
func (c *dbStorageClient) Get(ctx context.Context, key string) ([]byte, error) {
62-
rows, err := c.getQuery.QueryContext(ctx, key)
63-
if err != nil {
64-
return nil, err
65-
}
66-
if !rows.Next() {
67-
return nil, nil
68-
}
69-
var result []byte
70-
err = rows.Scan(&result)
71-
if err != nil {
72-
return result, err
73-
}
74-
err = rows.Close()
75-
return result, err
63+
return c.get(ctx, key, nil)
7664
}
7765

7866
// Set will store data. The data can be retrieved using the same key
7967
func (c *dbStorageClient) Set(ctx context.Context, key string, value []byte) error {
80-
_, err := c.setQuery.ExecContext(ctx, key, value, value)
81-
return err
68+
return c.set(ctx, key, value, nil)
8269
}
8370

8471
// Delete will delete data associated with the specified key
8572
func (c *dbStorageClient) Delete(ctx context.Context, key string) error {
86-
_, err := c.deleteQuery.ExecContext(ctx, key)
87-
return err
73+
return c.delete(ctx, key, nil)
8874
}
8975

9076
// Batch executes the specified operations in order. Get operation results are updated in place
9177
func (c *dbStorageClient) Batch(ctx context.Context, ops ...*storage.Operation) error {
92-
var err error
78+
// Start a new transaction
79+
tx, err := c.db.BeginTx(ctx, nil)
80+
if err != nil {
81+
return err
82+
}
83+
//nolint:errcheck
84+
defer tx.Rollback()
85+
9386
for _, op := range ops {
9487
switch op.Type {
9588
case storage.Get:
96-
op.Value, err = c.Get(ctx, op.Key)
89+
op.Value, err = c.get(ctx, op.Key, tx)
9790
case storage.Set:
98-
err = c.Set(ctx, op.Key, op.Value)
91+
err = c.set(ctx, op.Key, op.Value, tx)
9992
case storage.Delete:
100-
err = c.Delete(ctx, op.Key)
93+
err = c.delete(ctx, op.Key, tx)
10194
default:
10295
return errors.New("wrong operation type")
10396
}
@@ -106,7 +99,8 @@ func (c *dbStorageClient) Batch(ctx context.Context, ops ...*storage.Operation)
10699
return err
107100
}
108101
}
109-
return err
102+
103+
return tx.Commit()
110104
}
111105

112106
// Close will close the database
@@ -119,3 +113,39 @@ func (c *dbStorageClient) Close(_ context.Context) error {
119113
}
120114
return c.getQuery.Close()
121115
}
116+
117+
func (c *dbStorageClient) get(ctx context.Context, key string, tx *sql.Tx) ([]byte, error) {
118+
rows, err := c.wrapTx(c.getQuery, tx).QueryContext(ctx, key)
119+
if err != nil {
120+
return nil, err
121+
}
122+
123+
if !rows.Next() {
124+
return nil, nil
125+
}
126+
127+
var result []byte
128+
if err := rows.Scan(&result); err != nil {
129+
return result, err
130+
}
131+
132+
return result, rows.Close()
133+
}
134+
135+
func (c *dbStorageClient) set(ctx context.Context, key string, value []byte, tx *sql.Tx) error {
136+
_, err := c.wrapTx(c.setQuery, tx).ExecContext(ctx, key, value, value)
137+
return err
138+
}
139+
140+
func (c *dbStorageClient) delete(ctx context.Context, key string, tx *sql.Tx) error {
141+
_, err := c.wrapTx(c.deleteQuery, tx).ExecContext(ctx, key)
142+
return err
143+
}
144+
145+
func (c *dbStorageClient) wrapTx(stmt *sql.Stmt, tx *sql.Tx) *sql.Stmt {
146+
if tx != nil {
147+
return tx.Stmt(stmt)
148+
}
149+
150+
return stmt
151+
}

0 commit comments

Comments
 (0)