Skip to content

Commit ee2d165

Browse files
[extension/dbstorage] Add DB Transactions to dbstorage.Batch() method (#37805)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description According to [Storage API documentation](https://github.com/open-telemetry/opentelemetry-collector/blob/main/extension/xextension/storage/README.md) `Batch` is expected to "execute several operations in a single transaction" Moreover, Persistent Queue in `exporterhelper` is actively using `storage.Batch()` with read+write/write+delete operations in single call, which is really needs to be in a single transaction to avoid accidental data inconsistency in Persistent Queue in case of unexpected service shutdown. For example [here](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterqueue/persistent_queue.go#L140) or [here](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterqueue/persistent_queue.go#L257) or [here](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterqueue/persistent_queue.go#L463) As currently supported SQlite and PostgreSQL driver natively support Transaction - this PR implements single transaction inside `storage.Batch()` call Also, I've added bunch of unit tests that were missing to ensure that show Storage API works as expected in `dbstorage` extension <!--Describe what testing was performed and which tests were added.--> #### Testing Respective Unit Tests were added --------- Co-authored-by: Sean Marciniak <[email protected]>
1 parent 589d24a commit ee2d165

File tree

9 files changed

+405
-30
lines changed

9 files changed

+405
-30
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: dbstorageextension
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add DB Transactions to dbstorage.Batch() method as it is expected by Storage API
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [37805]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [api]

extension/storage/dbstorage/client.go

Lines changed: 67 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
// SQLite driver
1515
_ "github.com/mattn/go-sqlite3"
1616
"go.opentelemetry.io/collector/extension/xextension/storage"
17+
"go.uber.org/zap"
1718
)
1819

1920
const (
@@ -25,15 +26,16 @@ const (
2526
)
2627

2728
type dbStorageClient struct {
29+
logger *zap.Logger
2830
db *sql.DB
2931
getQuery *sql.Stmt
3032
setQuery *sql.Stmt
3133
deleteQuery *sql.Stmt
3234
}
3335

34-
func newClient(ctx context.Context, driverName string, db *sql.DB, tableName string) (*dbStorageClient, error) {
36+
func newClient(ctx context.Context, logger *zap.Logger, db *sql.DB, driverName string, tableName string) (*dbStorageClient, error) {
3537
createTableSQL := createTable
36-
if driverName == "sqlite" {
38+
if driverName == driverSQLite {
3739
createTableSQL = createTableSqlite
3840
}
3941
var err error
@@ -54,50 +56,52 @@ func newClient(ctx context.Context, driverName string, db *sql.DB, tableName str
5456
if err != nil {
5557
return nil, err
5658
}
57-
return &dbStorageClient{db, selectQuery, setQuery, deleteQuery}, nil
59+
return &dbStorageClient{logger, db, selectQuery, setQuery, deleteQuery}, nil
5860
}
5961

6062
// Get will retrieve data from storage that corresponds to the specified key
6163
func (c *dbStorageClient) Get(ctx context.Context, key string) ([]byte, error) {
62-
rows, err := c.getQuery.QueryContext(ctx, key)
63-
if err != nil {
64-
return nil, err
65-
}
66-
if !rows.Next() {
67-
return nil, nil
68-
}
69-
var result []byte
70-
err = rows.Scan(&result)
71-
if err != nil {
72-
return result, err
73-
}
74-
err = rows.Close()
75-
return result, err
64+
return c.get(ctx, key, nil)
7665
}
7766

7867
// Set will store data. The data can be retrieved using the same key
7968
func (c *dbStorageClient) Set(ctx context.Context, key string, value []byte) error {
80-
_, err := c.setQuery.ExecContext(ctx, key, value, value)
81-
return err
69+
return c.set(ctx, key, value, nil)
8270
}
8371

8472
// Delete will delete data associated with the specified key
8573
func (c *dbStorageClient) Delete(ctx context.Context, key string) error {
86-
_, err := c.deleteQuery.ExecContext(ctx, key)
87-
return err
74+
return c.delete(ctx, key, nil)
8875
}
8976

9077
// Batch executes the specified operations in order. Get operation results are updated in place
9178
func (c *dbStorageClient) Batch(ctx context.Context, ops ...*storage.Operation) error {
92-
var err error
79+
// Start a new transaction
80+
tx, err := c.db.BeginTx(ctx, nil)
81+
if err != nil {
82+
return err
83+
}
84+
85+
// In case of any error we should roll back whole transaction to keep DB in consistent state
86+
// In case of successful commit - tx.Rollback() will be a no-op here as tx is already closed
87+
defer func() {
88+
// We should ignore error related already finished transaction here
89+
// It might happened, for example, if Context was canceled outside of Batch() function
90+
// in this case whole transaction will be rolled back by sql package and we'll receive ErrTxDone here,
91+
// which is actually not an issue because transaction was correctly closed with rollback
92+
if rollbackErr := tx.Rollback(); !errors.Is(rollbackErr, sql.ErrTxDone) {
93+
c.logger.Error("Failed to rollback Batch() transaction", zap.Error(rollbackErr))
94+
}
95+
}()
96+
9397
for _, op := range ops {
9498
switch op.Type {
9599
case storage.Get:
96-
op.Value, err = c.Get(ctx, op.Key)
100+
op.Value, err = c.get(ctx, op.Key, tx)
97101
case storage.Set:
98-
err = c.Set(ctx, op.Key, op.Value)
102+
err = c.set(ctx, op.Key, op.Value, tx)
99103
case storage.Delete:
100-
err = c.Delete(ctx, op.Key)
104+
err = c.delete(ctx, op.Key, tx)
101105
default:
102106
return errors.New("wrong operation type")
103107
}
@@ -106,7 +110,8 @@ func (c *dbStorageClient) Batch(ctx context.Context, ops ...*storage.Operation)
106110
return err
107111
}
108112
}
109-
return err
113+
114+
return tx.Commit()
110115
}
111116

112117
// Close will close the database
@@ -119,3 +124,39 @@ func (c *dbStorageClient) Close(_ context.Context) error {
119124
}
120125
return c.getQuery.Close()
121126
}
127+
128+
func (c *dbStorageClient) get(ctx context.Context, key string, tx *sql.Tx) ([]byte, error) {
129+
rows, err := c.wrapTx(c.getQuery, tx).QueryContext(ctx, key)
130+
if err != nil {
131+
return nil, err
132+
}
133+
134+
if !rows.Next() {
135+
return nil, nil
136+
}
137+
138+
var result []byte
139+
if err := rows.Scan(&result); err != nil {
140+
return result, err
141+
}
142+
143+
return result, rows.Close()
144+
}
145+
146+
func (c *dbStorageClient) set(ctx context.Context, key string, value []byte, tx *sql.Tx) error {
147+
_, err := c.wrapTx(c.setQuery, tx).ExecContext(ctx, key, value, value)
148+
return err
149+
}
150+
151+
func (c *dbStorageClient) delete(ctx context.Context, key string, tx *sql.Tx) error {
152+
_, err := c.wrapTx(c.deleteQuery, tx).ExecContext(ctx, key)
153+
return err
154+
}
155+
156+
func (c *dbStorageClient) wrapTx(stmt *sql.Stmt, tx *sql.Tx) *sql.Stmt {
157+
if tx != nil {
158+
return tx.Stmt(stmt)
159+
}
160+
161+
return stmt
162+
}

0 commit comments

Comments
 (0)