forked from valkey-io/valkey-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
hash.go
209 lines (189 loc) · 7.12 KB
/
hash.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
package om
import (
"context"
"github.com/oklog/ulid/v2"
"reflect"
"strconv"
"time"
"github.com/valkey-io/valkey-go"
)
// NewHashRepository creates an HashRepository.
// The prefix parameter is used as valkey key prefix. The entity stored by the repository will be named in the form of `{prefix}:{id}`
// The schema parameter should be a struct with fields tagged with `valkey:",key"` and `valkey:",ver"`
func NewHashRepository[T any](prefix string, schema T, client valkey.Client, opts ...RepositoryOption) Repository[T] {
repo := &HashRepository[T]{
prefix: prefix,
idx: "hashidx:" + prefix,
typ: reflect.TypeOf(schema),
client: client,
}
repo.schema = newSchema(repo.typ)
repo.factory = newHashConvFactory(repo.typ, repo.schema)
for _, opt := range opts {
opt((*HashRepository[any])(repo))
}
return repo
}
var _ Repository[any] = (*HashRepository[any])(nil)
// HashRepository is an OM repository backed by valkey hash.
type HashRepository[T any] struct {
schema schema
typ reflect.Type
client valkey.Client
factory *hashConvFactory
prefix string
idx string
}
// NewEntity returns an empty entity and will have the `valkey:",key"` field be set with ULID automatically.
func (r *HashRepository[T]) NewEntity() (entity *T) {
var v T
reflect.ValueOf(&v).Elem().Field(r.schema.key.idx).Set(reflect.ValueOf(ulid.Make().String()))
return &v
}
// Fetch an entity whose name is `{prefix}:{id}`
func (r *HashRepository[T]) Fetch(ctx context.Context, id string) (v *T, err error) {
record, err := r.client.Do(ctx, r.client.B().Hgetall().Key(key(r.prefix, id)).Build()).AsStrMap()
if err == nil {
v, err = r.fromHash(record)
}
return v, err
}
// FetchCache is like Fetch, but it uses client side caching mechanism.
func (r *HashRepository[T]) FetchCache(ctx context.Context, id string, ttl time.Duration) (v *T, err error) {
record, err := r.client.DoCache(ctx, r.client.B().Hgetall().Key(key(r.prefix, id)).Cache(), ttl).AsStrMap()
if err == nil {
v, err = r.fromHash(record)
}
return v, err
}
func (r *HashRepository[T]) toExec(entity *T) (val reflect.Value, exec valkey.LuaExec) {
val = reflect.ValueOf(entity).Elem()
fields := r.factory.NewConverter(val).ToHash()
keyVal := fields[r.schema.key.name]
verVal := fields[r.schema.ver.name]
extVal := int64(0)
if r.schema.ext != nil {
if ext, ok := val.Field(r.schema.ext.idx).Interface().(time.Time); ok && !ext.IsZero() {
extVal = ext.UnixMilli()
}
}
exec.Keys = []string{key(r.prefix, keyVal)}
if extVal != 0 {
exec.Args = make([]string, 0, len(fields)*2+1)
} else {
exec.Args = make([]string, 0, len(fields)*2)
}
exec.Args = append(exec.Args, r.schema.ver.name, verVal) // keep the ver field be the first pair for the hashSaveScript
delete(fields, r.schema.ver.name)
for k, v := range fields {
exec.Args = append(exec.Args, k, v)
}
if extVal != 0 {
exec.Args = append(exec.Args, strconv.FormatInt(extVal, 10))
}
return
}
// Save the entity under the valkey key of `{prefix}:{id}`.
// It also uses the `valkey:",ver"` field and lua script to perform optimistic locking and prevent lost update.
func (r *HashRepository[T]) Save(ctx context.Context, entity *T) (err error) {
val, exec := r.toExec(entity)
str, err := hashSaveScript.Exec(ctx, r.client, exec.Keys, exec.Args).ToString()
if valkey.IsValkeyNil(err) {
return ErrVersionMismatch
}
if err == nil {
ver, _ := strconv.ParseInt(str, 10, 64)
val.Field(r.schema.ver.idx).SetInt(ver)
}
return err
}
// SaveMulti batches multiple HashRepository.Save at once
func (r *HashRepository[T]) SaveMulti(ctx context.Context, entities ...*T) []error {
errs := make([]error, len(entities))
vals := make([]reflect.Value, len(entities))
exec := make([]valkey.LuaExec, len(entities))
for i, entity := range entities {
vals[i], exec[i] = r.toExec(entity)
}
for i, resp := range hashSaveScript.ExecMulti(ctx, r.client, exec...) {
if str, err := resp.ToString(); err != nil {
if errs[i] = err; valkey.IsValkeyNil(err) {
errs[i] = ErrVersionMismatch
}
} else {
ver, _ := strconv.ParseInt(str, 10, 64)
vals[i].Field(r.schema.ver.idx).SetInt(ver)
}
}
return errs
}
// Remove the entity under the valkey key of `{prefix}:{id}`.
func (r *HashRepository[T]) Remove(ctx context.Context, id string) error {
return r.client.Do(ctx, r.client.B().Del().Key(key(r.prefix, id)).Build()).Error()
}
// CreateIndex uses FT.CREATE from the RediSearch module to create inverted index under the name `hashidx:{prefix}`
// You can use the cmdFn parameter to mutate the index construction command.
func (r *HashRepository[T]) CreateIndex(ctx context.Context, cmdFn func(schema FtCreateSchema) valkey.Completed) error {
return r.client.Do(ctx, cmdFn(r.client.B().FtCreate().Index(r.idx).OnHash().Prefix(1).Prefix(r.prefix+":").Schema())).Error()
}
// DropIndex uses FT.DROPINDEX from the RediSearch module to drop index whose name is `hashidx:{prefix}`
func (r *HashRepository[T]) DropIndex(ctx context.Context) error {
return r.client.Do(ctx, r.client.B().FtDropindex().Index(r.idx).Build()).Error()
}
// Search uses FT.SEARCH from the RediSearch module to search the index whose name is `hashidx:{prefix}`
// It returns three values:
// 1. total count of match results inside the valkey, and note that it might be larger than returned search result.
// 2. search result, and note that its length might smaller than the first return value.
// 3. error if any
// You can use the cmdFn parameter to mutate the search command.
func (r *HashRepository[T]) Search(ctx context.Context, cmdFn func(search FtSearchIndex) valkey.Completed) (n int64, s []*T, err error) {
n, resp, err := r.client.Do(ctx, cmdFn(r.client.B().FtSearch().Index(r.idx))).AsFtSearch()
if err == nil {
s = make([]*T, len(resp))
for i, v := range resp {
if s[i], err = r.fromFields(v.Doc); err != nil {
return 0, nil, err
}
}
}
return n, s, err
}
// Aggregate performs the FT.AGGREGATE and returns a *AggregateCursor for accessing the results
func (r *HashRepository[T]) Aggregate(ctx context.Context, cmdFn func(agg FtAggregateIndex) valkey.Completed) (cursor *AggregateCursor, err error) {
cid, total, resp, err := r.client.Do(ctx, cmdFn(r.client.B().FtAggregate().Index(r.idx))).AsFtAggregateCursor()
if err != nil {
return nil, err
}
return newAggregateCursor(r.idx, r.client, resp, cid, total), nil
}
// IndexName returns the index name used in the FT.CREATE
func (r *HashRepository[T]) IndexName() string {
return r.idx
}
func (r *HashRepository[T]) fromHash(record map[string]string) (*T, error) {
if len(record) == 0 {
return nil, ErrEmptyHashRecord
}
return r.fromFields(record)
}
func (r *HashRepository[T]) fromFields(fields map[string]string) (*T, error) {
var v T
if err := r.factory.NewConverter(reflect.ValueOf(&v).Elem()).FromHash(fields); err != nil {
return nil, err
}
return &v, nil
}
var hashSaveScript = valkey.NewLuaScript(`
local v = redis.call('HGET',KEYS[1],ARGV[1])
if (not v or v == ARGV[2])
then
ARGV[2] = tostring(tonumber(ARGV[2])+1)
local e = (#ARGV % 2 == 1) and table.remove(ARGV) or nil
if redis.call('HSET',KEYS[1],unpack(ARGV))
then
if e then redis.call('PEXPIREAT',KEYS[1],e) end
return ARGV[2]
end
end
return nil
`)