Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
## Required tools
- moq `go install github.com/matryer/moq@latest`
- moq `go install github.com/matryer/moq@latest`
16 changes: 11 additions & 5 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ type Agent struct {
lock sync.Mutex
state agentState

cfgWatcher ConfigWatcher
connMgr KvClientManager
collections CollectionResolver
retries RetryManager
vbRouter VbucketRouter
cfgWatcher ConfigWatcher
connMgr KvClientManager
collections CollectionResolver
retries RetryManager
vbRouter VbucketRouter
vbuuidConsistency VbucketUuidConsistency

httpCfgWatcher *ConfigWatcherHttp
memdCfgWatcher *ConfigWatcherMemd
Expand Down Expand Up @@ -197,6 +198,10 @@ func CreateAgent(ctx context.Context, opts AgentOptions) (*Agent, error) {
})
agent.vbRouter.UpdateRoutingInfo(agentComponentConfigs.VbucketRoutingInfo)

agent.vbuuidConsistency = newVbucketConsistencyComponent(&vbucketConsistencyComponentOptions{
logger: logger.Named("vbuuid-consistency"),
})

if opts.BucketName == "" {
configWatcher, err := NewConfigWatcherHttp(
&agentComponentConfigs.ConfigWatcherHttpConfig,
Expand Down Expand Up @@ -264,6 +269,7 @@ func CreateAgent(ctx context.Context, opts AgentOptions) (*Agent, error) {
connManager: agent.connMgr,
nmvHandler: &agentNmvHandler{agent},
vbs: agent.vbRouter,
vbc: agent.vbuuidConsistency,
compression: &CompressionManagerDefault{
disableCompression: !useCompression,
compressionMinSize: compressionMinSize,
Expand Down
44 changes: 44 additions & 0 deletions agent_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,50 @@ func TestAgentConnectAfterCreateBucket(t *testing.T) {
}, 30*time.Second, 100*time.Millisecond)
}

func TestAgentSetWithMetaVBUUID(t *testing.T) {
testutilsint.SkipIfShortTest(t)

agent := CreateDefaultAgent(t)

key := uuid.NewString()[:6]

upsertRes, err := agent.Upsert(context.Background(), &gocbcorex.UpsertOptions{
Key: []byte(key),
ScopeName: "",
CollectionName: "",
Value: []byte(`{"foo": "bar"}`),
})
require.NoError(t, err)

_, err = agent.SetWithMeta(context.Background(), &gocbcorex.SetWithMetaOptions{
Key: []byte(key),
ScopeName: "",
CollectionName: "",
Value: []byte(`{"foo": "bar"}`),
VBUUID: 12345,
})
require.ErrorIs(t, err, gocbcorex.ErrVbucketUUIDMismatch)

var uuidMismatchErr gocbcorex.VbucketUUIDMisMatchError
require.ErrorAs(t, err, &uuidMismatchErr)

actualUUID := uuidMismatchErr.ActualVbUUID

_, err = agent.SetWithMeta(context.Background(), &gocbcorex.SetWithMetaOptions{
Key: []byte(key),
ScopeName: "",
CollectionName: "",
Value: []byte(`{"foo": "bar"}`),
VBUUID: actualUUID,
StoreCas: upsertRes.Cas,
Options: memdx.MetaOpFlagSkipConflictResolution,
})
require.NoError(t, err)

err = agent.Close()
require.NoError(t, err)
}

func BenchmarkBasicGet(b *testing.B) {
opts := gocbcorex.AgentOptions{
TLSConfig: nil,
Expand Down
4 changes: 4 additions & 0 deletions agent_ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ func (agent *Agent) EnsureBucket(ctx context.Context, opts *EnsureBucketOptions)
return agent.mgmt.EnsureBucket(ctx, opts)
}

func (agent *Agent) XdcrC2c(ctx context.Context, opts *cbmgmtx.XdcrC2cOptions) error {
return agent.mgmt.XdcrC2c(ctx, opts)
}

func (agent *Agent) Search(ctx context.Context, opts *cbsearchx.QueryOptions) (cbsearchx.QueryResultStream, error) {
return agent.search.Query(ctx, opts)
}
Expand Down
27 changes: 27 additions & 0 deletions cbmgmtx/mgmt.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cbmgmtx

import (
"bytes"
"context"
"encoding/json"
"errors"
Expand Down Expand Up @@ -1316,3 +1317,29 @@ func (h Management) DeleteUser(
_ = resp.Body.Close()
return nil
}

type XdcrC2cOptions struct {
Payload []byte
OnBehalfOf *cbhttpx.OnBehalfOfInfo
}

func (h Management) XdcrC2c(
ctx context.Context,
opts *XdcrC2cOptions,
) error {
resp, err := h.Execute(
ctx,
"POST",
"/xdcr/c2cCommunications",
"", opts.OnBehalfOf, bytes.NewReader(opts.Payload))
if err != nil {
return err
}

if resp.StatusCode != 200 {
return h.DecodeCommonError(resp)
}

_ = resp.Body.Close()
return nil
}
15 changes: 15 additions & 0 deletions cbmgmtx/mgmt_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,3 +306,18 @@ func TestHttpMgmtUsers(t *testing.T) {
})
require.ErrorIs(t, err, cbmgmtx.ErrUserNotFound)
}

func TestHttpMgmtXdcrC2c(t *testing.T) {
testutilsint.SkipIfShortTest(t)

ctx := context.Background()

err := getHttpMgmt().XdcrC2c(ctx, &cbmgmtx.XdcrC2cOptions{
Payload: []byte(`{"Magic":1,"ReqType":7}`),
})

// we expect an error like the following if this request made it to goxdcr:
// {"_":"SourceHeartbeatReq deSerialize err: snappy: corrupt input"}
require.Error(t, err)
assert.Contains(t, err.Error(), "SourceHeartbeatReq")
}
42 changes: 37 additions & 5 deletions collectionresolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,48 @@ func OrchestrateMemdCollectionID[RespT any](
ctx context.Context,
cr CollectionResolver,
scopeName, collectionName string,
fn func(collectionID uint32, manifestID uint64) (RespT, error),
collectionID uint32,
fn func(collectionID uint32) (RespT, error),
) (RespT, error) {
collectionID, manifestRev, err := cr.ResolveCollectionID(ctx, scopeName, collectionName)
if collectionID > 0 && collectionName == "" && scopeName == "" {
// If there's an unknown collection ID error then we'll just propagate it.
return fn(collectionID)
}

resolvedCid, manifestRev, err := cr.ResolveCollectionID(ctx, scopeName, collectionName)
if err != nil {
var emptyResp RespT
return emptyResp, err
}

if collectionID > 0 && resolvedCid != collectionID {
cr.InvalidateCollectionID(
ctx,
scopeName, collectionName,
"", 0)

newCollectionID, newManifestRev, newResolveErr :=
cr.ResolveCollectionID(ctx, scopeName, collectionName)
if newResolveErr != nil {
var emptyResp RespT
return emptyResp, newResolveErr
}

if newCollectionID != collectionID {
// If we still don't match after resolution, then we can confidently say that we have the latest
// so the callee must have an out of date collection ID.
var emptyResp RespT
return emptyResp, &CollectionIDMismatchError{
Cause: err,
CollectionID: collectionID,
ServerCollectionID: newCollectionID,
ManifestUid: newManifestRev,
}
}
}

for {
res, err := fn(collectionID, manifestRev)
res, err := fn(resolvedCid)
if err != nil {
if errors.Is(err, memdx.ErrUnknownCollectionID) {
invalidatingEndpoint := ""
Expand Down Expand Up @@ -59,7 +91,7 @@ func OrchestrateMemdCollectionID[RespT any](
return emptyResp, newResolveErr
}

if newCollectionID == collectionID {
if newCollectionID == resolvedCid {
// if resolution yielded the same response, this means that our ability
// to fetch an updated collection id is compromised, or the server is in
// an older state. In both instances, we no longer have a deterministic
Expand All @@ -73,7 +105,7 @@ func OrchestrateMemdCollectionID[RespT any](
}
}

collectionID = newCollectionID
resolvedCid = newCollectionID
manifestRev = newManifestRev
continue
}
Expand Down
Loading
Loading