From 5fd30425ca3cef76df236d029a2b5905ba44f824 Mon Sep 17 00:00:00 2001 From: AnaNek Date: Tue, 19 Jul 2022 18:27:42 +0300 Subject: [PATCH 1/2] code health: place `fill*` functions in the beginning of the files File `requests.go` has `fill*` functions as well as `prepare.go` file. We should sync with `requests.go` and place `fill*` functions in the beginning of `prepare.go` file. Follows up #117 Part of #101 --- prepared.go | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/prepared.go b/prepared.go index 6a41538ed..0f9303344 100644 --- a/prepared.go +++ b/prepared.go @@ -20,6 +20,26 @@ type Prepared struct { Conn *Connection } +func fillPrepare(enc *msgpack.Encoder, expr string) error { + enc.EncodeMapLen(1) + enc.EncodeUint64(KeySQLText) + return enc.EncodeString(expr) +} + +func fillUnprepare(enc *msgpack.Encoder, stmt Prepared) error { + enc.EncodeMapLen(1) + enc.EncodeUint64(KeyStmtID) + return enc.EncodeUint64(uint64(stmt.StatementID)) +} + +func fillExecutePrepared(enc *msgpack.Encoder, stmt Prepared, args interface{}) error { + enc.EncodeMapLen(2) + enc.EncodeUint64(KeyStmtID) + enc.EncodeUint64(uint64(stmt.StatementID)) + enc.EncodeUint64(KeySQLBind) + return encodeSQLBind(enc, args) +} + // NewPreparedFromResponse constructs a Prepared object. func NewPreparedFromResponse(conn *Connection, resp *Response) (*Prepared, error) { if resp == nil { @@ -150,23 +170,3 @@ func (req *ExecutePreparedRequest) Context(ctx context.Context) *ExecutePrepared req.ctx = ctx return req } - -func fillPrepare(enc *msgpack.Encoder, expr string) error { - enc.EncodeMapLen(1) - enc.EncodeUint64(KeySQLText) - return enc.EncodeString(expr) -} - -func fillUnprepare(enc *msgpack.Encoder, stmt Prepared) error { - enc.EncodeMapLen(1) - enc.EncodeUint64(KeyStmtID) - return enc.EncodeUint64(uint64(stmt.StatementID)) -} - -func fillExecutePrepared(enc *msgpack.Encoder, stmt Prepared, args interface{}) error { - enc.EncodeMapLen(2) - enc.EncodeUint64(KeyStmtID) - enc.EncodeUint64(uint64(stmt.StatementID)) - enc.EncodeUint64(KeySQLBind) - return encodeSQLBind(enc, args) -} From cc38aedc529726be1ec927bae05e8ab609774a2e Mon Sep 17 00:00:00 2001 From: AnaNek Date: Fri, 10 Jun 2022 13:26:57 +0300 Subject: [PATCH 2/2] streams: interactive transactions and support The main purpose of streams is transactions via iproto. Since v. 2.10.0, Tarantool supports streams and interactive transactions over them. Each stream can start its own transaction, so they allows multiplexing several transactions over one connection. API for this feature is the following: * `NewStream()` method to create a stream object for `Connection` and `NewStream(userMode Mode)` method to create a stream object for `ConnectionPool` * stream object `Stream` with `Do()` method and new request objects to work with stream, `BeginRequest` - start transaction via iproto stream; `CommitRequest` - commit transaction; `RollbackRequest` - rollback transaction. Closes #101 --- CHANGELOG.md | 1 + config.lua | 1 + connection.go | 47 ++- connection_pool/config.lua | 1 + connection_pool/connection_pool.go | 14 + connection_pool/connection_pool_test.go | 314 ++++++++++++++++++- connection_pool/example_test.go | 261 ++++++++++++++++ connector.go | 1 + const.go | 6 + example_test.go | 230 ++++++++++++++ export_test.go | 18 ++ multi/config.lua | 7 + multi/multi.go | 9 + multi/multi_test.go | 253 +++++++++++++++- request_test.go | 65 ++++ stream.go | 202 +++++++++++++ tarantool_test.go | 384 +++++++++++++++++++++++- test_helpers/main.go | 5 + test_helpers/utils.go | 30 ++ 19 files changed, 1816 insertions(+), 33 deletions(-) create mode 100644 stream.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a840c13c..97b6e07d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. - Support datetime type in msgpack (#118) - Prepared SQL statements (#117) - Context support for request objects (#48) +- Streams and interactive transactions support (#101) ### Changed diff --git a/config.lua b/config.lua index abea45742..9ec12f7d5 100644 --- a/config.lua +++ b/config.lua @@ -2,6 +2,7 @@ -- able to send requests until everything is configured. box.cfg{ work_dir = os.getenv("TEST_TNT_WORK_DIR"), + memtx_use_mvcc_engine = os.getenv("TEST_TNT_MEMTX_USE_MVCC_ENGINE") == 'true' or nil, } box.once("init", function() diff --git a/connection.go b/connection.go index e5595b5b1..a1e485ba8 100644 --- a/connection.go +++ b/connection.go @@ -20,6 +20,7 @@ import ( ) const requestsMap = 128 +const ignoreStreamId = 0 const ( connDisconnected = 0 connConnected = 1 @@ -143,6 +144,8 @@ type Connection struct { state uint32 dec *msgpack.Decoder lenbuf [PacketLengthBytes]byte + + lastStreamId uint64 } var _ = Connector(&Connection{}) // Check compatibility with connector interface. @@ -528,16 +531,27 @@ func (conn *Connection) dial() (err error) { return } -func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32, req Request, res SchemaResolver) (err error) { +func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32, + req Request, streamId uint64, res SchemaResolver) (err error) { hl := h.Len() - h.Write([]byte{ + + hMapLen := byte(0x82) // 2 element map. + if streamId != ignoreStreamId { + hMapLen = byte(0x83) // 3 element map. + } + hBytes := []byte{ 0xce, 0, 0, 0, 0, // Length. - 0x82, // 2 element map. + hMapLen, KeyCode, byte(req.Code()), // Request code. KeySync, 0xce, byte(reqid >> 24), byte(reqid >> 16), byte(reqid >> 8), byte(reqid), - }) + } + if streamId != ignoreStreamId { + hBytes = append(hBytes, KeyStreamId, byte(streamId)) + } + + h.Write(hBytes) if err = req.Body(res, enc); err != nil { return @@ -555,7 +569,7 @@ func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32, req Request, res Sch func (conn *Connection) writeAuthRequest(w *bufio.Writer, scramble []byte) (err error) { var packet smallWBuf req := newAuthRequest(conn.opts.User, string(scramble)) - err = pack(&packet, msgpack.NewEncoder(&packet), 0, req, conn.Schema) + err = pack(&packet, msgpack.NewEncoder(&packet), 0, req, ignoreStreamId, conn.Schema) if err != nil { return errors.New("auth: pack error " + err.Error()) @@ -869,7 +883,7 @@ func (conn *Connection) contextWatchdog(fut *Future, ctx context.Context) { } } -func (conn *Connection) send(req Request) *Future { +func (conn *Connection) send(req Request, streamId uint64) *Future { fut := conn.newFuture(req.Ctx()) if fut.ready == nil { return fut @@ -882,14 +896,14 @@ func (conn *Connection) send(req Request) *Future { default: } } - conn.putFuture(fut, req) + conn.putFuture(fut, req, streamId) if req.Ctx() != nil { go conn.contextWatchdog(fut, req.Ctx()) } return fut } -func (conn *Connection) putFuture(fut *Future, req Request) { +func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) { shardn := fut.requestId & (conn.opts.Concurrency - 1) shard := &conn.shard[shardn] shard.bufmut.Lock() @@ -906,7 +920,7 @@ func (conn *Connection) putFuture(fut *Future, req Request) { } blen := shard.buf.Len() reqid := fut.requestId - if err := pack(&shard.buf, shard.enc, reqid, req, conn.Schema); err != nil { + if err := pack(&shard.buf, shard.enc, reqid, req, streamId, conn.Schema); err != nil { shard.buf.Trunc(blen) shard.bufmut.Unlock() if f := conn.fetchFuture(reqid); f == fut { @@ -1095,7 +1109,7 @@ func (conn *Connection) Do(req Request) *Future { default: } } - return conn.send(req) + return conn.send(req, ignoreStreamId) } // ConfiguredTimeout returns a timeout from connection config. @@ -1121,3 +1135,16 @@ func (conn *Connection) NewPrepared(expr string) (*Prepared, error) { } return NewPreparedFromResponse(conn, resp) } + +// NewStream creates new Stream object for connection. +// +// Since v. 2.10.0, Tarantool supports streams and interactive transactions over them. +// To use interactive transactions, memtx_use_mvcc_engine box option should be set to true. +// Since 1.7.0 +func (conn *Connection) NewStream() (*Stream, error) { + next := atomic.AddUint64(&conn.lastStreamId, 1) + return &Stream{ + Id: next, + Conn: conn, + }, nil +} diff --git a/connection_pool/config.lua b/connection_pool/config.lua index fb3859297..4df392ba8 100644 --- a/connection_pool/config.lua +++ b/connection_pool/config.lua @@ -2,6 +2,7 @@ -- able to send requests until everything is configured. box.cfg{ work_dir = os.getenv("TEST_TNT_WORK_DIR"), + memtx_use_mvcc_engine = os.getenv("TEST_TNT_MEMTX_USE_MVCC_ENGINE") == 'true' or nil, } box.once("init", function() diff --git a/connection_pool/connection_pool.go b/connection_pool/connection_pool.go index ad2e936cc..1b080fc1f 100644 --- a/connection_pool/connection_pool.go +++ b/connection_pool/connection_pool.go @@ -544,6 +544,20 @@ func (connPool *ConnectionPool) Do(req tarantool.Request, userMode Mode) *tarant return conn.Do(req) } +// NewStream creates new Stream object for connection selected +// by userMode from connPool. +// +// Since v. 2.10.0, Tarantool supports streams and interactive transactions over them. +// To use interactive transactions, memtx_use_mvcc_engine box option should be set to true. +// Since 1.7.0 +func (connPool *ConnectionPool) NewStream(userMode Mode) (*tarantool.Stream, error) { + conn, err := connPool.getNextConnection(userMode) + if err != nil { + return nil, err + } + return conn.NewStream() +} + // // private // diff --git a/connection_pool/connection_pool_test.go b/connection_pool/connection_pool_test.go index 2e462e4eb..4dc3ac12c 100644 --- a/connection_pool/connection_pool_test.go +++ b/connection_pool/connection_pool_test.go @@ -1369,6 +1369,300 @@ func TestDoWithStrangerConn(t *testing.T) { } } +func TestStream_Commit(t *testing.T) { + var req tarantool.Request + var resp *tarantool.Response + var err error + + test_helpers.SkipIfStreamsUnsupported(t) + + roles := []bool{true, true, false, true, true} + + err = test_helpers.SetClusterRO(servers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + + connPool, err := connection_pool.Connect(servers, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + defer connPool.Close() + + stream, err := connPool.NewStream(connection_pool.PreferRW) + require.Nilf(t, err, "failed to create stream") + require.NotNilf(t, connPool, "stream is nil after NewStream") + + // Begin transaction + req = tarantool.NewBeginRequest() + resp, err = stream.Do(req).Get() + require.Nilf(t, err, "failed to Begin") + require.NotNilf(t, resp, "response is nil after Begin") + require.Equalf(t, tarantool.OkCode, resp.Code, "failed to Begin: wrong code returned") + + // Insert in stream + req = tarantool.NewInsertRequest(spaceName). + Tuple([]interface{}{"commit_key", "commit_value"}) + resp, err = stream.Do(req).Get() + require.Nilf(t, err, "failed to Insert") + require.NotNilf(t, resp, "response is nil after Insert") + require.Equalf(t, tarantool.OkCode, resp.Code, "failed to Insert: wrong code returned") + + // Connect to servers[2] to check if tuple + // was inserted outside of stream on RW instance + // before transaction commit + conn := test_helpers.ConnectWithValidation(t, servers[2], connOpts) + defer conn.Close() + + // Select not related to the transaction + // while transaction is not committed + // result of select is empty + selectReq := tarantool.NewSelectRequest(spaceNo). + Index(indexNo). + Limit(1). + Iterator(tarantool.IterEq). + Key([]interface{}{"commit_key"}) + resp, err = conn.Do(selectReq).Get() + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, 0, len(resp.Data), "response Data len != 0") + + // Select in stream + resp, err = stream.Do(selectReq).Get() + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, 1, len(resp.Data), "response Body len != 1 after Select") + + tpl, ok := resp.Data[0].([]interface{}) + require.Truef(t, ok, "unexpected body of Select") + require.Equalf(t, 2, len(tpl), "unexpected body of Select") + + key, ok := tpl[0].(string) + require.Truef(t, ok, "unexpected body of Select (0)") + require.Equalf(t, "commit_key", key, "unexpected body of Select (0)") + + value, ok := tpl[1].(string) + require.Truef(t, ok, "unexpected body of Select (1)") + require.Equalf(t, "commit_value", value, "unexpected body of Select (1)") + + // Commit transaction + req = tarantool.NewCommitRequest() + resp, err = stream.Do(req).Get() + require.Nilf(t, err, "failed to Commit") + require.NotNilf(t, resp, "response is nil after Commit") + require.Equalf(t, tarantool.OkCode, resp.Code, "failed to Commit: wrong code returned") + + // Select outside of transaction + resp, err = conn.Do(selectReq).Get() + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, len(resp.Data), 1, "response Body len != 1 after Select") + + tpl, ok = resp.Data[0].([]interface{}) + require.Truef(t, ok, "unexpected body of Select") + require.Equalf(t, 2, len(tpl), "unexpected body of Select") + + key, ok = tpl[0].(string) + require.Truef(t, ok, "unexpected body of Select (0)") + require.Equalf(t, "commit_key", key, "unexpected body of Select (0)") + + value, ok = tpl[1].(string) + require.Truef(t, ok, "unexpected body of Select (1)") + require.Equalf(t, "commit_value", value, "unexpected body of Select (1)") +} + +func TestStream_Rollback(t *testing.T) { + var req tarantool.Request + var resp *tarantool.Response + var err error + + test_helpers.SkipIfStreamsUnsupported(t) + + roles := []bool{true, true, false, true, true} + + err = test_helpers.SetClusterRO(servers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + + connPool, err := connection_pool.Connect(servers, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + defer connPool.Close() + + stream, err := connPool.NewStream(connection_pool.PreferRW) + require.Nilf(t, err, "failed to create stream") + require.NotNilf(t, connPool, "stream is nil after NewStream") + + // Begin transaction + req = tarantool.NewBeginRequest() + resp, err = stream.Do(req).Get() + require.Nilf(t, err, "failed to Begin") + require.NotNilf(t, resp, "response is nil after Begin") + require.Equalf(t, tarantool.OkCode, resp.Code, "failed to Begin: wrong code returned") + + // Insert in stream + req = tarantool.NewInsertRequest(spaceName). + Tuple([]interface{}{"rollback_key", "rollback_value"}) + resp, err = stream.Do(req).Get() + require.Nilf(t, err, "failed to Insert") + require.NotNilf(t, resp, "response is nil after Insert") + require.Equalf(t, tarantool.OkCode, resp.Code, "failed to Insert: wrong code returned") + + // Connect to servers[2] to check if tuple + // was not inserted outside of stream on RW instance + conn := test_helpers.ConnectWithValidation(t, servers[2], connOpts) + defer conn.Close() + + // Select not related to the transaction + // while transaction is not committed + // result of select is empty + selectReq := tarantool.NewSelectRequest(spaceNo). + Index(indexNo). + Limit(1). + Iterator(tarantool.IterEq). + Key([]interface{}{"rollback_key"}) + resp, err = conn.Do(selectReq).Get() + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, 0, len(resp.Data), "response Data len != 0") + + // Select in stream + resp, err = stream.Do(selectReq).Get() + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, 1, len(resp.Data), "response Body len != 1 after Select") + + tpl, ok := resp.Data[0].([]interface{}) + require.Truef(t, ok, "unexpected body of Select") + require.Equalf(t, 2, len(tpl), "unexpected body of Select") + + key, ok := tpl[0].(string) + require.Truef(t, ok, "unexpected body of Select (0)") + require.Equalf(t, "rollback_key", key, "unexpected body of Select (0)") + + value, ok := tpl[1].(string) + require.Truef(t, ok, "unexpected body of Select (1)") + require.Equalf(t, "rollback_value", value, "unexpected body of Select (1)") + + // Rollback transaction + req = tarantool.NewRollbackRequest() + resp, err = stream.Do(req).Get() + require.Nilf(t, err, "failed to Rollback") + require.NotNilf(t, resp, "response is nil after Rollback") + require.Equalf(t, tarantool.OkCode, resp.Code, "failed to Rollback: wrong code returned") + + // Select outside of transaction + resp, err = conn.Do(selectReq).Get() + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, 0, len(resp.Data), "response Body len != 0 after Select") + + // Select inside of stream after rollback + resp, err = stream.Do(selectReq).Get() + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, 0, len(resp.Data), "response Body len != 0 after Select") +} + +func TestStream_TxnIsolationLevel(t *testing.T) { + var req tarantool.Request + var resp *tarantool.Response + var err error + + txnIsolationLevels := []tarantool.TxnIsolationLevel{ + tarantool.DefaultIsolationLevel, + tarantool.ReadCommittedLevel, + tarantool.ReadConfirmedLevel, + tarantool.BestEffortLevel, + } + + test_helpers.SkipIfStreamsUnsupported(t) + + roles := []bool{true, true, false, true, true} + + err = test_helpers.SetClusterRO(servers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + + connPool, err := connection_pool.Connect(servers, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + defer connPool.Close() + + stream, err := connPool.NewStream(connection_pool.PreferRW) + require.Nilf(t, err, "failed to create stream") + require.NotNilf(t, connPool, "stream is nil after NewStream") + + // Connect to servers[2] to check if tuple + // was not inserted outside of stream on RW instance + conn := test_helpers.ConnectWithValidation(t, servers[2], connOpts) + defer conn.Close() + + for _, level := range txnIsolationLevels { + // Begin transaction + req = tarantool.NewBeginRequest().TxnIsolation(level).Timeout(500 * time.Millisecond) + resp, err = stream.Do(req).Get() + require.Nilf(t, err, "failed to Begin") + require.NotNilf(t, resp, "response is nil after Begin") + require.Equalf(t, tarantool.OkCode, resp.Code, "wrong code returned") + + // Insert in stream + req = tarantool.NewInsertRequest(spaceName). + Tuple([]interface{}{"level_key", "level_value"}) + resp, err = stream.Do(req).Get() + require.Nilf(t, err, "failed to Insert") + require.NotNilf(t, resp, "response is nil after Insert") + require.Equalf(t, tarantool.OkCode, resp.Code, "wrong code returned") + + // Select not related to the transaction + // while transaction is not committed + // result of select is empty + selectReq := tarantool.NewSelectRequest(spaceNo). + Index(indexNo). + Limit(1). + Iterator(tarantool.IterEq). + Key([]interface{}{"level_key"}) + resp, err = conn.Do(selectReq).Get() + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, 0, len(resp.Data), "response Data len != 0") + + // Select in stream + resp, err = stream.Do(selectReq).Get() + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, 1, len(resp.Data), "response Body len != 1 after Select") + + tpl, ok := resp.Data[0].([]interface{}) + require.Truef(t, ok, "unexpected body of Select") + require.Equalf(t, 2, len(tpl), "unexpected body of Select") + + key, ok := tpl[0].(string) + require.Truef(t, ok, "unexpected body of Select (0)") + require.Equalf(t, "level_key", key, "unexpected body of Select (0)") + + value, ok := tpl[1].(string) + require.Truef(t, ok, "unexpected body of Select (1)") + require.Equalf(t, "level_value", value, "unexpected body of Select (1)") + + // Rollback transaction + req = tarantool.NewRollbackRequest() + resp, err = stream.Do(req).Get() + require.Nilf(t, err, "failed to Rollback") + require.NotNilf(t, resp, "response is nil after Rollback") + require.Equalf(t, tarantool.OkCode, resp.Code, "wrong code returned") + + // Select outside of transaction + resp, err = conn.Do(selectReq).Get() + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, 0, len(resp.Data), "response Data len != 0") + + // Select inside of stream after rollback + resp, err = stream.Do(selectReq).Get() + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, 0, len(resp.Data), "response Data len != 0") + + test_helpers.DeleteRecordByKey(t, conn, spaceNo, indexNo, []interface{}{"level_key"}) + } +} + // runTestMain is a body of TestMain function // (see https://pkg.go.dev/testing#hdr-Main). // Using defer + os.Exit is not works so TestMain body @@ -1383,15 +1677,21 @@ func runTestMain(m *testing.M) int { "work_dir1", "work_dir2", "work_dir3", "work_dir4", "work_dir5"} - var err error + + // Tarantool supports streams and interactive transactions since version 2.10.0 + isStreamUnsupported, err := test_helpers.IsTarantoolVersionLess(2, 10, 0) + if err != nil { + log.Fatalf("Could not check the Tarantool version") + } instances, err = test_helpers.StartTarantoolInstances(servers, workDirs, test_helpers.StartOpts{ - InitScript: initScript, - User: connOpts.User, - Pass: connOpts.Pass, - WaitStart: waitStart, - ConnectRetry: connectRetry, - RetryTimeout: retryTimeout, + InitScript: initScript, + User: connOpts.User, + Pass: connOpts.Pass, + WaitStart: waitStart, + ConnectRetry: connectRetry, + RetryTimeout: retryTimeout, + MemtxUseMvccEngine: !isStreamUnsupported, }) if err != nil { diff --git a/connection_pool/example_test.go b/connection_pool/example_test.go index 08995d03e..7f6c34700 100644 --- a/connection_pool/example_test.go +++ b/connection_pool/example_test.go @@ -2,6 +2,7 @@ package connection_pool_test import ( "fmt" + "time" "github.com/tarantool/go-tarantool" "github.com/tarantool/go-tarantool/connection_pool" @@ -573,3 +574,263 @@ func ExampleConnectionPool_NewPrepared() { fmt.Printf("Failed to prepare") } } + +func ExampleCommitRequest() { + var req tarantool.Request + var resp *tarantool.Response + var err error + + // Tarantool supports streams and interactive transactions since version 2.10.0 + isLess, _ := test_helpers.IsTarantoolVersionLess(2, 10, 0) + if err != nil || isLess { + return + } + + pool, err := examplePool(testRoles) + if err != nil { + fmt.Println(err) + return + } + defer pool.Close() + + // example pool has only one rw instance + stream, err := pool.NewStream(connection_pool.RW) + if err != nil { + fmt.Println(err) + return + } + + // Begin transaction + req = tarantool.NewBeginRequest() + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Begin: %s", err.Error()) + return + } + fmt.Printf("Begin transaction: response is %#v\n", resp.Code) + + // Insert in stream + req = tarantool.NewInsertRequest(spaceName). + Tuple([]interface{}{"example_commit_key", "example_commit_value"}) + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Insert: %s", err.Error()) + return + } + fmt.Printf("Insert in stream: response is %#v\n", resp.Code) + + // Select not related to the transaction + // while transaction is not commited + // result of select is empty + selectReq := tarantool.NewSelectRequest(spaceNo). + Index(indexNo). + Limit(1). + Iterator(tarantool.IterEq). + Key([]interface{}{"example_commit_key"}) + resp, err = pool.Do(selectReq, connection_pool.RW).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select out of stream before commit: response is %#v\n", resp.Data) + + // Select in stream + resp, err = stream.Do(selectReq).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select in stream: response is %#v\n", resp.Data) + + // Commit transaction + req = tarantool.NewCommitRequest() + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Commit: %s", err.Error()) + return + } + fmt.Printf("Commit transaction: response is %#v\n", resp.Code) + + // Select outside of transaction + // example pool has only one rw instance + resp, err = pool.Do(selectReq, connection_pool.RW).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select after commit: response is %#v\n", resp.Data) +} + +func ExampleRollbackRequest() { + var req tarantool.Request + var resp *tarantool.Response + var err error + + // Tarantool supports streams and interactive transactions since version 2.10.0 + isLess, _ := test_helpers.IsTarantoolVersionLess(2, 10, 0) + if err != nil || isLess { + return + } + + // example pool has only one rw instance + pool, err := examplePool(testRoles) + if err != nil { + fmt.Println(err) + return + } + defer pool.Close() + + stream, err := pool.NewStream(connection_pool.RW) + if err != nil { + fmt.Println(err) + return + } + + // Begin transaction + req = tarantool.NewBeginRequest() + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Begin: %s", err.Error()) + return + } + fmt.Printf("Begin transaction: response is %#v\n", resp.Code) + + // Insert in stream + req = tarantool.NewInsertRequest(spaceName). + Tuple([]interface{}{"example_rollback_key", "example_rollback_value"}) + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Insert: %s", err.Error()) + return + } + fmt.Printf("Insert in stream: response is %#v\n", resp.Code) + + // Select not related to the transaction + // while transaction is not commited + // result of select is empty + selectReq := tarantool.NewSelectRequest(spaceNo). + Index(indexNo). + Limit(1). + Iterator(tarantool.IterEq). + Key([]interface{}{"example_rollback_key"}) + resp, err = pool.Do(selectReq, connection_pool.RW).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select out of stream: response is %#v\n", resp.Data) + + // Select in stream + resp, err = stream.Do(selectReq).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select in stream: response is %#v\n", resp.Data) + + // Rollback transaction + req = tarantool.NewRollbackRequest() + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Rollback: %s", err.Error()) + return + } + fmt.Printf("Rollback transaction: response is %#v\n", resp.Code) + + // Select outside of transaction + // example pool has only one rw instance + resp, err = pool.Do(selectReq, connection_pool.RW).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select after Rollback: response is %#v\n", resp.Data) +} + +func ExampleBeginRequest_TxnIsolation() { + var req tarantool.Request + var resp *tarantool.Response + var err error + + // Tarantool supports streams and interactive transactions since version 2.10.0 + isLess, _ := test_helpers.IsTarantoolVersionLess(2, 10, 0) + if err != nil || isLess { + return + } + + // example pool has only one rw instance + pool, err := examplePool(testRoles) + if err != nil { + fmt.Println(err) + return + } + defer pool.Close() + + stream, err := pool.NewStream(connection_pool.RW) + if err != nil { + fmt.Println(err) + return + } + + // Begin transaction + req = tarantool.NewBeginRequest(). + TxnIsolation(tarantool.ReadConfirmedLevel). + Timeout(500 * time.Millisecond) + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Begin: %s", err.Error()) + return + } + fmt.Printf("Begin transaction: response is %#v\n", resp.Code) + + // Insert in stream + req = tarantool.NewInsertRequest(spaceName). + Tuple([]interface{}{"isolation_level_key", "isolation_level_value"}) + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Insert: %s", err.Error()) + return + } + fmt.Printf("Insert in stream: response is %#v\n", resp.Code) + + // Select not related to the transaction + // while transaction is not commited + // result of select is empty + selectReq := tarantool.NewSelectRequest(spaceNo). + Index(indexNo). + Limit(1). + Iterator(tarantool.IterEq). + Key([]interface{}{"isolation_level_key"}) + resp, err = pool.Do(selectReq, connection_pool.RW).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select out of stream: response is %#v\n", resp.Data) + + // Select in stream + resp, err = stream.Do(selectReq).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select in stream: response is %#v\n", resp.Data) + + // Rollback transaction + req = tarantool.NewRollbackRequest() + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Rollback: %s", err.Error()) + return + } + fmt.Printf("Rollback transaction: response is %#v\n", resp.Code) + + // Select outside of transaction + // example pool has only one rw instance + resp, err = pool.Do(selectReq, connection_pool.RW).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select after Rollback: response is %#v\n", resp.Data) +} diff --git a/connector.go b/connector.go index 3084b9124..d6c44c8dd 100644 --- a/connector.go +++ b/connector.go @@ -45,6 +45,7 @@ type Connector interface { ExecuteAsync(expr string, args interface{}) *Future NewPrepared(expr string) (*Prepared, error) + NewStream() (*Stream, error) Do(req Request) (fut *Future) } diff --git a/const.go b/const.go index 3d0d7424f..4a3cb6833 100644 --- a/const.go +++ b/const.go @@ -13,11 +13,15 @@ const ( Call17RequestCode = 10 /* call in >= 1.7 format */ ExecuteRequestCode = 11 PrepareRequestCode = 13 + BeginRequestCode = 14 + CommitRequestCode = 15 + RollbackRequestCode = 16 PingRequestCode = 64 SubscribeRequestCode = 66 KeyCode = 0x00 KeySync = 0x01 + KeyStreamId = 0x0a KeySpaceNo = 0x10 KeyIndexNo = 0x11 KeyLimit = 0x12 @@ -37,6 +41,8 @@ const ( KeySQLBind = 0x41 KeySQLInfo = 0x42 KeyStmtID = 0x43 + KeyTimeout = 0x56 + KeyTxnIsolation = 0x59 KeyFieldName = 0x00 KeyFieldType = 0x01 diff --git a/example_test.go b/example_test.go index cd4c7874c..df7dad770 100644 --- a/example_test.go +++ b/example_test.go @@ -228,6 +228,236 @@ func ExampleUpsertRequest() { // response is []interface {}{[]interface {}{0x459, "first", "updated"}} } +func ExampleCommitRequest() { + var req tarantool.Request + var resp *tarantool.Response + var err error + + // Tarantool supports streams and interactive transactions since version 2.10.0 + isLess, _ := test_helpers.IsTarantoolVersionLess(2, 10, 0) + if err != nil || isLess { + return + } + + conn := example_connect() + defer conn.Close() + + stream, _ := conn.NewStream() + + // Begin transaction + req = tarantool.NewBeginRequest() + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Begin: %s", err.Error()) + return + } + fmt.Printf("Begin transaction: response is %#v\n", resp.Code) + + // Insert in stream + req = tarantool.NewInsertRequest(spaceName). + Tuple([]interface{}{uint(1001), "commit_hello", "commit_world"}) + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Insert: %s", err.Error()) + return + } + fmt.Printf("Insert in stream: response is %#v\n", resp.Code) + + // Select not related to the transaction + // while transaction is not commited + // result of select is empty + selectReq := tarantool.NewSelectRequest(spaceNo). + Index(indexNo). + Limit(1). + Iterator(tarantool.IterEq). + Key([]interface{}{uint(1001)}) + resp, err = conn.Do(selectReq).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select out of stream before commit: response is %#v\n", resp.Data) + + // Select in stream + resp, err = stream.Do(selectReq).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select in stream: response is %#v\n", resp.Data) + + // Commit transaction + req = tarantool.NewCommitRequest() + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Commit: %s", err.Error()) + return + } + fmt.Printf("Commit transaction: response is %#v\n", resp.Code) + + // Select outside of transaction + resp, err = conn.Do(selectReq).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select after commit: response is %#v\n", resp.Data) +} + +func ExampleRollbackRequest() { + var req tarantool.Request + var resp *tarantool.Response + var err error + + // Tarantool supports streams and interactive transactions since version 2.10.0 + isLess, _ := test_helpers.IsTarantoolVersionLess(2, 10, 0) + if err != nil || isLess { + return + } + + conn := example_connect() + defer conn.Close() + + stream, _ := conn.NewStream() + + // Begin transaction + req = tarantool.NewBeginRequest() + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Begin: %s", err.Error()) + return + } + fmt.Printf("Begin transaction: response is %#v\n", resp.Code) + + // Insert in stream + req = tarantool.NewInsertRequest(spaceName). + Tuple([]interface{}{uint(2001), "rollback_hello", "rollback_world"}) + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Insert: %s", err.Error()) + return + } + fmt.Printf("Insert in stream: response is %#v\n", resp.Code) + + // Select not related to the transaction + // while transaction is not commited + // result of select is empty + selectReq := tarantool.NewSelectRequest(spaceNo). + Index(indexNo). + Limit(1). + Iterator(tarantool.IterEq). + Key([]interface{}{uint(2001)}) + resp, err = conn.Do(selectReq).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select out of stream: response is %#v\n", resp.Data) + + // Select in stream + resp, err = stream.Do(selectReq).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select in stream: response is %#v\n", resp.Data) + + // Rollback transaction + req = tarantool.NewRollbackRequest() + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Rollback: %s", err.Error()) + return + } + fmt.Printf("Rollback transaction: response is %#v\n", resp.Code) + + // Select outside of transaction + resp, err = conn.Do(selectReq).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select after Rollback: response is %#v\n", resp.Data) +} + +func ExampleBeginRequest_TxnIsolation() { + var req tarantool.Request + var resp *tarantool.Response + var err error + + // Tarantool supports streams and interactive transactions since version 2.10.0 + isLess, _ := test_helpers.IsTarantoolVersionLess(2, 10, 0) + if err != nil || isLess { + return + } + + conn := example_connect() + defer conn.Close() + + stream, _ := conn.NewStream() + + // Begin transaction + req = tarantool.NewBeginRequest(). + TxnIsolation(tarantool.ReadConfirmedLevel). + Timeout(500 * time.Millisecond) + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Begin: %s", err.Error()) + return + } + fmt.Printf("Begin transaction: response is %#v\n", resp.Code) + + // Insert in stream + req = tarantool.NewInsertRequest(spaceName). + Tuple([]interface{}{uint(2001), "rollback_hello", "rollback_world"}) + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Insert: %s", err.Error()) + return + } + fmt.Printf("Insert in stream: response is %#v\n", resp.Code) + + // Select not related to the transaction + // while transaction is not commited + // result of select is empty + selectReq := tarantool.NewSelectRequest(spaceNo). + Index(indexNo). + Limit(1). + Iterator(tarantool.IterEq). + Key([]interface{}{uint(2001)}) + resp, err = conn.Do(selectReq).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select out of stream: response is %#v\n", resp.Data) + + // Select in stream + resp, err = stream.Do(selectReq).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select in stream: response is %#v\n", resp.Data) + + // Rollback transaction + req = tarantool.NewRollbackRequest() + resp, err = stream.Do(req).Get() + if err != nil { + fmt.Printf("Failed to Rollback: %s", err.Error()) + return + } + fmt.Printf("Rollback transaction: response is %#v\n", resp.Code) + + // Select outside of transaction + resp, err = conn.Do(selectReq).Get() + if err != nil { + fmt.Printf("Failed to Select: %s", err.Error()) + return + } + fmt.Printf("Select after Rollback: response is %#v\n", resp.Data) +} + func ExampleFuture_GetIterator() { conn := example_connect() defer conn.Close() diff --git a/export_test.go b/export_test.go index 315f444de..0492c1a5b 100644 --- a/export_test.go +++ b/export_test.go @@ -93,3 +93,21 @@ func RefImplExecutePreparedBody(enc *msgpack.Encoder, stmt Prepared, args interf func RefImplUnprepareBody(enc *msgpack.Encoder, stmt Prepared) error { return fillUnprepare(enc, stmt) } + +// RefImplBeginBody is reference implementation for filling of an begin +// request's body. +func RefImplBeginBody(enc *msgpack.Encoder, txnIsolation TxnIsolationLevel, timeout time.Duration) error { + return fillBegin(enc, txnIsolation, timeout) +} + +// RefImplCommitBody is reference implementation for filling of an commit +// request's body. +func RefImplCommitBody(enc *msgpack.Encoder) error { + return fillCommit(enc) +} + +// RefImplRollbackBody is reference implementation for filling of an rollback +// request's body. +func RefImplRollbackBody(enc *msgpack.Encoder) error { + return fillRollback(enc) +} diff --git a/multi/config.lua b/multi/config.lua index 5d75da513..0f032b204 100644 --- a/multi/config.lua +++ b/multi/config.lua @@ -4,6 +4,7 @@ local nodes_load = require("config_load_nodes") -- able to send requests until everything is configured. box.cfg{ work_dir = os.getenv("TEST_TNT_WORK_DIR"), + memtx_use_mvcc_engine = os.getenv("TEST_TNT_MEMTX_USE_MVCC_ENGINE") == 'true' or nil, } -- Function to call for getting address list, part of tarantool/multi API. @@ -11,6 +12,12 @@ local get_cluster_nodes = nodes_load.get_cluster_nodes rawset(_G, 'get_cluster_nodes', get_cluster_nodes) box.once("init", function() + local s = box.schema.space.create('test', { + id = 517, + if_not_exists = true, + }) + s:create_index('primary', {type = 'tree', parts = {1, 'uint'}, if_not_exists = true}) + box.schema.user.create('test', { password = 'test' }) box.schema.user.grant('test', 'read,write,execute', 'universe') diff --git a/multi/multi.go b/multi/multi.go index 03531a817..67f450c5c 100644 --- a/multi/multi.go +++ b/multi/multi.go @@ -498,6 +498,15 @@ func (connMulti *ConnectionMulti) NewPrepared(expr string) (*tarantool.Prepared, return connMulti.getCurrentConnection().NewPrepared(expr) } +// NewStream creates new Stream object for connection. +// +// Since v. 2.10.0, Tarantool supports streams and interactive transactions over them. +// To use interactive transactions, memtx_use_mvcc_engine box option should be set to true. +// Since 1.7.0 +func (connMulti *ConnectionMulti) NewStream() (*tarantool.Stream, error) { + return connMulti.getCurrentConnection().NewStream() +} + // Do sends the request and returns a future. func (connMulti *ConnectionMulti) Do(req tarantool.Request) *tarantool.Future { if connectedReq, ok := req.(tarantool.ConnectedRequest); ok { diff --git a/multi/multi_test.go b/multi/multi_test.go index 628a2ab28..b4cdf18af 100644 --- a/multi/multi_test.go +++ b/multi/multi_test.go @@ -16,6 +16,9 @@ import ( var server1 = "127.0.0.1:3013" var server2 = "127.0.0.1:3014" +var spaceNo = uint32(517) +var spaceName = "test" +var indexNo = uint32(0) var connOpts = tarantool.Opts{ Timeout: 500 * time.Millisecond, User: "test", @@ -318,6 +321,233 @@ func TestDoWithStrangerConn(t *testing.T) { } } +func TestStream_Commit(t *testing.T) { + var req tarantool.Request + var resp *tarantool.Response + var err error + + test_helpers.SkipIfStreamsUnsupported(t) + + multiConn, err := Connect([]string{server1, server2}, connOpts) + if err != nil { + t.Fatalf("Failed to connect: %s", err.Error()) + } + if multiConn == nil { + t.Fatalf("conn is nil after Connect") + } + defer multiConn.Close() + + stream, _ := multiConn.NewStream() + + // Begin transaction + req = tarantool.NewBeginRequest() + resp, err = stream.Do(req).Get() + if err != nil { + t.Fatalf("Failed to Begin: %s", err.Error()) + } + if resp.Code != tarantool.OkCode { + t.Fatalf("Failed to Begin: wrong code returned %d", resp.Code) + } + + // Insert in stream + req = tarantool.NewInsertRequest(spaceName). + Tuple([]interface{}{uint(1001), "hello2", "world2"}) + resp, err = stream.Do(req).Get() + if err != nil { + t.Fatalf("Failed to Insert: %s", err.Error()) + } + if resp.Code != tarantool.OkCode { + t.Errorf("Failed to Insert: wrong code returned %d", resp.Code) + } + defer test_helpers.DeleteRecordByKey(t, multiConn, spaceNo, indexNo, []interface{}{uint(1001)}) + + // Select not related to the transaction + // while transaction is not committed + // result of select is empty + selectReq := tarantool.NewSelectRequest(spaceNo). + Index(indexNo). + Limit(1). + Iterator(tarantool.IterEq). + Key([]interface{}{uint(1001)}) + resp, err = multiConn.Do(selectReq).Get() + if err != nil { + t.Fatalf("Failed to Select: %s", err.Error()) + } + if resp == nil { + t.Fatalf("Response is nil after Select") + } + if len(resp.Data) != 0 { + t.Fatalf("Response Data len != 0") + } + + // Select in stream + resp, err = stream.Do(selectReq).Get() + if err != nil { + t.Fatalf("Failed to Select: %s", err.Error()) + } + if resp == nil { + t.Fatalf("Response is nil after Select") + } + if len(resp.Data) != 1 { + t.Fatalf("Response Data len != 1") + } + if tpl, ok := resp.Data[0].([]interface{}); !ok { + t.Fatalf("Unexpected body of Select") + } else { + if id, ok := tpl[0].(uint64); !ok || id != 1001 { + t.Fatalf("Unexpected body of Select (0)") + } + if h, ok := tpl[1].(string); !ok || h != "hello2" { + t.Fatalf("Unexpected body of Select (1)") + } + if h, ok := tpl[2].(string); !ok || h != "world2" { + t.Fatalf("Unexpected body of Select (2)") + } + } + + // Commit transaction + req = tarantool.NewCommitRequest() + resp, err = stream.Do(req).Get() + if err != nil { + t.Fatalf("Failed to Commit: %s", err.Error()) + } + if resp.Code != tarantool.OkCode { + t.Fatalf("Failed to Commit: wrong code returned %d", resp.Code) + } + + // Select outside of transaction + resp, err = multiConn.Do(selectReq).Get() + if err != nil { + t.Fatalf("Failed to Select: %s", err.Error()) + } + if resp == nil { + t.Fatalf("Response is nil after Select") + } + if len(resp.Data) != 1 { + t.Fatalf("Response Data len != 1") + } + if tpl, ok := resp.Data[0].([]interface{}); !ok { + t.Fatalf("Unexpected body of Select") + } else { + if id, ok := tpl[0].(uint64); !ok || id != 1001 { + t.Fatalf("Unexpected body of Select (0)") + } + if h, ok := tpl[1].(string); !ok || h != "hello2" { + t.Fatalf("Unexpected body of Select (1)") + } + if h, ok := tpl[2].(string); !ok || h != "world2" { + t.Fatalf("Unexpected body of Select (2)") + } + } +} + +func TestStream_Rollback(t *testing.T) { + var req tarantool.Request + var resp *tarantool.Response + var err error + + test_helpers.SkipIfStreamsUnsupported(t) + + multiConn, err := Connect([]string{server1, server2}, connOpts) + if err != nil { + t.Fatalf("Failed to connect: %s", err.Error()) + } + if multiConn == nil { + t.Fatalf("conn is nil after Connect") + } + defer multiConn.Close() + + stream, _ := multiConn.NewStream() + + // Begin transaction + req = tarantool.NewBeginRequest() + resp, err = stream.Do(req).Get() + if err != nil { + t.Fatalf("Failed to Begin: %s", err.Error()) + } + if resp.Code != tarantool.OkCode { + t.Fatalf("Failed to Begin: wrong code returned %d", resp.Code) + } + + // Insert in stream + req = tarantool.NewInsertRequest(spaceName). + Tuple([]interface{}{uint(1001), "hello2", "world2"}) + resp, err = stream.Do(req).Get() + if err != nil { + t.Fatalf("Failed to Insert: %s", err.Error()) + } + if resp.Code != tarantool.OkCode { + t.Errorf("Failed to Insert: wrong code returned %d", resp.Code) + } + defer test_helpers.DeleteRecordByKey(t, multiConn, spaceNo, indexNo, []interface{}{uint(1001)}) + + // Select not related to the transaction + // while transaction is not committed + // result of select is empty + selectReq := tarantool.NewSelectRequest(spaceNo). + Index(indexNo). + Limit(1). + Iterator(tarantool.IterEq). + Key([]interface{}{uint(1001)}) + resp, err = multiConn.Do(selectReq).Get() + if err != nil { + t.Fatalf("Failed to Select: %s", err.Error()) + } + if resp == nil { + t.Fatalf("Response is nil after Select") + } + if len(resp.Data) != 0 { + t.Fatalf("Response Data len != 0") + } + + // Select in stream + resp, err = stream.Do(selectReq).Get() + if err != nil { + t.Fatalf("Failed to Select: %s", err.Error()) + } + if resp == nil { + t.Fatalf("Response is nil after Select") + } + if len(resp.Data) != 1 { + t.Fatalf("Response Data len != 1") + } + if tpl, ok := resp.Data[0].([]interface{}); !ok { + t.Fatalf("Unexpected body of Select") + } else { + if id, ok := tpl[0].(uint64); !ok || id != 1001 { + t.Fatalf("Unexpected body of Select (0)") + } + if h, ok := tpl[1].(string); !ok || h != "hello2" { + t.Fatalf("Unexpected body of Select (1)") + } + if h, ok := tpl[2].(string); !ok || h != "world2" { + t.Fatalf("Unexpected body of Select (2)") + } + } + + // Rollback transaction + req = tarantool.NewRollbackRequest() + resp, err = stream.Do(req).Get() + if err != nil { + t.Fatalf("Failed to Rollback: %s", err.Error()) + } + if resp.Code != tarantool.OkCode { + t.Fatalf("Failed to Rollback: wrong code returned %d", resp.Code) + } + + // Select outside of transaction + resp, err = multiConn.Do(selectReq).Get() + if err != nil { + t.Fatalf("Failed to Select: %s", err.Error()) + } + if resp == nil { + t.Fatalf("Response is nil after Select") + } + if len(resp.Data) != 0 { + t.Fatalf("Response Data len != 0") + } +} + // runTestMain is a body of TestMain function // (see https://pkg.go.dev/testing#hdr-Main). // Using defer + os.Exit is not works so TestMain body @@ -329,15 +559,22 @@ func runTestMain(m *testing.M) int { var connectRetry uint = 3 retryTimeout := 500 * time.Millisecond + // Tarantool supports streams and interactive transactions since version 2.10.0 + isStreamUnsupported, err := test_helpers.IsTarantoolVersionLess(2, 10, 0) + if err != nil { + log.Fatalf("Could not check the Tarantool version") + } + inst1, err := test_helpers.StartTarantool(test_helpers.StartOpts{ - InitScript: initScript, - Listen: server1, - WorkDir: "work_dir1", - User: connOpts.User, - Pass: connOpts.Pass, - WaitStart: waitStart, - ConnectRetry: connectRetry, - RetryTimeout: retryTimeout, + InitScript: initScript, + Listen: server1, + WorkDir: "work_dir1", + User: connOpts.User, + Pass: connOpts.Pass, + WaitStart: waitStart, + ConnectRetry: connectRetry, + RetryTimeout: retryTimeout, + MemtxUseMvccEngine: !isStreamUnsupported, }) defer test_helpers.StopTarantoolWithCleanup(inst1) diff --git a/request_test.go b/request_test.go index 7c1805155..b1a558b59 100644 --- a/request_test.go +++ b/request_test.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" "testing" + "time" "github.com/stretchr/testify/assert" @@ -22,6 +23,11 @@ const validExpr = "any string" // We don't check the value here. const defaultSpace = 0 // And valid too. const defaultIndex = 0 // And valid too. +const defaultIsolationLevel = DefaultIsolationLevel +const defaultTimeout = 0 + +const validTimeout = 500 * time.Millisecond + var validStmt *Prepared = &Prepared{StatementID: 1, Conn: &Connection{}} type ValidSchemeResolver struct { @@ -175,6 +181,9 @@ func TestRequestsCodes(t *testing.T) { {req: NewPrepareRequest(validExpr), code: PrepareRequestCode}, {req: NewUnprepareRequest(validStmt), code: PrepareRequestCode}, {req: NewExecutePreparedRequest(validStmt), code: ExecuteRequestCode}, + {req: NewBeginRequest(), code: BeginRequestCode}, + {req: NewCommitRequest(), code: CommitRequestCode}, + {req: NewRollbackRequest(), code: RollbackRequestCode}, } for _, test := range tests { @@ -585,3 +594,59 @@ func TestExecutePreparedRequestDefaultValues(t *testing.T) { assert.Equal(t, req.Conn(), validStmt.Conn) assertBodyEqual(t, refBuf.Bytes(), req) } + +func TestBeginRequestDefaultValues(t *testing.T) { + var refBuf bytes.Buffer + + refEnc := msgpack.NewEncoder(&refBuf) + err := RefImplBeginBody(refEnc, defaultIsolationLevel, defaultTimeout) + if err != nil { + t.Errorf("An unexpected RefImplBeginBody() error: %q", err.Error()) + return + } + + req := NewBeginRequest() + assertBodyEqual(t, refBuf.Bytes(), req) +} + +func TestBeginRequestSetters(t *testing.T) { + var refBuf bytes.Buffer + + refEnc := msgpack.NewEncoder(&refBuf) + err := RefImplBeginBody(refEnc, ReadConfirmedLevel, validTimeout) + if err != nil { + t.Errorf("An unexpected RefImplBeginBody() error: %q", err.Error()) + return + } + + req := NewBeginRequest().TxnIsolation(ReadConfirmedLevel).Timeout(validTimeout) + assertBodyEqual(t, refBuf.Bytes(), req) +} + +func TestCommitRequestDefaultValues(t *testing.T) { + var refBuf bytes.Buffer + + refEnc := msgpack.NewEncoder(&refBuf) + err := RefImplCommitBody(refEnc) + if err != nil { + t.Errorf("An unexpected RefImplCommitBody() error: %q", err.Error()) + return + } + + req := NewCommitRequest() + assertBodyEqual(t, refBuf.Bytes(), req) +} + +func TestRollbackRequestDefaultValues(t *testing.T) { + var refBuf bytes.Buffer + + refEnc := msgpack.NewEncoder(&refBuf) + err := RefImplRollbackBody(refEnc) + if err != nil { + t.Errorf("An unexpected RefImplRollbackBody() error: %q", err.Error()) + return + } + + req := NewRollbackRequest() + assertBodyEqual(t, refBuf.Bytes(), req) +} diff --git a/stream.go b/stream.go new file mode 100644 index 000000000..fdfe8408c --- /dev/null +++ b/stream.go @@ -0,0 +1,202 @@ +package tarantool + +import ( + "context" + "fmt" + "time" + + "gopkg.in/vmihailenco/msgpack.v2" +) + +type TxnIsolationLevel uint + +const ( + // By default, the isolation level of Tarantool is serializable. + DefaultIsolationLevel TxnIsolationLevel = 0 + // The ReadCommittedLevel isolation level makes visible all transactions + // that started commit (stream.Do(NewCommitRequest()) was called). + ReadCommittedLevel TxnIsolationLevel = 1 + // The ReadConfirmedLevel isolation level makes visible all transactions + // that finished the commit (stream.Do(NewCommitRequest()) was returned). + ReadConfirmedLevel TxnIsolationLevel = 2 + // If the BestEffortLevel (serializable) isolation level becomes unreachable, + // the transaction is marked as «conflicted» and can no longer be committed. + BestEffortLevel TxnIsolationLevel = 3 +) + +type Stream struct { + Id uint64 + Conn *Connection +} + +func fillBegin(enc *msgpack.Encoder, txnIsolation TxnIsolationLevel, timeout time.Duration) error { + hasTimeout := timeout > 0 + hasIsolationLevel := txnIsolation != DefaultIsolationLevel + mapLen := 0 + if hasTimeout { + mapLen += 1 + } + if hasIsolationLevel { + mapLen += 1 + } + + err := enc.EncodeMapLen(mapLen) + if err != nil { + return err + } + + if hasTimeout { + err = enc.EncodeUint64(KeyTimeout) + if err != nil { + return err + } + + err = enc.Encode(timeout.Seconds()) + if err != nil { + return err + } + } + + if hasIsolationLevel { + err = enc.EncodeUint(KeyTxnIsolation) + if err != nil { + return err + } + + err = enc.Encode(txnIsolation) + if err != nil { + return err + } + } + + return err +} + +func fillCommit(enc *msgpack.Encoder) error { + return enc.EncodeMapLen(0) +} + +func fillRollback(enc *msgpack.Encoder) error { + return enc.EncodeMapLen(0) +} + +// BeginRequest helps you to create a begin request object for execution +// by a Stream. +// Begin request can not be processed out of stream. +type BeginRequest struct { + baseRequest + txnIsolation TxnIsolationLevel + timeout time.Duration +} + +// NewBeginRequest returns a new BeginRequest. +func NewBeginRequest() *BeginRequest { + req := new(BeginRequest) + req.requestCode = BeginRequestCode + req.txnIsolation = DefaultIsolationLevel + return req +} + +// TxnIsolation sets the the transaction isolation level for transaction manager. +// By default, the isolation level of Tarantool is serializable. +func (req *BeginRequest) TxnIsolation(txnIsolation TxnIsolationLevel) *BeginRequest { + req.txnIsolation = txnIsolation + return req +} + +// WithTimeout allows to set up a timeout for call BeginRequest. +func (req *BeginRequest) Timeout(timeout time.Duration) *BeginRequest { + req.timeout = timeout + return req +} + +// Body fills an encoder with the begin request body. +func (req *BeginRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error { + return fillBegin(enc, req.txnIsolation, req.timeout) +} + +// Context sets a passed context to the request. +// +// Pay attention that when using context with request objects, +// the timeout option for Connection does not affect the lifetime +// of the request. For those purposes use context.WithTimeout() as +// the root context. +func (req *BeginRequest) Context(ctx context.Context) *BeginRequest { + req.ctx = ctx + return req +} + +// CommitRequest helps you to create a commit request object for execution +// by a Stream. +// Commit request can not be processed out of stream. +type CommitRequest struct { + baseRequest +} + +// NewCommitRequest returns a new CommitRequest. +func NewCommitRequest() *CommitRequest { + req := new(CommitRequest) + req.requestCode = CommitRequestCode + return req +} + +// Body fills an encoder with the commit request body. +func (req *CommitRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error { + return fillCommit(enc) +} + +// Context sets a passed context to the request. +// +// Pay attention that when using context with request objects, +// the timeout option for Connection does not affect the lifetime +// of the request. For those purposes use context.WithTimeout() as +// the root context. +func (req *CommitRequest) Context(ctx context.Context) *CommitRequest { + req.ctx = ctx + return req +} + +// RollbackRequest helps you to create a rollback request object for execution +// by a Stream. +// Rollback request can not be processed out of stream. +type RollbackRequest struct { + baseRequest +} + +// NewRollbackRequest returns a new RollbackRequest. +func NewRollbackRequest() *RollbackRequest { + req := new(RollbackRequest) + req.requestCode = RollbackRequestCode + return req +} + +// Body fills an encoder with the rollback request body. +func (req *RollbackRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error { + return fillRollback(enc) +} + +// Context sets a passed context to the request. +// +// Pay attention that when using context with request objects, +// the timeout option for Connection does not affect the lifetime +// of the request. For those purposes use context.WithTimeout() as +// the root context. +func (req *RollbackRequest) Context(ctx context.Context) *RollbackRequest { + req.ctx = ctx + return req +} + +// Do verifies, sends the request and returns a future. +// +// An error is returned if the request was formed incorrectly, or failure to +// create the future. +func (s *Stream) Do(req Request) *Future { + if connectedReq, ok := req.(ConnectedRequest); ok { + if connectedReq.Conn() != s.Conn { + fut := NewFuture() + fut.SetError(fmt.Errorf("the passed connected request doesn't belong to the current connection or connection pool")) + return fut + } + } + return s.Conn.send(req, s.Id) +} diff --git a/tarantool_test.go b/tarantool_test.go index f5360ba6b..de4062c9e 100644 --- a/tarantool_test.go +++ b/tarantool_test.go @@ -2319,21 +2319,389 @@ func TestComplexStructs(t *testing.T) { } } +func TestStream_Commit(t *testing.T) { + var req Request + var resp *Response + var err error + var conn *Connection + + test_helpers.SkipIfStreamsUnsupported(t) + + conn = test_helpers.ConnectWithValidation(t, server, opts) + defer conn.Close() + + stream, _ := conn.NewStream() + + // Begin transaction + req = NewBeginRequest() + resp, err = stream.Do(req).Get() + if err != nil { + t.Fatalf("Failed to Begin: %s", err.Error()) + } + if resp.Code != OkCode { + t.Fatalf("Failed to Begin: wrong code returned %d", resp.Code) + } + + // Insert in stream + req = NewInsertRequest(spaceName). + Tuple([]interface{}{uint(1001), "hello2", "world2"}) + resp, err = stream.Do(req).Get() + if err != nil { + t.Fatalf("Failed to Insert: %s", err.Error()) + } + if resp.Code != OkCode { + t.Errorf("Failed to Insert: wrong code returned %d", resp.Code) + } + defer test_helpers.DeleteRecordByKey(t, conn, spaceNo, indexNo, []interface{}{uint(1001)}) + + // Select not related to the transaction + // while transaction is not committed + // result of select is empty + selectReq := NewSelectRequest(spaceNo). + Index(indexNo). + Limit(1). + Iterator(IterEq). + Key([]interface{}{uint(1001)}) + resp, err = conn.Do(selectReq).Get() + if err != nil { + t.Fatalf("Failed to Select: %s", err.Error()) + } + if resp == nil { + t.Fatalf("Response is nil after Select") + } + if len(resp.Data) != 0 { + t.Fatalf("Response Data len != 0") + } + + // Select in stream + resp, err = stream.Do(selectReq).Get() + if err != nil { + t.Fatalf("Failed to Select: %s", err.Error()) + } + if resp == nil { + t.Fatalf("Response is nil after Select") + } + if len(resp.Data) != 1 { + t.Fatalf("Response Data len != 1") + } + if tpl, ok := resp.Data[0].([]interface{}); !ok { + t.Fatalf("Unexpected body of Select") + } else { + if id, ok := tpl[0].(uint64); !ok || id != 1001 { + t.Fatalf("Unexpected body of Select (0)") + } + if h, ok := tpl[1].(string); !ok || h != "hello2" { + t.Fatalf("Unexpected body of Select (1)") + } + if h, ok := tpl[2].(string); !ok || h != "world2" { + t.Fatalf("Unexpected body of Select (2)") + } + } + + // Commit transaction + req = NewCommitRequest() + resp, err = stream.Do(req).Get() + if err != nil { + t.Fatalf("Failed to Commit: %s", err.Error()) + } + if resp.Code != OkCode { + t.Fatalf("Failed to Commit: wrong code returned %d", resp.Code) + } + + // Select outside of transaction + resp, err = conn.Do(selectReq).Get() + if err != nil { + t.Fatalf("Failed to Select: %s", err.Error()) + } + if resp == nil { + t.Fatalf("Response is nil after Select") + } + if len(resp.Data) != 1 { + t.Fatalf("Response Data len != 1") + } + if tpl, ok := resp.Data[0].([]interface{}); !ok { + t.Fatalf("Unexpected body of Select") + } else { + if id, ok := tpl[0].(uint64); !ok || id != 1001 { + t.Fatalf("Unexpected body of Select (0)") + } + if h, ok := tpl[1].(string); !ok || h != "hello2" { + t.Fatalf("Unexpected body of Select (1)") + } + if h, ok := tpl[2].(string); !ok || h != "world2" { + t.Fatalf("Unexpected body of Select (2)") + } + } +} + +func TestStream_Rollback(t *testing.T) { + var req Request + var resp *Response + var err error + var conn *Connection + + test_helpers.SkipIfStreamsUnsupported(t) + + conn = test_helpers.ConnectWithValidation(t, server, opts) + defer conn.Close() + + stream, _ := conn.NewStream() + + // Begin transaction + req = NewBeginRequest() + resp, err = stream.Do(req).Get() + if err != nil { + t.Fatalf("Failed to Begin: %s", err.Error()) + } + if resp.Code != OkCode { + t.Fatalf("Failed to Begin: wrong code returned %d", resp.Code) + } + + // Insert in stream + req = NewInsertRequest(spaceName). + Tuple([]interface{}{uint(1001), "hello2", "world2"}) + resp, err = stream.Do(req).Get() + if err != nil { + t.Fatalf("Failed to Insert: %s", err.Error()) + } + if resp.Code != OkCode { + t.Errorf("Failed to Insert: wrong code returned %d", resp.Code) + } + defer test_helpers.DeleteRecordByKey(t, conn, spaceNo, indexNo, []interface{}{uint(1001)}) + + // Select not related to the transaction + // while transaction is not committed + // result of select is empty + selectReq := NewSelectRequest(spaceNo). + Index(indexNo). + Limit(1). + Iterator(IterEq). + Key([]interface{}{uint(1001)}) + resp, err = conn.Do(selectReq).Get() + if err != nil { + t.Fatalf("Failed to Select: %s", err.Error()) + } + if resp == nil { + t.Fatalf("Response is nil after Select") + } + if len(resp.Data) != 0 { + t.Fatalf("Response Data len != 0") + } + + // Select in stream + resp, err = stream.Do(selectReq).Get() + if err != nil { + t.Fatalf("Failed to Select: %s", err.Error()) + } + if resp == nil { + t.Fatalf("Response is nil after Select") + } + if len(resp.Data) != 1 { + t.Fatalf("Response Data len != 1") + } + if tpl, ok := resp.Data[0].([]interface{}); !ok { + t.Fatalf("Unexpected body of Select") + } else { + if id, ok := tpl[0].(uint64); !ok || id != 1001 { + t.Fatalf("Unexpected body of Select (0)") + } + if h, ok := tpl[1].(string); !ok || h != "hello2" { + t.Fatalf("Unexpected body of Select (1)") + } + if h, ok := tpl[2].(string); !ok || h != "world2" { + t.Fatalf("Unexpected body of Select (2)") + } + } + + // Rollback transaction + req = NewRollbackRequest() + resp, err = stream.Do(req).Get() + if err != nil { + t.Fatalf("Failed to Rollback: %s", err.Error()) + } + if resp.Code != OkCode { + t.Fatalf("Failed to Rollback: wrong code returned %d", resp.Code) + } + + // Select outside of transaction + resp, err = conn.Do(selectReq).Get() + if err != nil { + t.Fatalf("Failed to Select: %s", err.Error()) + } + if resp == nil { + t.Fatalf("Response is nil after Select") + } + if len(resp.Data) != 0 { + t.Fatalf("Response Data len != 0") + } + + // Select inside of stream after rollback + resp, err = stream.Do(selectReq).Get() + if err != nil { + t.Fatalf("Failed to Select: %s", err.Error()) + } + if resp == nil { + t.Fatalf("Response is nil after Select") + } + if len(resp.Data) != 0 { + t.Fatalf("Response Data len != 0") + } +} + +func TestStream_TxnIsolationLevel(t *testing.T) { + var req Request + var resp *Response + var err error + var conn *Connection + + txnIsolationLevels := []TxnIsolationLevel{ + DefaultIsolationLevel, + ReadCommittedLevel, + ReadConfirmedLevel, + BestEffortLevel, + } + + test_helpers.SkipIfStreamsUnsupported(t) + + conn = test_helpers.ConnectWithValidation(t, server, opts) + defer conn.Close() + + stream, _ := conn.NewStream() + + for _, level := range txnIsolationLevels { + // Begin transaction + req = NewBeginRequest().TxnIsolation(level).Timeout(500 * time.Millisecond) + resp, err = stream.Do(req).Get() + require.Nilf(t, err, "failed to Begin") + require.NotNilf(t, resp, "response is nil after Begin") + require.Equalf(t, OkCode, resp.Code, "wrong code returned") + + // Insert in stream + req = NewInsertRequest(spaceName). + Tuple([]interface{}{uint(1001), "hello2", "world2"}) + resp, err = stream.Do(req).Get() + require.Nilf(t, err, "failed to Insert") + require.NotNilf(t, resp, "response is nil after Insert") + require.Equalf(t, OkCode, resp.Code, "wrong code returned") + + // Select not related to the transaction + // while transaction is not committed + // result of select is empty + selectReq := NewSelectRequest(spaceNo). + Index(indexNo). + Limit(1). + Iterator(IterEq). + Key([]interface{}{uint(1001)}) + resp, err = conn.Do(selectReq).Get() + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, 0, len(resp.Data), "response Data len != 0") + + // Select in stream + resp, err = stream.Do(selectReq).Get() + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, 1, len(resp.Data), "response Body len != 1 after Select") + + tpl, ok := resp.Data[0].([]interface{}) + require.Truef(t, ok, "unexpected body of Select") + require.Equalf(t, 3, len(tpl), "unexpected body of Select") + + key, ok := tpl[0].(uint64) + require.Truef(t, ok, "unexpected body of Select (0)") + require.Equalf(t, uint64(1001), key, "unexpected body of Select (0)") + + value1, ok := tpl[1].(string) + require.Truef(t, ok, "unexpected body of Select (1)") + require.Equalf(t, "hello2", value1, "unexpected body of Select (1)") + + value2, ok := tpl[2].(string) + require.Truef(t, ok, "unexpected body of Select (2)") + require.Equalf(t, "world2", value2, "unexpected body of Select (2)") + + // Rollback transaction + req = NewRollbackRequest() + resp, err = stream.Do(req).Get() + require.Nilf(t, err, "failed to Rollback") + require.NotNilf(t, resp, "response is nil after Rollback") + require.Equalf(t, OkCode, resp.Code, "wrong code returned") + + // Select outside of transaction + resp, err = conn.Do(selectReq).Get() + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, 0, len(resp.Data), "response Data len != 0") + + // Select inside of stream after rollback + resp, err = stream.Do(selectReq).Get() + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, 0, len(resp.Data), "response Data len != 0") + + test_helpers.DeleteRecordByKey(t, conn, spaceNo, indexNo, []interface{}{uint(1001)}) + } +} + +func TestStream_DoWithStrangerConn(t *testing.T) { + expectedErr := fmt.Errorf("the passed connected request " + + "doesn't belong to the current connection or connection pool") + + conn := &Connection{} + stream, _ := conn.NewStream() + req := test_helpers.NewStrangerRequest() + + _, err := stream.Do(req).Get() + if err == nil { + t.Fatalf("nil error has been caught") + } + if err.Error() != expectedErr.Error() { + t.Fatalf("Unexpected error has been caught: %s", err.Error()) + } +} + +func TestStream_DoWithClosedConn(t *testing.T) { + expectedErr := fmt.Errorf("using closed connection") + + test_helpers.SkipIfStreamsUnsupported(t) + + conn := test_helpers.ConnectWithValidation(t, server, opts) + + stream, _ := conn.NewStream() + conn.Close() + + // Begin transaction + req := NewBeginRequest() + _, err := stream.Do(req).Get() + if err == nil { + t.Fatalf("nil error has been caught") + } + if !strings.Contains(err.Error(), expectedErr.Error()) { + t.Fatalf("Unexpected error has been caught: %s", err.Error()) + } +} + // runTestMain is a body of TestMain function // (see https://pkg.go.dev/testing#hdr-Main). // Using defer + os.Exit is not works so TestMain body // is a separate function, see // https://stackoverflow.com/questions/27629380/how-to-exit-a-go-program-honoring-deferred-calls func runTestMain(m *testing.M) int { + // Tarantool supports streams and interactive transactions since version 2.10.0 + isStreamUnsupported, err := test_helpers.IsTarantoolVersionLess(2, 10, 0) + if err != nil { + log.Fatalf("Could not check the Tarantool version") + } + inst, err := test_helpers.StartTarantool(test_helpers.StartOpts{ - InitScript: "config.lua", - Listen: server, - WorkDir: "work_dir", - User: opts.User, - Pass: opts.Pass, - WaitStart: 100 * time.Millisecond, - ConnectRetry: 3, - RetryTimeout: 500 * time.Millisecond, + InitScript: "config.lua", + Listen: server, + WorkDir: "work_dir", + User: opts.User, + Pass: opts.Pass, + WaitStart: 100 * time.Millisecond, + ConnectRetry: 3, + RetryTimeout: 500 * time.Millisecond, + MemtxUseMvccEngine: !isStreamUnsupported, }) defer test_helpers.StopTarantoolWithCleanup(inst) diff --git a/test_helpers/main.go b/test_helpers/main.go index 5c9d5135e..cdc3c343d 100644 --- a/test_helpers/main.go +++ b/test_helpers/main.go @@ -72,6 +72,10 @@ type StartOpts struct { // RetryTimeout is a time between tarantool ping retries. RetryTimeout time.Duration + + // MemtxUseMvccEngine is flag to enable transactional + // manager if set to true. + MemtxUseMvccEngine bool } // TarantoolInstance is a data for instance graceful shutdown and cleanup. @@ -190,6 +194,7 @@ func StartTarantool(startOpts StartOpts) (TarantoolInstance, error) { os.Environ(), fmt.Sprintf("TEST_TNT_WORK_DIR=%s", startOpts.WorkDir), fmt.Sprintf("TEST_TNT_LISTEN=%s", startOpts.Listen), + fmt.Sprintf("TEST_TNT_MEMTX_USE_MVCC_ENGINE=%t", startOpts.MemtxUseMvccEngine), ) // Clean up existing work_dir. diff --git a/test_helpers/utils.go b/test_helpers/utils.go index e07f34bf8..c936e90b3 100644 --- a/test_helpers/utils.go +++ b/test_helpers/utils.go @@ -24,6 +24,22 @@ func ConnectWithValidation(t testing.TB, return conn } +func DeleteRecordByKey(t *testing.T, conn tarantool.Connector, + space interface{}, index interface{}, key []interface{}) { + t.Helper() + + req := tarantool.NewDeleteRequest(space). + Index(index). + Key(key) + resp, err := conn.Do(req).Get() + if err != nil { + t.Fatalf("Failed to Delete: %s", err.Error()) + } + if resp == nil { + t.Fatalf("Response is nil after Select") + } +} + func SkipIfSQLUnsupported(t testing.TB) { t.Helper() @@ -36,3 +52,17 @@ func SkipIfSQLUnsupported(t testing.TB) { t.Skip() } } + +func SkipIfStreamsUnsupported(t *testing.T) { + t.Helper() + + // Tarantool supports streams and interactive transactions since version 2.10.0 + isLess, err := IsTarantoolVersionLess(2, 10, 0) + if err != nil { + t.Fatalf("Could not check the Tarantool version") + } + + if isLess { + t.Skip("Skipping test for Tarantool without streams support") + } +}