diff --git a/README.md b/README.md index 831d95f..bd7b290 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,2 @@ ## Required tools -- moq `go install github.com/matryer/moq@latest` \ No newline at end of file +- moq `go install github.com/matryer/moq@latest` diff --git a/agent.go b/agent.go index aaaf7e4..1617c8a 100644 --- a/agent.go +++ b/agent.go @@ -36,11 +36,12 @@ type Agent struct { lock sync.Mutex state agentState - cfgWatcher ConfigWatcher - connMgr KvClientManager - collections CollectionResolver - retries RetryManager - vbRouter VbucketRouter + cfgWatcher ConfigWatcher + connMgr KvClientManager + collections CollectionResolver + retries RetryManager + vbRouter VbucketRouter + vbuuidConsistency VbucketUuidConsistency httpCfgWatcher *ConfigWatcherHttp memdCfgWatcher *ConfigWatcherMemd @@ -197,6 +198,10 @@ func CreateAgent(ctx context.Context, opts AgentOptions) (*Agent, error) { }) agent.vbRouter.UpdateRoutingInfo(agentComponentConfigs.VbucketRoutingInfo) + agent.vbuuidConsistency = newVbucketConsistencyComponent(&vbucketConsistencyComponentOptions{ + logger: logger.Named("vbuuid-consistency"), + }) + if opts.BucketName == "" { configWatcher, err := NewConfigWatcherHttp( &agentComponentConfigs.ConfigWatcherHttpConfig, @@ -264,6 +269,7 @@ func CreateAgent(ctx context.Context, opts AgentOptions) (*Agent, error) { connManager: agent.connMgr, nmvHandler: &agentNmvHandler{agent}, vbs: agent.vbRouter, + vbc: agent.vbuuidConsistency, compression: &CompressionManagerDefault{ disableCompression: !useCompression, compressionMinSize: compressionMinSize, diff --git a/agent_int_test.go b/agent_int_test.go index c3f108e..4745213 100644 --- a/agent_int_test.go +++ b/agent_int_test.go @@ -554,6 +554,50 @@ func TestAgentConnectAfterCreateBucket(t *testing.T) { }, 30*time.Second, 100*time.Millisecond) } +func TestAgentSetWithMetaVBUUID(t *testing.T) { + testutilsint.SkipIfShortTest(t) + + agent := CreateDefaultAgent(t) + + key := uuid.NewString()[:6] + + upsertRes, err := agent.Upsert(context.Background(), &gocbcorex.UpsertOptions{ + Key: []byte(key), + ScopeName: "", + CollectionName: "", + Value: []byte(`{"foo": "bar"}`), + }) + require.NoError(t, err) + + _, err = agent.SetWithMeta(context.Background(), &gocbcorex.SetWithMetaOptions{ + Key: []byte(key), + ScopeName: "", + CollectionName: "", + Value: []byte(`{"foo": "bar"}`), + VBUUID: 12345, + }) + require.ErrorIs(t, err, gocbcorex.ErrVbucketUUIDMismatch) + + var uuidMismatchErr gocbcorex.VbucketUUIDMisMatchError + require.ErrorAs(t, err, &uuidMismatchErr) + + actualUUID := uuidMismatchErr.ActualVbUUID + + _, err = agent.SetWithMeta(context.Background(), &gocbcorex.SetWithMetaOptions{ + Key: []byte(key), + ScopeName: "", + CollectionName: "", + Value: []byte(`{"foo": "bar"}`), + VBUUID: actualUUID, + StoreCas: upsertRes.Cas, + Options: memdx.MetaOpFlagSkipConflictResolution, + }) + require.NoError(t, err) + + err = agent.Close() + require.NoError(t, err) +} + func BenchmarkBasicGet(b *testing.B) { opts := gocbcorex.AgentOptions{ TLSConfig: nil, diff --git a/agent_ops.go b/agent_ops.go index ada2a1c..c620fb7 100644 --- a/agent_ops.go +++ b/agent_ops.go @@ -210,6 +210,10 @@ func (agent *Agent) EnsureBucket(ctx context.Context, opts *EnsureBucketOptions) return agent.mgmt.EnsureBucket(ctx, opts) } +func (agent *Agent) XdcrC2c(ctx context.Context, opts *cbmgmtx.XdcrC2cOptions) error { + return agent.mgmt.XdcrC2c(ctx, opts) +} + func (agent *Agent) Search(ctx context.Context, opts *cbsearchx.QueryOptions) (cbsearchx.QueryResultStream, error) { return agent.search.Query(ctx, opts) } diff --git a/cbmgmtx/mgmt.go b/cbmgmtx/mgmt.go index 76cc6e6..bf73e87 100644 --- a/cbmgmtx/mgmt.go +++ b/cbmgmtx/mgmt.go @@ -1,6 +1,7 @@ package cbmgmtx import ( + "bytes" "context" "encoding/json" "errors" @@ -1316,3 +1317,29 @@ func (h Management) DeleteUser( _ = resp.Body.Close() return nil } + +type XdcrC2cOptions struct { + Payload []byte + OnBehalfOf *cbhttpx.OnBehalfOfInfo +} + +func (h Management) XdcrC2c( + ctx context.Context, + opts *XdcrC2cOptions, +) error { + resp, err := h.Execute( + ctx, + "POST", + "/xdcr/c2cCommunications", + "", opts.OnBehalfOf, bytes.NewReader(opts.Payload)) + if err != nil { + return err + } + + if resp.StatusCode != 200 { + return h.DecodeCommonError(resp) + } + + _ = resp.Body.Close() + return nil +} diff --git a/cbmgmtx/mgmt_int_test.go b/cbmgmtx/mgmt_int_test.go index d070185..7e11585 100644 --- a/cbmgmtx/mgmt_int_test.go +++ b/cbmgmtx/mgmt_int_test.go @@ -306,3 +306,18 @@ func TestHttpMgmtUsers(t *testing.T) { }) require.ErrorIs(t, err, cbmgmtx.ErrUserNotFound) } + +func TestHttpMgmtXdcrC2c(t *testing.T) { + testutilsint.SkipIfShortTest(t) + + ctx := context.Background() + + err := getHttpMgmt().XdcrC2c(ctx, &cbmgmtx.XdcrC2cOptions{ + Payload: []byte(`{"Magic":1,"ReqType":7}`), + }) + + // we expect an error like the following if this request made it to goxdcr: + // {"_":"SourceHeartbeatReq deSerialize err: snappy: corrupt input"} + require.Error(t, err) + assert.Contains(t, err.Error(), "SourceHeartbeatReq") +} diff --git a/collectionresolver.go b/collectionresolver.go index 802a498..ff3864a 100644 --- a/collectionresolver.go +++ b/collectionresolver.go @@ -16,16 +16,48 @@ func OrchestrateMemdCollectionID[RespT any]( ctx context.Context, cr CollectionResolver, scopeName, collectionName string, - fn func(collectionID uint32, manifestID uint64) (RespT, error), + collectionID uint32, + fn func(collectionID uint32) (RespT, error), ) (RespT, error) { - collectionID, manifestRev, err := cr.ResolveCollectionID(ctx, scopeName, collectionName) + if collectionID > 0 && collectionName == "" && scopeName == "" { + // If there's an unknown collection ID error then we'll just propagate it. + return fn(collectionID) + } + + resolvedCid, manifestRev, err := cr.ResolveCollectionID(ctx, scopeName, collectionName) if err != nil { var emptyResp RespT return emptyResp, err } + if collectionID > 0 && resolvedCid != collectionID { + cr.InvalidateCollectionID( + ctx, + scopeName, collectionName, + "", 0) + + newCollectionID, newManifestRev, newResolveErr := + cr.ResolveCollectionID(ctx, scopeName, collectionName) + if newResolveErr != nil { + var emptyResp RespT + return emptyResp, newResolveErr + } + + if newCollectionID != collectionID { + // If we still don't match after resolution, then we can confidently say that we have the latest + // so the callee must have an out of date collection ID. + var emptyResp RespT + return emptyResp, &CollectionIDMismatchError{ + Cause: err, + CollectionID: collectionID, + ServerCollectionID: newCollectionID, + ManifestUid: newManifestRev, + } + } + } + for { - res, err := fn(collectionID, manifestRev) + res, err := fn(resolvedCid) if err != nil { if errors.Is(err, memdx.ErrUnknownCollectionID) { invalidatingEndpoint := "" @@ -59,7 +91,7 @@ func OrchestrateMemdCollectionID[RespT any]( return emptyResp, newResolveErr } - if newCollectionID == collectionID { + if newCollectionID == resolvedCid { // if resolution yielded the same response, this means that our ability // to fetch an updated collection id is compromised, or the server is in // an older state. In both instances, we no longer have a deterministic @@ -73,7 +105,7 @@ func OrchestrateMemdCollectionID[RespT any]( } } - collectionID = newCollectionID + resolvedCid = newCollectionID manifestRev = newManifestRev continue } diff --git a/collectionresolver_test.go b/collectionresolver_test.go index 7e90972..3a1c3f2 100644 --- a/collectionresolver_test.go +++ b/collectionresolver_test.go @@ -27,11 +27,10 @@ func TestOrchestrateMemdCollectionID(t *testing.T) { var called int ctx := context.Background() - res, err := OrchestrateMemdCollectionID(ctx, mock, expectedScopeName, expectedCollectionName, func(collectionID uint32, manifestID uint64) (int, error) { + res, err := OrchestrateMemdCollectionID(ctx, mock, expectedScopeName, expectedCollectionName, 0, func(collectionID uint32) (int, error) { called++ assert.Equal(t, cid, collectionID) - assert.Equal(t, rev, manifestID) return 1, nil }) @@ -58,11 +57,10 @@ func TestOrchestrateMemdCollectionIDReturnError(t *testing.T) { var called int expectedErr := errors.New("imanerror") ctx := context.Background() - res, err := OrchestrateMemdCollectionID(ctx, mock, expectedScopeName, expectedCollectionName, func(collectionID uint32, manifestID uint64) (int, error) { + res, err := OrchestrateMemdCollectionID(ctx, mock, expectedScopeName, expectedCollectionName, 0, func(collectionID uint32) (int, error) { called++ assert.Equal(t, cid, collectionID) - assert.Equal(t, rev, manifestID) return 0, expectedErr }) @@ -88,7 +86,7 @@ func TestOrchestrateMemdCollectionIDResolverReturnError(t *testing.T) { var called int ctx := context.Background() - res, err := OrchestrateMemdCollectionID(ctx, mock, expectedScopeName, expectedCollectionName, func(collectionID uint32, manifestID uint64) (int, error) { + res, err := OrchestrateMemdCollectionID(ctx, mock, expectedScopeName, expectedCollectionName, 0, func(collectionID uint32) (int, error) { called++ return 0, errors.New("shouldnt have reached here") @@ -122,11 +120,10 @@ func TestOrchestrateMemdCollectionIDCollectionNotFoundError(t *testing.T) { var called int ctx := context.Background() - res, err := OrchestrateMemdCollectionID(ctx, mock, expectedScopeName, expectedCollectionName, func(collectionID uint32, manifestID uint64) (int, error) { + res, err := OrchestrateMemdCollectionID(ctx, mock, expectedScopeName, expectedCollectionName, 0, func(collectionID uint32) (int, error) { called++ assert.Equal(t, cid, collectionID) - assert.Equal(t, rev, manifestID) return 0, memdx.ErrUnknownCollectionID }) @@ -171,11 +168,10 @@ func TestOrchestrateMemdCollectionIDCollectionNotFoundErrorServerHasOlderManifes var called int ctx := context.Background() - res, err := OrchestrateMemdCollectionID(ctx, mock, expectedScopeName, expectedCollectionName, func(collectionID uint32, manifestID uint64) (int, error) { + res, err := OrchestrateMemdCollectionID(ctx, mock, expectedScopeName, expectedCollectionName, 0, func(collectionID uint32) (int, error) { called++ assert.Equal(t, cid, collectionID) - assert.Equal(t, rev, manifestID) return 0, &memdx.ServerErrorWithContext{ Cause: memdx.ServerError{ @@ -196,3 +192,133 @@ func TestOrchestrateMemdCollectionIDCollectionNotFoundErrorServerHasOlderManifes assert.Equal(t, 0, numInvalidateCalls) assert.Zero(t, res) } + +func TestOrchestrateMemdCollectionIDIDSpecified(t *testing.T) { + cid := uint32(5) + mock := &CollectionResolverMock{ + ResolveCollectionIDFunc: func(ctx context.Context, scopeName string, collectionName string) (uint32, uint64, error) { + t.Fatalf("Resolve should not have been called") + return 0, 0, nil + }, + } + + var called int + ctx := context.Background() + res, err := OrchestrateMemdCollectionID(ctx, mock, "", "", cid, func(collectionID uint32) (int, error) { + called++ + + assert.Equal(t, cid, collectionID) + + return 1, nil + }) + require.NoError(t, err) + + assert.Equal(t, 1, called) + assert.Equal(t, 1, res) +} + +func TestOrchestrateMemdCollectionIDIDANdName(t *testing.T) { + cid := uint32(5) + rev := uint64(2) + expectedScopeName := "testScope" + expectedCollectionName := "testCol" + mock := &CollectionResolverMock{ + ResolveCollectionIDFunc: func(ctx context.Context, scopeName string, collectionName string) (uint32, uint64, error) { + assert.Equal(t, expectedScopeName, scopeName) + assert.Equal(t, expectedCollectionName, collectionName) + + return cid, rev, nil + }, + } + + var called int + ctx := context.Background() + res, err := OrchestrateMemdCollectionID(ctx, mock, expectedScopeName, expectedCollectionName, cid, func(collectionID uint32) (int, error) { + called++ + + assert.Equal(t, cid, collectionID) + + return 1, nil + }) + require.NoError(t, err) + + assert.Equal(t, 1, called) + assert.Equal(t, 1, res) +} + +func TestOrchestrateMemdCollectionIDIDAndNameIDMismatch(t *testing.T) { + cid := uint32(5) + rev := uint64(2) + var numInvalidateCalls int + expectedScopeName := "testScope" + expectedCollectionName := "testCol" + mock := &CollectionResolverMock{ + ResolveCollectionIDFunc: func(ctx context.Context, scopeName string, collectionName string) (uint32, uint64, error) { + assert.Equal(t, expectedScopeName, scopeName) + assert.Equal(t, expectedCollectionName, collectionName) + + return cid, rev, nil + }, + InvalidateCollectionIDFunc: func(ctx context.Context, scopeName string, collectionName string, endpoint string, manifestRev uint64) { + assert.Equal(t, expectedScopeName, scopeName) + assert.Equal(t, expectedCollectionName, collectionName) + + numInvalidateCalls++ + }, + } + + var called int + ctx := context.Background() + res, err := OrchestrateMemdCollectionID(ctx, mock, expectedScopeName, expectedCollectionName, 7, func(collectionID uint32) (int, error) { + called++ + return 0, nil + }) + require.ErrorIs(t, err, ErrCollectionIDMismatch) + + assert.Zero(t, called) + assert.Equal(t, 1, numInvalidateCalls) + assert.Zero(t, res) +} + +func TestOrchestrateMemdCollectionIDIDAndNameIDMismatchReresolve(t *testing.T) { + firstResolvedCid := uint32(5) + secondResolvedCid := uint32(7) + rev := uint64(2) + var numInvalidateCalls int + var numResolvedCalls int + expectedScopeName := "testScope" + expectedCollectionName := "testCol" + mock := &CollectionResolverMock{ + ResolveCollectionIDFunc: func(ctx context.Context, scopeName string, collectionName string) (uint32, uint64, error) { + assert.Equal(t, expectedScopeName, scopeName) + assert.Equal(t, expectedCollectionName, collectionName) + + numResolvedCalls++ + + if numResolvedCalls == 1 { + return firstResolvedCid, rev, nil + } + + return secondResolvedCid, rev, nil + }, + InvalidateCollectionIDFunc: func(ctx context.Context, scopeName string, collectionName string, endpoint string, manifestRev uint64) { + assert.Equal(t, expectedScopeName, scopeName) + assert.Equal(t, expectedCollectionName, collectionName) + + numInvalidateCalls++ + }, + } + + var called int + ctx := context.Background() + res, err := OrchestrateMemdCollectionID(ctx, mock, expectedScopeName, expectedCollectionName, secondResolvedCid, func(collectionID uint32) (int, error) { + called++ + return 1, nil + }) + require.NoError(t, err) + + assert.Equal(t, 1, called) + assert.Equal(t, 1, res) + assert.Equal(t, 1, numInvalidateCalls) + assert.Equal(t, 2, numResolvedCalls) +} diff --git a/crud.go b/crud.go index 3ddc1fe..001ae15 100644 --- a/crud.go +++ b/crud.go @@ -24,6 +24,7 @@ type CrudComponent struct { connManager KvClientManager compression CompressionManager vbs VbucketRouter + vbc VbucketUuidConsistency } func OrchestrateSimpleCrud[RespT any]( @@ -34,18 +35,50 @@ func OrchestrateSimpleCrud[RespT any]( ch NotMyVbucketConfigHandler, nkcp KvClientManager, scopeName, collectionName string, + collectionID uint32, key []byte, - fn func(collectionID uint32, manifestID uint64, endpoint string, vbID uint16, client KvClient) (RespT, error), + fn func(collectionID uint32, endpoint string, vbID uint16, client KvClient) (RespT, error), ) (RespT, error) { return OrchestrateRetries( ctx, rs, func() (RespT, error) { return OrchestrateMemdCollectionID( - ctx, cr, scopeName, collectionName, - func(collectionID uint32, manifestID uint64) (RespT, error) { + ctx, cr, scopeName, collectionName, collectionID, + func(collectionID uint32) (RespT, error) { return OrchestrateMemdRouting(ctx, vb, ch, key, 0, func(endpoint string, vbID uint16) (RespT, error) { return OrchestrateMemdClient(ctx, nkcp, endpoint, func(client KvClient) (RespT, error) { - return fn(collectionID, manifestID, endpoint, vbID, client) + return fn(collectionID, endpoint, vbID, client) + }) + }) + }) + }) +} + +func OrchestrateSimpleCrudMeta[RespT any]( + ctx context.Context, + rs RetryManager, + cr CollectionResolver, + vb VbucketRouter, + ch NotMyVbucketConfigHandler, + nkcp KvClientManager, + vbc VbucketUuidConsistency, + scopeName, collectionName string, + collectionID uint32, + key []byte, + vbuuid uint64, + fn func(collectionID uint32, endpoint string, vbID uint16, client KvClient) (RespT, error), +) (RespT, error) { + return OrchestrateRetries( + ctx, rs, + func() (RespT, error) { + return OrchestrateMemdCollectionID( + ctx, cr, scopeName, collectionName, collectionID, + func(collectionID uint32) (RespT, error) { + return OrchestrateMemdRouting(ctx, vb, ch, key, 0, func(endpoint string, vbID uint16) (RespT, error) { + return OrchestrateMemdClient(ctx, nkcp, endpoint, func(client KvClient) (RespT, error) { + return OrchestrateVBucketConsistency(ctx, vbc, client, vbID, vbuuid, func(client KvClient) (RespT, error) { + return fn(collectionID, endpoint, vbID, client) + }) }) }) }) @@ -56,6 +89,7 @@ type GetOptions struct { Key []byte ScopeName string CollectionName string + CollectionID uint32 OnBehalfOf string } @@ -72,8 +106,8 @@ func (cc *CrudComponent) Get(ctx context.Context, opts *GetOptions) (*GetResult, return OrchestrateSimpleCrud( ctx, cc.retries, cc.collections, cc.vbs, cc.nmvHandler, cc.connManager, - opts.ScopeName, opts.CollectionName, opts.Key, - func(collectionID uint32, manifestID uint64, endpoint string, vbID uint16, client KvClient) (*GetResult, error) { + opts.ScopeName, opts.CollectionName, opts.CollectionID, opts.Key, + func(collectionID uint32, endpoint string, vbID uint16, client KvClient) (*GetResult, error) { resp, err := client.Get(ctx, &memdx.GetRequest{ CollectionID: collectionID, Key: opts.Key, @@ -104,6 +138,7 @@ type GetExOptions struct { Key []byte ScopeName string CollectionName string + CollectionID uint32 OnBehalfOf string } @@ -120,8 +155,8 @@ func (cc *CrudComponent) GetEx(ctx context.Context, opts *GetExOptions) (*GetExR return OrchestrateSimpleCrud( ctx, cc.retries, cc.collections, cc.vbs, cc.nmvHandler, cc.connManager, - opts.ScopeName, opts.CollectionName, opts.Key, - func(collectionID uint32, manifestID uint64, endpoint string, vbID uint16, client KvClient) (*GetExResult, error) { + opts.ScopeName, opts.CollectionName, opts.CollectionID, opts.Key, + func(collectionID uint32, endpoint string, vbID uint16, client KvClient) (*GetExResult, error) { resp, err := client.GetEx(ctx, &memdx.GetExRequest{ CollectionID: collectionID, Key: opts.Key, @@ -152,6 +187,7 @@ type GetReplicaOptions struct { Key []byte ScopeName string CollectionName string + CollectionID uint32 ReplicaIdx uint32 OnBehalfOf string } @@ -198,8 +234,8 @@ func (cc *CrudComponent) GetReplica(ctx context.Context, opts *GetReplicaOptions ctx, cc.retries, func() (*GetReplicaResult, error) { return OrchestrateMemdCollectionID( - ctx, cc.collections, opts.ScopeName, opts.CollectionName, - func(collectionID uint32, manifestID uint64) (*GetReplicaResult, error) { + ctx, cc.collections, opts.ScopeName, opts.CollectionName, opts.CollectionID, + func(collectionID uint32) (*GetReplicaResult, error) { return OrchestrateMemdRouting(ctx, cc.vbs, cc.nmvHandler, opts.Key, vbServerIdx, func(endpoint string, vbID uint16) (*GetReplicaResult, error) { return OrchestrateMemdClient(ctx, cc.connManager, endpoint, func(client KvClient) (*GetReplicaResult, error) { return fn(collectionID, vbID, client) @@ -214,6 +250,7 @@ type GetAllReplicasOptions struct { BucketName string ScopeName string CollectionName string + CollectionID uint32 OnBehalfOf string } @@ -324,8 +361,8 @@ func (cc *CrudComponent) GetAllReplicas(ctx context.Context, opts *GetAllReplica // This retry orchestrator handles request level retryable errors, errors which impact every replica request, // e.g the collection ID not yet being consistent _, err = OrchestrateRetries(ctx, cc.retries, func() (any, error) { - return OrchestrateMemdCollectionID(ctx, cc.collections, opts.ScopeName, opts.CollectionName, - func(collectionID uint32, manifestID uint64) (any, error) { + return OrchestrateMemdCollectionID(ctx, cc.collections, opts.ScopeName, opts.CollectionName, opts.CollectionID, + func(collectionID uint32) (any, error) { // We are past the point of no return. From here on the request cannot error, e.g a nil error will always be // returned from GetAllReplicas, however individual replica reads can push an error to the channel. @@ -435,6 +472,7 @@ type UpsertOptions struct { Key []byte ScopeName string CollectionName string + CollectionID uint32 Value []byte Flags uint32 Datatype memdx.DatatypeFlag @@ -456,8 +494,8 @@ func (cc *CrudComponent) Upsert(ctx context.Context, opts *UpsertOptions) (*Upse return OrchestrateSimpleCrud( ctx, cc.retries, cc.collections, cc.vbs, cc.nmvHandler, cc.connManager, - opts.ScopeName, opts.CollectionName, opts.Key, - func(collectionID uint32, manifestID uint64, endpoint string, vbID uint16, client KvClient) (*UpsertResult, error) { + opts.ScopeName, opts.CollectionName, opts.CollectionID, opts.Key, + func(collectionID uint32, endpoint string, vbID uint16, client KvClient) (*UpsertResult, error) { value, datatype, err := cc.compression.Compress(client.HasFeature(memdx.HelloFeatureSnappy), opts.Datatype, opts.Value) if err != nil { return nil, err @@ -497,6 +535,7 @@ type DeleteOptions struct { Key []byte ScopeName string CollectionName string + CollectionID uint32 Cas uint64 DurabilityLevel memdx.DurabilityLevel OnBehalfOf string @@ -513,8 +552,8 @@ func (cc *CrudComponent) Delete(ctx context.Context, opts *DeleteOptions) (*Dele return OrchestrateSimpleCrud( ctx, cc.retries, cc.collections, cc.vbs, cc.nmvHandler, cc.connManager, - opts.ScopeName, opts.CollectionName, opts.Key, - func(collectionID uint32, manifestID uint64, endpoint string, vbID uint16, client KvClient) (*DeleteResult, error) { + opts.ScopeName, opts.CollectionName, opts.CollectionID, opts.Key, + func(collectionID uint32, endpoint string, vbID uint16, client KvClient) (*DeleteResult, error) { resp, err := client.Delete(ctx, &memdx.DeleteRequest{ CollectionID: collectionID, Key: opts.Key, @@ -544,6 +583,7 @@ type GetAndTouchOptions struct { Key []byte ScopeName string CollectionName string + CollectionID uint32 Expiry uint32 OnBehalfOf string } @@ -561,8 +601,8 @@ func (cc *CrudComponent) GetAndTouch(ctx context.Context, opts *GetAndTouchOptio return OrchestrateSimpleCrud( ctx, cc.retries, cc.collections, cc.vbs, cc.nmvHandler, cc.connManager, - opts.ScopeName, opts.CollectionName, opts.Key, - func(collectionID uint32, manifestID uint64, endpoint string, vbID uint16, client KvClient) (*GetAndTouchResult, error) { + opts.ScopeName, opts.CollectionName, opts.CollectionID, opts.Key, + func(collectionID uint32, endpoint string, vbID uint16, client KvClient) (*GetAndTouchResult, error) { resp, err := client.GetAndTouch(ctx, &memdx.GetAndTouchRequest{ CollectionID: collectionID, Key: opts.Key, @@ -593,6 +633,7 @@ func (cc *CrudComponent) GetAndTouch(ctx context.Context, opts *GetAndTouchOptio type GetRandomOptions struct { ScopeName string CollectionName string + CollectionID uint32 OnBehalfOf string } @@ -610,8 +651,8 @@ func (cc *CrudComponent) GetRandom(ctx context.Context, opts *GetRandomOptions) return OrchestrateSimpleCrud( ctx, cc.retries, cc.collections, cc.vbs, cc.nmvHandler, cc.connManager, - opts.ScopeName, opts.CollectionName, nil, - func(collectionID uint32, manifestID uint64, endpoint string, vbID uint16, client KvClient) (*GetRandomResult, error) { + opts.ScopeName, opts.CollectionName, opts.CollectionID, nil, + func(collectionID uint32, endpoint string, vbID uint16, client KvClient) (*GetRandomResult, error) { resp, err := client.GetRandom(ctx, &memdx.GetRandomRequest{ CollectionID: collectionID, CrudRequestMeta: memdx.CrudRequestMeta{ @@ -641,6 +682,7 @@ type UnlockOptions struct { Key []byte ScopeName string CollectionName string + CollectionID uint32 Cas uint64 OnBehalfOf string } @@ -655,8 +697,8 @@ func (cc *CrudComponent) Unlock(ctx context.Context, opts *UnlockOptions) (*Unlo return OrchestrateSimpleCrud( ctx, cc.retries, cc.collections, cc.vbs, cc.nmvHandler, cc.connManager, - opts.ScopeName, opts.CollectionName, opts.Key, - func(collectionID uint32, manifestID uint64, endpoint string, vbID uint16, client KvClient) (*UnlockResult, error) { + opts.ScopeName, opts.CollectionName, opts.CollectionID, opts.Key, + func(collectionID uint32, endpoint string, vbID uint16, client KvClient) (*UnlockResult, error) { resp, err := client.Unlock(ctx, &memdx.UnlockRequest{ CollectionID: collectionID, Key: opts.Key, @@ -684,6 +726,7 @@ type TouchOptions struct { Key []byte ScopeName string CollectionName string + CollectionID uint32 Expiry uint32 OnBehalfOf string } @@ -698,8 +741,8 @@ func (cc *CrudComponent) Touch(ctx context.Context, opts *TouchOptions) (*TouchR return OrchestrateSimpleCrud( ctx, cc.retries, cc.collections, cc.vbs, cc.nmvHandler, cc.connManager, - opts.ScopeName, opts.CollectionName, opts.Key, - func(collectionID uint32, manifestID uint64, endpoint string, vbID uint16, client KvClient) (*TouchResult, error) { + opts.ScopeName, opts.CollectionName, opts.CollectionID, opts.Key, + func(collectionID uint32, endpoint string, vbID uint16, client KvClient) (*TouchResult, error) { resp, err := client.Touch(ctx, &memdx.TouchRequest{ CollectionID: collectionID, Key: opts.Key, @@ -739,8 +782,8 @@ func (cc *CrudComponent) GetAndLock(ctx context.Context, opts *GetAndLockOptions ctx, span := tracer.Start(ctx, "GetAndLock") defer span.End() - return OrchestrateSimpleCrud(ctx, cc.retries, cc.collections, cc.vbs, cc.nmvHandler, cc.connManager, opts.ScopeName, opts.CollectionName, opts.Key, - func(collectionID uint32, manifestID uint64, endpoint string, vbID uint16, client KvClient) (*GetAndLockResult, error) { + return OrchestrateSimpleCrud(ctx, cc.retries, cc.collections, cc.vbs, cc.nmvHandler, cc.connManager, opts.ScopeName, opts.CollectionName, opts.CollectionID, opts.Key, + func(collectionID uint32, endpoint string, vbID uint16, client KvClient) (*GetAndLockResult, error) { resp, err := client.GetAndLock(ctx, &memdx.GetAndLockRequest{ CollectionID: collectionID, LockTime: opts.LockTime, @@ -772,6 +815,7 @@ type AddOptions struct { Key []byte ScopeName string CollectionName string + CollectionID uint32 Flags uint32 Value []byte Datatype memdx.DatatypeFlag @@ -791,8 +835,8 @@ func (cc *CrudComponent) Add(ctx context.Context, opts *AddOptions) (*AddResult, return OrchestrateSimpleCrud( ctx, cc.retries, cc.collections, cc.vbs, cc.nmvHandler, cc.connManager, - opts.ScopeName, opts.CollectionName, opts.Key, - func(collectionID uint32, manifestID uint64, endpoint string, vbID uint16, client KvClient) (*AddResult, error) { + opts.ScopeName, opts.CollectionName, opts.CollectionID, opts.Key, + func(collectionID uint32, endpoint string, vbID uint16, client KvClient) (*AddResult, error) { value, datatype, err := cc.compression.Compress(client.HasFeature(memdx.HelloFeatureSnappy), opts.Datatype, opts.Value) if err != nil { return nil, err @@ -830,6 +874,7 @@ type ReplaceOptions struct { Key []byte ScopeName string CollectionName string + CollectionID uint32 Flags uint32 Value []byte Datatype memdx.DatatypeFlag @@ -851,8 +896,8 @@ func (cc *CrudComponent) Replace(ctx context.Context, opts *ReplaceOptions) (*Re return OrchestrateSimpleCrud( ctx, cc.retries, cc.collections, cc.vbs, cc.nmvHandler, cc.connManager, - opts.ScopeName, opts.CollectionName, opts.Key, - func(collectionID uint32, manifestID uint64, endpoint string, vbID uint16, client KvClient) (*ReplaceResult, error) { + opts.ScopeName, opts.CollectionName, opts.CollectionID, opts.Key, + func(collectionID uint32, endpoint string, vbID uint16, client KvClient) (*ReplaceResult, error) { value, datatype, err := cc.compression.Compress(client.HasFeature(memdx.HelloFeatureSnappy), opts.Datatype, opts.Value) if err != nil { return nil, err @@ -892,6 +937,7 @@ type AppendOptions struct { Key []byte ScopeName string CollectionName string + CollectionID uint32 Value []byte Cas uint64 DurabilityLevel memdx.DurabilityLevel @@ -909,8 +955,8 @@ func (cc *CrudComponent) Append(ctx context.Context, opts *AppendOptions) (*Appe return OrchestrateSimpleCrud( ctx, cc.retries, cc.collections, cc.vbs, cc.nmvHandler, cc.connManager, - opts.ScopeName, opts.CollectionName, opts.Key, - func(collectionID uint32, manifestID uint64, endpoint string, vbID uint16, client KvClient) (*AppendResult, error) { + opts.ScopeName, opts.CollectionName, opts.CollectionID, opts.Key, + func(collectionID uint32, endpoint string, vbID uint16, client KvClient) (*AppendResult, error) { value, datatype, err := cc.compression.Compress(client.HasFeature(memdx.HelloFeatureSnappy), 0, opts.Value) if err != nil { return nil, err @@ -947,6 +993,7 @@ type PrependOptions struct { Key []byte ScopeName string CollectionName string + CollectionID uint32 Value []byte Cas uint64 DurabilityLevel memdx.DurabilityLevel @@ -964,8 +1011,8 @@ func (cc *CrudComponent) Prepend(ctx context.Context, opts *PrependOptions) (*Pr return OrchestrateSimpleCrud( ctx, cc.retries, cc.collections, cc.vbs, cc.nmvHandler, cc.connManager, - opts.ScopeName, opts.CollectionName, opts.Key, - func(collectionID uint32, manifestID uint64, endpoint string, vbID uint16, client KvClient) (*PrependResult, error) { + opts.ScopeName, opts.CollectionName, opts.CollectionID, opts.Key, + func(collectionID uint32, endpoint string, vbID uint16, client KvClient) (*PrependResult, error) { value, datatype, err := cc.compression.Compress(client.HasFeature(memdx.HelloFeatureSnappy), 0, opts.Value) if err != nil { return nil, err @@ -1002,6 +1049,7 @@ type IncrementOptions struct { Key []byte ScopeName string CollectionName string + CollectionID uint32 Value []byte Initial uint64 Delta uint64 @@ -1022,8 +1070,8 @@ func (cc *CrudComponent) Increment(ctx context.Context, opts *IncrementOptions) return OrchestrateSimpleCrud( ctx, cc.retries, cc.collections, cc.vbs, cc.nmvHandler, cc.connManager, - opts.ScopeName, opts.CollectionName, opts.Key, - func(collectionID uint32, manifestID uint64, endpoint string, vbID uint16, client KvClient) (*IncrementResult, error) { + opts.ScopeName, opts.CollectionName, opts.CollectionID, opts.Key, + func(collectionID uint32, endpoint string, vbID uint16, client KvClient) (*IncrementResult, error) { resp, err := client.Increment(ctx, &memdx.IncrementRequest{ CollectionID: collectionID, Key: opts.Key, @@ -1056,6 +1104,7 @@ type DecrementOptions struct { Key []byte ScopeName string CollectionName string + CollectionID uint32 Value []byte Initial uint64 Delta uint64 @@ -1076,8 +1125,8 @@ func (cc *CrudComponent) Decrement(ctx context.Context, opts *DecrementOptions) return OrchestrateSimpleCrud( ctx, cc.retries, cc.collections, cc.vbs, cc.nmvHandler, cc.connManager, - opts.ScopeName, opts.CollectionName, opts.Key, - func(collectionID uint32, manifestID uint64, endpoint string, vbID uint16, client KvClient) (*DecrementResult, error) { + opts.ScopeName, opts.CollectionName, opts.CollectionID, opts.Key, + func(collectionID uint32, endpoint string, vbID uint16, client KvClient) (*DecrementResult, error) { resp, err := client.Decrement(ctx, &memdx.DecrementRequest{ CollectionID: collectionID, Key: opts.Key, @@ -1110,7 +1159,9 @@ type GetMetaOptions struct { Key []byte ScopeName string CollectionName string + CollectionID uint32 FetchDatatype bool + VBUUID uint64 OnBehalfOf string } @@ -1128,10 +1179,10 @@ func (cc *CrudComponent) GetMeta(ctx context.Context, opts *GetMetaOptions) (*Ge ctx, span := tracer.Start(ctx, "GetMeta") defer span.End() - return OrchestrateSimpleCrud( - ctx, cc.retries, cc.collections, cc.vbs, cc.nmvHandler, cc.connManager, - opts.ScopeName, opts.CollectionName, opts.Key, - func(collectionID uint32, manifestID uint64, endpoint string, vbID uint16, client KvClient) (*GetMetaResult, error) { + return OrchestrateSimpleCrudMeta( + ctx, cc.retries, cc.collections, cc.vbs, cc.nmvHandler, cc.connManager, cc.vbc, + opts.ScopeName, opts.CollectionName, opts.CollectionID, opts.Key, opts.VBUUID, + func(collectionID uint32, endpoint string, vbID uint16, client KvClient) (*GetMetaResult, error) { resp, err := client.GetMeta(ctx, &memdx.GetMetaRequest{ CollectionID: collectionID, Key: opts.Key, @@ -1167,6 +1218,7 @@ type AddWithMetaOptions struct { Key []byte ScopeName string CollectionName string + CollectionID uint32 Value []byte Flags uint32 Datatype memdx.DatatypeFlag @@ -1175,6 +1227,7 @@ type AddWithMetaOptions struct { RevNo uint64 StoreCas uint64 Options memdx.MetaOpFlag + VBUUID uint64 OnBehalfOf string } @@ -1187,10 +1240,10 @@ func (cc *CrudComponent) AddWithMeta(ctx context.Context, opts *AddWithMetaOptio ctx, span := tracer.Start(ctx, "AddWithMeta") defer span.End() - return OrchestrateSimpleCrud( - ctx, cc.retries, cc.collections, cc.vbs, cc.nmvHandler, cc.connManager, - opts.ScopeName, opts.CollectionName, opts.Key, - func(collectionID uint32, manifestID uint64, endpoint string, vbID uint16, client KvClient) (*AddWithMetaResult, error) { + return OrchestrateSimpleCrudMeta( + ctx, cc.retries, cc.collections, cc.vbs, cc.nmvHandler, cc.connManager, cc.vbc, + opts.ScopeName, opts.CollectionName, opts.CollectionID, opts.Key, opts.VBUUID, + func(collectionID uint32, endpoint string, vbID uint16, client KvClient) (*AddWithMetaResult, error) { resp, err := client.AddWithMeta(ctx, &memdx.AddWithMetaRequest{ CollectionID: collectionID, Key: opts.Key, @@ -1226,6 +1279,7 @@ type SetWithMetaOptions struct { Key []byte ScopeName string CollectionName string + CollectionID uint32 Value []byte Flags uint32 Datatype memdx.DatatypeFlag @@ -1235,6 +1289,7 @@ type SetWithMetaOptions struct { CheckCas uint64 StoreCas uint64 Options memdx.MetaOpFlag + VBUUID uint64 OnBehalfOf string } @@ -1247,10 +1302,10 @@ func (cc *CrudComponent) SetWithMeta(ctx context.Context, opts *SetWithMetaOptio ctx, span := tracer.Start(ctx, "SetWithMeta") defer span.End() - return OrchestrateSimpleCrud( - ctx, cc.retries, cc.collections, cc.vbs, cc.nmvHandler, cc.connManager, - opts.ScopeName, opts.CollectionName, opts.Key, - func(collectionID uint32, manifestID uint64, endpoint string, vbID uint16, client KvClient) (*SetWithMetaResult, error) { + return OrchestrateSimpleCrudMeta( + ctx, cc.retries, cc.collections, cc.vbs, cc.nmvHandler, cc.connManager, cc.vbc, + opts.ScopeName, opts.CollectionName, opts.CollectionID, opts.Key, opts.VBUUID, + func(collectionID uint32, endpoint string, vbID uint16, client KvClient) (*SetWithMetaResult, error) { resp, err := client.SetWithMeta(ctx, &memdx.SetWithMetaRequest{ CollectionID: collectionID, Key: opts.Key, @@ -1287,6 +1342,7 @@ type DeleteWithMetaOptions struct { Key []byte ScopeName string CollectionName string + CollectionID uint32 CheckCas uint64 Flags uint32 Expiry uint32 @@ -1294,6 +1350,7 @@ type DeleteWithMetaOptions struct { StoreCas uint64 RevNo uint64 Options memdx.MetaOpFlag + VBUUID uint64 OnBehalfOf string } @@ -1306,10 +1363,10 @@ func (cc *CrudComponent) DeleteWithMeta(ctx context.Context, opts *DeleteWithMet ctx, span := tracer.Start(ctx, "DeleteWithMeta") defer span.End() - return OrchestrateSimpleCrud( - ctx, cc.retries, cc.collections, cc.vbs, cc.nmvHandler, cc.connManager, - opts.ScopeName, opts.CollectionName, opts.Key, - func(collectionID uint32, manifestID uint64, endpoint string, vbID uint16, client KvClient) (*DeleteWithMetaResult, error) { + return OrchestrateSimpleCrudMeta( + ctx, cc.retries, cc.collections, cc.vbs, cc.nmvHandler, cc.connManager, cc.vbc, + opts.ScopeName, opts.CollectionName, opts.CollectionID, opts.Key, opts.VBUUID, + func(collectionID uint32, endpoint string, vbID uint16, client KvClient) (*DeleteWithMetaResult, error) { resp, err := client.DeleteWithMeta(ctx, &memdx.DeleteWithMetaRequest{ CollectionID: collectionID, Key: opts.Key, @@ -1344,6 +1401,7 @@ type LookupInOptions struct { Key []byte ScopeName string CollectionName string + CollectionID uint32 Ops []memdx.LookupInOp Flags memdx.SubdocDocFlag OnBehalfOf string @@ -1361,8 +1419,8 @@ func (cc *CrudComponent) LookupIn(ctx context.Context, opts *LookupInOptions) (* return OrchestrateSimpleCrud( ctx, cc.retries, cc.collections, cc.vbs, cc.nmvHandler, cc.connManager, - opts.ScopeName, opts.CollectionName, opts.Key, - func(collectionID uint32, manifestID uint64, endpoint string, vbID uint16, client KvClient) (*LookupInResult, error) { + opts.ScopeName, opts.CollectionName, opts.CollectionID, opts.Key, + func(collectionID uint32, endpoint string, vbID uint16, client KvClient) (*LookupInResult, error) { resp, err := client.LookupIn(ctx, &memdx.LookupInRequest{ CollectionID: collectionID, Key: opts.Key, @@ -1389,6 +1447,7 @@ type MutateInOptions struct { Key []byte ScopeName string CollectionName string + CollectionID uint32 Ops []memdx.MutateInOp Flags memdx.SubdocDocFlag Expiry uint32 @@ -1410,8 +1469,8 @@ func (cc *CrudComponent) MutateIn(ctx context.Context, opts *MutateInOptions) (* return OrchestrateSimpleCrud( ctx, cc.retries, cc.collections, cc.vbs, cc.nmvHandler, cc.connManager, - opts.ScopeName, opts.CollectionName, opts.Key, - func(collectionID uint32, manifestID uint64, endpoint string, vbID uint16, client KvClient) (*MutateInResult, error) { + opts.ScopeName, opts.CollectionName, opts.CollectionID, opts.Key, + func(collectionID uint32, endpoint string, vbID uint16, client KvClient) (*MutateInResult, error) { resp, err := client.MutateIn(ctx, &memdx.MutateInRequest{ CollectionID: collectionID, Key: opts.Key, @@ -1446,6 +1505,7 @@ type GetOrLookupOptions struct { Key []byte ScopeName string CollectionName string + CollectionID uint32 Project []string WithExpiry bool WithFlags bool diff --git a/crud_rangescan.go b/crud_rangescan.go index 6c59ced..94e9994 100644 --- a/crud_rangescan.go +++ b/crud_rangescan.go @@ -10,6 +10,7 @@ import ( type RangeScanCreateOptions struct { ScopeName string CollectionName string + CollectionID uint32 VbucketID uint16 Scan json.RawMessage @@ -33,38 +34,36 @@ func (cc *CrudComponent) RangeScanCreate(ctx context.Context, opts *RangeScanCre return OrchestrateRetries( ctx, cc.retries, func() (*RangeScanCreateResult, error) { - return OrchestrateMemdCollectionID( - ctx, cc.collections, opts.ScopeName, opts.CollectionName, - func(collectionID uint32, manifestID uint64) (*RangeScanCreateResult, error) { - endpoint, err := cc.vbs.DispatchToVbucket(opts.VbucketID, 0) + return OrchestrateMemdCollectionID(ctx, cc.collections, opts.ScopeName, opts.CollectionName, opts.CollectionID, func(collectionID uint32) (*RangeScanCreateResult, error) { + endpoint, err := cc.vbs.DispatchToVbucket(opts.VbucketID, 0) + if err != nil { + return nil, err + } + return OrchestrateMemdClient(ctx, cc.connManager, endpoint, func(client KvClient) (*RangeScanCreateResult, error) { + res, err := client.RangeScanCreate(ctx, &memdx.RangeScanCreateRequest{ + CollectionID: collectionID, + VbucketID: opts.VbucketID, + Scan: opts.Scan, + KeysOnly: opts.KeysOnly, + Range: opts.Range, + Sampling: opts.Sampling, + Snapshot: opts.Snapshot, + CrudRequestMeta: memdx.CrudRequestMeta{ + OnBehalfOf: opts.OnBehalfOf, + }, + }) if err != nil { return nil, err } - return OrchestrateMemdClient(ctx, cc.connManager, endpoint, func(client KvClient) (*RangeScanCreateResult, error) { - res, err := client.RangeScanCreate(ctx, &memdx.RangeScanCreateRequest{ - CollectionID: collectionID, - VbucketID: opts.VbucketID, - Scan: opts.Scan, - KeysOnly: opts.KeysOnly, - Range: opts.Range, - Sampling: opts.Sampling, - Snapshot: opts.Snapshot, - CrudRequestMeta: memdx.CrudRequestMeta{ - OnBehalfOf: opts.OnBehalfOf, - }, - }) - if err != nil { - return nil, err - } - - return &RangeScanCreateResult{ - ScanUUID: res.ScanUUUID, - client: client, - vbucketID: opts.VbucketID, - parent: cc, - }, nil - }) + + return &RangeScanCreateResult{ + ScanUUID: res.ScanUUUID, + client: client, + vbucketID: opts.VbucketID, + parent: cc, + }, nil }) + }) }) } diff --git a/crud_test.go b/crud_test.go index 8c464d7..a3eccd3 100644 --- a/crud_test.go +++ b/crud_test.go @@ -49,7 +49,7 @@ func TestSimpleCrudCollectionMapOutdatedRetries(t *testing.T) { } var fnCalls int - fn := func(collectionID uint32, manifestID uint64, endpoint string, vbID uint16, client KvClient) (*UpsertResult, error) { + fn := func(collectionID uint32, endpoint string, vbID uint16, client KvClient) (*UpsertResult, error) { if fnCalls == 0 { fnCalls++ return nil, &memdx.ServerErrorWithContext{ @@ -62,18 +62,7 @@ func TestSimpleCrudCollectionMapOutdatedRetries(t *testing.T) { return &UpsertResult{}, nil } - res, err := OrchestrateSimpleCrud[*UpsertResult]( - context.Background(), - rs, - cr, - vb, - nil, - nkcp, - "scope", - "collection", - []byte("somekey"), - fn, - ) + res, err := OrchestrateSimpleCrud[*UpsertResult](context.Background(), rs, cr, vb, nil, nkcp, "scope", "collection", 0, []byte("somekey"), fn) require.NoError(t, err) require.NotNil(t, res) diff --git a/errors.go b/errors.go index b22b380..7f99ed6 100644 --- a/errors.go +++ b/errors.go @@ -12,8 +12,10 @@ var ( ErrInternalServerError = errors.New("internal server error") ErrVbucketMapOutdated = errors.New("the vbucket map is out of date") ErrCollectionManifestOutdated = errors.New("the collection manifest is out of date") + ErrCollectionIDMismatch = errors.New("the provided collection id does not match") ErrServiceNotAvailable = errors.New("service is not available") ErrNoBucketSelected = errors.New("no bucket selected, please select a bucket before performing bucket operations") + ErrVbucketUUIDMismatch = errors.New("the provided vbucket uuid does not match") ) type placeholderError struct { @@ -56,6 +58,22 @@ func (e CollectionManifestOutdatedError) Unwrap() error { return ErrCollectionManifestOutdated } +type CollectionIDMismatchError struct { + Cause error + CollectionID uint32 + ServerCollectionID uint32 + ManifestUid uint64 +} + +func (e CollectionIDMismatchError) Error() string { + return fmt.Sprintf("provided collection id mismatch: provided collection id: %d, sdk collection id: %d, "+ + "manifest uid: %d", e.CollectionID, e.ServerCollectionID, e.ManifestUid) +} + +func (e CollectionIDMismatchError) Unwrap() error { + return ErrCollectionIDMismatch +} + type VbucketMapOutdatedError struct { Cause error } @@ -233,3 +251,17 @@ func (e RetryOrchestrationError) Error() string { func (e RetryOrchestrationError) Unwrap() error { return e.Cause } + +type VbucketUUIDMisMatchError struct { + RequestedVbId uint16 + RequestVbUUID uint64 + ActualVbUUID uint64 +} + +func (e VbucketUUIDMisMatchError) Error() string { + return fmt.Sprintf("invalid vbucket uuid for vbucket %d (requested: %d, actual: %d)", e.RequestedVbId, e.RequestVbUUID, e.ActualVbUUID) +} + +func (e VbucketUUIDMisMatchError) Unwrap() error { + return ErrVbucketUUIDMismatch +} diff --git a/generate-mocks.go b/generate-mocks.go index a82b3cc..882cf88 100644 --- a/generate-mocks.go +++ b/generate-mocks.go @@ -1,10 +1,10 @@ -//go:generate moq -out mock_collectionresolver_test.go . CollectionResolver -//go:generate moq -out mock_vbucketrouter_test.go . VbucketRouter -//go:generate moq -out mock_kvclient_test.go . KvClient MemdxClient -//go:generate moq -out mock_kvclientpool_test.go . KvClientPool -//go:generate moq -out mock_kvclientmanager_test.go . KvClientManager -//go:generate moq -out mock_retrymanager_test.go . RetryManager RetryController -//go:generate moq -out mock_bucketchecker_test.go . BucketChecker -//go:generate moq -out ./cbauthx/mock_authcheck_test.go ./cbauthx AuthCheck +//go:generate moq -fmt goimports -out mock_collectionresolver_test.go . CollectionResolver +//go:generate moq -fmt goimports -out mock_vbucketrouter_test.go . VbucketRouter +//go:generate moq -fmt goimports -out mock_kvclient_test.go . KvClient MemdxClient +//go:generate moq -fmt goimports -out mock_kvclientpool_test.go . KvClientPool +//go:generate moq -fmt goimports -out mock_kvclientmanager_test.go . KvClientManager +//go:generate moq -fmt goimports -out mock_retrymanager_test.go . RetryManager RetryController +//go:generate moq -fmt goimports -out mock_bucketchecker_test.go . BucketChecker +//go:generate moq -fmt goimports -out ./cbauthx/mock_authcheck_test.go ./cbauthx AuthCheck package gocbcorex diff --git a/kvclient.go b/kvclient.go index 5a34364..6a51ebd 100644 --- a/kvclient.go +++ b/kvclient.go @@ -97,6 +97,10 @@ type KvClient interface { RemoteAddr() net.Addr LocalAddr() net.Addr + // Baggage provides a way to store arbitrary state on the client. + Baggage(key any) (any, bool) + AddBaggage(key any, value any) + KvClientOps memdx.Dispatcher } @@ -105,6 +109,8 @@ type kvClient struct { logger *zap.Logger remoteHostname string + baggage sync.Map + pendingOperations uint64 cli MemdxClient telemetry *kvClientTelem @@ -377,6 +383,14 @@ func (c *kvClient) Telemetry() MemdClientTelem { return c.telemetry } +func (c *kvClient) Baggage(key any) (any, bool) { + return c.baggage.Load(key) +} + +func (c *kvClient) AddBaggage(key, value any) { + c.baggage.Store(key, value) +} + func (c *kvClient) handleUnsolicitedPacket(pak *memdx.Packet) { c.logger.Info("unexpected unsolicited packet", zap.String("opaque", strconv.Itoa(int(pak.Opaque))), diff --git a/mgmtcomponent.go b/mgmtcomponent.go index e073bbc..cc59ff0 100644 --- a/mgmtcomponent.go +++ b/mgmtcomponent.go @@ -187,6 +187,10 @@ func (w *MgmtComponent) CheckBucketExists(ctx context.Context, opts *cbmgmtx.Che return OrchestrateSimpleMgmtCall(ctx, w, cbmgmtx.Management.CheckBucketExists, opts) } +func (w *MgmtComponent) XdcrC2c(ctx context.Context, opts *cbmgmtx.XdcrC2cOptions) error { + return OrchestrateNoResMgmtCall(ctx, w, cbmgmtx.Management.XdcrC2c, opts) +} + type EnsureBucketOptions struct { BucketName string BucketUUID string diff --git a/mock_kvclient_test.go b/mock_kvclient_test.go index 2772775..3cee0d1 100644 --- a/mock_kvclient_test.go +++ b/mock_kvclient_test.go @@ -24,12 +24,18 @@ var _ KvClient = &KvClientMock{} // AddFunc: func(ctx context.Context, req *memdx.AddRequest) (*memdx.AddResponse, error) { // panic("mock out the Add method") // }, +// AddBaggageFunc: func(key any, value any) { +// panic("mock out the AddBaggage method") +// }, // AddWithMetaFunc: func(ctx context.Context, req *memdx.AddWithMetaRequest) (*memdx.AddWithMetaResponse, error) { // panic("mock out the AddWithMeta method") // }, // AppendFunc: func(ctx context.Context, req *memdx.AppendRequest) (*memdx.AppendResponse, error) { // panic("mock out the Append method") // }, +// BaggageFunc: func(key any) (any, bool) { +// panic("mock out the Baggage method") +// }, // CloseFunc: func() error { // panic("mock out the Close method") // }, @@ -139,12 +145,18 @@ type KvClientMock struct { // AddFunc mocks the Add method. AddFunc func(ctx context.Context, req *memdx.AddRequest) (*memdx.AddResponse, error) + // AddBaggageFunc mocks the AddBaggage method. + AddBaggageFunc func(key any, value any) + // AddWithMetaFunc mocks the AddWithMeta method. AddWithMetaFunc func(ctx context.Context, req *memdx.AddWithMetaRequest) (*memdx.AddWithMetaResponse, error) // AppendFunc mocks the Append method. AppendFunc func(ctx context.Context, req *memdx.AppendRequest) (*memdx.AppendResponse, error) + // BaggageFunc mocks the Baggage method. + BaggageFunc func(key any) (any, bool) + // CloseFunc mocks the Close method. CloseFunc func() error @@ -253,6 +265,13 @@ type KvClientMock struct { // Req is the req argument value. Req *memdx.AddRequest } + // AddBaggage holds details about calls to the AddBaggage method. + AddBaggage []struct { + // Key is the key argument value. + Key any + // Value is the value argument value. + Value any + } // AddWithMeta holds details about calls to the AddWithMeta method. AddWithMeta []struct { // Ctx is the ctx argument value. @@ -267,6 +286,11 @@ type KvClientMock struct { // Req is the req argument value. Req *memdx.AppendRequest } + // Baggage holds details about calls to the Baggage method. + Baggage []struct { + // Key is the key argument value. + Key any + } // Close holds details about calls to the Close method. Close []struct { } @@ -482,8 +506,10 @@ type KvClientMock struct { } } lockAdd sync.RWMutex + lockAddBaggage sync.RWMutex lockAddWithMeta sync.RWMutex lockAppend sync.RWMutex + lockBaggage sync.RWMutex lockClose sync.RWMutex lockDecrement sync.RWMutex lockDelete sync.RWMutex @@ -555,6 +581,42 @@ func (mock *KvClientMock) AddCalls() []struct { return calls } +// AddBaggage calls AddBaggageFunc. +func (mock *KvClientMock) AddBaggage(key any, value any) { + if mock.AddBaggageFunc == nil { + panic("KvClientMock.AddBaggageFunc: method is nil but KvClient.AddBaggage was just called") + } + callInfo := struct { + Key any + Value any + }{ + Key: key, + Value: value, + } + mock.lockAddBaggage.Lock() + mock.calls.AddBaggage = append(mock.calls.AddBaggage, callInfo) + mock.lockAddBaggage.Unlock() + mock.AddBaggageFunc(key, value) +} + +// AddBaggageCalls gets all the calls that were made to AddBaggage. +// Check the length with: +// +// len(mockedKvClient.AddBaggageCalls()) +func (mock *KvClientMock) AddBaggageCalls() []struct { + Key any + Value any +} { + var calls []struct { + Key any + Value any + } + mock.lockAddBaggage.RLock() + calls = mock.calls.AddBaggage + mock.lockAddBaggage.RUnlock() + return calls +} + // AddWithMeta calls AddWithMetaFunc. func (mock *KvClientMock) AddWithMeta(ctx context.Context, req *memdx.AddWithMetaRequest) (*memdx.AddWithMetaResponse, error) { if mock.AddWithMetaFunc == nil { @@ -627,6 +689,38 @@ func (mock *KvClientMock) AppendCalls() []struct { return calls } +// Baggage calls BaggageFunc. +func (mock *KvClientMock) Baggage(key any) (any, bool) { + if mock.BaggageFunc == nil { + panic("KvClientMock.BaggageFunc: method is nil but KvClient.Baggage was just called") + } + callInfo := struct { + Key any + }{ + Key: key, + } + mock.lockBaggage.Lock() + mock.calls.Baggage = append(mock.calls.Baggage, callInfo) + mock.lockBaggage.Unlock() + return mock.BaggageFunc(key) +} + +// BaggageCalls gets all the calls that were made to Baggage. +// Check the length with: +// +// len(mockedKvClient.BaggageCalls()) +func (mock *KvClientMock) BaggageCalls() []struct { + Key any +} { + var calls []struct { + Key any + } + mock.lockBaggage.RLock() + calls = mock.calls.Baggage + mock.lockBaggage.RUnlock() + return calls +} + // Close calls CloseFunc. func (mock *KvClientMock) Close() error { if mock.CloseFunc == nil { diff --git a/vbucketconsistency.go b/vbucketconsistency.go new file mode 100644 index 0000000..28d2c41 --- /dev/null +++ b/vbucketconsistency.go @@ -0,0 +1,82 @@ +package gocbcorex + +import ( + "context" + "sync" + + "go.uber.org/zap" +) + +type vbuuidBaggageKey struct{} + +type VbucketUuidConsistency interface { + VerifyVBucket(ctx context.Context, client KvClient, vbID uint16, vbUUID uint64) error +} + +type vbucketConsistencyComponentOptions struct { + logger *zap.Logger +} + +type vbucketConsistencyComponent struct { + logger *zap.Logger + + slowLock sync.Mutex +} + +func newVbucketConsistencyComponent(opts *vbucketConsistencyComponentOptions) *vbucketConsistencyComponent { + return &vbucketConsistencyComponent{ + logger: opts.logger, + } +} + +func (vb *vbucketConsistencyComponent) VerifyVBucket(ctx context.Context, client KvClient, vbID uint16, vbUUID uint64) error { + if vbuuidCache, ok := client.Baggage(&vbuuidBaggageKey{}); ok { + vc := vbuuidCache.(*vbucketUUIDCache) + serverVbUUID, err := vc.LookupVbUUID(ctx, vbID, client) + if err != nil { + return err + } + + if serverVbUUID != vbUUID { + return VbucketUUIDMisMatchError{ + RequestedVbId: vbID, + RequestVbUUID: vbUUID, + ActualVbUUID: serverVbUUID, + } + } + + return nil + } + + vb.slowLock.Lock() + if _, ok := client.Baggage(&vbuuidBaggageKey{}); ok { + vb.slowLock.Unlock() + return vb.VerifyVBucket(ctx, client, vbID, vbUUID) + } + + cache := newVBucketUUIDCache(&vbucketUUIDCacheOptions{}) + client.AddBaggage(&vbuuidBaggageKey{}, cache) + vb.slowLock.Unlock() + + return vb.VerifyVBucket(ctx, client, vbID, vbUUID) +} + +func OrchestrateVBucketConsistency[RespT any]( + ctx context.Context, + vb VbucketUuidConsistency, + client KvClient, + vbID uint16, + vbUUID uint64, + fn func(KvClient) (RespT, error), +) (RespT, error) { + if vbUUID == 0 { + return fn(client) + } + + if err := vb.VerifyVBucket(ctx, client, vbID, vbUUID); err != nil { + var emptyResp RespT + return emptyResp, err + } + + return fn(client) +} diff --git a/vbucketconsistency_test.go b/vbucketconsistency_test.go new file mode 100644 index 0000000..b992c98 --- /dev/null +++ b/vbucketconsistency_test.go @@ -0,0 +1,208 @@ +package gocbcorex + +import ( + "context" + "strconv" + "testing" + + "github.com/couchbase/gocbcorex/memdx" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestVBucketConsistencyFastCacheMatch(t *testing.T) { + logger, _ := zap.NewDevelopment() + + fastCache := vbucketUUIDCacheFastCache{ + vbuckets: map[uint16]uint64{ + 1: 1234, + 2: 5678, + 3: 91011, + }, + } + cache := &vbucketUUIDCache{} + cache.fastCache.Store(&fastCache) + + kvCli := &KvClientMock{ + BaggageFunc: func(key any) (any, bool) { + return cache, true + }, + } + + vbComp := newVbucketConsistencyComponent(&vbucketConsistencyComponentOptions{ + logger: logger, + }) + + err := vbComp.VerifyVBucket(context.Background(), kvCli, 1, 1234) + require.NoError(t, err) +} + +func TestVBucketConsistencyFastCacheMismatch(t *testing.T) { + logger, _ := zap.NewDevelopment() + + fastCache := vbucketUUIDCacheFastCache{ + vbuckets: map[uint16]uint64{ + 1: 1234, + 2: 5678, + 3: 91011, + }, + } + cache := &vbucketUUIDCache{} + cache.fastCache.Store(&fastCache) + + kvCli := &KvClientMock{ + BaggageFunc: func(key any) (any, bool) { + return cache, true + }, + } + + vbComp := newVbucketConsistencyComponent(&vbucketConsistencyComponentOptions{ + logger: logger, + }) + + err := vbComp.VerifyVBucket(context.Background(), kvCli, 1, 5678) + require.ErrorIs(t, err, ErrVbucketUUIDMismatch) +} + +func TestVBucketConsistencyFastCacheUnknownVbucket(t *testing.T) { + logger, _ := zap.NewDevelopment() + + fastCache := vbucketUUIDCacheFastCache{ + vbuckets: map[uint16]uint64{ + 1: 1234, + 2: 5678, + 3: 91011, + }, + } + cache := &vbucketUUIDCache{} + cache.fastCache.Store(&fastCache) + + kvCli := &KvClientMock{ + BaggageFunc: func(key any) (any, bool) { + return cache, true + }, + } + + vbComp := newVbucketConsistencyComponent(&vbucketConsistencyComponentOptions{ + logger: logger, + }) + + err := vbComp.VerifyVBucket(context.Background(), kvCli, 5, 5678) + require.ErrorIs(t, err, ErrInvalidVbucket) +} + +func TestVBucketConsistencyFastCacheMissing(t *testing.T) { + logger, _ := zap.NewDevelopment() + + vbUUID := 170788223609151 + vbUUID2 := 225299882966157 + baggage := make(map[any]any) + numBaggageCalls := 0 + numStatsCalls := 0 + + kvCli := &KvClientMock{ + BaggageFunc: func(key any) (any, bool) { + numBaggageCalls++ + if numBaggageCalls <= 2 { + return nil, false + } + + v, ok := baggage[key] + + return v, ok + }, + AddBaggageFunc: func(key any, value any) { + baggage[key] = value + }, + StatsFunc: func(ctx context.Context, req *memdx.StatsRequest, + dataCb func(*memdx.StatsDataResponse) error) (*memdx.StatsActionResponse, error) { + numStatsCalls++ + err := dataCb(&memdx.StatsDataResponse{ + Key: "vb_1:uuid", + Value: strconv.Itoa(vbUUID), + }) + if err != nil { + return nil, err + } + err = dataCb(&memdx.StatsDataResponse{ + Key: "vb_2:uuid", + Value: strconv.Itoa(vbUUID2), + }) + if err != nil { + return nil, err + } + + return &memdx.StatsActionResponse{}, nil + }, + } + + vbComp := newVbucketConsistencyComponent(&vbucketConsistencyComponentOptions{ + logger: logger, + }) + + err := vbComp.VerifyVBucket(context.Background(), kvCli, 1, uint64(vbUUID)) + require.NoError(t, err) + + err = vbComp.VerifyVBucket(context.Background(), kvCli, 2, uint64(vbUUID2)) + require.NoError(t, err) + + // 4 calls, 2 checking baggage on first call, then checking baggage against after creating the cache. + // 1 check on the second call. + assert.Equal(t, 4, numBaggageCalls) + assert.Equal(t, 1, numStatsCalls) +} + +func TestVBucketConsistencyFastCacheMissBaggageRace(t *testing.T) { + logger, _ := zap.NewDevelopment() + + vbUUID := 170788223609151 + vbUUID2 := 225299882966157 + numBaggageCalls := 0 + numAddBaggageCalls := 0 + numStatsCalls := 0 + + fastCache := vbucketUUIDCacheFastCache{ + vbuckets: map[uint16]uint64{ + 1: 170788223609151, + 2: 225299882966157, + 3: 91011, + }, + } + cache := &vbucketUUIDCache{} + cache.fastCache.Store(&fastCache) + + kvCli := &KvClientMock{ + BaggageFunc: func(key any) (any, bool) { + numBaggageCalls++ + if numBaggageCalls == 1 { + return nil, false + } + + return cache, true + }, + AddBaggageFunc: func(key any, value any) { + numAddBaggageCalls++ + }, + StatsFunc: func(ctx context.Context, req *memdx.StatsRequest, + dataCb func(*memdx.StatsDataResponse) error) (*memdx.StatsActionResponse, error) { + numStatsCalls++ + + return &memdx.StatsActionResponse{}, nil + }, + } + + vbComp := newVbucketConsistencyComponent(&vbucketConsistencyComponentOptions{ + logger: logger, + }) + + err := vbComp.VerifyVBucket(context.Background(), kvCli, 1, uint64(vbUUID)) + require.NoError(t, err) + + err = vbComp.VerifyVBucket(context.Background(), kvCli, 2, uint64(vbUUID2)) + require.NoError(t, err) + + assert.Equal(t, 4, numBaggageCalls) + assert.Zero(t, numStatsCalls) + assert.Zero(t, numAddBaggageCalls) +} diff --git a/vbucketuuidcache.go b/vbucketuuidcache.go new file mode 100644 index 0000000..5c91584 --- /dev/null +++ b/vbucketuuidcache.go @@ -0,0 +1,99 @@ +package gocbcorex + +import ( + "context" + "sync" + "sync/atomic" + + "github.com/couchbase/gocbcorex/memdx" +) + +type vbucketUUIDCacheFastCache struct { + vbuckets map[uint16]uint64 +} + +type vbucketUUIDCacheOptions struct { +} + +type vbucketUUIDCache struct { + fastCache atomic.Pointer[vbucketUUIDCacheFastCache] + + slowLock sync.Mutex + slowPendingCh chan struct{} +} + +func newVBucketUUIDCache(opts *vbucketUUIDCacheOptions) *vbucketUUIDCache { + return &vbucketUUIDCache{} +} + +func (vc *vbucketUUIDCache) LookupVbUUID(ctx context.Context, vbID uint16, client KvClient) (uint64, error) { + fastCache := vc.fastCache.Load() + if fastCache != nil { + if vbUUID, ok := fastCache.vbuckets[vbID]; ok { + return vbUUID, nil + } else { + return 0, ErrInvalidVbucket + } + } + + vc.slowLock.Lock() + if vc.slowPendingCh == nil { + vc.slowPendingCh = make(chan struct{}) + vc.slowLock.Unlock() + + vbUUIDs, err := vc.getVBUUIDs(ctx, client) + if err != nil { + vc.slowLock.Lock() + close(vc.slowPendingCh) + vc.slowPendingCh = nil + vc.slowLock.Unlock() + return 0, err + } + + vc.slowLock.Lock() + close(vc.slowPendingCh) + vc.slowPendingCh = nil + + fastCache := &vbucketUUIDCacheFastCache{ + vbuckets: vbUUIDs, + } + vc.fastCache.Store(fastCache) + vc.slowLock.Unlock() + + return vc.LookupVbUUID(ctx, vbID, client) + } + + pendingCh := vc.slowPendingCh + vc.slowLock.Unlock() + + select { + case <-pendingCh: + case <-ctx.Done(): + return 0, ctx.Err() + } + + return vc.LookupVbUUID(ctx, vbID, client) +} + +func (vc *vbucketUUIDCache) getVBUUIDs(ctx context.Context, client KvClient) (map[uint16]uint64, error) { + parser := memdx.VbucketSeqNoStatsParser{} + _, err := client.Stats(ctx, &memdx.StatsRequest{ + UtilsRequestMeta: memdx.UtilsRequestMeta{ + OnBehalfOf: "", + }, + GroupName: parser.GroupName(), + }, func(response *memdx.StatsDataResponse) error { + parser.HandleEntry(response.Key, response.Value) + return nil + }) + if err != nil { + return nil, err + } + + cache := make(map[uint16]uint64, len(parser.Vbuckets)) + for vbID, vbDetails := range parser.Vbuckets { + cache[vbID] = vbDetails.Uuid + } + + return cache, nil +}