-
Notifications
You must be signed in to change notification settings - Fork 0
/
shards.go
76 lines (62 loc) · 1.91 KB
/
shards.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package rscylla
import (
"context"
"encoding/binary"
"sort"
"time"
"github.com/gocql/gocql"
)
// These functions were copied from github.com/scylladb/scylla-cdc-go.
// shardStreams returns the stream IDs grouped by virtual node index.
func shardStreams(streamIDs []streamID) [][]streamID {
vnodes := make(map[int64][]streamID)
for _, stream := range streamIDs {
idx := getVnodeIndex(stream)
vnodes[idx] = append(vnodes[idx], stream)
}
var shards [][]streamID
// Idx -1 means that we don't know the vnode for given stream,
// therefore we will put those streams into a separate shard each.
for _, stream := range vnodes[-1] {
shards = append(shards, []streamID{stream})
}
delete(vnodes, -1)
// Result order should be consistent.
var idxl []int64
for idx := range vnodes {
idxl = append(idxl, idx)
}
sort.Slice(idxl, func(i, j int) bool {
return idxl[i] < idxl[j]
})
for _, idx := range idxl {
shards = append(shards, vnodes[idx])
}
return shards
}
// getVnodeIndex returns the vnode index fom given stream ID.
// It returns -1 if the stream ID format is unrecognized.
func getVnodeIndex(streamID streamID) int64 {
if len(streamID) != 16 {
// Don't know how to handle other sizes
return -1
}
lowerQword := binary.BigEndian.Uint64(streamID[8:16])
version := lowerQword & (1<<4 - 1)
if version != 1 {
// Unrecognized version
return -1
}
vnodeIdx := (lowerQword >> 4) & (1<<22 - 1)
return int64(vnodeIdx)
}
// getShards returns a all shards of the generation. Shards are streamIDs grouped by virtual node index.
func getShards(ctx context.Context, session *gocql.Session, consistency gocql.Consistency, gen time.Time) ([][]streamID, error) {
var streams []streamID
err := session.Query("SELECT streams FROM "+generationsTableName+" WHERE time = ?", gen).
WithContext(ctx).Consistency(consistency).Scan(&streams)
if err != nil {
return nil, err
}
return shardStreams(streams), err
}