Skip to content

Commit

Permalink
script: Add watch command
Browse files Browse the repository at this point in the history
The 'watch' command allows watching a table for changes, similar to the
"cilium-dbg statedb <table> --watch" flag. The "strike out" support is
ported from cilium-dbg/cmd/statedb.go.

The plan is to eventually replace the "cilium-dbg statedb" with the
script commands.

Signed-off-by: Jussi Maki <[email protected]>
  • Loading branch information
joamaki committed Oct 10, 2024
1 parent c6862eb commit c49a1e9
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 7 deletions.
4 changes: 4 additions & 0 deletions any_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ func (t AnyTable) queryIndex(txn ReadTxn, index string, key string) (indexReadTx
return itxn, rawKey, err
}

func (t AnyTable) Changes(txn WriteTxn) (anyChangeIterator, error) {
return t.Meta.anyChanges(txn)
}

func (t AnyTable) TableHeader() []string {
zero := t.Meta.proto()
if tw, ok := zero.(TableWritable); ok {
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ module github.com/cilium/statedb

go 1.23

replace github.com/cilium/hive => ../hive

require (
github.com/cilium/hive v0.0.0-20241009102328-2ab688845f23
github.com/cilium/stream v0.0.0-20240209152734-a0792b51812d
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de
github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
github.com/cilium/hive v0.0.0-20241009102328-2ab688845f23 h1:RQSJdQVdxE9puF18G5RGZZi2jhBb2dtA6zI+HHMyY+Y=
github.com/cilium/hive v0.0.0-20241009102328-2ab688845f23/go.mod h1:pI2GJ1n3SLKIQVFrKF7W6A6gb6BQkZ+3Hp4PAEo5SuI=
github.com/cilium/stream v0.0.0-20240209152734-a0792b51812d h1:p6MgATaKEB9o7iAsk9rlzXNDMNCeKPAkx4Y8f+Zq8X8=
github.com/cilium/stream v0.0.0-20240209152734-a0792b51812d/go.mod h1:3VLiLgs8wfjirkuYqos4t0IBPQ+sXtf3tFkChLm6ARM=
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
Expand All @@ -24,6 +22,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhnIaL+V+BEER86oLrvS+kWobKpbJuye0=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
Expand Down
163 changes: 158 additions & 5 deletions script.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ import (
"regexp"
"slices"
"strings"
"text/tabwriter"
"time"

"github.com/cilium/hive"

Check failure on line 21 in script.go

View workflow job for this annotation

GitHub Actions / test

github.com/cilium/[email protected]: replacement directory ../hive does not exist

Check failure on line 21 in script.go

View workflow job for this annotation

GitHub Actions / test

github.com/cilium/[email protected]: replacement directory ../hive does not exist
"github.com/cilium/hive/script"

Check failure on line 22 in script.go

View workflow job for this annotation

GitHub Actions / test

github.com/cilium/[email protected]: replacement directory ../hive does not exist

Check failure on line 22 in script.go

View workflow job for this annotation

GitHub Actions / test

github.com/cilium/[email protected]: replacement directory ../hive does not exist
"github.com/liggitt/tabwriter"
"golang.org/x/time/rate"
"gopkg.in/yaml.v3"
)

Expand All @@ -35,6 +36,7 @@ func ScriptCommands(db *DB) hive.ScriptCmdOut {
"prefix": PrefixCmd(db),
"list": ListCmd(db),
"lowerbound": LowerBoundCmd(db),
"watch": WatchCmd(db),
"initialized": InitializedCmd(db),
}
subCmdsList := strings.Join(slices.Collect(maps.Keys(subCmds)), ", ")
Expand Down Expand Up @@ -77,7 +79,7 @@ func TablesCmd(db *DB) script.Cmd {
func(s *script.State, args ...string) (script.WaitFunc, error) {
txn := db.ReadTxn()
tbls := db.GetTables(txn)
w := tabwriter.NewWriter(s.LogWriter(), 5, 4, 3, ' ', 0)
w := newTabWriter(s.LogWriter())
fmt.Fprintf(w, "Name\tObject count\tDeleted objects\tIndexes\tInitializers\tGo type\tLast WriteTxn\n")
for _, tbl := range tbls {
idxs := strings.Join(tbl.Indexes(), ", ")
Expand Down Expand Up @@ -207,7 +209,7 @@ func CompareCmd(db *DB) script.Cmd {
return script.Command(
script.CmdUsage{
Summary: "Compare table",
Args: "table file (-timeout=<dur>) (-grep=<pattern>)",
Args: "(-timeout=<dur>) (-grep=<pattern>) table file",
},
func(s *script.State, args ...string) (script.WaitFunc, error) {
flags := newCmdFlagSet()
Expand Down Expand Up @@ -264,7 +266,7 @@ func CompareCmd(db *DB) script.Cmd {
// Create the diff between 'lines' and the rows in the table.
equal := true
var diff bytes.Buffer
w := tabwriter.NewWriter(&diff, 5, 4, 3, ' ', 0)
w := newTabWriter(&diff)
fmt.Fprintf(w, " %s\n", joinByPositions(columnNames, columnPositions))

objs, watch := tbl.AllWatch(db.ReadTxn())
Expand Down Expand Up @@ -509,6 +511,65 @@ func runQueryCmd(query int, db *DB, s *script.State, args []string) (script.Wait
}, nil
}

func WatchCmd(db *DB) script.Cmd {
return script.Command(
script.CmdUsage{
Summary: "Watch a table for changes",
Args: "table",
},
func(s *script.State, args ...string) (script.WaitFunc, error) {
if len(args) < 1 {
return nil, fmt.Errorf("expected table name")
}

tbl, _, err := getTable(db, args[0])
if err != nil {
return nil, err
}
wtxn := db.WriteTxn(tbl.Meta)
iter, err := tbl.Changes(wtxn)
wtxn.Commit()
if err != nil {
return nil, err
}

header := tbl.TableHeader()
if header == nil {
return nil, fmt.Errorf("objects in table %q not TableWritable", tbl.Meta.Name())
}
tw := newTabWriter(&strikethroughWriter{w: s.LogWriter()})
fmt.Fprintf(tw, "%s\n", strings.Join(header, "\t"))

limiter := rate.NewLimiter(10.0, 1)
for {
if err := limiter.Wait(s.Context()); err != nil {
break
}
changes, watch := iter.nextAny(db.ReadTxn())
for change := range changes {
row := change.Object.(TableWritable).TableRow()
if change.Deleted {
fmt.Fprintf(tw, "%s (deleted)%s", strings.Join(row, "\t"), magicStrikethroughNewline)
} else {
fmt.Fprintf(tw, "%s\n", strings.Join(row, "\t"))
}
}
tw.Flush()
if err := s.FlushLog(); err != nil {
return nil, err
}
select {
case <-watch:
case <-s.Context().Done():
return nil, nil
}
}
return nil, nil

},
)
}

func firstOfSeq2[A, B any](it iter.Seq2[A, B]) iter.Seq2[A, B] {
return func(yield func(a A, b B) bool) {
for a, b := range it {
Expand Down Expand Up @@ -576,7 +637,7 @@ func writeObjects(tbl *AnyTable, it iter.Seq2[any, Revision], w io.Writer, colum
if err != nil {
return err
}
tw := tabwriter.NewWriter(w, 5, 4, 3, ' ', 0)
tw := newTabWriter(w)
fmt.Fprintf(tw, "%s\n", strings.Join(header, "\t"))

for obj := range it {
Expand Down Expand Up @@ -682,3 +743,95 @@ func joinByPositions(row []string, positions []int) string {
}
return w.String()
}

// strikethroughWriter writes a line of text that is striken through
// if the line contains the magic character at the end before \n.
// This is used to strike through a tab-formatted line without messing
// up with the widths of the cells.
type strikethroughWriter struct {
buf []byte
strikethrough bool
w io.Writer
}

var (
// Magic character to use at the end of the line to denote that this should be
// striken through.
// This is to avoid messing up the width calculations in the tab writer, which
// would happen if ANSI codes were used directly.
magicStrikethrough = byte('\xfe')
magicStrikethroughNewline = "\xfe\n"
)

func stripTrailingWhitespace(buf []byte) []byte {
idx := bytes.LastIndexFunc(
buf,
func(r rune) bool {
return r != ' ' && r != '\t'
},
)
if idx > 0 {
return buf[:idx+1]
}
return buf
}

func (s *strikethroughWriter) Write(p []byte) (n int, err error) {
write := func(bs []byte) {
if err == nil {
_, e := s.w.Write(bs)
if e != nil {
err = e
}
}
}
for _, c := range p {
switch c {
case '\n':
s.buf = stripTrailingWhitespace(s.buf)

if s.strikethrough {
write(beginStrikethrough)
write(s.buf)
write(endStrikethrough)
} else {
write(s.buf)
}
write(newline)

s.buf = s.buf[:0] // reset len for reuse.
s.strikethrough = false

if err != nil {
return 0, err
}

case magicStrikethrough:
s.strikethrough = true

default:
s.buf = append(s.buf, c)
}
}
return len(p), nil
}

var (
// Use color red and the strikethrough escape
beginStrikethrough = []byte("\033[9m\033[31m")
endStrikethrough = []byte("\033[0m")
newline = []byte("\n")
)

var _ io.Writer = &strikethroughWriter{}

func newTabWriter(out io.Writer) *tabwriter.Writer {
const (
minWidth = 6
width = 4
padding = 3
padChar = ' '
flags = tabwriter.RememberWidths
)
return tabwriter.NewWriter(out, minWidth, width, padding, padChar, flags)
}

0 comments on commit c49a1e9

Please sign in to comment.