From e4d9ef7b4c5fdb2b072945f7acdfa157d1db1cca Mon Sep 17 00:00:00 2001 From: Jussi Maki Date: Fri, 30 Aug 2024 16:33:30 +0200 Subject: [PATCH] http: /changes API for streaming changes This allows streaming changes (upserts and deletes) over the HTTP API. Example usage: example := statedb.NewRemoteTable[*Example](url, "examples") // Stream changes until [ctx] is cancelled. changes, errChan := example.Changes(ctx) for change, rev, ok := changes.Next(); ok; change, rev, ok = changes.Next() { ... } err := <-errChan Signed-off-by: Jussi Maki --- db.go | 2 +- http.go | 59 ++++++++++++++++++++++++++++++++++++++ http_client.go | 77 +++++++++++++++++++++++++++++++++++++++++++++----- http_test.go | 58 +++++++++++++++++++++++++++++++++++-- iterator.go | 19 +++++++++++++ table.go | 11 ++++++++ types.go | 8 ++++-- 7 files changed, 221 insertions(+), 13 deletions(-) diff --git a/db.go b/db.go index c4eb33f..f5003c3 100644 --- a/db.go +++ b/db.go @@ -93,7 +93,7 @@ type DB struct { defaultHandle Handle } -type dbRoot = []tableEntry +type dbRoot []tableEntry type Option func(*opts) diff --git a/http.go b/http.go index ad9cd22..5cd9863 100644 --- a/http.go +++ b/http.go @@ -9,6 +9,7 @@ import ( "fmt" "io" "net/http" + "time" "github.com/cilium/statedb/part" ) @@ -19,6 +20,7 @@ func (db *DB) HTTPHandler() http.Handler { mux.HandleFunc("GET /dump", h.dumpAll) mux.HandleFunc("GET /dump/{table}", h.dumpTable) mux.HandleFunc("GET /query", h.query) + mux.HandleFunc("GET /changes/{table}", h.changes) return mux } @@ -79,6 +81,7 @@ func (h dbHandler) query(w http.ResponseWriter, r *http.Request) { for _, e := range txn.root { if e.meta.Name() == req.Table { table = e.meta + break } } if table == nil { @@ -147,3 +150,59 @@ func runQuery(indexTxn indexReadTxn, lowerbound bool, queryKey []byte, onObject } } } + +func (h dbHandler) changes(w http.ResponseWriter, r *http.Request) { + const keepaliveInterval = 30 * time.Second + + enc := json.NewEncoder(w) + tableName := r.PathValue("table") + + // Look up the table + var tableMeta TableMeta + for _, e := range h.db.ReadTxn().getTxn().root { + if e.meta.Name() == tableName { + tableMeta = e.meta + break + } + } + if tableMeta == nil { + w.WriteHeader(http.StatusNotFound) + enc.Encode(QueryResponse{Err: fmt.Sprintf("Table %q not found", tableName)}) + return + } + + // Register for changes. + wtxn := h.db.WriteTxn(tableMeta) + changeIter, err := tableMeta.anyChanges(wtxn) + wtxn.Commit() + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + defer changeIter.Close() + + w.WriteHeader(http.StatusOK) + + ticker := time.NewTicker(keepaliveInterval) + defer ticker.Stop() + + for { + for change, _, ok := changeIter.nextAny(); ok; change, _, ok = changeIter.nextAny() { + err := enc.Encode(change) + if err != nil { + panic(err) + } + } + w.(http.Flusher).Flush() + select { + case <-r.Context().Done(): + return + + case <-ticker.C: + // Send an empty keep-alive + enc.Encode(Change[any]{}) + + case <-changeIter.Watch(h.db.ReadTxn()): + } + } +} diff --git a/http_client.go b/http_client.go index ae7a3b4..29584db 100644 --- a/http_client.go +++ b/http_client.go @@ -9,6 +9,7 @@ import ( "encoding/base64" "encoding/json" "errors" + "fmt" "io" "net/http" "net/url" @@ -77,13 +78,6 @@ func (t *RemoteTable[Obj]) query(ctx context.Context, lowerBound bool, q Query[O } return &remoteGetIterator[Obj]{json.NewDecoder(resp.Body), errChanSend}, errChan } -func (t *RemoteTable[Obj]) Get(ctx context.Context, q Query[Obj]) (Iterator[Obj], <-chan error) { - return t.query(ctx, false, q) -} - -func (t *RemoteTable[Obj]) LowerBound(ctx context.Context, q Query[Obj]) (Iterator[Obj], <-chan error) { - return t.query(ctx, true, q) -} type remoteGetIterator[Obj any] struct { decoder *json.Decoder @@ -125,3 +119,72 @@ func (it *remoteGetIterator[Obj]) Next() (obj Obj, revision Revision, ok bool) { ok = true return } + +func (t *RemoteTable[Obj]) Get(ctx context.Context, q Query[Obj]) (Iterator[Obj], <-chan error) { + return t.query(ctx, false, q) +} + +func (t *RemoteTable[Obj]) LowerBound(ctx context.Context, q Query[Obj]) (Iterator[Obj], <-chan error) { + return t.query(ctx, true, q) +} + +func (t *RemoteTable[Obj]) Changes(ctx context.Context) (iter Iterator[Change[Obj]], errChan <-chan error) { + // Use a channel to return errors so we can use the same Iterator[Obj] interface as StateDB does. + errChanSend := make(chan error, 1) + errChan = errChanSend + + url := t.base.JoinPath("/changes", t.tableName) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url.String(), nil) + if err != nil { + errChanSend <- err + close(errChanSend) + return + } + + req.Header.Add("Content-Type", "application/json") + req.Header.Add("Accept", "application/json") + + resp, err := t.client.Do(req) + if err != nil { + errChanSend <- err + close(errChanSend) + return + } + return &remoteChangeIterator[Obj]{json.NewDecoder(resp.Body), errChanSend}, errChan +} + +type remoteChangeIterator[Obj any] struct { + decoder *json.Decoder + errChan chan error +} + +func (it *remoteChangeIterator[Obj]) Next() (change Change[Obj], revision Revision, ok bool) { + if it.decoder == nil { + return + } + + for { + err := it.decoder.Decode(&change) + if err == nil && change.Revision == 0 { + // Keep-alive message, skip it. + continue + } + + if err != nil { + if errors.Is(err, io.EOF) { + it.errChan <- nil + close(it.errChan) + return + } + it.decoder = nil + it.errChan <- fmt.Errorf("decode error: %w", err) + close(it.errChan) + } else { + ok = true + revision = change.Revision + } + break + } + + return +} diff --git a/http_test.go b/http_test.go index df03d60..939bc62 100644 --- a/http_test.go +++ b/http_test.go @@ -19,7 +19,7 @@ import ( "github.com/cilium/statedb/part" ) -func httpFixture(t *testing.T) (*DB, Table[testObject], *httptest.Server) { +func httpFixture(t *testing.T) (*DB, RWTable[testObject], *httptest.Server) { db, table, _ := newTestDB(t, tagsIndex) ts := httptest.NewServer(db.HTTPHandler()) @@ -110,7 +110,7 @@ func Test_http_runQuery(t *testing.T) { } } -func Test_http_RemoteTable(t *testing.T) { +func Test_http_RemoteTable_Get_LowerBound(t *testing.T) { ctx := context.TODO() _, table, ts := httpFixture(t) @@ -136,3 +136,57 @@ func Test_http_RemoteTable(t *testing.T) { assert.EqualValues(t, 4, items[3].ID) } } + +func Test_http_RemoteTable_Changes(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + db, table, ts := httpFixture(t) + + base, err := url.Parse(ts.URL) + require.NoError(t, err, "ParseURL") + + remoteTable := NewRemoteTable[testObject](base, table.Name()) + + iter, errs := remoteTable.LowerBound(ctx, idIndex.Query(0)) + items := Collect(iter) + require.NoError(t, <-errs, "LowerBound(0)") + require.Len(t, items, 4) + + changeIter, errs := remoteTable.Changes(ctx) + for _, item := range items { + change, rev, ok := changeIter.Next() + require.True(t, ok) + assert.NotZero(t, rev) + assert.NotZero(t, change.Revision) + assert.False(t, change.Deleted) + assert.Equal(t, item.ID, change.Object.ID) + } + + wtxn := db.WriteTxn(table) + _, _, err = table.Insert(wtxn, testObject{ID: 5}) + require.NoError(t, err, "Insert") + _, _, err = table.Delete(wtxn, testObject{ID: 1}) + require.NoError(t, err, "Delete") + wtxn.Commit() + + change, rev, ok := changeIter.Next() + require.True(t, ok) + assert.NotZero(t, rev) + assert.NotZero(t, change.Revision) + assert.False(t, change.Deleted) + assert.EqualValues(t, 5, change.Object.ID) + + change, rev, ok = changeIter.Next() + require.True(t, ok) + assert.NotZero(t, rev) + assert.NotZero(t, change.Revision) + assert.True(t, change.Deleted) + assert.EqualValues(t, 1, change.Object.ID) + + cancel() + + change, _, ok = changeIter.Next() + assert.False(t, ok) + + err = <-errs + require.ErrorIs(t, err, context.Canceled) +} diff --git a/iterator.go b/iterator.go index bf8eeed..843134a 100644 --- a/iterator.go +++ b/iterator.go @@ -232,6 +232,19 @@ func (it *changeIterator[Obj]) Next() (ev Change[Obj], revision uint64, ok bool) return } +// nextAny is for implementing the /changes HTTP API where the concrete object +// type is not known. +func (it *changeIterator[Obj]) nextAny() (ev Change[any], revision uint64, ok bool) { + var evTyped Change[Obj] + evTyped, revision, ok = it.Next() + ev = Change[any]{ + Object: evTyped.Object, + Revision: evTyped.Revision, + Deleted: evTyped.Deleted, + } + return +} + func (it *changeIterator[Obj]) Watch(txn ReadTxn) <-chan struct{} { if it.iter == nil { // Iterator has been exhausted, check if we need to requery @@ -267,3 +280,9 @@ func (it *changeIterator[Obj]) Close() { } *it = changeIterator[Obj]{} } + +type anyChangeIterator interface { + nextAny() (ev Change[any], revision uint64, ok bool) + Watch(ReadTxn) <-chan struct{} + Close() +} diff --git a/table.go b/table.go index 7f7cd71..b2a5816 100644 --- a/table.go +++ b/table.go @@ -422,6 +422,17 @@ func (t *genTable[Obj]) Changes(txn WriteTxn) (ChangeIterator[Obj], error) { return iter, nil } +// anyChanges returns the anyChangeIterator. Used for implementing the /changes HTTP +// API where we can't work with concrete object types as they're not known and thus +// uninstantiatable. +func (t *genTable[Obj]) anyChanges(txn WriteTxn) (anyChangeIterator, error) { + iter, err := t.Changes(txn) + if err != nil { + return nil, err + } + return iter.(*changeIterator[Obj]), err +} + func (t *genTable[Obj]) sortableMutex() internal.SortableMutex { return t.smu } diff --git a/types.go b/types.go index b84044b..7c63828 100644 --- a/types.go +++ b/types.go @@ -96,9 +96,9 @@ type Table[Obj any] interface { // Change is either an update or a delete of an object. Used by Changes() and // the Observable(). type Change[Obj any] struct { - Object Obj - Revision Revision - Deleted bool + Object Obj `json:"obj"` + Revision Revision `json:"rev"` + Deleted bool `json:"deleted,omitempty"` } type ChangeIterator[Obj any] interface { @@ -210,6 +210,7 @@ type RWTable[Obj any] interface { // the object type (the 'Obj' constraint). type TableMeta interface { Name() TableName // The name of the table + tableEntry() tableEntry tablePos() int setTablePos(int) @@ -218,6 +219,7 @@ type TableMeta interface { primary() anyIndexer // The untyped primary indexer for the table secondary() map[string]anyIndexer // Secondary indexers (if any) sortableMutex() internal.SortableMutex // The sortable mutex for locking the table for writing + anyChanges(txn WriteTxn) (anyChangeIterator, error) } // Iterator for iterating objects returned from queries.