Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions memdx/opcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ const (
OpCodeSASLAuth = OpCode(OpCodeTypeCli | 0x21)
OpCodeSASLStep = OpCode(OpCodeTypeCli | 0x22)
OpCodeGetAllVBSeqnos = OpCode(OpCodeTypeCli | 0x48)
OpCodeGetEx = OpCode(OpCodeTypeCli | 0x49)
OpCodeGetExReplica = OpCode(OpCodeTypeCli | 0x4a)
OpCodeDcpOpenConnection = OpCode(OpCodeTypeCli | 0x50)
OpCodeDcpAddStream = OpCode(OpCodeTypeCli | 0x51)
OpCodeDcpCloseStream = OpCode(OpCodeTypeCli | 0x52)
Expand Down
82 changes: 82 additions & 0 deletions memdx/ops_crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,88 @@ func (o OpsCrud) Get(d Dispatcher, req *GetRequest, cb func(*GetResponse, error)
})
}

type GetExRequest struct {
CrudRequestMeta
CollectionID uint32
Key []byte
VbucketID uint16
}

func (r GetExRequest) OpName() string { return OpCodeGetEx.String() }

type GetExResponse struct {
CrudResponseMeta
Cas uint64
Flags uint32
Value []byte
Datatype uint8
}

func (o OpsCrud) GetEx(d Dispatcher, req *GetExRequest, cb func(*GetExResponse, error)) (PendingOp, error) {
extFramesBuf := make([]byte, 0, 128)
extFramesBuf, err := o.encodeReqExtFrames(req.OnBehalfOf, 0, 0, false, extFramesBuf)
if err != nil {
return nil, err
}

reqKey, err := o.encodeCollectionAndKey(req.CollectionID, req.Key, nil)
if err != nil {
return nil, err
}

return d.Dispatch(&Packet{
OpCode: OpCodeGetEx,
Key: reqKey,
VbucketID: req.VbucketID,
FramingExtras: extFramesBuf,
}, func(resp *Packet, err error) bool {
if err != nil {
cb(nil, err)
return false
}

decompErr := OpsCore{}.maybeDecompressPacket(resp)
if decompErr != nil {
cb(nil, decompErr)
return false
}

if resp.Status == StatusKeyNotFound {
cb(nil, ErrDocNotFound)
return false
}

if resp.Status != StatusSuccess {
cb(nil, OpsCrud{}.decodeCommonError(resp))
return false
}

if len(resp.Extras) != 4 {
cb(nil, protocolError{"bad extras length"})
return false
}

flags := binary.BigEndian.Uint32(resp.Extras[0:])

serverDuration, err := o.decodeResExtFrames(resp.FramingExtras)
if err != nil {
cb(nil, err)
return false
}

cb(&GetExResponse{
Cas: resp.Cas,
Flags: flags,
Value: resp.Value,
Datatype: resp.Datatype,
CrudResponseMeta: CrudResponseMeta{
ServerDuration: serverDuration,
},
}, nil)
return false
})
}

type GetAndTouchRequest struct {
CrudRequestMeta
CollectionID uint32
Expand Down
104 changes: 104 additions & 0 deletions memdx/ops_crud_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3785,6 +3785,110 @@ func TestOpsCrudCounterNonNumericDoc(t *testing.T) {
require.ErrorIs(t, err, memdx.ErrDeltaBadval)
}

func TestOpsCrudGetEx(t *testing.T) {
testutilsint.SkipIfShortTest(t)
testutilsint.SkipIfOlderServerVersion(t, "8.0.0")

t.Run("Basic", func(t *testing.T) {
key := []byte(uuid.NewString())

cli := createTestClient(t)

_, err := memdx.SyncUnaryCall(memdx.OpsCrud{
CollectionsEnabled: true,
ExtFramesEnabled: true,
}, memdx.OpsCrud.Set, cli, &memdx.SetRequest{
CollectionID: 0,
Key: key,
VbucketID: defaultTestVbucketID,
Value: []byte(`{"key":"value"}`),
})
require.NoError(t, err)

resp, err := memdx.SyncUnaryCall(memdx.OpsCrud{
CollectionsEnabled: true,
ExtFramesEnabled: true,
}, memdx.OpsCrud.GetEx, cli, &memdx.GetExRequest{
CollectionID: 0,
Key: key,
VbucketID: defaultTestVbucketID,
})
require.NoError(t, err)

hasXattrs := resp.Datatype&uint8(memdx.DatatypeFlagXattrs) != 0
require.False(t, hasXattrs)

require.NoError(t, err)
require.Equal(t, []byte(`{"key":"value"}`), resp.Value)
})

t.Run("WithXattrs", func(t *testing.T) {
key := []byte(uuid.NewString())

cli := createTestClient(t)

_, err := memdx.SyncUnaryCall(memdx.OpsCrud{
CollectionsEnabled: true,
ExtFramesEnabled: true,
}, memdx.OpsCrud.Set, cli, &memdx.SetRequest{
CollectionID: 0,
Key: key,
VbucketID: defaultTestVbucketID,
Value: []byte(`{"key":"value"}`),
})
require.NoError(t, err)

_, err = memdx.SyncUnaryCall(memdx.OpsCrud{
CollectionsEnabled: true,
ExtFramesEnabled: true,
}, memdx.OpsCrud.MutateIn, cli, &memdx.MutateInRequest{
CollectionID: 0,
Key: key,
VbucketID: defaultTestVbucketID,
Ops: []memdx.MutateInOp{
{
Op: memdx.MutateInOpTypeDictSet,
Path: []byte("_foo"),
Value: []byte(`{"x":"y"}`),
Flags: memdx.SubdocOpFlagXattrPath,
},
{
Op: memdx.MutateInOpTypeDictSet,
Path: []byte("_bar"),
Value: []byte(`{"y":"z"}`),
Flags: memdx.SubdocOpFlagXattrPath,
},
},
})
require.NoError(t, err)

resp, err := memdx.SyncUnaryCall(memdx.OpsCrud{
CollectionsEnabled: true,
ExtFramesEnabled: true,
}, memdx.OpsCrud.GetEx, cli, &memdx.GetExRequest{
CollectionID: 0,
Key: key,
VbucketID: defaultTestVbucketID,
})
require.NoError(t, err)

hasXattrs := resp.Datatype&uint8(memdx.DatatypeFlagXattrs) != 0
require.True(t, hasXattrs)

xattrBlob, docValue, err := memdx.SplitXattrBlob(resp.Value)
require.NoError(t, err)

xattrs := make(map[string]string)
err = memdx.IterXattrBlobEntries(xattrBlob, func(name, value string) {
xattrs[name] = value
})
require.NoError(t, err)
require.Equal(t, map[string]string{"_foo": `{"x":"y"}`, "_bar": `{"y":"z"}`}, xattrs)

require.Equal(t, []byte(`{"key":"value"}`), docValue)
})
}

type testCrudDispatcher struct {
Pak *memdx.Packet
}
Expand Down
93 changes: 93 additions & 0 deletions memdx/xattrblob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package memdx

import (
"bytes"
"encoding/binary"
)

func SplitXattrBlob(value []byte) ([]byte, []byte, error) {
if len(value) < 4 {
return nil, nil, protocolError{"xattr blob too small"}
}

// The first 4 bytes are the length of the xattr blob
xattrLen := binary.BigEndian.Uint32(value[:4])
return value[4 : 4+xattrLen], value[4+xattrLen:], nil
}

func JoinXattrBlob(xattrEntries, docValue []byte) []byte {
totalLen := 4 + len(xattrEntries) + len(docValue)
buf := make([]byte, totalLen)
binary.BigEndian.PutUint32(buf[:4], uint32(len(xattrEntries)))
copy(buf[4:], xattrEntries)
copy(buf[4+len(xattrEntries):], docValue)
return buf
}

func DecodeXattrBlobEntry(buf []byte) (string, string, int, error) {
if len(buf) < 4 {
return "", "", 0, protocolError{"xattr entry too small"}
}

// The first 4 bytes are the length of the xattr blob
xattrLen := binary.BigEndian.Uint32(buf[:4])
if len(buf) < int(xattrLen)+4 {
return "", "", 0, protocolError{"xattr blob too small"}
}

entryBytes := buf[4 : 4+xattrLen]

nullDelim1Idx := bytes.IndexByte(entryBytes, 0)
if nullDelim1Idx == -1 {
return "", "", 0, protocolError{"xattr entry missing first null delimiter"}
}

entryName := string(entryBytes[:nullDelim1Idx])
entryBytes = entryBytes[nullDelim1Idx+1:]

nullDelim2Idx := bytes.IndexByte(entryBytes, 0)
if nullDelim2Idx == -1 {
return "", "", 0, protocolError{"xattr entry missing second null delimiter"}
}

if nullDelim2Idx != len(entryBytes)-1 {
return "", "", 0, protocolError{"xattr entry has extra data after second null delimiter"}
}

entryValue := string(entryBytes[:nullDelim2Idx])

return entryName, entryValue, int(4 + xattrLen), nil
}

func AppendXattrBlobEntry(buf []byte, name, value string) []byte {
nameBytes := []byte(name)
valueBytes := []byte(value)

entryLen := len(nameBytes) + 1 + len(valueBytes) + 1

entryLenBytes := make([]byte, 4)
binary.BigEndian.PutUint32(entryLenBytes, uint32(entryLen))

buf = append(buf, entryLenBytes...)
buf = append(buf, nameBytes...)
buf = append(buf, 0)
buf = append(buf, valueBytes...)
buf = append(buf, 0)

return buf
}

func IterXattrBlobEntries(buf []byte, cb func(string, string)) error {
for len(buf) > 0 {
name, value, n, err := DecodeXattrBlobEntry(buf)
if err != nil {
return err
}

cb(name, value)

buf = buf[n:]
}

return nil
}
46 changes: 46 additions & 0 deletions memdx/xattrblob_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package memdx

import (
"testing"

"github.com/stretchr/testify/require"
)

/*
DOC: {"key":"value"}
_bar: {"y":"z"}
_foo: {"x":"y"}
*/
var TEST_VALUE []byte = []byte{
0, 0, 0, 38, 0, 0, 0, 15, 95, 98, 97, 114, 0, 123, 34, 121,
34, 58, 34, 122, 34, 125, 0, 0, 0, 0, 15, 95, 102, 111, 111, 0,
123, 34, 120, 34, 58, 34, 121, 34, 125, 0, 123, 34, 107, 101, 121, 34,
58, 34, 118, 97, 108, 117, 101, 34, 125}

func TestXattrBlob(t *testing.T) {
xattrBlob, docValue, err := SplitXattrBlob(TEST_VALUE)
require.NoError(t, err)
require.Equal(t, []byte(`{"key":"value"}`), docValue)

remainingBytes := xattrBlob
firstName, firstValue, n, err := DecodeXattrBlobEntry(remainingBytes)
require.NoError(t, err)
require.Equal(t, "_bar", firstName)
require.Equal(t, `{"y":"z"}`, firstValue)

remainingBytes = remainingBytes[n:]
secondName, secondValue, n, err := DecodeXattrBlobEntry(remainingBytes)
require.NoError(t, err)
require.Equal(t, "_foo", secondName)
require.Equal(t, `{"x":"y"}`, secondValue)

remainingBytes = remainingBytes[n:]
require.Empty(t, remainingBytes)

newBlob := make([]byte, 0)
newBlob = AppendXattrBlobEntry(newBlob, "_bar", `{"y":"z"}`)
newBlob = AppendXattrBlobEntry(newBlob, "_foo", `{"x":"y"}`)

newBytes := JoinXattrBlob(newBlob, docValue)
require.Equal(t, TEST_VALUE, newBytes)
}
Loading