diff --git a/CHANGELOG.md b/CHANGELOG.md index f0cc0b40a..8057a3058 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. - Support UUID type in msgpack (#90) - Go modules support (#91) - queue-utube handling (#85) +- Add public API with a request object for Select/Update/Upstream (#126) ### Fixed diff --git a/client_tools.go b/client_tools.go index de10a366b..27976a9a7 100644 --- a/client_tools.go +++ b/client_tools.go @@ -67,6 +67,79 @@ func (o Op) EncodeMsgpack(enc *msgpack.Encoder) error { return enc.Encode(o.Arg) } +const ( + appendOperator = "+" + subtractionOperator = "-" + bitwiseAndOperator = "&" + bitwiseOrOperator = "|" + bitwiseXorOperator = "^" + spliceOperator = ":" + insertOperator = "!" + deleteOperator = "#" + assignOperator = "=" +) + +// Operations is a collection of update operations. +type Operations struct { + ops []Op +} + +// NewOperations returns a new empty collection of update operations. +func NewOperations() *Operations { + ops := new(Operations) + return ops +} + +func (ops *Operations) append(op string, field int, arg interface{}) *Operations { + ops.ops = append(ops.ops, Op{op, field, arg}) + return ops +} + +// Add adds an additional operation to the collection of update operations. +func (ops *Operations) Add(field int, arg interface{}) *Operations { + return ops.append(appendOperator, field, arg) +} + +// Subtract adds a subtraction operation to the collection of update operations. +func (ops *Operations) Subtract(field int, arg interface{}) *Operations { + return ops.append(subtractionOperator, field, arg) +} + +// BitwiseAnd adds a bitwise AND operation to the collection of update operations. +func (ops *Operations) BitwiseAnd(field int, arg interface{}) *Operations { + return ops.append(bitwiseAndOperator, field, arg) +} + +// BitwiseOr adds a bitwise OR operation to the collection of update operations. +func (ops *Operations) BitwiseOr(field int, arg interface{}) *Operations { + return ops.append(bitwiseOrOperator, field, arg) +} + +// BitwiseXor adds a bitwise XOR operation to the collection of update operations. +func (ops *Operations) BitwiseXor(field int, arg interface{}) *Operations { + return ops.append(bitwiseXorOperator, field, arg) +} + +// Splice adds a splice operation to the collection of update operations. +func (ops *Operations) Splice(field int, arg interface{}) *Operations { + return ops.append(spliceOperator, field, arg) +} + +// Insert adds an insert operation to the collection of update operations. +func (ops *Operations) Insert(field int, arg interface{}) *Operations { + return ops.append(insertOperator, field, arg) +} + +// Delete adds a delete operation to the collection of update operations. +func (ops *Operations) Delete(field int, arg interface{}) *Operations { + return ops.append(deleteOperator, field, arg) +} + +// Assign adds an assign operation to the collection of update operations. +func (ops *Operations) Assign(field int, arg interface{}) *Operations { + return ops.append(assignOperator, field, arg) +} + type OpSplice struct { Op string Field int diff --git a/connection.go b/connection.go index d8e381364..3f6dd3740 100644 --- a/connection.go +++ b/connection.go @@ -431,7 +431,7 @@ func (conn *Connection) dial() (err error) { func (conn *Connection) writeAuthRequest(w *bufio.Writer, scramble []byte) (err error) { request := &Future{ requestId: 0, - requestCode: AuthRequest, + requestCode: AuthRequestCode, } var packet smallWBuf err = request.pack(&packet, msgpack.NewEncoder(&packet), func(enc *msgpack.Encoder) error { @@ -874,6 +874,43 @@ func (conn *Connection) nextRequestId() (requestId uint32) { return atomic.AddUint32(&conn.requestId, 1) } +// Do verifies, sends the request and returns a response. +// +// An error is returned if the request was formed incorrectly, or failure to +// communicate by the connection, or unable to decode the response. +func (conn *Connection) Do(req Request) (*Response, error) { + fut, err := conn.DoAsync(req) + if err != nil { + return nil, err + } + return fut.Get() +} + +// DoTyped verifies, sends the request and fills the typed result. +// +// An error is returned if the request was formed incorrectly, or failure to +// communicate by the connection, or unable to decode the response. +func (conn *Connection) DoTyped(req Request, result interface{}) error { + fut, err := conn.DoAsync(req) + if err != nil { + return err + } + return fut.GetTyped(result) +} + +// DoAsync verifies, sends the request and returns a future. +// +// An error is returned if the request was formed incorrectly, or failure to +// create the future. +func (conn *Connection) DoAsync(req Request) (*Future, error) { + bodyFunc, err := req.BodyFunc(conn.Schema) + if err != nil { + return nil, err + } + future := conn.newFuture(req.Code()) + return future.send(conn, bodyFunc), nil +} + // ConfiguredTimeout returns a timeout from connection config. func (conn *Connection) ConfiguredTimeout() time.Duration { return conn.opts.Timeout diff --git a/const.go b/const.go index 03b00c6b1..27fa0ed84 100644 --- a/const.go +++ b/const.go @@ -1,18 +1,18 @@ package tarantool const ( - SelectRequest = 1 - InsertRequest = 2 - ReplaceRequest = 3 - UpdateRequest = 4 - DeleteRequest = 5 - CallRequest = 6 /* call in 1.6 format */ - AuthRequest = 7 - EvalRequest = 8 - UpsertRequest = 9 - Call17Request = 10 - PingRequest = 64 - SubscribeRequest = 66 + SelectRequestCode = 1 + InsertRequestCode = 2 + ReplaceRequestCode = 3 + UpdateRequestCode = 4 + DeleteRequestCode = 5 + CallRequestCode = 6 /* call in 1.6 format */ + AuthRequestCode = 7 + EvalRequestCode = 8 + UpsertRequestCode = 9 + Call17RequestCode = 10 + PingRequestCode = 64 + SubscribeRequestCode = 66 KeyCode = 0x00 KeySync = 0x01 diff --git a/example_test.go b/example_test.go index 1e8c883ab..c88fe4095 100644 --- a/example_test.go +++ b/example_test.go @@ -103,6 +103,116 @@ func ExampleConnection_SelectAsync() { // Future 2 Data [[18 val 18 bla]] } +func ExampleSelectRequest() { + conn := example_connect() + defer conn.Close() + + req := tarantool.NewSelectRequest(512). + Limit(100). + Key(tarantool.IntKey{1111}) + resp, err := conn.Do(req) + if err != nil { + fmt.Printf("error in do select request is %v", err) + return + } + fmt.Printf("response is %#v\n", resp.Data) + + req = tarantool.NewSelectRequest("test"). + Index("primary"). + Limit(100). + Key(tarantool.IntKey{1111}) + fut, err := conn.DoAsync(req) + if err != nil { + fmt.Printf("error in do async select request is %v", err) + } + resp, err = fut.Get() + if err != nil { + fmt.Printf("error in do async select request is %v", err) + return + } + fmt.Printf("response is %#v\n", resp.Data) + // Output: + // response is []interface {}{[]interface {}{0x457, "hello", "world"}} + // response is []interface {}{[]interface {}{0x457, "hello", "world"}} +} + +func ExampleUpdateRequest() { + conn := example_connect() + defer conn.Close() + + req := tarantool.NewUpdateRequest(512). + Key(tarantool.IntKey{1111}). + Operations(tarantool.NewOperations().Assign(1, "bye")) + resp, err := conn.Do(req) + if err != nil { + fmt.Printf("error in do update request is %v", err) + return + } + fmt.Printf("response is %#v\n", resp.Data) + + req = tarantool.NewUpdateRequest("test"). + Index("primary"). + Key(tarantool.IntKey{1111}). + Operations(tarantool.NewOperations().Assign(1, "hello")) + fut, err := conn.DoAsync(req) + if err != nil { + fmt.Printf("error in do async update request is %v", err) + } + resp, err = fut.Get() + if err != nil { + fmt.Printf("error in do async update request is %v", err) + return + } + fmt.Printf("response is %#v\n", resp.Data) + // Output: + // response is []interface {}{[]interface {}{0x457, "bye", "world"}} + // response is []interface {}{[]interface {}{0x457, "hello", "world"}} +} + +func ExampleUpsertRequest() { + conn := example_connect() + defer conn.Close() + + var req tarantool.Request + req = tarantool.NewUpsertRequest(512). + Tuple([]interface{}{uint(1113), "first", "first"}). + Operations(tarantool.NewOperations().Assign(1, "updated")) + resp, err := conn.Do(req) + if err != nil { + fmt.Printf("error in do select upsert is %v", err) + return + } + fmt.Printf("response is %#v\n", resp.Data) + + req = tarantool.NewUpsertRequest("test"). + Tuple([]interface{}{uint(1113), "second", "second"}). + Operations(tarantool.NewOperations().Assign(2, "updated")) + fut, err := conn.DoAsync(req) + if err != nil { + fmt.Printf("error in do async upsert request is %v", err) + } + resp, err = fut.Get() + if err != nil { + fmt.Printf("error in do async upsert request is %v", err) + return + } + fmt.Printf("response is %#v\n", resp.Data) + + req = tarantool.NewSelectRequest(512). + Limit(100). + Key(tarantool.IntKey{1113}) + resp, err = conn.Do(req) + if err != nil { + fmt.Printf("error in do select request is %v", err) + return + } + fmt.Printf("response is %#v\n", resp.Data) + // Output: + // response is []interface {}{} + // response is []interface {}{} + // response is []interface {}{[]interface {}{0x459, "first", "updated"}} +} + func ExampleConnection_Ping() { conn := example_connect() defer conn.Close() diff --git a/export_test.go b/export_test.go index 931e78c9b..b536bfa19 100644 --- a/export_test.go +++ b/export_test.go @@ -1,5 +1,23 @@ package tarantool -func (schema *Schema) ResolveSpaceIndex(s interface{}, i interface{}) (spaceNo, indexNo uint32, err error) { - return schema.resolveSpaceIndex(s, i) +import ( + "gopkg.in/vmihailenco/msgpack.v2" +) + +// RefImplSelectBody is reference implementation for filling of a select +// request's body. +func RefImplSelectBody(enc *msgpack.Encoder, space, index, offset, limit, iterator uint32, key interface{}) error { + return fillSelect(enc, space, index, offset, limit, iterator, key) +} + +// RefImplUpdateBody is reference implementation for filling of an update +// request's body. +func RefImplUpdateBody(enc *msgpack.Encoder, space, index uint32, key, ops interface{}) error { + return fillUpdate(enc, space, index, key, ops) +} + +// RefImplUpsertBody is reference implementation for filling of an upsert +// request's body. +func RefImplUpsertBody(enc *msgpack.Encoder, space uint32, tuple, ops interface{}) error { + return fillUpsert(enc, space, tuple, ops) } diff --git a/future.go b/future.go new file mode 100644 index 000000000..da28288a0 --- /dev/null +++ b/future.go @@ -0,0 +1,126 @@ +package tarantool + +import ( + "time" + + "gopkg.in/vmihailenco/msgpack.v2" +) + +// Future is a handle for asynchronous request. +type Future struct { + requestId uint32 + requestCode int32 + timeout time.Duration + resp *Response + err error + ready chan struct{} + next *Future +} + +// Get waits for Future to be filled and returns Response and error. +// +// Response will contain deserialized result in Data field. +// It will be []interface{}, so if you want more performace, use GetTyped method. +// +// Note: Response could be equal to nil if ClientError is returned in error. +// +// "error" could be Error, if it is error returned by Tarantool, +// or ClientError, if something bad happens in a client process. +func (fut *Future) Get() (*Response, error) { + fut.wait() + if fut.err != nil { + return fut.resp, fut.err + } + fut.err = fut.resp.decodeBody() + return fut.resp, fut.err +} + +// GetTyped waits for Future and calls msgpack.Decoder.Decode(result) if no error happens. +// It is could be much faster than Get() function. +// +// Note: Tarantool usually returns array of tuples (except for Eval and Call17 actions). +func (fut *Future) GetTyped(result interface{}) error { + fut.wait() + if fut.err != nil { + return fut.err + } + fut.err = fut.resp.decodeBodyTyped(result) + return fut.err +} + +var closedChan = make(chan struct{}) + +func init() { + close(closedChan) +} + +// WaitChan returns channel which becomes closed when response arrived or error occured. +func (fut *Future) WaitChan() <-chan struct{} { + if fut.ready == nil { + return closedChan + } + return fut.ready +} + +// Err returns error set on Future. +// It waits for future to be set. +// Note: it doesn't decode body, therefore decoding error are not set here. +func (fut *Future) Err() error { + fut.wait() + return fut.err +} + +func (fut *Future) pack(h *smallWBuf, enc *msgpack.Encoder, body func(*msgpack.Encoder) error) (err error) { + rid := fut.requestId + hl := h.Len() + h.Write([]byte{ + 0xce, 0, 0, 0, 0, // Length. + 0x82, // 2 element map. + KeyCode, byte(fut.requestCode), // Request code. + KeySync, 0xce, + byte(rid >> 24), byte(rid >> 16), + byte(rid >> 8), byte(rid), + }) + + if err = body(enc); err != nil { + return + } + + l := uint32(h.Len() - 5 - hl) + h.b[hl+1] = byte(l >> 24) + h.b[hl+2] = byte(l >> 16) + h.b[hl+3] = byte(l >> 8) + h.b[hl+4] = byte(l) + + return +} + +func (fut *Future) send(conn *Connection, body func(*msgpack.Encoder) error) *Future { + if fut.ready == nil { + return fut + } + conn.putFuture(fut, body) + return fut +} + +func (fut *Future) markReady(conn *Connection) { + close(fut.ready) + if conn.rlimit != nil { + <-conn.rlimit + } +} + +func (fut *Future) fail(conn *Connection, err error) *Future { + if f := conn.fetchFuture(fut.requestId); f == fut { + f.err = err + fut.markReady(conn) + } + return fut +} + +func (fut *Future) wait() { + if fut.ready == nil { + return + } + <-fut.ready +} diff --git a/request.go b/request.go index ff8a7e8a5..f4866d0e4 100644 --- a/request.go +++ b/request.go @@ -2,29 +2,11 @@ package tarantool import ( "errors" - "time" "gopkg.in/vmihailenco/msgpack.v2" ) -// Future is a handle for asynchronous request. -type Future struct { - requestId uint32 - requestCode int32 - timeout time.Duration - resp *Response - err error - ready chan struct{} - next *Future -} - -// Ping sends empty request to Tarantool to check connection. -func (conn *Connection) Ping() (resp *Response, err error) { - future := conn.newFuture(PingRequest) - return future.send(conn, func(enc *msgpack.Encoder) error { enc.EncodeMapLen(0); return nil }).Get() -} - -func (req *Future) fillSearch(enc *msgpack.Encoder, spaceNo, indexNo uint32, key interface{}) error { +func fillSearch(enc *msgpack.Encoder, spaceNo, indexNo uint32, key interface{}) error { enc.EncodeUint64(KeySpaceNo) enc.EncodeUint64(uint64(spaceNo)) enc.EncodeUint64(KeyIndexNo) @@ -33,7 +15,7 @@ func (req *Future) fillSearch(enc *msgpack.Encoder, spaceNo, indexNo uint32, key return enc.Encode(key) } -func (req *Future) fillIterator(enc *msgpack.Encoder, offset, limit, iterator uint32) { +func fillIterator(enc *msgpack.Encoder, offset, limit, iterator uint32) { enc.EncodeUint64(KeyIterator) enc.EncodeUint64(uint64(iterator)) enc.EncodeUint64(KeyOffset) @@ -42,13 +24,46 @@ func (req *Future) fillIterator(enc *msgpack.Encoder, offset, limit, iterator ui enc.EncodeUint64(uint64(limit)) } -func (req *Future) fillInsert(enc *msgpack.Encoder, spaceNo uint32, tuple interface{}) error { +func fillInsert(enc *msgpack.Encoder, spaceNo uint32, tuple interface{}) error { enc.EncodeUint64(KeySpaceNo) enc.EncodeUint64(uint64(spaceNo)) enc.EncodeUint64(KeyTuple) return enc.Encode(tuple) } +func fillSelect(enc *msgpack.Encoder, spaceNo, indexNo, offset, limit, iterator uint32, key interface{}) error { + enc.EncodeMapLen(6) + fillIterator(enc, offset, limit, iterator) + return fillSearch(enc, spaceNo, indexNo, key) +} + +func fillUpdate(enc *msgpack.Encoder, spaceNo, indexNo uint32, key, ops interface{}) error { + enc.EncodeMapLen(4) + if err := fillSearch(enc, spaceNo, indexNo, key); err != nil { + return err + } + enc.EncodeUint64(KeyTuple) + return enc.Encode(ops) +} + +func fillUpsert(enc *msgpack.Encoder, spaceNo uint32, tuple, ops interface{}) error { + enc.EncodeMapLen(3) + enc.EncodeUint64(KeySpaceNo) + enc.EncodeUint64(uint64(spaceNo)) + enc.EncodeUint64(KeyTuple) + if err := enc.Encode(tuple); err != nil { + return err + } + enc.EncodeUint64(KeyDefTuple) + return enc.Encode(ops) +} + +// Ping sends empty request to Tarantool to check connection. +func (conn *Connection) Ping() (resp *Response, err error) { + future := conn.newFuture(PingRequestCode) + return future.send(conn, func(enc *msgpack.Encoder) error { enc.EncodeMapLen(0); return nil }).Get() +} + // Select performs select to box space. // // It is equal to conn.SelectAsync(...).Get(). @@ -214,103 +229,88 @@ func (conn *Connection) EvalTyped(expr string, args interface{}, result interfac // SelectAsync sends select request to Tarantool and returns Future. func (conn *Connection) SelectAsync(space, index interface{}, offset, limit, iterator uint32, key interface{}) *Future { - future := conn.newFuture(SelectRequest) - spaceNo, indexNo, err := conn.Schema.resolveSpaceIndex(space, index) + future := conn.newFuture(SelectRequestCode) + spaceNo, indexNo, err := conn.Schema.ResolveSpaceIndex(space, index) if err != nil { return future.fail(conn, err) } return future.send(conn, func(enc *msgpack.Encoder) error { - enc.EncodeMapLen(6) - future.fillIterator(enc, offset, limit, iterator) - return future.fillSearch(enc, spaceNo, indexNo, key) + return fillSelect(enc, spaceNo, indexNo, offset, limit, iterator, key) }) } // InsertAsync sends insert action to Tarantool and returns Future. // Tarantool will reject Insert when tuple with same primary key exists. func (conn *Connection) InsertAsync(space interface{}, tuple interface{}) *Future { - future := conn.newFuture(InsertRequest) - spaceNo, _, err := conn.Schema.resolveSpaceIndex(space, nil) + future := conn.newFuture(InsertRequestCode) + spaceNo, _, err := conn.Schema.ResolveSpaceIndex(space, nil) if err != nil { return future.fail(conn, err) } return future.send(conn, func(enc *msgpack.Encoder) error { enc.EncodeMapLen(2) - return future.fillInsert(enc, spaceNo, tuple) + return fillInsert(enc, spaceNo, tuple) }) } // ReplaceAsync sends "insert or replace" action to Tarantool and returns Future. // If tuple with same primary key exists, it will be replaced. func (conn *Connection) ReplaceAsync(space interface{}, tuple interface{}) *Future { - future := conn.newFuture(ReplaceRequest) - spaceNo, _, err := conn.Schema.resolveSpaceIndex(space, nil) + future := conn.newFuture(ReplaceRequestCode) + spaceNo, _, err := conn.Schema.ResolveSpaceIndex(space, nil) if err != nil { return future.fail(conn, err) } return future.send(conn, func(enc *msgpack.Encoder) error { enc.EncodeMapLen(2) - return future.fillInsert(enc, spaceNo, tuple) + return fillInsert(enc, spaceNo, tuple) }) } // DeleteAsync sends deletion action to Tarantool and returns Future. // Future's result will contain array with deleted tuple. func (conn *Connection) DeleteAsync(space, index interface{}, key interface{}) *Future { - future := conn.newFuture(DeleteRequest) - spaceNo, indexNo, err := conn.Schema.resolveSpaceIndex(space, index) + future := conn.newFuture(DeleteRequestCode) + spaceNo, indexNo, err := conn.Schema.ResolveSpaceIndex(space, index) if err != nil { return future.fail(conn, err) } return future.send(conn, func(enc *msgpack.Encoder) error { enc.EncodeMapLen(3) - return future.fillSearch(enc, spaceNo, indexNo, key) + return fillSearch(enc, spaceNo, indexNo, key) }) } // Update sends deletion of a tuple by key and returns Future. // Future's result will contain array with updated tuple. func (conn *Connection) UpdateAsync(space, index interface{}, key, ops interface{}) *Future { - future := conn.newFuture(UpdateRequest) - spaceNo, indexNo, err := conn.Schema.resolveSpaceIndex(space, index) + future := conn.newFuture(UpdateRequestCode) + spaceNo, indexNo, err := conn.Schema.ResolveSpaceIndex(space, index) if err != nil { return future.fail(conn, err) } return future.send(conn, func(enc *msgpack.Encoder) error { - enc.EncodeMapLen(4) - if err := future.fillSearch(enc, spaceNo, indexNo, key); err != nil { - return err - } - enc.EncodeUint64(KeyTuple) - return enc.Encode(ops) + return fillUpdate(enc, spaceNo, indexNo, key, ops) }) } // UpsertAsync sends "update or insert" action to Tarantool and returns Future. // Future's sesult will not contain any tuple. func (conn *Connection) UpsertAsync(space interface{}, tuple interface{}, ops interface{}) *Future { - future := conn.newFuture(UpsertRequest) - spaceNo, _, err := conn.Schema.resolveSpaceIndex(space, nil) + future := conn.newFuture(UpsertRequestCode) + spaceNo, _, err := conn.Schema.ResolveSpaceIndex(space, nil) if err != nil { return future.fail(conn, err) } return future.send(conn, func(enc *msgpack.Encoder) error { - enc.EncodeMapLen(3) - enc.EncodeUint64(KeySpaceNo) - enc.EncodeUint64(uint64(spaceNo)) - enc.EncodeUint64(KeyTuple) - if err := enc.Encode(tuple); err != nil { - return err - } - enc.EncodeUint64(KeyDefTuple) - return enc.Encode(ops) + return fillUpsert(enc, spaceNo, tuple, ops) }) } // CallAsync sends a call to registered Tarantool function and returns Future. // It uses request code for Tarantool 1.6, so future's result is always array of arrays func (conn *Connection) CallAsync(functionName string, args interface{}) *Future { - future := conn.newFuture(CallRequest) + future := conn.newFuture(CallRequestCode) return future.send(conn, func(enc *msgpack.Encoder) error { enc.EncodeMapLen(2) enc.EncodeUint64(KeyFunctionName) @@ -324,7 +324,7 @@ func (conn *Connection) CallAsync(functionName string, args interface{}) *Future // It uses request code for Tarantool 1.7, so future's result will not be converted // (though, keep in mind, result is always array) func (conn *Connection) Call17Async(functionName string, args interface{}) *Future { - future := conn.newFuture(Call17Request) + future := conn.newFuture(Call17RequestCode) return future.send(conn, func(enc *msgpack.Encoder) error { enc.EncodeMapLen(2) enc.EncodeUint64(KeyFunctionName) @@ -336,7 +336,7 @@ func (conn *Connection) Call17Async(functionName string, args interface{}) *Futu // EvalAsync sends a Lua expression for evaluation and returns Future. func (conn *Connection) EvalAsync(expr string, args interface{}) *Future { - future := conn.newFuture(EvalRequest) + future := conn.newFuture(EvalRequestCode) return future.send(conn, func(enc *msgpack.Encoder) error { enc.EncodeMapLen(2) enc.EncodeUint64(KeyExpression) @@ -346,114 +346,198 @@ func (conn *Connection) EvalAsync(expr string, args interface{}) *Future { }) } -// -// private -// +// Request is an interface that provides the necessary data to create a request +// that will be sent to a tarantool instance. +type Request interface { + // Code returns a IPROTO code for the request. + Code() int32 + // BodyFunc returns a functions that can fill an encoder with + // the request body or it returns an error if unable to create the function. + BodyFunc(resolver SchemaResolver) (func(enc *msgpack.Encoder) error, error) +} -func (fut *Future) pack(h *smallWBuf, enc *msgpack.Encoder, body func(*msgpack.Encoder) error) (err error) { - rid := fut.requestId - hl := h.Len() - h.Write([]byte{ - 0xce, 0, 0, 0, 0, // length - 0x82, // 2 element map - KeyCode, byte(fut.requestCode), // request code - KeySync, 0xce, - byte(rid >> 24), byte(rid >> 16), - byte(rid >> 8), byte(rid), - }) +type baseRequest struct { + requestCode int32 +} - if err = body(enc); err != nil { - return - } +// Code returns a IPROTO code for the request. +func (req *baseRequest) Code() int32 { + return req.requestCode +} - l := uint32(h.Len() - 5 - hl) - h.b[hl+1] = byte(l >> 24) - h.b[hl+2] = byte(l >> 16) - h.b[hl+3] = byte(l >> 8) - h.b[hl+4] = byte(l) +type spaceRequest struct { + baseRequest + space interface{} +} - return +type spaceIndexRequest struct { + spaceRequest + index interface{} } -func (fut *Future) send(conn *Connection, body func(*msgpack.Encoder) error) *Future { - if fut.ready == nil { - return fut - } - conn.putFuture(fut, body) - return fut +// SelectRequest allows you to create a select request object for execution +// by a Connection. +type SelectRequest struct { + spaceIndexRequest + offset, limit, iterator uint32 + key interface{} } -func (fut *Future) markReady(conn *Connection) { - close(fut.ready) - if conn.rlimit != nil { - <-conn.rlimit - } +// NewSelectRequest returns a new empty SelectRequest. +func NewSelectRequest(space interface{}) *SelectRequest { + req := new(SelectRequest) + req.requestCode = SelectRequestCode + req.space = space + return req } -func (fut *Future) fail(conn *Connection, err error) *Future { - if f := conn.fetchFuture(fut.requestId); f == fut { - f.err = err - fut.markReady(conn) - } - return fut +// Index sets the index for the select request. +// Note: default value is 0. +func (req *SelectRequest) Index(index interface{}) *SelectRequest { + req.index = index + return req +} + +// Offset sets the offset for the select request. +// Note: default value is 0. +func (req *SelectRequest) Offset(offset uint32) *SelectRequest { + req.offset = offset + return req +} + +// Limit sets the limit for the select request. +// Note: default value is 0. +func (req *SelectRequest) Limit(limit uint32) *SelectRequest { + req.limit = limit + return req +} + +// Iterator set the iterator for the select request. +// Note: default value is IterEq. +func (req *SelectRequest) Iterator(iterator uint32) *SelectRequest { + req.iterator = iterator + return req +} + +// Key set the key for the select request. +// Note: default value is nil. +func (req *SelectRequest) Key(key interface{}) *SelectRequest { + req.key = key + return req } -func (fut *Future) wait() { - if fut.ready == nil { - return +// BodyFunc returns a function that can create an encoded body of +// the select request. +// It returns an error if the request space or the request index cannot +// be resolved. +func (req *SelectRequest) BodyFunc(res SchemaResolver) (func(enc *msgpack.Encoder) error, error) { + spaceNo, indexNo, err := res.ResolveSpaceIndex(req.space, req.index) + if err != nil { + return nil, err } - <-fut.ready + + return func(enc *msgpack.Encoder) error { + return fillSelect(enc, spaceNo, indexNo, req.offset, req.limit, req.iterator, req.key) + }, nil } -// Get waits for Future to be filled and returns Response and error. -// -// Response will contain deserialized result in Data field. -// It will be []interface{}, so if you want more performace, use GetTyped method. -// -// Note: Response could be equal to nil if ClientError is returned in error. -// -// "error" could be Error, if it is error returned by Tarantool, -// or ClientError, if something bad happens in a client process. -func (fut *Future) Get() (*Response, error) { - fut.wait() - if fut.err != nil { - return fut.resp, fut.err +// UpdateRequest helps you to create an update request object for execution +// by a Connection. +type UpdateRequest struct { + spaceIndexRequest + key interface{} + ops []Op +} + +// NewUpdateRequest returns a new empty UpdateRequest. +func NewUpdateRequest(space interface{}) *UpdateRequest { + req := new(UpdateRequest) + req.requestCode = UpdateRequestCode + req.space = space + return req +} + +// Index sets the index for the update request. +// Note: default value is 0. +func (req *UpdateRequest) Index(index interface{}) *UpdateRequest { + req.index = index + return req +} + +// Key sets the key of tuple for the update request. +// Note: default value is nil. +func (req *UpdateRequest) Key(key interface{}) *UpdateRequest { + req.key = key + return req +} + +// Operations sets operations to be performed on update. +// Note: default value is nil. +func (req *UpdateRequest) Operations(ops *Operations) *UpdateRequest { + if ops != nil { + req.ops = ops.ops } - fut.err = fut.resp.decodeBody() - return fut.resp, fut.err + return req } -// GetTyped waits for Future and calls msgpack.Decoder.Decode(result) if no error happens. -// It is could be much faster than Get() function. -// -// Note: Tarantool usually returns array of tuples (except for Eval and Call17 actions) -func (fut *Future) GetTyped(result interface{}) error { - fut.wait() - if fut.err != nil { - return fut.err +// BodyFunc returns a function that can create an encoded body of +// the update request. +// It returns an error if the request space or the request index cannot +// be resolved. +func (req *UpdateRequest) BodyFunc(res SchemaResolver) (func(enc *msgpack.Encoder) error, error) { + spaceNo, indexNo, err := res.ResolveSpaceIndex(req.space, req.index) + if err != nil { + return nil, err } - fut.err = fut.resp.decodeBodyTyped(result) - return fut.err + + return func(enc *msgpack.Encoder) error { + return fillUpdate(enc, spaceNo, indexNo, req.key, req.ops) + }, nil } -var closedChan = make(chan struct{}) +// UpsertRequest helps you to create an upsert request object for execution +// by a Connection. +type UpsertRequest struct { + spaceRequest + tuple interface{} + ops []Op +} -func init() { - close(closedChan) +// NewUpsertRequest returns a new empty UpsertRequest. +func NewUpsertRequest(space interface{}) *UpsertRequest { + req := new(UpsertRequest) + req.requestCode = UpsertRequestCode + req.space = space + return req } -// WaitChan returns channel which becomes closed when response arrived or error occured. -func (fut *Future) WaitChan() <-chan struct{} { - if fut.ready == nil { - return closedChan +// Tuples sets the tuple for insertion or update by the upsert request. +// Note: default value is nil. +func (req *UpsertRequest) Tuple(tuple interface{}) *UpsertRequest { + req.tuple = tuple + return req +} + +// Operations sets operations to be performed on update case by the upsert request. +// Note: default value is nil. +func (req *UpsertRequest) Operations(ops *Operations) *UpsertRequest { + if ops != nil { + req.ops = ops.ops } - return fut.ready + return req } -// Err returns error set on Future. -// It waits for future to be set. -// Note: it doesn't decode body, therefore decoding error are not set here. -func (fut *Future) Err() error { - fut.wait() - return fut.err +// BodyFunc returns a function that can create an encoded body of +// the upsert request. +// It returns an error if the request space or the request index cannot +// be resolved. +func (req *UpsertRequest) BodyFunc(res SchemaResolver) (func(enc *msgpack.Encoder) error, error) { + spaceNo, _, err := res.ResolveSpaceIndex(req.space, nil) + if err != nil { + return nil, err + } + + return func(enc *msgpack.Encoder) error { + return fillUpsert(enc, spaceNo, req.tuple, req.ops) + }, nil } diff --git a/request_test.go b/request_test.go new file mode 100644 index 000000000..9d02f2431 --- /dev/null +++ b/request_test.go @@ -0,0 +1,263 @@ +package tarantool_test + +import ( + "bytes" + "errors" + "testing" + + . "github.com/tarantool/go-tarantool" + "gopkg.in/vmihailenco/msgpack.v2" +) + +const invalidSpaceMsg = "invalid space" +const invalidIndexMsg = "invalid index" + +const invalidSpace = 2 +const invalidIndex = 2 +const validSpace = 1 // Any valid value != default. +const validIndex = 3 // Any valid value != default. +const defaultSpace = 0 // And valid too. +const defaultIndex = 0 // And valid too. + +type ValidSchemeResolver struct { +} + +func (*ValidSchemeResolver) ResolveSpaceIndex(s, i interface{}) (spaceNo, indexNo uint32, err error) { + if s != nil { + spaceNo = uint32(s.(int)) + } else { + spaceNo = defaultSpace + } + if i != nil { + indexNo = uint32(i.(int)) + } else { + indexNo = defaultIndex + } + if spaceNo == invalidSpace { + return 0, 0, errors.New(invalidSpaceMsg) + } + if indexNo == invalidIndex { + return 0, 0, errors.New(invalidIndexMsg) + } + return spaceNo, indexNo, nil +} + +var resolver ValidSchemeResolver + +func assertBodyFuncCall(t testing.TB, requests []Request, errorMsg string) { + t.Helper() + + const errBegin = "An unexpected Request.BodyFunc() " + for _, req := range requests { + _, err := req.BodyFunc(&resolver) + if err != nil && errorMsg != "" && err.Error() != errorMsg { + t.Errorf(errBegin+"error %q expected %q", err.Error(), errorMsg) + } + if err != nil && errorMsg == "" { + t.Errorf(errBegin+"error %q", err.Error()) + } + if err == nil && errorMsg != "" { + t.Errorf(errBegin+"result, expexted error %q", errorMsg) + } + } +} + +func assertBodyEqual(t testing.TB, reference []byte, req Request) { + t.Helper() + + var reqBuf bytes.Buffer + reqEnc := msgpack.NewEncoder(&reqBuf) + + f, err := req.BodyFunc(&resolver) + if err != nil { + t.Errorf("An unexpected Response.BodyFunc() error: %q", err.Error()) + } else { + err = f(reqEnc) + if err != nil { + t.Errorf("An unexpected encode body error: %q", err.Error()) + } + reqBody := reqBuf.Bytes() + if bytes.Compare(reqBody, reference) != 0 { + t.Errorf("Encoded request %v != reference %v", reqBody, reference) + } + } +} + +func getTestOps() ([]Op, *Operations) { + ops := []Op{ + {"+", 1, 2}, + {"-", 3, 4}, + {"&", 5, 6}, + {"|", 7, 8}, + {"^", 9, 1}, + {"^", 9, 1}, // The duplication is for test purposes. + {":", 2, 3}, + {"!", 4, 5}, + {"#", 6, 7}, + {"=", 8, 9}, + } + operations := NewOperations(). + Add(1, 2). + Subtract(3, 4). + BitwiseAnd(5, 6). + BitwiseOr(7, 8). + BitwiseXor(9, 1). + BitwiseXor(9, 1). // The duplication is for test purposes. + Splice(2, 3). + Insert(4, 5). + Delete(6, 7). + Assign(8, 9) + return ops, operations +} + +func TestRequestsValidSpaceAndIndex(t *testing.T) { + requests := []Request{ + NewSelectRequest(validSpace), + NewSelectRequest(validSpace).Index(validIndex), + NewUpdateRequest(validSpace), + NewUpdateRequest(validSpace).Index(validIndex), + NewUpsertRequest(validSpace), + } + + assertBodyFuncCall(t, requests, "") +} + +func TestRequestsInvalidSpace(t *testing.T) { + requests := []Request{ + NewSelectRequest(invalidSpace).Index(validIndex), + NewSelectRequest(invalidSpace), + NewUpdateRequest(invalidSpace).Index(validIndex), + NewUpdateRequest(invalidSpace), + NewUpsertRequest(invalidSpace), + } + + assertBodyFuncCall(t, requests, invalidSpaceMsg) +} + +func TestRequestsInvalidIndex(t *testing.T) { + requests := []Request{ + NewSelectRequest(validSpace).Index(invalidIndex), + NewUpdateRequest(validSpace).Index(invalidIndex), + } + + assertBodyFuncCall(t, requests, invalidIndexMsg) +} + +func TestRequestsCodes(t *testing.T) { + tests := []struct { + req Request + code int32 + }{ + {req: NewSelectRequest(validSpace), code: SelectRequestCode}, + {req: NewUpdateRequest(validSpace), code: UpdateRequestCode}, + {req: NewUpsertRequest(validSpace), code: UpsertRequestCode}, + } + + for _, test := range tests { + if code := test.req.Code(); code != test.code { + t.Errorf("An invalid request code 0x%x, expected 0x%x", code, test.code) + } + } +} + +func TestSelectRequestDefaultValues(t *testing.T) { + var refBuf bytes.Buffer + + refEnc := msgpack.NewEncoder(&refBuf) + err := RefImplSelectBody(refEnc, validSpace, defaultIndex, 0, 0, IterEq, nil) + if err != nil { + t.Errorf("An unexpected RefImplSelectBody() error %q", err.Error()) + return + } + + req := NewSelectRequest(validSpace) + assertBodyEqual(t, refBuf.Bytes(), req) +} + +func TestSelectRequestSetters(t *testing.T) { + const offset = 4 + const limit = 5 + const iter = IterLt + key := []interface{}{uint(36)} + var refBuf bytes.Buffer + + refEnc := msgpack.NewEncoder(&refBuf) + err := RefImplSelectBody(refEnc, validSpace, validIndex, offset, limit, iter, key) + if err != nil { + t.Errorf("An unexpected RefImplSelectBody() error %q", err.Error()) + return + } + + req := NewSelectRequest(validSpace). + Index(validIndex). + Offset(offset). + Limit(limit). + Iterator(iter). + Key(key) + assertBodyEqual(t, refBuf.Bytes(), req) +} + +func TestUpdateRequestDefaultValues(t *testing.T) { + var refBuf bytes.Buffer + + refEnc := msgpack.NewEncoder(&refBuf) + err := RefImplUpdateBody(refEnc, validSpace, defaultIndex, nil, nil) + if err != nil { + t.Errorf("An unexpected RefImplUpdateBody() error: %q", err.Error()) + return + } + + req := NewUpdateRequest(validSpace) + assertBodyEqual(t, refBuf.Bytes(), req) +} + +func TestUpdateRequestSetters(t *testing.T) { + key := []interface{}{uint(44)} + refOps, reqOps := getTestOps() + var refBuf bytes.Buffer + + refEnc := msgpack.NewEncoder(&refBuf) + err := RefImplUpdateBody(refEnc, validSpace, validIndex, key, refOps) + if err != nil { + t.Errorf("An unexpected RefImplUpdateBody() error: %q", err.Error()) + return + } + + req := NewUpdateRequest(validSpace). + Index(validIndex). + Key(key). + Operations(reqOps) + assertBodyEqual(t, refBuf.Bytes(), req) +} + +func TestUpsertRequestDefaultValues(t *testing.T) { + var refBuf bytes.Buffer + + refEnc := msgpack.NewEncoder(&refBuf) + err := RefImplUpsertBody(refEnc, validSpace, nil, nil) + if err != nil { + t.Errorf("An unexpected RefImplUpsertBody() error: %q", err.Error()) + return + } + + req := NewUpsertRequest(validSpace) + assertBodyEqual(t, refBuf.Bytes(), req) +} + +func TestUpsertRequestSetters(t *testing.T) { + tuple := []interface{}{uint(64)} + refOps, reqOps := getTestOps() + var refBuf bytes.Buffer + + refEnc := msgpack.NewEncoder(&refBuf) + err := RefImplUpsertBody(refEnc, validSpace, tuple, refOps) + if err != nil { + t.Errorf("An unexpected RefImplUpsertBody() error: %q", err.Error()) + return + } + + req := NewUpsertRequest(validSpace). + Tuple(tuple). + Operations(reqOps) + assertBodyEqual(t, refBuf.Bytes(), req) +} diff --git a/schema.go b/schema.go index 91c0faedd..d373e16ec 100644 --- a/schema.go +++ b/schema.go @@ -4,6 +4,13 @@ import ( "fmt" ) +// SchemaResolver is an interface for resolving schema details. +type SchemaResolver interface { + // ResolveSpaceIndex returns resolved space and index numbers or an + // error if it cannot be resolved. + ResolveSpaceIndex(s interface{}, i interface{}) (spaceNo, indexNo uint32, err error) +} + // Schema contains information about spaces and indexes. type Schema struct { Version uint @@ -175,7 +182,10 @@ func (conn *Connection) loadSchema() (err error) { return nil } -func (schema *Schema) resolveSpaceIndex(s interface{}, i interface{}) (spaceNo, indexNo uint32, err error) { +// ResolveSpaceIndex tries to resolve space and index numbers. +// Note: s can be a number, string, or an object of Space type. +// Note: i can be a number, string, or an object of Index type. +func (schema *Schema) ResolveSpaceIndex(s interface{}, i interface{}) (spaceNo, indexNo uint32, err error) { var space *Space var index *Index var ok bool diff --git a/tarantool_test.go b/tarantool_test.go index 5f5078a8b..7810a25e8 100644 --- a/tarantool_test.go +++ b/tarantool_test.go @@ -89,6 +89,37 @@ func BenchmarkClientSerial(b *testing.B) { } } +func BenchmarkClientSerialRequestObject(b *testing.B) { + var err error + + conn, err := Connect(server, opts) + if err != nil { + b.Error(err) + return + } + defer conn.Close() + + _, err = conn.Replace(spaceNo, []interface{}{uint(1111), "hello", "world"}) + if err != nil { + b.Error(err) + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + req := NewSelectRequest(spaceNo). + Index(indexNo). + Offset(0). + Limit(1). + Iterator(IterEq). + Key([]interface{}{uint(1111)}) + _, err := conn.Do(req) + if err != nil { + b.Error(err) + } + } +} + func BenchmarkClientSerialTyped(b *testing.B) { var err error @@ -835,11 +866,11 @@ func TestSchema(t *testing.T) { } rSpaceNo, rIndexNo, err = schema.ResolveSpaceIndex("schematest22", "secondary") if err == nil { - t.Errorf("resolveSpaceIndex didn't returned error with not existing space name") + t.Errorf("ResolveSpaceIndex didn't returned error with not existing space name") } rSpaceNo, rIndexNo, err = schema.ResolveSpaceIndex("schematest", "secondary22") if err == nil { - t.Errorf("resolveSpaceIndex didn't returned error with not existing index name") + t.Errorf("ResolveSpaceIndex didn't returned error with not existing index name") } } @@ -936,6 +967,90 @@ func TestClientNamed(t *testing.T) { } } +func TestClientRequestObjects(t *testing.T) { + var ( + req Request + resp *Response + err error + conn *Connection + ) + + conn, err = Connect(server, opts) + if err != nil { + t.Errorf("Failed to connect: %s", err.Error()) + return + } + if conn == nil { + t.Errorf("conn is nil after Connect") + return + } + defer conn.Close() + + // The code prepares data. + for i := 1010; i < 1020; i++ { + resp, err = conn.Replace(spaceName, []interface{}{uint(i), fmt.Sprintf("val %d", i), "bla"}) + if err != nil { + t.Errorf("Failed to Replace: %s", err.Error()) + } + } + + // Update. + req = NewUpdateRequest(spaceName). + Index(indexName). + Key([]interface{}{uint(1010)}). + Operations(NewOperations().Assign(1, "bye").Insert(2, 1)) + resp, err = conn.Do(req) + if err != nil { + t.Errorf("Failed to Update: %s", err.Error()) + } + if resp == nil { + t.Errorf("Response is nil after Update") + } + + // Upsert. + req = NewUpsertRequest(spaceNo). + Tuple([]interface{}{uint(1010), "hi", "hi"}). + Operations(NewOperations().Assign(2, "bye")) + resp, err = conn.Do(req) + if err != nil { + t.Errorf("Failed to Upsert (update): %s", err.Error()) + } + if resp == nil { + t.Errorf("Response is nil after Upsert (update)") + } + + // Select. + req = NewSelectRequest(spaceNo). + Index(indexNo). + Limit(1). + Iterator(IterEq). + Key([]interface{}{uint(1010)}) + resp, err = conn.Do(req) + if err != nil { + t.Errorf("Failed to Select: %s", err.Error()) + } + if resp == nil { + t.Errorf("Response is nil after Select") + return + } + if len(resp.Data) != 1 { + t.Errorf("Response Data len != 1") + } + if tpl, ok := resp.Data[0].([]interface{}); !ok { + t.Errorf("Unexpected body of Select") + } else { + if id, ok := tpl[0].(uint64); !ok || id != 1010 { + t.Errorf("Unexpected body of Select (0) %d, expected %d", tpl[0].(uint64), 1010) + } + if h, ok := tpl[1].(string); !ok || h != "bye" { + t.Errorf("Unexpected body of Select (1) %q, expected %q", tpl[1].(string), "bye") + } + if h, ok := tpl[2].(string); !ok || h != "bye" { + t.Errorf("Unexpected body of Select (2) %q, expected %q", tpl[2].(string), "bye") + } + } +} + func TestComplexStructs(t *testing.T) { var err error var conn *Connection