Skip to content

Commit d65307c

Browse files
committed
Added logging to tx rollback failure
Added few comments to clarify commit/rollback flow
1 parent c6641c6 commit d65307c

File tree

6 files changed

+28
-14
lines changed

6 files changed

+28
-14
lines changed

extension/storage/dbstorage/client.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
// SQLite driver
1515
_ "github.com/mattn/go-sqlite3"
1616
"go.opentelemetry.io/collector/extension/xextension/storage"
17+
"go.uber.org/zap"
1718
)
1819

1920
const (
@@ -25,15 +26,16 @@ const (
2526
)
2627

2728
type dbStorageClient struct {
29+
logger *zap.Logger
2830
db *sql.DB
2931
getQuery *sql.Stmt
3032
setQuery *sql.Stmt
3133
deleteQuery *sql.Stmt
3234
}
3335

34-
func newClient(ctx context.Context, driverName string, db *sql.DB, tableName string) (*dbStorageClient, error) {
36+
func newClient(ctx context.Context, logger *zap.Logger, db *sql.DB, driverName string, tableName string) (*dbStorageClient, error) {
3537
createTableSQL := createTable
36-
if driverName == driverSqlite {
38+
if driverName == driverSQLite {
3739
createTableSQL = createTableSqlite
3840
}
3941
var err error
@@ -54,7 +56,7 @@ func newClient(ctx context.Context, driverName string, db *sql.DB, tableName str
5456
if err != nil {
5557
return nil, err
5658
}
57-
return &dbStorageClient{db, selectQuery, setQuery, deleteQuery}, nil
59+
return &dbStorageClient{logger, db, selectQuery, setQuery, deleteQuery}, nil
5860
}
5961

6062
// Get will retrieve data from storage that corresponds to the specified key
@@ -79,8 +81,18 @@ func (c *dbStorageClient) Batch(ctx context.Context, ops ...*storage.Operation)
7981
if err != nil {
8082
return err
8183
}
82-
//nolint:errcheck
83-
defer tx.Rollback()
84+
85+
// In case of any error we should roll back whole transaction to keep DB in consistent state
86+
// In case of successful commit - tx.Rollback() will be a no-op here as tx is already closed
87+
defer func() {
88+
// We should ignore error related already finished transaction here
89+
// It might happened, for example, if Context was canceled outside of Batch() function
90+
// in this case whole transaction will be rolled back by sql package and we'll receive ErrTxDone here,
91+
// which is actually not an issue because transaction was correctly closed with rollback
92+
if rollbackErr := tx.Rollback(); !errors.Is(rollbackErr, sql.ErrTxDone) {
93+
c.logger.Error("Failed to rollback Batch() transaction", zap.Error(rollbackErr))
94+
}
95+
}()
8496

8597
for _, op := range ops {
8698
switch op.Type {

extension/storage/dbstorage/client_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/stretchr/testify/assert"
1515
"github.com/stretchr/testify/require"
1616
"go.opentelemetry.io/collector/extension/xextension/storage"
17+
"go.uber.org/zap"
1718
)
1819

1920
var testTableName = "exporter_otlp_test"
@@ -30,7 +31,7 @@ func Test_newClient(t *testing.T) {
3031
mock.ExpectPrepare(regexp.QuoteMeta(fmt.Sprintf(setQueryText, testTableName)))
3132
mock.ExpectPrepare(regexp.QuoteMeta(fmt.Sprintf(deleteQueryText, testTableName)))
3233

33-
_, err = newClient(context.Background(), driverPostgresql, db, testTableName)
34+
_, err = newClient(context.Background(), zap.L(), db, driverPostgreSQL, testTableName)
3435
assert.NoError(t, err)
3536
})
3637
t.Run("Should return client with Sqlite specific query(s)", func(t *testing.T) {
@@ -44,7 +45,7 @@ func Test_newClient(t *testing.T) {
4445
mock.ExpectPrepare(regexp.QuoteMeta(fmt.Sprintf(setQueryText, testTableName)))
4546
mock.ExpectPrepare(regexp.QuoteMeta(fmt.Sprintf(deleteQueryText, testTableName)))
4647

47-
_, err = newClient(context.Background(), driverSqlite, db, testTableName)
48+
_, err = newClient(context.Background(), zap.L(), db, driverSQLite, testTableName)
4849
assert.NoError(t, err)
4950
})
5051
}
@@ -277,6 +278,7 @@ func newTestClient(t *testing.T) (*dbStorageClient, sqlmock.Sqlmock) {
277278
require.NoError(t, err)
278279

279280
return &dbStorageClient{
281+
logger: zap.L(),
280282
db: db,
281283
getQuery: selectQuery,
282284
setQuery: setQuery,

extension/storage/dbstorage/config.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ import (
99
)
1010

1111
const (
12-
driverPostgresql = "pgx"
13-
driverSqlite = "sqlite3"
12+
driverPostgreSQL = "pgx"
13+
driverSQLite = "sqlite3"
1414
)
1515

1616
// Config defines configuration for dbstorage extension.
@@ -27,7 +27,7 @@ func (cfg *Config) Validate() error {
2727
return errors.New("missing driver name")
2828
}
2929

30-
if cfg.DriverName != driverPostgresql && cfg.DriverName != driverSqlite {
30+
if cfg.DriverName != driverPostgreSQL && cfg.DriverName != driverSQLite {
3131
return fmt.Errorf("unsupported driver %s", cfg.DriverName)
3232
}
3333

extension/storage/dbstorage/config_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func TestConfigValidate(t *testing.T) {
3434
},
3535
{
3636
"Valid",
37-
Config{DriverName: driverSqlite, DataSource: "bar"},
37+
Config{DriverName: driverSQLite, DataSource: "bar"},
3838
nil,
3939
},
4040
}

extension/storage/dbstorage/extension.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func (ds *databaseStorage) GetClient(ctx context.Context, kind component.Kind, e
6464
fullName = fmt.Sprintf("%s_%s_%s_%s", kindString(kind), ent.Type(), ent.Name(), name)
6565
}
6666
fullName = strings.ReplaceAll(fullName, " ", "")
67-
return newClient(ctx, ds.driverName, ds.db, fullName)
67+
return newClient(ctx, ds.logger, ds.db, ds.driverName, fullName)
6868
}
6969

7070
func kindString(k component.Kind) string {

extension/storage/dbstorage/extension_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ func testExtensionIntegrity(t *testing.T, se storage.Extension) {
120120
func newSqliteTestExtension(t *testing.T) storage.Extension {
121121
f := NewFactory()
122122
cfg := f.CreateDefaultConfig().(*Config)
123-
cfg.DriverName = driverSqlite
123+
cfg.DriverName = driverSQLite
124124
cfg.DataSource = fmt.Sprintf("file:%s/foo.db?_busy_timeout=10000&_journal=WAL&_sync=NORMAL", t.TempDir())
125125

126126
extension, err := f.Create(context.Background(), extensiontest.NewNopSettings(), cfg)
@@ -162,7 +162,7 @@ func newPostgresTestExtension(t *testing.T) storage.Extension {
162162
})
163163
f := NewFactory()
164164
cfg := f.CreateDefaultConfig().(*Config)
165-
cfg.DriverName = driverPostgresql
165+
cfg.DriverName = driverPostgreSQL
166166
cfg.DataSource = fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=disable", "127.0.0.1", port.Port(), "root", "passwd", "db")
167167

168168
extension, err := f.Create(context.Background(), extensiontest.NewNopSettings(), cfg)

0 commit comments

Comments
 (0)