-
Notifications
You must be signed in to change notification settings - Fork 139
/
Copy pathblip_client_test.go
1673 lines (1430 loc) · 64.3 KB
/
blip_client_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
Copyright 2018-Present Couchbase, Inc.
Use of this software is governed by the Business Source License included in
the file licenses/BSL-Couchbase.txt. As of the Change Date specified in that
file, in accordance with the Business Source License, use of this software will
be governed by the Apache License, Version 2.0, included in the file
licenses/APL2.txt.
*/
package rest
import (
"bytes"
"context"
"encoding/base64"
"fmt"
"iter"
"net/http"
"slices"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/couchbase/go-blip"
"github.com/couchbase/sync_gateway/base"
"github.com/couchbase/sync_gateway/db"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type BlipTesterClientOpts struct {
ClientDeltas bool // Support deltas on the client side
Username string
Channels []string
SendRevocations bool
SupportedBLIPProtocols []string
SkipCollectionsInitialization bool
// a deltaSrc rev ID for which to reject a delta
rejectDeltasForSrcRev string
// changesEntryCallback is a callback function invoked for each changes entry being received (pull)
changesEntryCallback func(docID, revID string)
// optional Origin header
origin *string
// sendReplacementRevs opts into the replacement rev behaviour in the event that we do not find the requested one.
sendReplacementRevs bool
revsLimit *int // defaults to 20
}
// defaultBlipTesterClientRevsLimit is the number of revisions sent as history when the client replicates - older revisions are not sent, and may not be stored.
const defaultBlipTesterClientRevsLimit = 20
// BlipTesterClient is a fully fledged client to emulate CBL behaviour on both push and pull replications through methods on this type.
type BlipTesterClient struct {
BlipTesterClientOpts
id uint32 // unique ID for the client
rt *RestTester
pullReplication *BlipTesterReplicator // SG -> CBL replications
pushReplication *BlipTesterReplicator // CBL -> SG replications
collectionClients []*BlipTesterCollectionClient
nonCollectionAwareClient *BlipTesterCollectionClient
}
// getProposeChangesForSeq returns a proposeChangeBatch entry for a document, if there is a document existing at this sequence.
func (btcc *BlipTesterCollectionClient) getProposeChangesForSeq(seq clientSeq) (*proposeChangeBatchEntry, bool) {
btcc.seqLock.RLock()
defer btcc.seqLock.RUnlock()
doc, ok := btcc._seqStore[seq]
if !ok {
return nil, false
}
return doc._proposeChangesEntryForDoc(), true
}
// OneShotChangesSince is an iterator that yields client sequence and document pairs that are newer than the given since value.
func (btcc *BlipTesterCollectionClient) OneShotChangesSince(ctx context.Context, since clientSeq) iter.Seq2[clientSeq, *proposeChangeBatchEntry] {
return func(yield func(clientSeq, *proposeChangeBatchEntry) bool) {
btcc.seqLock.Lock()
seqLast := btcc._seqLast
for btcc._seqLast <= since {
if ctx.Err() != nil {
btcc.seqLock.Unlock()
return
}
// block until new seq
base.DebugfCtx(ctx, base.KeySGTest, "OneShotChangesSince: since=%d, _seqLast=%d - waiting for new sequence", since, btcc._seqLast)
btcc._seqCond.Wait()
// Check to see if we were woken because of Close()
if ctx.Err() != nil {
btcc.seqLock.Unlock()
return
}
seqLast = btcc._seqLast
base.DebugfCtx(ctx, base.KeySGTest, "OneShotChangesSince: since=%d, _seqLast=%d - woke up", since, btcc._seqLast)
}
btcc.seqLock.Unlock()
base.DebugfCtx(ctx, base.KeySGTest, "OneShotChangesSince: since=%d, _seqLast=%d - iterating", since, seqLast)
for seq := since; seq <= seqLast; seq++ {
change, ok := btcc.getProposeChangesForSeq(seq)
// filter non-latest entries in cases where we haven't pruned _seqStore
if !ok {
continue
}
if !yield(seq, change) {
base.DebugfCtx(ctx, base.KeySGTest, "OneShotChangesSince: since=%d, _seqLast=%d - stopping iteration", since, seqLast)
return
}
}
base.DebugfCtx(ctx, base.KeySGTest, "OneShotChangesSince: since=%d, _seqLast=%d - done", since, seqLast)
}
}
// changesSince returns a channel which will yield proposed versions of changes that are the given since value.
// The channel will be closed when the iteration is finished. In the case of a continuous iteration, the channel will remain open until the context is cancelled.
func (btcc *BlipTesterCollectionClient) changesSince(ctx context.Context, since clientSeq, continuous bool) chan *proposeChangeBatchEntry {
ch := make(chan *proposeChangeBatchEntry)
btcc.goroutineWg.Add(1)
go func() {
defer btcc.goroutineWg.Done()
sinceVal := since
defer close(ch)
for {
if ctx.Err() != nil {
return
}
base.DebugfCtx(ctx, base.KeySGTest, "changesSince: sinceVal=%d", sinceVal)
for _, change := range btcc.OneShotChangesSince(ctx, sinceVal) {
select {
case <-ctx.Done():
return
case ch <- change:
base.DebugfCtx(ctx, base.KeySGTest, "sent doc %q to changes feed", change.docID)
sinceVal = change.seq
}
}
if !continuous {
base.DebugfCtx(ctx, base.KeySGTest, "opts.Continuous=false, breaking changes loop")
break
}
}
}()
return ch
}
type clientSeq uint64
// clientDocRev represents a revision of a document stored on this client, including any metadata associated with this specific revision.
type clientDocRev struct {
clientSeq clientSeq
version DocVersion
body []byte
isDelete bool
message *blip.Message // rev or norev message associated with this revision when replicated
}
// clientDoc represents a document stored on the client - it may also contain older versions of the document.
type clientDoc struct {
id string // doc ID
_latestSeq clientSeq // Latest sequence number we have for the doc - the active rev
_latestServerVersion DocVersion // Latest version we know the server had (via push or a pull)
_revisionsBySeq map[clientSeq]clientDocRev // Full history of doc from client POV
_seqsByVersions map[DocVersion]clientSeq // Lookup from version into revisionsBySeq
}
// newClientDocument creates a local copy of a document
func newClientDocument(docID string, newClientSeq clientSeq, rev *clientDocRev) *clientDoc {
doc := &clientDoc{
id: docID,
_latestSeq: newClientSeq,
_revisionsBySeq: make(map[clientSeq]clientDocRev),
_seqsByVersions: make(map[DocVersion]clientSeq),
}
if rev != nil {
doc._revisionsBySeq[rev.clientSeq] = *rev
doc._seqsByVersions[rev.version] = rev.clientSeq
}
return doc
}
// _docRevSeqsNewestToOldest returns a list of sequences associated with this document, ordered newest to oldest. Calling this function requires holding BlipTesterCollectionClient.seqLock as read lock.
func (cd *clientDoc) _docRevSeqsNewestToOldest() []clientSeq {
seqs := make([]clientSeq, 0, len(cd._revisionsBySeq))
for _, rev := range cd._revisionsBySeq {
seqs = append(seqs, rev.clientSeq)
}
slices.Sort(seqs) // oldest to newest
slices.Reverse(seqs) // newest to oldest
return seqs
}
// _latestRev returns the latest revision of the document. Calling this function requires holding BlipTesterCollectionClient.seqLock as read lock.
func (cd *clientDoc) _latestRev(tb testing.TB) *clientDocRev {
rev, ok := cd._revisionsBySeq[cd._latestSeq]
require.True(tb, ok, "latestSeq %d not found in revisionsBySeq", cd._latestSeq)
return &rev
}
// _addNewRev adds a new revision to the document. Calling this function requires holding BlipTesterCollectionClient.seqLock as write lock.
func (cd *clientDoc) _addNewRev(rev clientDocRev) {
cd._latestSeq = rev.clientSeq
cd._revisionsBySeq[rev.clientSeq] = rev
cd._seqsByVersions[rev.version] = rev.clientSeq
}
// _revisionBySeq returns the revision associated with the given sequence number. Calling this function requires holding BlipTesterCollectionClient.seqLock as read lock.
func (cd *clientDoc) _revisionBySeq(tb testing.TB, seq clientSeq) *clientDocRev {
rev, ok := cd._revisionsBySeq[seq]
require.True(tb, ok, "seq %d not found in revisionsBySeq", seq)
return &rev
}
// _getRev returns the revision associated with the given version. Calling this function requires holding BlipTesterCollectionClient.seqLock as read lock.
func (cd *clientDoc) _getRev(tb testing.TB, version DocVersion) *clientDocRev {
seq, ok := cd._seqsByVersions[version]
require.True(tb, ok, "version %v not found in seqsByVersions", version)
rev, ok := cd._revisionsBySeq[seq]
require.True(tb, ok, "seq %d not found in revisionsBySeq", seq)
return &rev
}
// _pruneVersion removes the given version from the document. Calling this function requires holding BlipTesterCollectionClient.seqLock as write lock.
func (cd *clientDoc) _pruneVersion(t testing.TB, version DocVersion) {
seq, ok := cd._seqsByVersions[version]
require.True(t, ok, "version %v not found in seqsByVersions", version)
require.Less(t, seq, cd._latestSeq, "seq %d is the latest seq for doc %q, can not prune latest version", seq, cd.id)
delete(cd._seqsByVersions, version)
delete(cd._revisionsBySeq, seq)
}
// _proposeChangesEntryForDoc returns a proposeChangeBatchEntry representing the revision and history for the change from the last known version replicated to server. Calling this function requires holding BlipTesterCollectionClient.seqLock as read lock.
func (cd *clientDoc) _proposeChangesEntryForDoc() *proposeChangeBatchEntry {
latestRev := cd._revisionsBySeq[cd._latestSeq]
var revisionHistory []DocVersion
for i, seq := range cd._docRevSeqsNewestToOldest() {
if i == 0 {
// skip current rev
continue
}
revisionHistory = append(revisionHistory, cd._revisionsBySeq[seq].version)
}
return &proposeChangeBatchEntry{docID: cd.id, version: latestRev.version, history: revisionHistory, latestServerVersion: cd._latestServerVersion, seq: cd._latestSeq, isDelete: latestRev.isDelete}
}
type BlipTesterCollectionClient struct {
parent *BlipTesterClient
ctx context.Context
ctxCancel context.CancelFunc
goroutineWg sync.WaitGroup
collection string
collectionIdx int
// seqLock protects all _seq fields below, including the _seqStore map
seqLock *sync.RWMutex
// _lastSeq is the client's latest assigned sequence number
_seqLast clientSeq
// _seqStore is a sparse map of (client) sequences and the corresponding document
// entries are removed from this map when the sequence no longer represents an active document revision
// the older revisions for a particular document can still be accessed via clientDoc.revisionsBySeq if required
_seqStore map[clientSeq]*clientDoc
// _seqFromDocID used to lookup entry in _seqStore by docID - not a pointer into other map for simplicity
_seqFromDocID map[string]clientSeq
// _seqCond is used to signal when a new sequence has been added to wake up idle "changes" loops
_seqCond *sync.Cond
attachmentsLock sync.RWMutex // lock for _attachments map
_attachments map[string][]byte // Client's local store of _attachments - Map of digest to bytes
}
// _getClientDoc returns the clientDoc for the given docID, if it exists. Requires BlipTesterCollectionClient.seqLock read lock to be held.
func (btcc *BlipTesterCollectionClient) _getClientDoc(docID string) (*clientDoc, bool) {
seq, ok := btcc._seqFromDocID[docID]
if !ok {
return nil, false
}
clientDoc, ok := btcc._seqStore[seq]
require.True(btcc.TB(), ok, "docID %q found in _seqFromDocID but seq %d not in _seqStore %v", docID, seq, btcc._seqStore)
return clientDoc, ok
}
// BlipTestClientRunner is for running the blip tester client and its associated methods in test framework
type BlipTestClientRunner struct {
clients map[uint32]*BlipTesterClient // map of created BlipTesterClient's
t *testing.T
initialisedInsideRunnerCode bool // flag to check that the BlipTesterClient is being initialised in the correct area (inside the Run() method)
SkipVersionVectorInitialization bool // used to skip the version vector subtest
}
// BlipTesterReplicator is a BlipTester which stores a map of messages keyed by Serial Number
type BlipTesterReplicator struct {
bt *BlipTester
id string // Generated UUID on creation
messagesLock sync.RWMutex // lock for messages map
messages map[blip.MessageNumber]*blip.Message // Map of blip messages keyed by message number
replicationStats *db.BlipSyncStats // Stats of replications
}
// NewBlipTesterClientRunner creates a BlipTestClientRunner type
func NewBlipTesterClientRunner(t *testing.T) *BlipTestClientRunner {
return &BlipTestClientRunner{
t: t,
clients: make(map[uint32]*BlipTesterClient),
}
}
// Close shuts down all the clients and clears all messages stored.
func (btr *BlipTesterReplicator) Close() {
btr.bt.Close()
btr.messagesLock.Lock()
btr.messages = make(map[blip.MessageNumber]*blip.Message, 0)
btr.messagesLock.Unlock()
}
// initHandlers sets up the blip client side handles for each message type.
func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) {
if btr.replicationStats == nil {
btr.replicationStats = db.NewBlipSyncStats()
}
ctx := base.DatabaseLogCtx(base.TestCtx(btr.bt.restTester.TB()), btr.bt.restTester.GetDatabase().Name, nil)
btr.bt.blipContext.DefaultHandler = btr.defaultHandler()
handlers := map[string]func(*blip.Message){
db.MessageNoRev: btr.handleNoRev(btc),
db.MessageGetAttachment: btr.handleGetAttachment(btc),
db.MessageRev: btr.handleRev(ctx, btc),
db.MessageProposeChanges: btr.handleProposeChanges(btc),
db.MessageChanges: btr.handleChanges(btc),
db.MessageProveAttachment: btr.handleProveAttachment(ctx, btc),
}
for profile, handler := range handlers {
btr.bt.blipContext.HandlerForProfile[profile] = func(msg *blip.Message) {
defer btr.storeMessage(msg)
handler(msg)
}
}
}
// handleProveAttachment handles proveAttachment received by blip client
func (btr *BlipTesterReplicator) handleProveAttachment(ctx context.Context, btc *BlipTesterClient) func(*blip.Message) {
return func(msg *blip.Message) {
nonce, err := msg.Body()
require.NoError(btr.TB(), err)
require.NotEmpty(btr.TB(), nonce, "no nonce sent with proveAttachment")
digest, ok := msg.Properties[db.ProveAttachmentDigest]
require.True(btr.TB(), ok, "no digest sent with proveAttachment")
btcc := btc.getCollectionClientFromMessage(msg)
attData := btcc.getAttachment(digest)
proof := db.ProveAttachment(ctx, attData, nonce)
resp := msg.Response()
resp.SetBody([]byte(proof))
btr.replicationStats.ProveAttachment.Add(1)
}
}
// handleChanges handles changes messages on the blip tester client
func (btr *BlipTesterReplicator) handleChanges(btc *BlipTesterClient) func(*blip.Message) {
revsLimit := base.ValDefault(btc.revsLimit, defaultBlipTesterClientRevsLimit)
return func(msg *blip.Message) {
btcc := btc.getCollectionClientFromMessage(msg)
// Exit early when there's nothing to do
if msg.NoReply() {
return
}
body, err := msg.Body()
require.NoError(btr.TB(), err)
knownRevs := []interface{}{}
if string(body) != "null" {
var changesReqs [][]interface{}
err = base.JSONUnmarshal(body, &changesReqs)
require.NoError(btr.TB(), err)
knownRevs = make([]interface{}, len(changesReqs))
// changesReqs == [[sequence, docID, revID, {deleted}, {size (bytes)}], ...]
outer:
for i, changesReq := range changesReqs {
docID := changesReq[1].(string)
revID := changesReq[2].(string)
if btc.changesEntryCallback != nil {
btc.changesEntryCallback(docID, revID)
}
deletedInt := 0
if len(changesReq) > 3 {
castedDeleted, ok := changesReq[3].(float64)
if ok {
deletedInt = int(castedDeleted)
if deletedInt&2 == 2 {
continue
}
}
}
// Build up a list of revisions known to the client for each change
// The first element of each revision list must be the parent revision of the change
revList := make([]string, 0, revsLimit)
allVersions := btcc.getAllRevisions(docID)
if len(allVersions) == 0 {
knownRevs[i] = []interface{}{} // sending empty array means we've not seen the doc before, but still want it
continue
}
for _, version := range allVersions {
if revID == version.RevID {
knownRevs[i] = nil // Send back null to signal we don't need this change
continue outer
}
if len(revList) < revsLimit {
revList = append(revList, version.RevID)
} else {
break
}
knownRevs[i] = revList
}
}
}
response := msg.Response()
if btc.ClientDeltas {
// Enable deltas from the client side
response.Properties["deltas"] = "true"
}
b, err := base.JSONMarshal(knownRevs)
require.NoError(btr.TB(), err)
response.SetBody(b)
}
}
// handleProposeChanges handles proposeChanges messages on the blip tester client
func (btr *BlipTesterReplicator) handleProposeChanges(btc *BlipTesterClient) func(msg *blip.Message) {
return func(msg *blip.Message) {
btc.pullReplication.storeMessage(msg)
}
}
// handleRev handles rev messages on the blip tester client
func (btr *BlipTesterReplicator) handleRev(ctx context.Context, btc *BlipTesterClient) func(msg *blip.Message) {
return func(msg *blip.Message) {
defer btc.pullReplication.storeMessage(msg)
btcc := btc.getCollectionClientFromMessage(msg)
docID := msg.Properties[db.RevMessageID]
revID := msg.Properties[db.RevMessageRev]
deltaSrc := msg.Properties[db.RevMessageDeltaSrc]
replacedRev := msg.Properties[db.RevMessageReplacedRev]
body, err := msg.Body()
require.NoError(btr.TB(), err)
if msg.Properties[db.RevMessageDeleted] == "1" {
rev := revOptions{
newVersion: DocVersion{RevID: revID},
body: body,
isDelete: true,
msg: msg,
updateLatestServerVersion: true,
}
if replacedRev != "" {
rev.replacedVersion = &DocVersion{RevID: replacedRev}
}
btcc.addRev(docID, rev)
if !msg.NoReply() {
response := msg.Response()
response.SetBody([]byte(`[]`))
}
return
}
// bodyJSON is unmarshalled into when required (e.g. Delta patching, or attachment processing)
// Before being marshalled back into bytes for storage in the test client
var bodyJSON db.Body
// If deltas are enabled, and we see a deltaSrc property, we'll need to patch it before storing
if btc.ClientDeltas && deltaSrc != "" {
if btc.rejectDeltasForSrcRev == deltaSrc {
require.False(btr.TB(), msg.NoReply(), "expected delta rev message to be sent without noreply flag: %+v", msg)
response := msg.Response()
response.SetError("HTTP", http.StatusUnprocessableEntity, "test code intentionally rejected delta")
}
// unmarshal body to extract deltaSrc
var delta db.Body
err := delta.Unmarshal(body)
require.NoError(btc.TB(), err)
old := btcc.getBody(docID, DocVersion{RevID: deltaSrc})
var oldMap = map[string]interface{}(old)
err = base.Patch(&oldMap, delta)
require.NoError(btc.TB(), err)
bodyJSON = oldMap
}
// Fetch any missing attachments (if required) during this rev processing
if bytes.Contains(body, []byte(db.BodyAttachments)) {
// We'll need to unmarshal the body in order to do attachment processing
if bodyJSON == nil {
err := bodyJSON.Unmarshal(body)
require.NoError(btr.TB(), err)
}
if atts, ok := bodyJSON[db.BodyAttachments]; ok {
attsMap, ok := atts.(map[string]interface{})
require.True(btr.TB(), ok, "atts in doc wasn't map[string]interface{}")
var missingDigests []string
var knownDigests []string
btcc.attachmentsLock.RLock()
for _, attachment := range attsMap {
attMap, ok := attachment.(map[string]interface{})
require.True(btr.TB(), ok, "att in doc wasn't map[string]interface{}")
digest := attMap["digest"].(string)
if _, found := btcc._attachments[digest]; !found {
missingDigests = append(missingDigests, digest)
} else if btr.bt.activeSubprotocol == db.CBMobileReplicationV2 {
// only v2 clients care about proveAttachments
knownDigests = append(knownDigests, digest)
}
}
btcc.attachmentsLock.RUnlock()
for _, digest := range knownDigests {
attData := btcc.getAttachment(digest)
nonce, proof, err := db.GenerateProofOfAttachment(ctx, attData)
require.NoError(btr.TB(), err)
// if we already have this attachment, _we_ should ask the peer whether _they_ have the attachment
outrq := blip.NewRequest()
outrq.SetProfile(db.MessageProveAttachment)
outrq.Properties[db.ProveAttachmentDigest] = digest
outrq.SetBody(nonce)
btcc.sendPullMsg(outrq)
resp := outrq.Response()
btc.pullReplication.storeMessage(resp)
respBody, err := resp.Body()
require.NoError(btr.TB(), err)
if resp.Type() == blip.ErrorType {
// forward error from proveAttachment response into rev response
if !msg.NoReply() {
response := msg.Response()
errorCode, _ := strconv.Atoi(resp.Properties["Error-Code"])
response.SetError(resp.Properties["Error-Code"], errorCode, string(respBody))
}
return
}
if string(respBody) != proof {
// forward error from proveAttachment response into rev response
if !msg.NoReply() {
response := msg.Response()
response.SetError(resp.Properties["Error-Code"], http.StatusForbidden, fmt.Sprintf("Incorrect proof for attachment %s", digest))
}
return
}
}
for _, digest := range missingDigests {
outrq := blip.NewRequest()
outrq.SetProfile(db.MessageGetAttachment)
outrq.Properties[db.GetAttachmentDigest] = digest
if btr.bt.activeSubprotocol >= db.CBMobileReplicationV3 {
outrq.Properties[db.GetAttachmentID] = docID
}
btcc.sendPullMsg(outrq)
resp := outrq.Response()
btc.pullReplication.storeMessage(resp)
respBody, err := resp.Body()
require.NoError(btr.TB(), err)
if resp.Type() == blip.ErrorType {
// forward error from getAttachment response into rev response
if !msg.NoReply() {
response := msg.Response()
errorCode, _ := strconv.Atoi(resp.Properties["Error-Code"])
response.SetError(resp.Properties["Error-Code"], errorCode, string(respBody))
return
}
}
btcc.attachmentsLock.Lock()
btcc._attachments[digest] = respBody
btcc.attachmentsLock.Unlock()
}
}
}
if bodyJSON != nil {
body, err = base.JSONMarshal(bodyJSON)
require.NoError(btr.TB(), err)
}
rev := revOptions{
newVersion: DocVersion{RevID: revID},
body: body,
msg: msg,
updateLatestServerVersion: true,
}
if replacedRev != "" {
// store the new sequence for a replaced rev for tests waiting for this specific rev
rev.replacedVersion = &DocVersion{RevID: replacedRev}
}
btcc.addRev(docID, rev)
if !msg.NoReply() {
response := msg.Response()
response.SetBody([]byte(`[]`))
}
}
}
// handleGetAttachment handles getAttachment messages on the blip tester client
func (btr *BlipTesterReplicator) handleGetAttachment(btc *BlipTesterClient) func(msg *blip.Message) {
return func(msg *blip.Message) {
digest, ok := msg.Properties[db.GetAttachmentDigest]
require.True(btr.TB(), ok, "couldn't find digest in getAttachment message properties")
btcc := btc.getCollectionClientFromMessage(msg)
attachment := btcc.getAttachment(digest)
response := msg.Response()
response.SetBody(attachment)
btr.replicationStats.GetAttachment.Add(1)
}
}
// handleNoRev handles noRev messages on the blip tester client
func (btr *BlipTesterReplicator) handleNoRev(btc *BlipTesterClient) func(msg *blip.Message) {
return func(msg *blip.Message) {
btcc := btc.getCollectionClientFromMessage(msg)
docID := msg.Properties[db.NorevMessageId]
revID := msg.Properties[db.NorevMessageRev]
btcc.addRev(docID, revOptions{
newVersion: DocVersion{RevID: revID},
msg: msg,
})
}
}
// defaultHandler is the default handler for the blip tester client, this will fail the test harness
func (btr *BlipTesterReplicator) defaultHandler() func(msg *blip.Message) {
return func(msg *blip.Message) {
require.FailNow(btr.TB(), fmt.Sprintf("Unknown profile: %s caught by client DefaultHandler - msg: %#v", msg.Profile(), msg))
}
}
// TB returns testing.TB for the current test
func (btr *BlipTesterReplicator) TB() testing.TB {
return btr.bt.restTester.TB()
}
// TB returns testing.TB for the current test
func (btcc *BlipTesterCollectionClient) TB() testing.TB {
return btcc.parent.rt.TB()
}
// saveAttachment takes base64 encoded data and stores the attachment on the client.
func (btcc *BlipTesterCollectionClient) saveAttachment(base64data string) (dataLength int, digest string) {
btcc.attachmentsLock.Lock()
defer btcc.attachmentsLock.Unlock()
ctx := base.DatabaseLogCtx(base.TestCtx(btcc.parent.rt.TB()), btcc.parent.rt.GetDatabase().Name, nil)
data, err := base64.StdEncoding.DecodeString(base64data)
require.NoError(btcc.TB(), err)
digest = db.Sha1DigestKey(data)
if _, found := btcc._attachments[digest]; found {
base.InfofCtx(ctx, base.KeySync, "attachment with digest %s already exists", digest)
} else {
btcc._attachments[digest] = data
}
return len(data), digest
}
// getAttachment returns the attachment data for the given digest. The test will fail if the attachment is not found.
func (btcc *BlipTesterCollectionClient) getAttachment(digest string) (attachment []byte) {
btcc.attachmentsLock.RLock()
defer btcc.attachmentsLock.RUnlock()
attachment, found := btcc._attachments[digest]
require.True(btcc.TB(), found, "attachment with digest %s not found", digest)
return attachment
}
// updateLastReplicatedRev stores this version as the last version replicated to Sync Gateway.
func (btcc *BlipTesterCollectionClient) updateLastReplicatedRev(docID string, version DocVersion, msg *blip.Message) {
btcc.seqLock.Lock()
defer btcc.seqLock.Unlock()
doc, ok := btcc._getClientDoc(docID)
require.True(btcc.TB(), ok, "docID %q not found in _seqFromDocID", docID)
doc._latestServerVersion = version
rev := doc._revisionsBySeq[doc._seqsByVersions[version]]
rev.message = msg
}
// getLastReplicatedRev returns the last version replicated to Sync Gateway for the given docID.
func (btcc *BlipTesterCollectionClient) getLastReplicatedRev(docID string) (version DocVersion, ok bool) {
btcc.seqLock.RLock()
defer btcc.seqLock.RUnlock()
doc, ok := btcc._getClientDoc(docID)
require.True(btcc.TB(), ok, "docID %q not found in _seqFromDocID", docID)
latestServerVersion := doc._latestServerVersion
return latestServerVersion, latestServerVersion.RevID != ""
}
func newBlipTesterReplication(tb testing.TB, id string, btc *BlipTesterClient, skipCollectionsInitialization bool) *BlipTesterReplicator {
bt, err := NewBlipTesterFromSpecWithRT(tb, &BlipTesterSpec{
connectingPassword: RestTesterDefaultUserPassword,
connectingUsername: btc.Username,
connectingUserChannelGrants: btc.Channels,
blipProtocols: btc.SupportedBLIPProtocols,
skipCollectionsInitialization: skipCollectionsInitialization,
origin: btc.origin,
}, btc.rt)
require.NoError(tb, err)
r := &BlipTesterReplicator{
id: id,
bt: bt,
messages: make(map[blip.MessageNumber]*blip.Message),
}
r.initHandlers(btc)
return r
}
// getCollectionsForBLIP returns collections configured by a single database instance on a restTester. If only default collection exists, it will skip returning it to test "legacy" blip mode.
func getCollectionsForBLIP(_ testing.TB, rt *RestTester) []string {
db := rt.GetDatabase()
var collections []string
for _, collection := range db.CollectionByID {
if base.IsDefaultCollection(collection.ScopeName, collection.Name) {
continue
}
collections = append(collections,
strings.Join([]string{collection.ScopeName, collection.Name}, base.ScopeCollectionSeparator))
}
slices.Sort(collections)
return collections
}
func (btcRunner *BlipTestClientRunner) NewBlipTesterClientOptsWithRT(rt *RestTester, opts *BlipTesterClientOpts) (client *BlipTesterClient) {
require.True(btcRunner.TB(), btcRunner.initialisedInsideRunnerCode, "must call BlipTestClientRunner.NewBlipTesterClientRunner from inside BlipTestClientRunner.Run() method")
if opts == nil {
opts = &BlipTesterClientOpts{}
}
id, err := uuid.NewRandom()
require.NoError(btcRunner.TB(), err)
client = &BlipTesterClient{
BlipTesterClientOpts: *opts,
rt: rt,
id: id.ID(),
}
btcRunner.clients[client.id] = client
client.createBlipTesterReplications()
return client
}
// TB returns testing.TB for the current test
func (btc *BlipTesterClient) TB() testing.TB {
return btc.rt.TB()
}
// Close shuts down all the clients and clears all messages stored.
func (btc *BlipTesterClient) Close() {
btc.tearDownBlipClientReplications()
for _, collectionClient := range btc.collectionClients {
collectionClient.Close()
}
if btc.nonCollectionAwareClient != nil {
btc.nonCollectionAwareClient.Close()
}
}
// TB returns testing.TB for the current test
func (btcRunner *BlipTestClientRunner) TB() testing.TB {
return btcRunner.t
}
// Run is the main entry point for running the blip tester client and its associated methods in test framework and should be used instead of t.Run
func (btcRunner *BlipTestClientRunner) Run(test func(t *testing.T, SupportedBLIPProtocols []string)) {
btcRunner.initialisedInsideRunnerCode = true
// reset to protect against someone creating a new client after Run() is run
defer func() { btcRunner.initialisedInsideRunnerCode = false }()
btcRunner.t.Run("revTree", func(t *testing.T) {
test(t, []string{db.CBMobileReplicationV3.SubprotocolString()})
})
}
// tearDownBlipClientReplications closes the push and pull replications for the client.
func (btc *BlipTesterClient) tearDownBlipClientReplications() {
btc.pullReplication.Close()
btc.pushReplication.Close()
}
// createBlipTesterReplications creates the push and pull replications for the client.
func (btc *BlipTesterClient) createBlipTesterReplications() {
id, err := uuid.NewRandom()
require.NoError(btc.TB(), err)
btc.pushReplication = newBlipTesterReplication(btc.TB(), "push"+id.String(), btc, btc.BlipTesterClientOpts.SkipCollectionsInitialization)
btc.pullReplication = newBlipTesterReplication(btc.TB(), "pull"+id.String(), btc, btc.BlipTesterClientOpts.SkipCollectionsInitialization)
collections := getCollectionsForBLIP(btc.TB(), btc.rt)
if !btc.BlipTesterClientOpts.SkipCollectionsInitialization && len(collections) > 0 {
btc.collectionClients = make([]*BlipTesterCollectionClient, len(collections))
for i, collection := range collections {
btc.initCollectionReplication(collection, i)
}
} else {
btc.nonCollectionAwareClient = NewBlipTesterCollectionClient(btc)
}
btc.pullReplication.bt.avoidRestTesterClose = true
btc.pushReplication.bt.avoidRestTesterClose = true
}
// initCollectionReplication initializes a BlipTesterCollectionClient for the given collection.
func (btc *BlipTesterClient) initCollectionReplication(collection string, collectionIdx int) {
btcReplicator := NewBlipTesterCollectionClient(btc)
btcReplicator.collection = collection
btcReplicator.collectionIdx = collectionIdx
btc.collectionClients[collectionIdx] = btcReplicator
}
// waitForReplicationMessage waits for a replication message with the given serial number.
func (btc *BlipTesterClient) waitForReplicationMessage(collection *db.DatabaseCollection, serialNumber blip.MessageNumber) *blip.Message {
if base.IsDefaultCollection(collection.ScopeName, collection.Name) {
return btc.pushReplication.WaitForMessage(serialNumber)
}
return btc.pushReplication.WaitForMessage(serialNumber + 1)
}
// getMostRecentChangesMessage returns the most recent non nil changes message received from the pull replication. This represents the latest set of changes.
func (btc *BlipTesterClient) getMostRecentChangesMessage() *blip.Message {
var highestMsgSeq uint32
var highestSeqMsg *blip.Message
// Grab most recent changes message
for _, message := range btc.pullReplication.GetMessages() {
if message.Properties["Profile"] != db.MessageChanges {
continue
}
messageBody, err := message.Body()
require.NoError(btc.TB(), err)
if string(messageBody) == "null" {
continue
}
if highestMsgSeq >= uint32(message.SerialNumber()) {
continue
}
highestMsgSeq = uint32(message.SerialNumber())
highestSeqMsg = message
}
return highestSeqMsg
}
// SingleCollection returns a single collection blip tester if the RestTester database is configured with only one collection. Otherwise, throw a fatal test error.
func (btcRunner *BlipTestClientRunner) SingleCollection(clientID uint32) *BlipTesterCollectionClient {
if btcRunner.clients[clientID].nonCollectionAwareClient != nil {
return btcRunner.clients[clientID].nonCollectionAwareClient
}
require.Equal(btcRunner.clients[clientID].TB(), 1, len(btcRunner.clients[clientID].collectionClients))
return btcRunner.clients[clientID].collectionClients[0]
}
// Collection return a collection blip tester by name, if configured in the RestTester database. Otherwise, throw a fatal test error.
func (btcRunner *BlipTestClientRunner) Collection(clientID uint32, collectionName string) *BlipTesterCollectionClient {
if collectionName == "_default._default" && btcRunner.clients[clientID].nonCollectionAwareClient != nil {
return btcRunner.clients[clientID].nonCollectionAwareClient
}
for _, collectionClient := range btcRunner.clients[clientID].collectionClients {
if collectionClient.collection == collectionName {
return collectionClient
}
}
require.FailNow(btcRunner.clients[clientID].TB(), fmt.Sprintf("Could not find collection %s in BlipTesterClient", collectionName))
return nil
}
// BlipTesterPushOptions
type BlipTesterPushOptions struct {
Continuous bool
Since string
// TODO: Not Implemented
// Channels string
// DocIDs []string
// changesBatchSize int
}
// StartPush will begin a continuous push replication since 0 between the client and server
func (btcc *BlipTesterCollectionClient) StartPush() {
btcc.StartPushWithOpts(BlipTesterPushOptions{Continuous: true, Since: "0"})
}
// TODO: CBG-4401 Implement opts.changesBatchSize and raise default batch to ~20-200 to match real CBL client
const changesBatchSize = 1
type proposeChangeBatchEntry struct {
docID string
version DocVersion
seq clientSeq
history []DocVersion
latestServerVersion DocVersion
isDelete bool
}
func (e proposeChangeBatchEntry) historyStr() string {
sb := strings.Builder{}
for i, version := range e.history {
if i > 0 {
sb.WriteString(",")
}
sb.WriteString(version.RevID)
}
return sb.String()
}
// StartPull will begin a push replication with the given options between the client and server
func (btcc *BlipTesterCollectionClient) StartPushWithOpts(opts BlipTesterPushOptions) {
ctx := btcc.ctx
sinceFromStr, err := db.ParsePlainSequenceID(opts.Since)
require.NoError(btcc.TB(), err)
seq := clientSeq(sinceFromStr.SafeSequence())
btcc.goroutineWg.Add(1)
go func() {
defer btcc.goroutineWg.Done()
// TODO: CBG-4401 wire up opts.changesBatchSize and implement a flush timeout for when the client doesn't fill the batch
changesBatch := make([]proposeChangeBatchEntry, 0, changesBatchSize)
base.DebugfCtx(ctx, base.KeySGTest, "Starting push replication iteration with since=%v", seq)
for change := range btcc.changesSince(btcc.ctx, seq, opts.Continuous) {
changesBatch = append(changesBatch, *change)
if len(changesBatch) >= changesBatchSize {
base.DebugfCtx(ctx, base.KeySGTest, "Sending batch of %d changes", len(changesBatch))
btcc.sendRevisions(ctx, changesBatch)
changesBatch = changesBatch[:0]
}
}
}()
}
// sendProposeChanges sends a proposeChanges request to the server for the given changes, waits for a response and returns it.
func (btcc *BlipTesterCollectionClient) sendProposeChanges(ctx context.Context, changesBatch []proposeChangeBatchEntry) *blip.Message {
proposeChangesRequest := blip.NewRequest()
proposeChangesRequest.SetProfile(db.MessageProposeChanges)
proposeChangesRequestBody := bytes.NewBufferString(`[`)
for i, change := range changesBatch {
if i > 0 {
proposeChangesRequestBody.WriteString(",")
}
proposeChangesRequestBody.WriteString(fmt.Sprintf(`["%s","%s"`, change.docID, change.version.RevID))