Skip to content

Commit d0ab8ab

Browse files
committed
api: add support of a batch insert request
Draft changes: add support the IPROTO_INSERT_ARROW request and message pack type MP_ARROW . Closes #399
1 parent 592db69 commit d0ab8ab

File tree

4 files changed

+108
-3
lines changed

4 files changed

+108
-3
lines changed

CHANGELOG.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,13 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
99
## [Unreleased]
1010

1111
### Added
12-
- Add err log to `ConnectionPool.Add()` in case, when unable to establish
13-
connection and ctx is not canceled;
14-
also added logs for error case of `ConnectionPool.tryConnect()` calls in
12+
- Add err log to `ConnectionPool.Add()` in case, when unable to establish
13+
connection and ctx is not canceled;
14+
also added logs for error case of `ConnectionPool.tryConnect()` calls in
1515
`ConnectionPool.controller()` and `ConnectionPool.reconnect()`
1616
- Methods that are implemented but not included in the pooler interface (#395).
1717
- Implemented stringer methods for pool.Role (#405).
18+
- Support the IPROTO_INSERT_ARROW request (#399).
1819

1920
### Changed
2021

arrow/arrow.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package arrow
2+
3+
import (
4+
"fmt"
5+
"reflect"
6+
7+
"github.com/vmihailenco/msgpack/v5"
8+
)
9+
10+
// Arrow MessagePack extension type
11+
const arrowExtId = 8
12+
13+
// Arrow struct wraps a raw arrow data buffer.
14+
type Arrow struct {
15+
data []byte
16+
}
17+
18+
// MakeArrow returns a new arrow.Arrow object that contains
19+
// wrapped a raw arrow data buffer.
20+
func MakeArrow(arrow []byte) (Arrow, error) {
21+
if len(arrow) == 0 {
22+
return Arrow{}, fmt.Errorf("no Arrow data")
23+
}
24+
return Arrow{arrow}, nil
25+
}
26+
27+
// ToArrow returns a []byte that contains Arrow raw data.
28+
func (a *Arrow) ToArrow() []byte {
29+
return a.data
30+
}
31+
32+
func arrowDecoder(d *msgpack.Decoder, v reflect.Value, extLen int) error {
33+
arrow := Arrow{
34+
data: make([]byte, 0, extLen),
35+
}
36+
n, err := d.Buffered().Read(arrow.data)
37+
if err != nil {
38+
return fmt.Errorf("msgpack: can't read bytes on Arrow decode: %w", err)
39+
}
40+
if n < extLen || n != len(arrow.data) {
41+
return fmt.Errorf("msgpack: unexpected end of stream after %d Arrow bytes", n)
42+
}
43+
44+
v.Set(reflect.ValueOf(arrow))
45+
return nil
46+
}
47+
48+
func arrowEncoder(e *msgpack.Encoder, v reflect.Value) ([]byte, error) {
49+
if v.IsValid() {
50+
return v.Interface().(Arrow).data, nil
51+
}
52+
53+
return []byte{}, fmt.Errorf("msgpack: not valid Arrow value")
54+
}
55+
56+
func init() {
57+
msgpack.RegisterExtDecoder(arrowExtId, Arrow{}, arrowDecoder)
58+
msgpack.RegisterExtEncoder(arrowExtId, Arrow{}, arrowEncoder)
59+
}

arrow/arrow_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
package arrow_test

request.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1156,6 +1156,50 @@ func (req *InsertRequest) Context(ctx context.Context) *InsertRequest {
11561156
return req
11571157
}
11581158

1159+
// InsertArrowRequest helps you to create an insert request object for execution
1160+
// by a Connection.
1161+
type InsertArrowRequest struct {
1162+
spaceRequest
1163+
arrow interface{}
1164+
}
1165+
1166+
// NewInsertArrowRequest returns a new empty InsertArrowRequest.
1167+
func NewInsertArrowRequest(space interface{}) *InsertArrowRequest {
1168+
req := new(InsertArrowRequest)
1169+
req.rtype = iproto.IPROTO_INSERT
1170+
req.setSpace(space)
1171+
req.arrow = []interface{}{}
1172+
return req
1173+
}
1174+
1175+
// Arrow sets the arrow for insertion the insert arrow request.
1176+
// Note: default value is nil.
1177+
func (req *InsertArrowRequest) Arrow(arrow interface{}) *InsertArrowRequest {
1178+
req.arrow = arrow
1179+
return req
1180+
}
1181+
1182+
// Body fills an msgpack.Encoder with the insert arrow request body.
1183+
func (req *InsertArrowRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error {
1184+
spaceEnc, err := newSpaceEncoder(res, req.space)
1185+
if err != nil {
1186+
return err
1187+
}
1188+
1189+
return fillInsert(enc, spaceEnc, req.arrow)
1190+
}
1191+
1192+
// Context sets a passed context to the request.
1193+
//
1194+
// Pay attention that when using context with request objects,
1195+
// the timeout option for Connection does not affect the lifetime
1196+
// of the request. For those purposes use context.WithTimeout() as
1197+
// the root context.
1198+
func (req *InsertArrowRequest) Context(ctx context.Context) *InsertArrowRequest {
1199+
req.ctx = ctx
1200+
return req
1201+
}
1202+
11591203
// ReplaceRequest helps you to create a replace request object for execution
11601204
// by a Connection.
11611205
type ReplaceRequest struct {

0 commit comments

Comments
 (0)