Skip to content

Commit

Permalink
api: add support of a batch insert request
Browse files Browse the repository at this point in the history
Draft changes: add support the IPROTO_INSERT_ARROW request and message pack type MP_ARROW .

Closes #399
  • Loading branch information
dmyger committed Oct 4, 2024
1 parent 592db69 commit b089b44
Show file tree
Hide file tree
Showing 9 changed files with 351 additions and 21 deletions.
7 changes: 4 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
## [Unreleased]

### Added
- Add err log to `ConnectionPool.Add()` in case, when unable to establish
connection and ctx is not canceled;
also added logs for error case of `ConnectionPool.tryConnect()` calls in
- Add err log to `ConnectionPool.Add()` in case, when unable to establish
connection and ctx is not canceled;
also added logs for error case of `ConnectionPool.tryConnect()` calls in
`ConnectionPool.controller()` and `ConnectionPool.reconnect()`
- Methods that are implemented but not included in the pooler interface (#395).
- Implemented stringer methods for pool.Role (#405).
- Support the IPROTO_INSERT_ARROW request (#399).

### Changed

Expand Down
59 changes: 59 additions & 0 deletions arrow/arrow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package arrow

import (
"fmt"
"reflect"

"github.com/vmihailenco/msgpack/v5"
)

// Arrow MessagePack extension type
const arrowExtId = 8

// Arrow struct wraps a raw arrow data buffer.
type Arrow struct {
data []byte
}

// MakeArrow returns a new arrow.Arrow object that contains
// wrapped a raw arrow data buffer.
func MakeArrow(arrow []byte) (Arrow, error) {
if len(arrow) == 0 {
return Arrow{}, fmt.Errorf("no Arrow data")
}
return Arrow{arrow}, nil
}

// ToArrow returns a []byte that contains Arrow raw data.
func (a *Arrow) ToArrow() []byte {
return a.data
}

func arrowDecoder(d *msgpack.Decoder, v reflect.Value, extLen int) error {
arrow := Arrow{
data: make([]byte, 0, extLen),
}
n, err := d.Buffered().Read(arrow.data)
if err != nil {
return fmt.Errorf("msgpack: can't read bytes on Arrow decode: %w", err)
}
if n < extLen || n != len(arrow.data) {
return fmt.Errorf("msgpack: unexpected end of stream after %d Arrow bytes", n)
}

v.Set(reflect.ValueOf(arrow))
return nil
}

func arrowEncoder(e *msgpack.Encoder, v reflect.Value) ([]byte, error) {
if v.IsValid() {
return v.Interface().(Arrow).data, nil
}

return []byte{}, fmt.Errorf("msgpack: not valid Arrow value")
}

func init() {
msgpack.RegisterExtDecoder(arrowExtId, Arrow{}, arrowDecoder)
msgpack.RegisterExtEncoder(arrowExtId, Arrow{}, arrowEncoder)
}
130 changes: 130 additions & 0 deletions arrow/arrow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package arrow_test

import (
"encoding/hex"
"log"
"os"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/tarantool/go-tarantool/v2"
"github.com/tarantool/go-tarantool/v2/arrow"
"github.com/tarantool/go-tarantool/v2/test_helpers"
)

var isArrowSupported = false

var server = "127.0.0.1:3013"
var dialer = tarantool.NetDialer{
Address: server,
User: "test",
Password: "test",
}
var space = "testArrow"

var opts = tarantool.Opts{
//! Timeout: 5 * time.Second,
}

func skipIfArrowUnsupported(t *testing.T) {
t.Helper()
if !isArrowSupported {
t.Skip("Skipping test for Tarantool without Arrow support in msgpack")
}
}

// TestInsert uses Arrow sequence from Tarantool's test .
// nolint:lll
// See: https://github.com/tarantool/tarantool/blob/master/test/box-luatest/gh_10508_iproto_insert_arrow_test.lua
func TestInsert_invalid(t *testing.T) {
skipIfArrowUnsupported(t)

arrows := []struct {
arrow string
expected string
}{
{
"",
"no Arrow data",
},
{
"00",
"Failed to decode Arrow IPC data",
},
{
"ffffffff70000000040000009effffff0400010004000000" +
"b6ffffff0c00000004000000000000000100000004000000daffffff140000000202" +
"000004000000f0ffffff4000000001000000610000000600080004000c0010000400" +
"080009000c000c000c0000000400000008000a000c00040006000800ffffffff8800" +
"0000040000008affffff0400030010000000080000000000000000000000acffffff" +
"01000000000000003400000008000000000000000200000000000000000000000000" +
"00000000000000000000000000000800000000000000000000000100000001000000" +
"0000000000000000000000000a00140004000c0010000c0014000400060008000c00" +
"00000000000000000000",
"memtx does not support arrow format",
},
}

conn := test_helpers.ConnectWithValidation(t, dialer, opts)
defer conn.Close()

for i, a := range arrows {
t.Run(strconv.Itoa(i), func(t *testing.T) {
data, err := hex.DecodeString(a.arrow)
require.NoError(t, err)

arr, err := arrow.MakeArrow(data)
if err != nil {
require.ErrorContains(t, err, a.expected)
return
}
req := arrow.NewInsertRequest(space, arr)

_, err = conn.Do(req).Get()
require.ErrorContains(t, err, a.expected)
})
}

}

// runTestMain is a body of TestMain function
// (see https://pkg.go.dev/testing#hdr-Main).
// Using defer + os.Exit is not works so TestMain body
// is a separate function, see
// https://stackoverflow.com/questions/27629380/how-to-exit-a-go-program-honoring-deferred-calls
func runTestMain(m *testing.M) int {
isLess, err := test_helpers.IsTarantoolVersionLess(3, 3, 0)
if err != nil {
log.Fatalf("Failed to extract Tarantool version: %s", err)
}
isArrowSupported = !isLess

if !isArrowSupported {
log.Println("Skipping insert Arrow tests...")
return m.Run()
}

instance, err := test_helpers.StartTarantool(test_helpers.StartOpts{
Dialer: dialer,
InitScript: "config.lua",
Listen: server,
WaitStart: 100 * time.Millisecond,
ConnectRetry: 10,
RetryTimeout: 500 * time.Millisecond,
})
defer test_helpers.StopTarantoolWithCleanup(instance)

if err != nil {
log.Printf("Failed to prepare test Tarantool: %s", err)
return 1
}

return m.Run()
}

func TestMain(m *testing.M) {
code := runTestMain(m)
os.Exit(code)
}
34 changes: 34 additions & 0 deletions arrow/config.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
--? local uuid = require('uuid')
--? local msgpack = require('msgpack')

-- Do not set listen for now so connector won't be
-- able to send requests until everything is configured.
box.cfg{
work_dir = os.getenv("TEST_TNT_WORK_DIR"),
}

box.schema.user.create('test', { password = 'test' , if_not_exists = true })
box.schema.user.grant('test', 'execute', 'universe', nil, { if_not_exists = true })

--? local uuid_msgpack_supported = pcall(msgpack.encode, uuid.new())
--? if not uuid_msgpack_supported then
--? error('UUID unsupported, use Tarantool 2.4.1 or newer')
--? end

local s = box.schema.space.create('testArrow', {
id = 524,
if_not_exists = true,
})
s:create_index('primary', {
type = 'tree',
parts = {{ field = 1, type = 'integer' }},
if_not_exists = true
})
s:truncate()

box.schema.user.grant('test', 'read,write', 'space', 'testArrow', { if_not_exists = true })

-- Set listen only when every other thing is configured.
box.cfg{
listen = os.getenv("TEST_TNT_LISTEN"),
}
93 changes: 93 additions & 0 deletions arrow/request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package arrow

import (
"context"
"io"

"github.com/tarantool/go-iproto"
"github.com/tarantool/go-tarantool/v2"
"github.com/vmihailenco/msgpack/v5"
)

// INSERT Arrow request.
//
// FIXME: replace with iproto.IPROTO_INSERT_ARROW when iproto will released.
// https://github.com/tarantool/go-replica/issues/30
const iprotoInsertArrowType = iproto.Type(17)

// The data in Arrow format.
//
// FIXME: replace with iproto.IPROTO_ARROW when iproto will released.
// https://github.com/tarantool/go-replica/issues/30
const iprotoArrowKey = iproto.Key(0x36)

// InsertRequest helps you to create an insert request object for execution
// by a Connection.
type InsertRequest struct {
arrow Arrow
space interface{}
ctx context.Context
}

// NewInsertRequest returns a new empty InsertRequest.
func NewInsertRequest(space interface{}, arrow Arrow) *InsertRequest {
return &InsertRequest{
arrow: arrow,
space: space,
}
}

// Type returns a IPROTO_INSERT_ARROW type for the request.
func (r *InsertRequest) Type() iproto.Type {
return iprotoInsertArrowType
}

// Async returns false to the request return a response.
func (r *InsertRequest) Async() bool {
return false
}

// Ctx returns a context of the request.
func (r *InsertRequest) Ctx() context.Context {
return r.ctx
}

// Context sets a passed context to the request.
//
// Pay attention that when using context with request objects,
// the timeout option for Connection does not affect the lifetime
// of the request. For those purposes use context.WithTimeout() as
// the root context.
func (r *InsertRequest) Context(ctx context.Context) *InsertRequest {
r.ctx = ctx
return r
}

// Arrow sets the arrow for insertion the insert arrow request.
// Note: default value is nil.
func (r *InsertRequest) Arrow(arrow Arrow) *InsertRequest {
r.arrow = arrow
return r
}

// Body fills an msgpack.Encoder with the insert arrow request body.
func (r *InsertRequest) Body(res tarantool.SchemaResolver, enc *msgpack.Encoder) error {
if err := enc.EncodeMapLen(2); err != nil {
return err
}
if err := tarantool.EncodeSpace(res, enc, r.space); err != nil {
return err
}
if err := enc.EncodeUint(uint64(iprotoArrowKey)); err != nil {
return err
}
return enc.Encode(r.arrow)
}

// Response creates a response for the InsertRequest.
func (r *InsertRequest) Response(
header tarantool.Header,
body io.Reader,
) (tarantool.Response, error) {
return tarantool.DecodeBaseResponse(header, body)
}
23 changes: 13 additions & 10 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,11 +855,7 @@ func (req *baseRequest) Ctx() context.Context {

// Response creates a response for the baseRequest.
func (req *baseRequest) Response(header Header, body io.Reader) (Response, error) {
resp, err := createBaseResponse(header, body)
if err != nil {
return nil, err
}
return &resp, nil
return DecodeBaseResponse(header, body)
}

type spaceRequest struct {
Expand All @@ -871,6 +867,17 @@ func (req *spaceRequest) setSpace(space interface{}) {
req.space = space
}

func EncodeSpace(res SchemaResolver, enc *msgpack.Encoder, space interface{}) error {
spaceEnc, err := newSpaceEncoder(res, space)
if err != nil {
return err
}
if err := spaceEnc.Encode(enc); err != nil {
return err
}
return nil
}

type spaceIndexRequest struct {
spaceRequest
index interface{}
Expand Down Expand Up @@ -954,11 +961,7 @@ func (req authRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error {

// Response creates a response for the authRequest.
func (req authRequest) Response(header Header, body io.Reader) (Response, error) {
resp, err := createBaseResponse(header, body)
if err != nil {
return nil, err
}
return &resp, nil
return DecodeBaseResponse(header, body)
}

// PingRequest helps you to create an execute request object for execution
Expand Down
6 changes: 6 additions & 0 deletions response.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ func createBaseResponse(header Header, body io.Reader) (baseResponse, error) {
return baseResponse{header: header, buf: smallBuf{b: data}}, nil
}

// DecodeBaseResponse parse response header and body.
func DecodeBaseResponse(header Header, body io.Reader) (Response, error) {
resp, err := createBaseResponse(header, body)
return &resp, err
}

// SelectResponse is used for the select requests.
// It might contain a position descriptor of the last selected tuple.
//
Expand Down
Loading

0 comments on commit b089b44

Please sign in to comment.