Skip to content

Commit

Permalink
Add public API with a request object for Select/Update/Upsert
Browse files Browse the repository at this point in the history
This patch provides request types for part of space operations: Select, Update
and Upstream. It allows to create requests step by step. The main idea here
is too provide more extensible approach to create requests.

Part of #126
  • Loading branch information
oleg-jukovec committed Apr 22, 2022
1 parent cb9f156 commit 3ce324b
Show file tree
Hide file tree
Showing 11 changed files with 995 additions and 158 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
- Support UUID type in msgpack (#90)
- Go modules support (#91)
- queue-utube handling (#85)
- Add public API with a request object for Select/Update/Upstream (#126)

### Fixed

Expand Down
73 changes: 73 additions & 0 deletions client_tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,79 @@ func (o Op) EncodeMsgpack(enc *msgpack.Encoder) error {
return enc.Encode(o.Arg)
}

const (
appendOperator = "+"
subtractionOperator = "-"
bitwiseAndOperator = "&"
bitwiseOrOperator = "|"
bitwiseXorOperator = "^"
spliceOperator = ":"
insertOperator = "!"
deleteOperator = "#"
assignOperator = "="
)

// Operations is a collection of update operations.
type Operations struct {
ops []Op
}

// NewOperations returns a new empty collection of update operations.
func NewOperations() *Operations {
ops := new(Operations)
return ops
}

func (ops *Operations) append(op string, field int, arg interface{}) *Operations {
ops.ops = append(ops.ops, Op{op, field, arg})
return ops
}

// Add adds an additional operation to the collection of update operations.
func (ops *Operations) Add(field int, arg interface{}) *Operations {
return ops.append(appendOperator, field, arg)
}

// Subtract adds a subtraction operation to the collection of update operations.
func (ops *Operations) Subtract(field int, arg interface{}) *Operations {
return ops.append(subtractionOperator, field, arg)
}

// BitwiseAnd adds a bitwise AND operation to the collection of update operations.
func (ops *Operations) BitwiseAnd(field int, arg interface{}) *Operations {
return ops.append(bitwiseAndOperator, field, arg)
}

// BitwiseOr adds a bitwise OR operation to the collection of update operations.
func (ops *Operations) BitwiseOr(field int, arg interface{}) *Operations {
return ops.append(bitwiseOrOperator, field, arg)
}

// BitwiseXor adds a bitwise XOR operation to the collection of update operations.
func (ops *Operations) BitwiseXor(field int, arg interface{}) *Operations {
return ops.append(bitwiseXorOperator, field, arg)
}

// Splice adds a splice operation to the collection of update operations.
func (ops *Operations) Splice(field int, arg interface{}) *Operations {
return ops.append(spliceOperator, field, arg)
}

// Insert adds an insert operation to the collection of update operations.
func (ops *Operations) Insert(field int, arg interface{}) *Operations {
return ops.append(insertOperator, field, arg)
}

// Delete adds a delete operation to the collection of update operations.
func (ops *Operations) Delete(field int, arg interface{}) *Operations {
return ops.append(deleteOperator, field, arg)
}

// Assign adds an assign operation to the collection of update operations.
func (ops *Operations) Assign(field int, arg interface{}) *Operations {
return ops.append(assignOperator, field, arg)
}

type OpSplice struct {
Op string
Field int
Expand Down
39 changes: 38 additions & 1 deletion connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func (conn *Connection) dial() (err error) {
func (conn *Connection) writeAuthRequest(w *bufio.Writer, scramble []byte) (err error) {
request := &Future{
requestId: 0,
requestCode: AuthRequest,
requestCode: AuthRequestCode,
}
var packet smallWBuf
err = request.pack(&packet, msgpack.NewEncoder(&packet), func(enc *msgpack.Encoder) error {
Expand Down Expand Up @@ -874,6 +874,43 @@ func (conn *Connection) nextRequestId() (requestId uint32) {
return atomic.AddUint32(&conn.requestId, 1)
}

// Do verifies, sends the request and returns a response.
//
// An error is returned if the request was formed incorrectly, or failure to
// communicate by the connection, or unable to decode the response.
func (conn *Connection) Do(req Request) (*Response, error) {
fut, err := conn.DoAsync(req)
if err != nil {
return nil, err
}
return fut.Get()
}

// DoTyped verifies, sends the request and fills the typed result.
//
// An error is returned if the request was formed incorrectly, or failure to
// communicate by the connection, or unable to decode the response.
func (conn *Connection) DoTyped(req Request, result interface{}) error {
fut, err := conn.DoAsync(req)
if err != nil {
return err
}
return fut.GetTyped(result)
}

// DoAsync verifies, sends the request and returns a future.
//
// An error is returned if the request was formed incorrectly, or failure to
// create the future.
func (conn *Connection) DoAsync(req Request) (*Future, error) {
bodyFunc, err := req.BodyFunc(conn.Schema)
if err != nil {
return nil, err
}
future := conn.newFuture(req.Code())
return future.send(conn, bodyFunc), nil
}

// ConfiguredTimeout returns a timeout from connection config.
func (conn *Connection) ConfiguredTimeout() time.Duration {
return conn.opts.Timeout
Expand Down
24 changes: 12 additions & 12 deletions const.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package tarantool

const (
SelectRequest = 1
InsertRequest = 2
ReplaceRequest = 3
UpdateRequest = 4
DeleteRequest = 5
CallRequest = 6 /* call in 1.6 format */
AuthRequest = 7
EvalRequest = 8
UpsertRequest = 9
Call17Request = 10
PingRequest = 64
SubscribeRequest = 66
SelectRequestCode = 1
InsertRequestCode = 2
ReplaceRequestCode = 3
UpdateRequestCode = 4
DeleteRequestCode = 5
CallRequestCode = 6 /* call in 1.6 format */
AuthRequestCode = 7
EvalRequestCode = 8
UpsertRequestCode = 9
Call17RequestCode = 10
PingRequestCode = 64
SubscribeRequestCode = 66

KeyCode = 0x00
KeySync = 0x01
Expand Down
110 changes: 110 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,116 @@ func ExampleConnection_SelectAsync() {
// Future 2 Data [[18 val 18 bla]]
}

func ExampleSelectRequest() {
conn := example_connect()
defer conn.Close()

req := tarantool.NewSelectRequest(512).
Limit(100).
Key(tarantool.IntKey{1111})
resp, err := conn.Do(req)
if err != nil {
fmt.Printf("error in do select request is %v", err)
return
}
fmt.Printf("response is %#v\n", resp.Data)

req = tarantool.NewSelectRequest("test").
Index("primary").
Limit(100).
Key(tarantool.IntKey{1111})
fut, err := conn.DoAsync(req)
if err != nil {
fmt.Printf("error in do async select request is %v", err)
}
resp, err = fut.Get()
if err != nil {
fmt.Printf("error in do async select request is %v", err)
return
}
fmt.Printf("response is %#v\n", resp.Data)
// Output:
// response is []interface {}{[]interface {}{0x457, "hello", "world"}}
// response is []interface {}{[]interface {}{0x457, "hello", "world"}}
}

func ExampleUpdateRequest() {
conn := example_connect()
defer conn.Close()

req := tarantool.NewUpdateRequest(512).
Key(tarantool.IntKey{1111}).
Operations(tarantool.NewOperations().Assign(1, "bye"))
resp, err := conn.Do(req)
if err != nil {
fmt.Printf("error in do update request is %v", err)
return
}
fmt.Printf("response is %#v\n", resp.Data)

req = tarantool.NewUpdateRequest("test").
Index("primary").
Key(tarantool.IntKey{1111}).
Operations(tarantool.NewOperations().Assign(1, "hello"))
fut, err := conn.DoAsync(req)
if err != nil {
fmt.Printf("error in do async update request is %v", err)
}
resp, err = fut.Get()
if err != nil {
fmt.Printf("error in do async update request is %v", err)
return
}
fmt.Printf("response is %#v\n", resp.Data)
// Output:
// response is []interface {}{[]interface {}{0x457, "bye", "world"}}
// response is []interface {}{[]interface {}{0x457, "hello", "world"}}
}

func ExampleUpsertRequest() {
conn := example_connect()
defer conn.Close()

var req tarantool.Request
req = tarantool.NewUpsertRequest(512).
Tuple([]interface{}{uint(1113), "first", "first"}).
Operations(tarantool.NewOperations().Assign(1, "updated"))
resp, err := conn.Do(req)
if err != nil {
fmt.Printf("error in do select upsert is %v", err)
return
}
fmt.Printf("response is %#v\n", resp.Data)

req = tarantool.NewUpsertRequest("test").
Tuple([]interface{}{uint(1113), "second", "second"}).
Operations(tarantool.NewOperations().Assign(2, "updated"))
fut, err := conn.DoAsync(req)
if err != nil {
fmt.Printf("error in do async upsert request is %v", err)
}
resp, err = fut.Get()
if err != nil {
fmt.Printf("error in do async upsert request is %v", err)
return
}
fmt.Printf("response is %#v\n", resp.Data)

req = tarantool.NewSelectRequest(512).
Limit(100).
Key(tarantool.IntKey{1113})
resp, err = conn.Do(req)
if err != nil {
fmt.Printf("error in do select request is %v", err)
return
}
fmt.Printf("response is %#v\n", resp.Data)
// Output:
// response is []interface {}{}
// response is []interface {}{}
// response is []interface {}{[]interface {}{0x459, "first", "updated"}}
}

func ExampleConnection_Ping() {
conn := example_connect()
defer conn.Close()
Expand Down
22 changes: 20 additions & 2 deletions export_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,23 @@
package tarantool

func (schema *Schema) ResolveSpaceIndex(s interface{}, i interface{}) (spaceNo, indexNo uint32, err error) {
return schema.resolveSpaceIndex(s, i)
import (
"gopkg.in/vmihailenco/msgpack.v2"
)

// RefImplSelectBody is reference implementation for filling of a select
// request's body.
func RefImplSelectBody(enc *msgpack.Encoder, space, index, offset, limit, iterator uint32, key interface{}) error {
return fillSelect(enc, space, index, offset, limit, iterator, key)
}

// RefImplUpdateBody is reference implementation for filling of an update
// request's body.
func RefImplUpdateBody(enc *msgpack.Encoder, space, index uint32, key, ops interface{}) error {
return fillUpdate(enc, space, index, key, ops)
}

// RefImplUpsertBody is reference implementation for filling of an upsert
// request's body.
func RefImplUpsertBody(enc *msgpack.Encoder, space uint32, tuple, ops interface{}) error {
return fillUpsert(enc, space, tuple, ops)
}
Loading

0 comments on commit 3ce324b

Please sign in to comment.