diff --git a/any_table.go b/any_table.go index fb60806..ebea155 100644 --- a/any_table.go +++ b/any_table.go @@ -128,6 +128,10 @@ func (t AnyTable) queryIndex(txn ReadTxn, index string, key string) (indexReadTx return itxn, rawKey, err } +func (t AnyTable) Changes(txn WriteTxn) (anyChangeIterator, error) { + return t.Meta.anyChanges(txn) +} + func (t AnyTable) TableHeader() []string { zero := t.Meta.proto() if tw, ok := zero.(TableWritable); ok { diff --git a/go.mod b/go.mod index 7878538..4bb4d02 100644 --- a/go.mod +++ b/go.mod @@ -2,9 +2,12 @@ module github.com/cilium/statedb go 1.23 +replace github.com/cilium/hive => ../hive + require ( github.com/cilium/hive v0.0.0-20241009102328-2ab688845f23 github.com/cilium/stream v0.0.0-20240209152734-a0792b51812d + github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de github.com/spf13/cobra v1.8.0 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.4 diff --git a/go.sum b/go.sum index 3aba0d2..cd10fdc 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,3 @@ -github.com/cilium/hive v0.0.0-20241009102328-2ab688845f23 h1:RQSJdQVdxE9puF18G5RGZZi2jhBb2dtA6zI+HHMyY+Y= -github.com/cilium/hive v0.0.0-20241009102328-2ab688845f23/go.mod h1:pI2GJ1n3SLKIQVFrKF7W6A6gb6BQkZ+3Hp4PAEo5SuI= github.com/cilium/stream v0.0.0-20240209152734-a0792b51812d h1:p6MgATaKEB9o7iAsk9rlzXNDMNCeKPAkx4Y8f+Zq8X8= github.com/cilium/stream v0.0.0-20240209152734-a0792b51812d/go.mod h1:3VLiLgs8wfjirkuYqos4t0IBPQ+sXtf3tFkChLm6ARM= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= @@ -24,6 +22,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhnIaL+V+BEER86oLrvS+kWobKpbJuye0= +github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= diff --git a/script.go b/script.go index 609efc6..108836a 100644 --- a/script.go +++ b/script.go @@ -16,11 +16,12 @@ import ( "regexp" "slices" "strings" - "text/tabwriter" "time" "github.com/cilium/hive" "github.com/cilium/hive/script" + "github.com/liggitt/tabwriter" + "golang.org/x/time/rate" "gopkg.in/yaml.v3" ) @@ -35,6 +36,7 @@ func ScriptCommands(db *DB) hive.ScriptCmdOut { "prefix": PrefixCmd(db), "list": ListCmd(db), "lowerbound": LowerBoundCmd(db), + "watch": WatchCmd(db), "initialized": InitializedCmd(db), } subCmdsList := strings.Join(slices.Collect(maps.Keys(subCmds)), ", ") @@ -77,7 +79,7 @@ func TablesCmd(db *DB) script.Cmd { func(s *script.State, args ...string) (script.WaitFunc, error) { txn := db.ReadTxn() tbls := db.GetTables(txn) - w := tabwriter.NewWriter(s.LogWriter(), 5, 4, 3, ' ', 0) + w := newTabWriter(s.LogWriter()) fmt.Fprintf(w, "Name\tObject count\tDeleted objects\tIndexes\tInitializers\tGo type\tLast WriteTxn\n") for _, tbl := range tbls { idxs := strings.Join(tbl.Indexes(), ", ") @@ -207,7 +209,7 @@ func CompareCmd(db *DB) script.Cmd { return script.Command( script.CmdUsage{ Summary: "Compare table", - Args: "table file (-timeout=) (-grep=)", + Args: "(-timeout=) (-grep=) table file", }, func(s *script.State, args ...string) (script.WaitFunc, error) { flags := newCmdFlagSet() @@ -264,7 +266,7 @@ func CompareCmd(db *DB) script.Cmd { // Create the diff between 'lines' and the rows in the table. equal := true var diff bytes.Buffer - w := tabwriter.NewWriter(&diff, 5, 4, 3, ' ', 0) + w := newTabWriter(&diff) fmt.Fprintf(w, " %s\n", joinByPositions(columnNames, columnPositions)) objs, watch := tbl.AllWatch(db.ReadTxn()) @@ -509,6 +511,65 @@ func runQueryCmd(query int, db *DB, s *script.State, args []string) (script.Wait }, nil } +func WatchCmd(db *DB) script.Cmd { + return script.Command( + script.CmdUsage{ + Summary: "Watch a table for changes", + Args: "table", + }, + func(s *script.State, args ...string) (script.WaitFunc, error) { + if len(args) < 1 { + return nil, fmt.Errorf("expected table name") + } + + tbl, _, err := getTable(db, args[0]) + if err != nil { + return nil, err + } + wtxn := db.WriteTxn(tbl.Meta) + iter, err := tbl.Changes(wtxn) + wtxn.Commit() + if err != nil { + return nil, err + } + + header := tbl.TableHeader() + if header == nil { + return nil, fmt.Errorf("objects in table %q not TableWritable", tbl.Meta.Name()) + } + tw := newTabWriter(&strikethroughWriter{w: s.LogWriter()}) + fmt.Fprintf(tw, "%s\n", strings.Join(header, "\t")) + + limiter := rate.NewLimiter(10.0, 1) + for { + if err := limiter.Wait(s.Context()); err != nil { + break + } + changes, watch := iter.nextAny(db.ReadTxn()) + for change := range changes { + row := change.Object.(TableWritable).TableRow() + if change.Deleted { + fmt.Fprintf(tw, "%s (deleted)%s", strings.Join(row, "\t"), magicStrikethroughNewline) + } else { + fmt.Fprintf(tw, "%s\n", strings.Join(row, "\t")) + } + } + tw.Flush() + if err := s.FlushLog(); err != nil { + return nil, err + } + select { + case <-watch: + case <-s.Context().Done(): + return nil, nil + } + } + return nil, nil + + }, + ) +} + func firstOfSeq2[A, B any](it iter.Seq2[A, B]) iter.Seq2[A, B] { return func(yield func(a A, b B) bool) { for a, b := range it { @@ -576,7 +637,7 @@ func writeObjects(tbl *AnyTable, it iter.Seq2[any, Revision], w io.Writer, colum if err != nil { return err } - tw := tabwriter.NewWriter(w, 5, 4, 3, ' ', 0) + tw := newTabWriter(w) fmt.Fprintf(tw, "%s\n", strings.Join(header, "\t")) for obj := range it { @@ -682,3 +743,95 @@ func joinByPositions(row []string, positions []int) string { } return w.String() } + +// strikethroughWriter writes a line of text that is striken through +// if the line contains the magic character at the end before \n. +// This is used to strike through a tab-formatted line without messing +// up with the widths of the cells. +type strikethroughWriter struct { + buf []byte + strikethrough bool + w io.Writer +} + +var ( + // Magic character to use at the end of the line to denote that this should be + // striken through. + // This is to avoid messing up the width calculations in the tab writer, which + // would happen if ANSI codes were used directly. + magicStrikethrough = byte('\xfe') + magicStrikethroughNewline = "\xfe\n" +) + +func stripTrailingWhitespace(buf []byte) []byte { + idx := bytes.LastIndexFunc( + buf, + func(r rune) bool { + return r != ' ' && r != '\t' + }, + ) + if idx > 0 { + return buf[:idx+1] + } + return buf +} + +func (s *strikethroughWriter) Write(p []byte) (n int, err error) { + write := func(bs []byte) { + if err == nil { + _, e := s.w.Write(bs) + if e != nil { + err = e + } + } + } + for _, c := range p { + switch c { + case '\n': + s.buf = stripTrailingWhitespace(s.buf) + + if s.strikethrough { + write(beginStrikethrough) + write(s.buf) + write(endStrikethrough) + } else { + write(s.buf) + } + write(newline) + + s.buf = s.buf[:0] // reset len for reuse. + s.strikethrough = false + + if err != nil { + return 0, err + } + + case magicStrikethrough: + s.strikethrough = true + + default: + s.buf = append(s.buf, c) + } + } + return len(p), nil +} + +var ( + // Use color red and the strikethrough escape + beginStrikethrough = []byte("\033[9m\033[31m") + endStrikethrough = []byte("\033[0m") + newline = []byte("\n") +) + +var _ io.Writer = &strikethroughWriter{} + +func newTabWriter(out io.Writer) *tabwriter.Writer { + const ( + minWidth = 6 + width = 4 + padding = 3 + padChar = ' ' + flags = tabwriter.RememberWidths + ) + return tabwriter.NewWriter(out, minWidth, width, padding, padChar, flags) +}