-
Notifications
You must be signed in to change notification settings - Fork 0
/
watch.go
458 lines (385 loc) · 12.2 KB
/
watch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
package veracity
// Watch for log changes, relying on the blob last idtimestamps to do so
// efficiently.
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"github.com/datatrails/go-datatrails-common/azblob"
"github.com/datatrails/go-datatrails-common/logger"
"github.com/datatrails/go-datatrails-merklelog/massifs"
"github.com/datatrails/go-datatrails-merklelog/massifs/snowflakeid"
"github.com/datatrails/go-datatrails-merklelog/massifs/watcher"
// "github.com/datatrails/go-datatrails-common/azblob"
"github.com/urfave/cli/v2"
)
const (
flagCount = "count"
flagHorizon = "horizon"
flagIDSince = "idsince"
flagInterval = "interval"
flagLatest = "latest"
flagSince = "since"
currentEpoch = uint8(1) // good until the end of the first unix epoch
tenantPrefix = "tenant/"
sealIDNotFound = "NOT-FOUND"
// maxPollCount is the maximum number of times to poll for *some* activity.
// Polling always terminates as soon as the first activity is detected.
maxPollCount = 15
// More than this over flows the epoch which is half the length of the unix time epoch
maxHorizon = time.Hour * 100000
horizonAliasMax = "max" // short hand for the largest supported duration
sinceAliasLatest = "latest" // short hand for obtaining the latest change for all watched tenants
rangeDurationParseErrorSubString = "time: invalid duration "
threeSeconds = 3 * time.Second
)
var (
ErrNoChanges = errors.New("no changes found")
)
type WatchConfig struct {
watcher.WatchConfig
WatchTenants map[string]bool
WatchCount int
ReaderURL string
Latest bool
}
// watchReporter abstracts the output interface for WatchForChanges to facilitate unit testing.
type watchReporter interface {
Logf(message string, args ...any)
Outf(message string, args ...any)
}
type defaultReporter struct {
log logger.Logger
}
func (r defaultReporter) Logf(message string, args ...any) {
if r.log == nil {
return
}
r.log.Infof(message, args...)
}
func (r defaultReporter) Outf(message string, args ...any) {
fmt.Printf(message, args...)
}
// NewLogWatcherCmd watches for changes on any log
func NewLogWatcherCmd() *cli.Command {
return &cli.Command{Name: "watch",
Usage: `discover recently active logs
Provide --horizon OR provide either of --since or --idsince
horizon is always inferred from the since arguments if they are provided
`,
Flags: []cli.Flag{
&cli.BoolFlag{
Name: flagLatest,
Usage: `find the latest changes for each requested tenant (no matter how long ago they occured). This is mutualy exclusive with --since, --idsince and --horizon.`,
Value: false,
},
&cli.TimestampFlag{
Name: flagSince,
Usage: "RFC 3339 time stamp, only logs with changes after this are considered, defaults to now. idsince takes precendence if also supplied.",
Layout: time.RFC3339,
},
&cli.StringFlag{
Name: flagIDSince, Aliases: []string{"s"},
Usage: "Start time as an idtimestamp. Start time defaults to now. All results are >= this hex string. If provided, it is used exactly as is. Takes precedence over since",
},
&cli.StringFlag{
Name: flagHorizon,
Aliases: []string{"z"},
Value: "24h",
Usage: "Infer since as now - horizon. Use the alias --horizon=max to force the highest supported value. Otherwise, the format is {number}{units} eg 1h to only see things in the last hour. If watching (count=0), since is re-calculated every interval",
},
&cli.DurationFlag{
Name: flagInterval, Aliases: []string{"d"},
Value: threeSeconds,
Usage: "The default polling interval is once every three seconds, setting the interval to zero disables polling",
},
&cli.IntFlag{
Name: flagCount, Usage: fmt.Sprintf(
"Number of intervals to poll. Polling is terminated once the first activity is seen or after %d attempts regardless", maxPollCount),
Value: 1,
},
},
Action: func(cCtx *cli.Context) error {
var err error
cmd := &CmdCtx{}
ctx := context.Background()
if err = cfgLogging(cmd, cCtx); err != nil {
return err
}
reporter := &defaultReporter{log: cmd.log}
cfg, err := NewWatchConfig(cCtx, cmd)
if err != nil {
return err
}
forceProdUrl := cCtx.String("data-url") == ""
reader, err := cfgReader(cmd, cCtx, forceProdUrl)
if err != nil {
return err
}
return WatchForChanges(ctx, cfg, reader, reporter)
},
}
}
func checkCompatibleFlags(cCtx cliContext) error {
if !cCtx.IsSet(flagLatest) {
return nil
}
latestExcludes := []string{flagHorizon, flagSince, flagIDSince}
for _, excluded := range latestExcludes {
if cCtx.IsSet(excluded) {
return fmt.Errorf("the %s flag is mutualy exclusive with %s", flagLatest, strings.Join(latestExcludes, ", "))
}
}
return nil
}
type cliContext interface {
IsSet(string) bool
Bool(string) bool
Duration(name string) time.Duration
Timestamp(name string) *time.Time
String(name string) string
Int(name string) int
}
// parseHorizon parses a duration string from the command line In accordance
// with the most common reason for parse failure (specifying a large number), On
// an error that looks like a range to large issue, we coerce to the maximum
// hours and ignore the error. Errors that don't contain the marker substring
// are returned as is.
func parseHorizon(horizon string) (time.Duration, error) {
if horizon == horizonAliasMax {
return maxHorizon, nil
}
d, err := time.ParseDuration(horizon)
if err == nil {
if d > maxHorizon {
return 0, fmt.Errorf("the maximum supported duration is --horizon=%v, which has the alias --horizon=max. also consider using --latest", maxHorizon)
}
if d < 0 {
return 0, fmt.Errorf("negative horizon value:%s", horizon)
}
return d, nil
}
if strings.HasPrefix(err.Error(), rangeDurationParseErrorSubString) {
return 0, fmt.Errorf("the supplied horizon was invalid. the maximum supported duration is --horizon=%v, which has the alias --horizon=max. also consider using --latest", maxHorizon)
}
return d, fmt.Errorf("the horizon '%s' is out of range or otherwise invalid. Use --horizon=max to get the largest supported value %v. also consider using --latest", horizon, maxHorizon)
}
// NewWatchConfig derives a configuration from the options set on the command line context
func NewWatchConfig(cCtx cliContext, cmd *CmdCtx) (WatchConfig, error) {
var err error
// --latest is mutualy exclusive with the horizon, since, idsince flags.
if err = checkCompatibleFlags(cCtx); err != nil {
return WatchConfig{}, err
}
cfg := WatchConfig{
Latest: cCtx.Bool(flagLatest),
}
cfg.Interval = cCtx.Duration(flagInterval)
if cCtx.IsSet(flagHorizon) {
cfg.Horizon, err = parseHorizon(cCtx.String(flagHorizon))
if err != nil {
return WatchConfig{}, err
}
}
if cCtx.IsSet(flagSince) {
cfg.Since = *cCtx.Timestamp(flagSince)
}
if cCtx.IsSet(flagIDSince) {
cfg.IDSince = cCtx.String(flagIDSince)
}
if !cCtx.IsSet(flagLatest) {
err = watcher.ConfigDefaults(&cfg.WatchConfig)
if err != nil {
return WatchConfig{}, err
}
if cfg.Interval < time.Second {
return WatchConfig{}, fmt.Errorf("polling more than once per second is not currently supported")
}
}
cfg.WatchCount = min(max(1, cCtx.Int(flagCount)), maxPollCount)
cfg.ReaderURL = cmd.readerURL
tenants := CtxGetTenantOptions(cCtx)
if len(tenants) == 0 {
return cfg, nil
}
cfg.WatchTenants = make(map[string]bool)
for _, t := range tenants {
cfg.WatchTenants[strings.TrimPrefix(t, tenantPrefix)] = true
}
return cfg, nil
}
type Watcher struct {
watcher.Watcher
cfg WatchConfig
reader azblob.Reader
reporter watchReporter
collator watcher.LogTailCollator
}
// FirstFilter accounts for the --latest flag but otherwise falls through to the base implementation
func (w *Watcher) FirstFilter() string {
if !w.cfg.Latest {
return w.Watcher.FirstFilter()
}
// The first idtimestamp of the first epoch
idSince := massifs.IDTimestampToHex(0, 0)
return fmt.Sprintf(`"lastid">='%s'`, idSince)
}
// NextFilter accounts for the --latest flag but otherwise falls through to the base implementation
func (w *Watcher) NextFilter() string {
if !w.cfg.Latest {
return w.Watcher.NextFilter()
}
return w.FirstFilter()
}
func normalizeTenantIdentity(tenant string) string {
if strings.HasPrefix(tenant, tenantPrefix) {
return tenant
}
return fmt.Sprintf("%s%s", tenantPrefix, tenant)
}
// WatchForChanges watches for tenant log chances according to the provided config
func WatchForChanges(
ctx context.Context,
cfg WatchConfig, reader azblob.Reader, reporter watchReporter,
) error {
w := &Watcher{
Watcher: watcher.Watcher{Cfg: cfg.WatchConfig},
cfg: cfg,
reader: reader,
reporter: reporter,
collator: watcher.NewLogTailCollator(),
}
tagsFilter := w.FirstFilter()
count := w.cfg.WatchCount
for {
// For each count, collate all the pages
err := collectPages(ctx, w, tagsFilter)
if err != nil {
return err
}
var activity []TenantActivity
for _, tenant := range w.collator.SortedMassifTenants() {
if w.cfg.WatchTenants != nil && !w.cfg.WatchTenants[tenant] {
continue
}
lt := w.collator.Massifs[tenant]
sealLastID := lastSealID(w.collator, tenant)
// This is console mode output
a := TenantActivity{
Tenant: normalizeTenantIdentity(tenant),
Massif: int(lt.Number),
IDCommitted: lt.LastID, IDConfirmed: sealLastID,
LastModified: lastActivityRFC3339(lt.LastID, sealLastID),
MassifURL: fmt.Sprintf("%s%s", w.cfg.ReaderURL, lt.Path),
}
if sealLastID != sealIDNotFound {
a.SealURL = fmt.Sprintf("%s%s", w.cfg.ReaderURL, w.collator.Seals[tenant].Path)
}
activity = append(activity, a)
}
if activity != nil {
reporter.Logf(
"%d active logs since %v (%s).",
len(w.collator.Massifs),
w.LastSince.Format(time.RFC3339),
w.LastIDSince,
)
reporter.Logf(
"%d tenants sealed since %v (%s).",
len(w.collator.Seals),
w.LastSince.Format(time.RFC3339),
w.LastIDSince,
)
marshaledJson, err := json.MarshalIndent(activity, "", " ")
if err != nil {
return err
}
reporter.Outf(string(marshaledJson))
// Terminate immediately once we have results
return nil
}
// Note we don't allow a zero interval
if count <= 1 || w.Cfg.Interval == 0 {
// exit non zero if nothing is found
return ErrNoChanges
}
count--
tagsFilter = w.NextFilter()
time.Sleep(w.Cfg.Interval)
}
}
// collectPages collects all pages of a single filterList invocation
// and keeps things happy left
func collectPages(
ctx context.Context,
w *Watcher,
tagsFilter string,
filterOpts ...azblob.Option,
) error {
var lastMarker azblob.ListMarker
for {
filtered, err := filteredList(ctx, w.reader, tagsFilter, lastMarker, filterOpts...)
if err != nil {
return err
}
err = w.collator.CollatePage(filtered.Items)
if err != nil {
return err
}
lastMarker = filtered.Marker
if lastMarker == nil || *lastMarker == "" {
break
}
}
return nil
}
// filteredList makes adding the lastMarker option to the FilteredList call 'happy to the left'
func filteredList(
ctx context.Context,
reader azblob.Reader,
tagsFilter string,
marker azblob.ListMarker,
filterOpts ...azblob.Option,
) (*azblob.FilterResponse, error) {
if marker == nil || *marker == "" {
return reader.FilteredList(ctx, tagsFilter)
}
return reader.FilteredList(ctx, tagsFilter, append(filterOpts, azblob.WithListMarker(marker))...)
}
func lastSealID(c watcher.LogTailCollator, tenant string) string {
if _, ok := c.Seals[tenant]; ok {
return c.Seals[tenant].LastID
}
return sealIDNotFound
}
func lastActivityRFC3339(idmassif, idseal string) string {
tmassif, err := lastActivity(idmassif)
if err != nil {
return ""
}
if idseal == sealIDNotFound {
return tmassif.UTC().Format(time.RFC3339)
}
tseal, err := lastActivity(idseal)
if err != nil {
return tmassif.UTC().Format(time.RFC3339)
}
if tmassif.After(tseal) {
return tmassif.UTC().Format(time.RFC3339)
}
return tseal.UTC().Format(time.RFC3339)
}
func lastActivity(idTimstamp string) (time.Time, error) {
id, epoch, err := massifs.SplitIDTimestampHex(idTimstamp)
if err != nil {
return time.Time{}, err
}
ms, err := snowflakeid.IDUnixMilli(id, epoch)
if err != nil {
return time.Time{}, err
}
return time.UnixMilli(ms), nil
}