Skip to content

Commit

Permalink
Add public API with a request object for Select/Update/Upsert
Browse files Browse the repository at this point in the history
This patch provides request types for part of space operations: Select, Update
and Upstream. It allows to create requests step by step. The main idea here
is too provide more extensible approach to create requests.

Part of #126
  • Loading branch information
oleg-jukovec committed Apr 20, 2022
1 parent 7897baf commit 08d91c5
Show file tree
Hide file tree
Showing 11 changed files with 1,011 additions and 158 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
### Added

- Coveralls support (#149)
- Add public API with a request object for Select/Update/Upstream (#126)
- Reusable testing workflow (integration testing with latest Tarantool) (#123)
- Simple CI based on GitHub actions (#114)
- Support UUID type in msgpack (#90)
Expand Down
74 changes: 74 additions & 0 deletions client_tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,80 @@ func (o Op) EncodeMsgpack(enc *msgpack.Encoder) error {
return enc.Encode(o.Arg)
}

// We don't want to make it public.
const (
appendOperator = "+"
subtractionOperator = "-"
bitwiseAndOperator = "&"
bitwiseOrOperator = "|"
bitwiseXorOperator = "^"
spliceOperator = ":"
insertOperator = "!"
deleteOperator = "#"
assignOperator = "="
)

// Operations is a collection of update operations.
type Operations struct {
ops []Op
}

// NewOperations returns a new empty collection of update operations.
func NewOperations() *Operations {
ops := new(Operations)
return ops
}

func (ops *Operations) append(op string, field int, arg interface{}) *Operations {
ops.ops = append(ops.ops, Op{op, field, arg})
return ops
}

// Add adds an additional operation to the collection of update operations.
func (ops *Operations) Add(field int, arg interface{}) *Operations {
return ops.append(appendOperator, field, arg)
}

// Subtract adds a subtraction operation to the collection of update operations.
func (ops *Operations) Subtract(field int, arg interface{}) *Operations {
return ops.append(subtractionOperator, field, arg)
}

// BitwiseAnd adds a bitwise AND operation to the collection of update operations.
func (ops *Operations) BitwiseAnd(field int, arg interface{}) *Operations {
return ops.append(bitwiseAndOperator, field, arg)
}

// BitwiseOr adds a bitwise OR operation to the collection of update operations.
func (ops *Operations) BitwiseOr(field int, arg interface{}) *Operations {
return ops.append(bitwiseOrOperator, field, arg)
}

// BitwiseXor adds a bitwise XOR operation to the collection of update operations.
func (ops *Operations) BitwiseXor(field int, arg interface{}) *Operations {
return ops.append(bitwiseXorOperator, field, arg)
}

// Splice adds a splice operation to the collection of update operations.
func (ops *Operations) Splice(field int, arg interface{}) *Operations {
return ops.append(spliceOperator, field, arg)
}

// Insert adds an insert operation to the collection of update operations.
func (ops *Operations) Insert(field int, arg interface{}) *Operations {
return ops.append(insertOperator, field, arg)
}

// Delete adds a delete operation to the collection of update operations.
func (ops *Operations) Delete(field int, arg interface{}) *Operations {
return ops.append(deleteOperator, field, arg)
}

// Assign adds an assign operation to the collection of update operations.
func (ops *Operations) Assign(field int, arg interface{}) *Operations {
return ops.append(assignOperator, field, arg)
}

type OpSplice struct {
Op string
Field int
Expand Down
39 changes: 38 additions & 1 deletion connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ func (conn *Connection) dial() (err error) {
func (conn *Connection) writeAuthRequest(w *bufio.Writer, scramble []byte) (err error) {
request := &Future{
requestId: 0,
requestCode: AuthRequest,
requestCode: AuthRequestCode,
}
var packet smallWBuf
err = request.pack(&packet, msgpack.NewEncoder(&packet), func(enc *msgpack.Encoder) error {
Expand Down Expand Up @@ -873,6 +873,43 @@ func (conn *Connection) nextRequestId() (requestId uint32) {
return atomic.AddUint32(&conn.requestId, 1)
}

// Do verifies, sends the request and returns a response.
//
// An error is returned if the request was formed incorrectly, or failure to
// communicate by the connection, or unable to decode the response.
func (conn *Connection) Do(req Request) (*Response, error) {
fut, err := conn.DoAsync(req)
if err != nil {
return nil, err
}
return fut.Get()
}

// DoTyped verifies, sends the request and fills the typed result.
//
// An error is returned if the request was formed incorrectly, or failure to
// communicate by the connection, or unable to decode the response.
func (conn *Connection) DoTyped(req Request, result interface{}) error {
fut, err := conn.DoAsync(req)
if err != nil {
return err
}
return fut.GetTyped(result)
}

// DoAsync verifies, sends the request and returns a future.
//
// An error is returned if the request was formed incorrectly, or failure to
// create the future.
func (conn *Connection) DoAsync(req Request) (*Future, error) {
bodyFunc, err := req.BodyFunc(conn.Schema)
if err != nil {
return nil, err
}
future := conn.newFuture(req.Code())
return future.send(conn, bodyFunc), nil
}

// ConfiguredTimeout returns timeout from connection config
func (conn *Connection) ConfiguredTimeout() time.Duration {
return conn.opts.Timeout
Expand Down
24 changes: 12 additions & 12 deletions const.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package tarantool

const (
SelectRequest = 1
InsertRequest = 2
ReplaceRequest = 3
UpdateRequest = 4
DeleteRequest = 5
CallRequest = 6 /* call in 1.6 format */
AuthRequest = 7
EvalRequest = 8
UpsertRequest = 9
Call17Request = 10
PingRequest = 64
SubscribeRequest = 66
SelectRequestCode = 1
InsertRequestCode = 2
ReplaceRequestCode = 3
UpdateRequestCode = 4
DeleteRequestCode = 5
CallRequestCode = 6 /* call in 1.6 format */
AuthRequestCode = 7
EvalRequestCode = 8
UpsertRequestCode = 9
Call17RequestCode = 10
PingRequestCode = 64
SubscribeRequestCode = 66

KeyCode = 0x00
KeySync = 0x01
Expand Down
125 changes: 125 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,131 @@ func ExampleConnection_SelectTyped() {
// response is [{{} 1111 hello world}]
}

func ExampleSelectRequest() {
var conn *tarantool.Connection
conn, err := example_connect()
if err != nil {
fmt.Printf("error in prepare is %v", err)
return
}

req := tarantool.NewSelectRequest(512).
Limit(100).
Key(tarantool.IntKey{1111})
resp, err := conn.Do(req)
if err != nil {
fmt.Printf("error in do select request is %v", err)
return
}
fmt.Printf("response is %#v\n", resp.Data)

req = tarantool.NewSelectRequest("test").
Index("primary").
Limit(100).
Key(tarantool.IntKey{1111})
fut, err := conn.DoAsync(req)
if err != nil {
fmt.Printf("error in do async select request is %v", err)
}
resp, err = fut.Get()
if err != nil {
fmt.Printf("error in do async select request is %v", err)
return
}
fmt.Printf("response is %#v\n", resp.Data)
// Output:
// response is []interface {}{[]interface {}{0x457, "hello", "world"}}
// response is []interface {}{[]interface {}{0x457, "hello", "world"}}
}

func ExampleUpdateRequest() {
var conn *tarantool.Connection
conn, err := example_connect()
if err != nil {
fmt.Printf("error in prepare is %v", err)
return
}

req := tarantool.NewUpdateRequest(512).
Key(tarantool.IntKey{1111}).
Operations(tarantool.NewOperations().Assign(1, "bye"))
resp, err := conn.Do(req)
if err != nil {
fmt.Printf("error in do update request is %v", err)
return
}
fmt.Printf("response is %#v\n", resp.Data)

req = tarantool.NewUpdateRequest("test").
Index("primary").
Key(tarantool.IntKey{1111}).
Operations(tarantool.NewOperations().Assign(1, "hello"))
fut, err := conn.DoAsync(req)
if err != nil {
fmt.Printf("error in do async update request is %v", err)
}
resp, err = fut.Get()
if err != nil {
fmt.Printf("error in do async update request is %v", err)
return
}
fmt.Printf("response is %#v\n", resp.Data)
// Output:
// response is []interface {}{[]interface {}{0x457, "bye", "world"}}
// response is []interface {}{[]interface {}{0x457, "hello", "world"}}
}

func ExampleUpsertRequest() {
var (
conn *tarantool.Connection
req tarantool.Request
)

conn, err := example_connect()
if err != nil {
fmt.Printf("error in prepare is %v", err)
return
}

req = tarantool.NewUpsertRequest(512).
Tuple([]interface{}{uint(1113), "first", "first"}).
Operations(tarantool.NewOperations().Assign(1, "updated"))
resp, err := conn.Do(req)
if err != nil {
fmt.Printf("error in do select upsert is %v", err)
return
}
fmt.Printf("response is %#v\n", resp.Data)

req = tarantool.NewUpsertRequest("test").
Tuple([]interface{}{uint(1113), "second", "second"}).
Operations(tarantool.NewOperations().Assign(2, "updated"))
fut, err := conn.DoAsync(req)
if err != nil {
fmt.Printf("error in do async upsert request is %v", err)
}
resp, err = fut.Get()
if err != nil {
fmt.Printf("error in do async upsert request is %v", err)
return
}
fmt.Printf("response is %#v\n", resp.Data)
// check results
req = tarantool.NewSelectRequest(512).
Limit(100).
Key(tarantool.IntKey{1113})
resp, err = conn.Do(req)
if err != nil {
fmt.Printf("error in do select request is %v", err)
return
}
fmt.Printf("response is %#v\n", resp.Data)
// Output:
// response is []interface {}{}
// response is []interface {}{}
// response is []interface {}{[]interface {}{0x459, "first", "updated"}}
}

func Example() {
spaceNo := uint32(512)
indexNo := uint32(0)
Expand Down
22 changes: 20 additions & 2 deletions export_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,23 @@
package tarantool

func (schema *Schema) ResolveSpaceIndex(s interface{}, i interface{}) (spaceNo, indexNo uint32, err error) {
return schema.resolveSpaceIndex(s, i)
import (
"gopkg.in/vmihailenco/msgpack.v2"
)

// RefImplSelectBody is reference implementation for filling of a select
// request's body.
func RefImplSelectBody(enc *msgpack.Encoder, space, index, offset, limit, iterator uint32, key interface{}) error {
return fillSelect(enc, space, index, offset, limit, iterator, key)
}

// RefImplUpdateBody is reference implementation for filling of an update
// request's body.
func RefImplUpdateBody(enc *msgpack.Encoder, space, index uint32, key, ops interface{}) error {
return fillUpdate(enc, space, index, key, ops)
}

// RefImplUpsertBody is reference implementation for filling of an upsert
// request's body.
func RefImplUpsertBody(enc *msgpack.Encoder, space uint32, tuple, ops interface{}) error {
return fillUpsert(enc, space, tuple, ops)
}
Loading

0 comments on commit 08d91c5

Please sign in to comment.