From 1f88b6f9d9025c3323db1ce07c06da5caff2f88d Mon Sep 17 00:00:00 2001 From: Dmitriy Gertsog Date: Wed, 2 Oct 2024 17:45:02 +0300 Subject: [PATCH] api: add support of a batch insert request Draft changes: add support the IPROTO_INSERT_ARROW request and message pack type MP_ARROW . Closes #399 --- CHANGELOG.md | 7 ++- arrow/arrow.go | 59 +++++++++++++++++++ arrow/arrow_test.go | 130 ++++++++++++++++++++++++++++++++++++++++++ arrow/config.lua | 36 ++++++++++++ arrow/example_test.go | 62 ++++++++++++++++++++ arrow/request.go | 93 ++++++++++++++++++++++++++++++ request.go | 23 ++++---- request_test.go | 83 ++++++++++++++++++++++++++- response.go | 6 ++ test_helpers/main.go | 14 ++++- watch.go | 6 +- 11 files changed, 496 insertions(+), 23 deletions(-) create mode 100644 arrow/arrow.go create mode 100644 arrow/arrow_test.go create mode 100644 arrow/config.lua create mode 100644 arrow/example_test.go create mode 100644 arrow/request.go diff --git a/CHANGELOG.md b/CHANGELOG.md index a2f9983e..b0eb0f9c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,12 +9,13 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. ## [Unreleased] ### Added -- Add err log to `ConnectionPool.Add()` in case, when unable to establish - connection and ctx is not canceled; - also added logs for error case of `ConnectionPool.tryConnect()` calls in +- Add err log to `ConnectionPool.Add()` in case, when unable to establish + connection and ctx is not canceled; + also added logs for error case of `ConnectionPool.tryConnect()` calls in `ConnectionPool.controller()` and `ConnectionPool.reconnect()` - Methods that are implemented but not included in the pooler interface (#395). - Implemented stringer methods for pool.Role (#405). +- Support the IPROTO_INSERT_ARROW request (#399). ### Changed diff --git a/arrow/arrow.go b/arrow/arrow.go new file mode 100644 index 00000000..c0b12060 --- /dev/null +++ b/arrow/arrow.go @@ -0,0 +1,59 @@ +package arrow + +import ( + "fmt" + "reflect" + + "github.com/vmihailenco/msgpack/v5" +) + +// Arrow MessagePack extension type +const arrowExtId = 8 + +// Arrow struct wraps a raw arrow data buffer. +type Arrow struct { + data []byte +} + +// MakeArrow returns a new arrow.Arrow object that contains +// wrapped a raw arrow data buffer. +func MakeArrow(arrow []byte) (Arrow, error) { + if len(arrow) == 0 { + return Arrow{}, fmt.Errorf("no Arrow data") + } + return Arrow{arrow}, nil +} + +// ToArrow returns a []byte that contains Arrow raw data. +func (a *Arrow) ToArrow() []byte { + return a.data +} + +func arrowDecoder(d *msgpack.Decoder, v reflect.Value, extLen int) error { + arrow := Arrow{ + data: make([]byte, 0, extLen), + } + n, err := d.Buffered().Read(arrow.data) + if err != nil { + return fmt.Errorf("msgpack: can't read bytes on Arrow decode: %w", err) + } + if n < extLen || n != len(arrow.data) { + return fmt.Errorf("msgpack: unexpected end of stream after %d Arrow bytes", n) + } + + v.Set(reflect.ValueOf(arrow)) + return nil +} + +func arrowEncoder(e *msgpack.Encoder, v reflect.Value) ([]byte, error) { + if v.IsValid() { + return v.Interface().(Arrow).data, nil + } + + return []byte{}, fmt.Errorf("msgpack: not valid Arrow value") +} + +func init() { + msgpack.RegisterExtDecoder(arrowExtId, Arrow{}, arrowDecoder) + msgpack.RegisterExtEncoder(arrowExtId, Arrow{}, arrowEncoder) +} diff --git a/arrow/arrow_test.go b/arrow/arrow_test.go new file mode 100644 index 00000000..09096a2c --- /dev/null +++ b/arrow/arrow_test.go @@ -0,0 +1,130 @@ +package arrow_test + +import ( + "encoding/hex" + "log" + "os" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/tarantool/go-tarantool/v2" + "github.com/tarantool/go-tarantool/v2/arrow" + "github.com/tarantool/go-tarantool/v2/test_helpers" +) + +var isArrowSupported = false + +var server = "127.0.0.1:3013" +var dialer = tarantool.NetDialer{ + Address: server, + User: "test", + Password: "test", +} +var space = "testArrow" + +var opts = tarantool.Opts{ + Timeout: 5 * time.Second, +} + +func skipIfArrowUnsupported(t *testing.T) { + t.Helper() + if !isArrowSupported { + t.Skip("Skipping test for Tarantool without Arrow support in msgpack") + } +} + +// TestInsert uses Arrow sequence from Tarantool's test . +// nolint:lll +// See: https://github.com/tarantool/tarantool/blob/master/test/box-luatest/gh_10508_iproto_insert_arrow_test.lua +func TestInsert_invalid(t *testing.T) { + skipIfArrowUnsupported(t) + + arrows := []struct { + arrow string + expected string + }{ + { + "", + "no Arrow data", + }, + { + "00", + "Failed to decode Arrow IPC data", + }, + { + "ffffffff70000000040000009effffff0400010004000000" + + "b6ffffff0c00000004000000000000000100000004000000daffffff140000000202" + + "000004000000f0ffffff4000000001000000610000000600080004000c0010000400" + + "080009000c000c000c0000000400000008000a000c00040006000800ffffffff8800" + + "0000040000008affffff0400030010000000080000000000000000000000acffffff" + + "01000000000000003400000008000000000000000200000000000000000000000000" + + "00000000000000000000000000000800000000000000000000000100000001000000" + + "0000000000000000000000000a00140004000c0010000c0014000400060008000c00" + + "00000000000000000000", + "memtx does not support arrow format", + }, + } + + conn := test_helpers.ConnectWithValidation(t, dialer, opts) + defer conn.Close() + + for i, a := range arrows { + t.Run(strconv.Itoa(i), func(t *testing.T) { + data, err := hex.DecodeString(a.arrow) + require.NoError(t, err) + + arr, err := arrow.MakeArrow(data) + if err != nil { + require.ErrorContains(t, err, a.expected) + return + } + req := arrow.NewInsertRequest(space, arr) + + _, err = conn.Do(req).Get() + require.ErrorContains(t, err, a.expected) + }) + } + +} + +// runTestMain is a body of TestMain function +// (see https://pkg.go.dev/testing#hdr-Main). +// Using defer + os.Exit is not works so TestMain body +// is a separate function, see +// https://stackoverflow.com/questions/27629380/how-to-exit-a-go-program-honoring-deferred-calls +func runTestMain(m *testing.M) int { + isLess, err := test_helpers.IsTarantoolVersionLess(3, 3, 0) + if err != nil { + log.Fatalf("Failed to extract Tarantool version: %s", err) + } + isArrowSupported = !isLess + + if !isArrowSupported { + log.Println("Skipping insert Arrow tests...") + return m.Run() + } + + instance, err := test_helpers.StartTarantool(test_helpers.StartOpts{ + Dialer: dialer, + InitScript: "config.lua", + Listen: server, + WaitStart: 100 * time.Millisecond, + ConnectRetry: 10, + RetryTimeout: 500 * time.Millisecond, + }) + defer test_helpers.StopTarantoolWithCleanup(instance) + + if err != nil { + log.Printf("Failed to prepare test Tarantool: %s", err) + return 1 + } + + return m.Run() +} + +func TestMain(m *testing.M) { + code := runTestMain(m) + os.Exit(code) +} diff --git a/arrow/config.lua b/arrow/config.lua new file mode 100644 index 00000000..b75ecd00 --- /dev/null +++ b/arrow/config.lua @@ -0,0 +1,36 @@ +-- Do not set listen for now so connector won't be +-- able to send requests until everything is configured. +box.cfg { + work_dir = os.getenv("TEST_TNT_WORK_DIR") +} + +box.schema.user.create('test', { + password = 'test', + if_not_exists = true +}) +box.schema.user.grant('test', 'execute', 'universe', nil, { + if_not_exists = true +}) + +local s = box.schema.space.create('testArrow', { + id = 524, + if_not_exists = true +}) +s:create_index('primary', { + type = 'tree', + parts = {{ + field = 1, + type = 'integer' + }}, + if_not_exists = true +}) +s:truncate() + +box.schema.user.grant('test', 'read,write', 'space', 'testArrow', { + if_not_exists = true +}) + +-- Set listen only when every other thing is configured. +box.cfg { + listen = os.getenv("TEST_TNT_LISTEN") +} diff --git a/arrow/example_test.go b/arrow/example_test.go new file mode 100644 index 00000000..910a4fd4 --- /dev/null +++ b/arrow/example_test.go @@ -0,0 +1,62 @@ +// Run Tarantool instance before example execution: +// +// Terminal 1: +// $ cd arrow +// $ TEST_TNT_LISTEN=3013 TEST_TNT_WORK_DIR=$(mktemp -d -t 'tarantool.XXX') tarantool config.lua +// +// Terminal 2: +// $ go test -v example_test.go +package arrow_test + +import ( + "context" + "encoding/hex" + "fmt" + "log" + "strings" + "time" + + "github.com/tarantool/go-tarantool/v2" + "github.com/tarantool/go-tarantool/v2/arrow" +) + +var arrowBinData, _ = hex.DecodeString("ffffffff70000000040000009effffff0400010004000000" + + "b6ffffff0c00000004000000000000000100000004000000daffffff140000000202" + + "000004000000f0ffffff4000000001000000610000000600080004000c0010000400" + + "080009000c000c000c0000000400000008000a000c00040006000800ffffffff8800" + + "0000040000008affffff0400030010000000080000000000000000000000acffffff" + + "01000000000000003400000008000000000000000200000000000000000000000000" + + "00000000000000000000000000000800000000000000000000000100000001000000" + + "0000000000000000000000000a00140004000c0010000c0014000400060008000c00" + + "00000000000000000000") + +func Example() { + dialer := tarantool.NetDialer{ + Address: "127.0.0.1:3013", + User: "test", + Password: "test", + } + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + client, err := tarantool.Connect(ctx, dialer, tarantool.Opts{}) + cancel() + if err != nil { + log.Fatalf("Failed to connect: %s", err) + } + + arr, err := arrow.MakeArrow(arrowBinData) + if err != nil { + log.Fatalf("Failed prepare Arrow data: %s", err) + } + + spaceNo := uint32(524) + req := arrow.NewInsertRequest(spaceNo, arr) + + _, err = client.Do(req).Get() + if err != nil { + msg := strings.Split(err.Error(), "(")[0] + fmt.Printf("Failed insert Arrow: %s\n", msg) + } + + //! Output: + // Failed insert Arrow: memtx does not support arrow format +} diff --git a/arrow/request.go b/arrow/request.go new file mode 100644 index 00000000..4025940a --- /dev/null +++ b/arrow/request.go @@ -0,0 +1,93 @@ +package arrow + +import ( + "context" + "io" + + "github.com/tarantool/go-iproto" + "github.com/tarantool/go-tarantool/v2" + "github.com/vmihailenco/msgpack/v5" +) + +// INSERT Arrow request. +// +// FIXME: replace with iproto.IPROTO_INSERT_ARROW when iproto will released. +// https://github.com/tarantool/go-replica/issues/30 +const iprotoInsertArrowType = iproto.Type(17) + +// The data in Arrow format. +// +// FIXME: replace with iproto.IPROTO_ARROW when iproto will released. +// https://github.com/tarantool/go-replica/issues/30 +const iprotoArrowKey = iproto.Key(0x36) + +// InsertRequest helps you to create an insert request object for execution +// by a Connection. +type InsertRequest struct { + arrow Arrow + space interface{} + ctx context.Context +} + +// NewInsertRequest returns a new empty InsertRequest. +func NewInsertRequest(space interface{}, arrow Arrow) *InsertRequest { + return &InsertRequest{ + arrow: arrow, + space: space, + } +} + +// Type returns a IPROTO_INSERT_ARROW type for the request. +func (r *InsertRequest) Type() iproto.Type { + return iprotoInsertArrowType +} + +// Async returns false to the request return a response. +func (r *InsertRequest) Async() bool { + return false +} + +// Ctx returns a context of the request. +func (r *InsertRequest) Ctx() context.Context { + return r.ctx +} + +// Context sets a passed context to the request. +// +// Pay attention that when using context with request objects, +// the timeout option for Connection does not affect the lifetime +// of the request. For those purposes use context.WithTimeout() as +// the root context. +func (r *InsertRequest) Context(ctx context.Context) *InsertRequest { + r.ctx = ctx + return r +} + +// Arrow sets the arrow for insertion the insert arrow request. +// Note: default value is nil. +func (r *InsertRequest) Arrow(arrow Arrow) *InsertRequest { + r.arrow = arrow + return r +} + +// Body fills an msgpack.Encoder with the insert arrow request body. +func (r *InsertRequest) Body(res tarantool.SchemaResolver, enc *msgpack.Encoder) error { + if err := enc.EncodeMapLen(2); err != nil { + return err + } + if err := tarantool.EncodeSpace(res, enc, r.space); err != nil { + return err + } + if err := enc.EncodeUint(uint64(iprotoArrowKey)); err != nil { + return err + } + return enc.Encode(r.arrow) +} + +// Response creates a response for the InsertRequest. +func (r *InsertRequest) Response( + header tarantool.Header, + body io.Reader, +) (tarantool.Response, error) { + return tarantool.DecodeBaseResponse(header, body) +} diff --git a/request.go b/request.go index 8dbe250b..21ed0eba 100644 --- a/request.go +++ b/request.go @@ -855,11 +855,7 @@ func (req *baseRequest) Ctx() context.Context { // Response creates a response for the baseRequest. func (req *baseRequest) Response(header Header, body io.Reader) (Response, error) { - resp, err := createBaseResponse(header, body) - if err != nil { - return nil, err - } - return &resp, nil + return DecodeBaseResponse(header, body) } type spaceRequest struct { @@ -871,6 +867,17 @@ func (req *spaceRequest) setSpace(space interface{}) { req.space = space } +func EncodeSpace(res SchemaResolver, enc *msgpack.Encoder, space interface{}) error { + spaceEnc, err := newSpaceEncoder(res, space) + if err != nil { + return err + } + if err := spaceEnc.Encode(enc); err != nil { + return err + } + return nil +} + type spaceIndexRequest struct { spaceRequest index interface{} @@ -954,11 +961,7 @@ func (req authRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error { // Response creates a response for the authRequest. func (req authRequest) Response(header Header, body io.Reader) (Response, error) { - resp, err := createBaseResponse(header, body) - if err != nil { - return nil, err - } - return &resp, nil + return DecodeBaseResponse(header, body) } // PingRequest helps you to create an execute request object for execution diff --git a/request_test.go b/request_test.go index 84ba23ef..8ced455c 100644 --- a/request_test.go +++ b/request_test.go @@ -9,10 +9,10 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/tarantool/go-iproto" - "github.com/vmihailenco/msgpack/v5" - . "github.com/tarantool/go-tarantool/v2" + "github.com/vmihailenco/msgpack/v5" ) const invalidSpaceMsg = "invalid space" @@ -1063,3 +1063,82 @@ func TestResponseDecodeTyped(t *testing.T) { assert.Equal(t, []byte{'v', '2'}, decoded) } } + +type stubSchemeResolver struct { + space interface{} +} + +func (r stubSchemeResolver) ResolveSpace(s interface{}) (uint32, error) { + if id, ok := r.space.(uint32); ok { + return id, nil + } + if _, ok := r.space.(string); ok { + return 0, nil + } + return 0, fmt.Errorf("stub error message: %v", r.space) +} + +func (stubSchemeResolver) ResolveIndex(i interface{}, spaceNo uint32) (uint32, error) { + return 0, nil +} + +func (r stubSchemeResolver) NamesUseSupported() bool { + _, ok := r.space.(string) + return ok +} + +func TestEncodeSpace(t *testing.T) { + tests := []struct { + name string + res stubSchemeResolver + err string + out []byte + }{ + { + name: "string space", + res: stubSchemeResolver{"test"}, + out: []byte{0x5E, 0xA4, 0x74, 0x65, 0x73, 0x74}, + }, + { + name: "empty string", + res: stubSchemeResolver{""}, + out: []byte{0x5E, 0xA0}, + }, + { + name: "numeric 524", + res: stubSchemeResolver{uint32(524)}, + out: []byte{0x10, 0xCD, 0x02, 0x0C}, + }, + { + name: "numeric zero", + res: stubSchemeResolver{uint32(0)}, + out: []byte{0x10, 0x00}, + }, + { + name: "numeric max value", + res: stubSchemeResolver{^uint32(0)}, + out: []byte{0x10, 0xCE, 0xFF, 0xFF, 0xFF, 0xFF}, + }, + { + name: "resolve error", + res: stubSchemeResolver{false}, + err: "stub error message", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var buf bytes.Buffer + enc := msgpack.NewEncoder(&buf) + + err := EncodeSpace(tt.res, enc, tt.res.space) + if tt.err != "" { + require.ErrorContains(t, err, tt.err) + return + } else { + require.NoError(t, err) + } + + require.Equal(t, tt.out, buf.Bytes()) + }) + } +} diff --git a/response.go b/response.go index 2c287f8b..90a02a1c 100644 --- a/response.go +++ b/response.go @@ -45,6 +45,12 @@ func createBaseResponse(header Header, body io.Reader) (baseResponse, error) { return baseResponse{header: header, buf: smallBuf{b: data}}, nil } +// DecodeBaseResponse parse response header and body. +func DecodeBaseResponse(header Header, body io.Reader) (Response, error) { + resp, err := createBaseResponse(header, body) + return &resp, err +} + // SelectResponse is used for the select requests. // It might contain a position descriptor of the last selected tuple. // diff --git a/test_helpers/main.go b/test_helpers/main.go index 178f6992..d986e314 100644 --- a/test_helpers/main.go +++ b/test_helpers/main.go @@ -120,13 +120,22 @@ func atoiUint64(str string) (uint64, error) { return res, nil } +func getTarantoolExec() string { + + if tar_bin := os.Getenv("TARANTOOL_BIN"); tar_bin != "" { + return tar_bin + } + + return "tarantool" +} + // IsTarantoolVersionLess checks if tarantool version is less // than passed . Returns error if failed // to extract version. func IsTarantoolVersionLess(majorMin uint64, minorMin uint64, patchMin uint64) (bool, error) { var major, minor, patch uint64 - out, err := exec.Command("tarantool", "--version").Output() + out, err := exec.Command(getTarantoolExec(), "--version").Output() if err != nil { return true, err @@ -202,8 +211,7 @@ func StartTarantool(startOpts StartOpts) (TarantoolInstance, error) { return inst, err } } - - inst.Cmd = exec.Command("tarantool", startOpts.InitScript) + inst.Cmd = exec.Command(getTarantoolExec(), startOpts.InitScript) inst.Cmd.Env = append( os.Environ(), diff --git a/watch.go b/watch.go index c147b039..9f127313 100644 --- a/watch.go +++ b/watch.go @@ -58,11 +58,7 @@ func (req *BroadcastRequest) Async() bool { // Response creates a response for a BroadcastRequest. func (req *BroadcastRequest) Response(header Header, body io.Reader) (Response, error) { - resp, err := createBaseResponse(header, body) - if err != nil { - return nil, err - } - return &resp, nil + return DecodeBaseResponse(header, body) } // watchRequest subscribes to the updates of a specified key defined on the