Skip to content

Commit

Permalink
http: /changes API for streaming changes
Browse files Browse the repository at this point in the history
This allows streaming changes (upserts and deletes) over the
HTTP API.

Example usage:

  example := statedb.NewRemoteTable[*Example](url, "examples")

  // Stream changes until [ctx] is cancelled.
  changes, errChan := example.Changes(ctx)
  for change, rev, ok := changes.Next(); ok; change, rev, ok = changes.Next() {
    ...
  }
  err := <-errChan

Signed-off-by: Jussi Maki <[email protected]>
  • Loading branch information
joamaki committed Sep 3, 2024
1 parent 620befd commit e4d9ef7
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 13 deletions.
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type DB struct {
defaultHandle Handle
}

type dbRoot = []tableEntry
type dbRoot []tableEntry

type Option func(*opts)

Expand Down
59 changes: 59 additions & 0 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"io"
"net/http"
"time"

"github.com/cilium/statedb/part"
)
Expand All @@ -19,6 +20,7 @@ func (db *DB) HTTPHandler() http.Handler {
mux.HandleFunc("GET /dump", h.dumpAll)
mux.HandleFunc("GET /dump/{table}", h.dumpTable)
mux.HandleFunc("GET /query", h.query)
mux.HandleFunc("GET /changes/{table}", h.changes)
return mux
}

Expand Down Expand Up @@ -79,6 +81,7 @@ func (h dbHandler) query(w http.ResponseWriter, r *http.Request) {
for _, e := range txn.root {
if e.meta.Name() == req.Table {
table = e.meta
break
}
}
if table == nil {
Expand Down Expand Up @@ -147,3 +150,59 @@ func runQuery(indexTxn indexReadTxn, lowerbound bool, queryKey []byte, onObject
}
}
}

func (h dbHandler) changes(w http.ResponseWriter, r *http.Request) {
const keepaliveInterval = 30 * time.Second

enc := json.NewEncoder(w)
tableName := r.PathValue("table")

// Look up the table
var tableMeta TableMeta
for _, e := range h.db.ReadTxn().getTxn().root {
if e.meta.Name() == tableName {
tableMeta = e.meta
break
}
}
if tableMeta == nil {
w.WriteHeader(http.StatusNotFound)
enc.Encode(QueryResponse{Err: fmt.Sprintf("Table %q not found", tableName)})
return
}

// Register for changes.
wtxn := h.db.WriteTxn(tableMeta)
changeIter, err := tableMeta.anyChanges(wtxn)
wtxn.Commit()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
defer changeIter.Close()

w.WriteHeader(http.StatusOK)

ticker := time.NewTicker(keepaliveInterval)
defer ticker.Stop()

for {
for change, _, ok := changeIter.nextAny(); ok; change, _, ok = changeIter.nextAny() {
err := enc.Encode(change)
if err != nil {
panic(err)
}
}
w.(http.Flusher).Flush()
select {
case <-r.Context().Done():
return

case <-ticker.C:
// Send an empty keep-alive
enc.Encode(Change[any]{})

case <-changeIter.Watch(h.db.ReadTxn()):
}
}
}
77 changes: 70 additions & 7 deletions http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
Expand Down Expand Up @@ -77,13 +78,6 @@ func (t *RemoteTable[Obj]) query(ctx context.Context, lowerBound bool, q Query[O
}
return &remoteGetIterator[Obj]{json.NewDecoder(resp.Body), errChanSend}, errChan
}
func (t *RemoteTable[Obj]) Get(ctx context.Context, q Query[Obj]) (Iterator[Obj], <-chan error) {
return t.query(ctx, false, q)
}

func (t *RemoteTable[Obj]) LowerBound(ctx context.Context, q Query[Obj]) (Iterator[Obj], <-chan error) {
return t.query(ctx, true, q)
}

type remoteGetIterator[Obj any] struct {
decoder *json.Decoder
Expand Down Expand Up @@ -125,3 +119,72 @@ func (it *remoteGetIterator[Obj]) Next() (obj Obj, revision Revision, ok bool) {
ok = true
return
}

func (t *RemoteTable[Obj]) Get(ctx context.Context, q Query[Obj]) (Iterator[Obj], <-chan error) {
return t.query(ctx, false, q)
}

func (t *RemoteTable[Obj]) LowerBound(ctx context.Context, q Query[Obj]) (Iterator[Obj], <-chan error) {
return t.query(ctx, true, q)
}

func (t *RemoteTable[Obj]) Changes(ctx context.Context) (iter Iterator[Change[Obj]], errChan <-chan error) {
// Use a channel to return errors so we can use the same Iterator[Obj] interface as StateDB does.
errChanSend := make(chan error, 1)
errChan = errChanSend

url := t.base.JoinPath("/changes", t.tableName)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url.String(), nil)
if err != nil {
errChanSend <- err
close(errChanSend)
return
}

req.Header.Add("Content-Type", "application/json")
req.Header.Add("Accept", "application/json")

resp, err := t.client.Do(req)
if err != nil {
errChanSend <- err
close(errChanSend)
return
}
return &remoteChangeIterator[Obj]{json.NewDecoder(resp.Body), errChanSend}, errChan
}

type remoteChangeIterator[Obj any] struct {
decoder *json.Decoder
errChan chan error
}

func (it *remoteChangeIterator[Obj]) Next() (change Change[Obj], revision Revision, ok bool) {
if it.decoder == nil {
return
}

for {
err := it.decoder.Decode(&change)
if err == nil && change.Revision == 0 {
// Keep-alive message, skip it.
continue
}

if err != nil {
if errors.Is(err, io.EOF) {
it.errChan <- nil
close(it.errChan)
return
}
it.decoder = nil
it.errChan <- fmt.Errorf("decode error: %w", err)
close(it.errChan)
} else {
ok = true
revision = change.Revision
}
break
}

return
}
58 changes: 56 additions & 2 deletions http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/cilium/statedb/part"
)

func httpFixture(t *testing.T) (*DB, Table[testObject], *httptest.Server) {
func httpFixture(t *testing.T) (*DB, RWTable[testObject], *httptest.Server) {
db, table, _ := newTestDB(t, tagsIndex)

ts := httptest.NewServer(db.HTTPHandler())
Expand Down Expand Up @@ -110,7 +110,7 @@ func Test_http_runQuery(t *testing.T) {
}
}

func Test_http_RemoteTable(t *testing.T) {
func Test_http_RemoteTable_Get_LowerBound(t *testing.T) {
ctx := context.TODO()
_, table, ts := httpFixture(t)

Expand All @@ -136,3 +136,57 @@ func Test_http_RemoteTable(t *testing.T) {
assert.EqualValues(t, 4, items[3].ID)
}
}

func Test_http_RemoteTable_Changes(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
db, table, ts := httpFixture(t)

base, err := url.Parse(ts.URL)
require.NoError(t, err, "ParseURL")

remoteTable := NewRemoteTable[testObject](base, table.Name())

iter, errs := remoteTable.LowerBound(ctx, idIndex.Query(0))
items := Collect(iter)
require.NoError(t, <-errs, "LowerBound(0)")
require.Len(t, items, 4)

changeIter, errs := remoteTable.Changes(ctx)
for _, item := range items {
change, rev, ok := changeIter.Next()
require.True(t, ok)
assert.NotZero(t, rev)
assert.NotZero(t, change.Revision)
assert.False(t, change.Deleted)
assert.Equal(t, item.ID, change.Object.ID)
}

wtxn := db.WriteTxn(table)
_, _, err = table.Insert(wtxn, testObject{ID: 5})
require.NoError(t, err, "Insert")
_, _, err = table.Delete(wtxn, testObject{ID: 1})
require.NoError(t, err, "Delete")
wtxn.Commit()

change, rev, ok := changeIter.Next()
require.True(t, ok)
assert.NotZero(t, rev)
assert.NotZero(t, change.Revision)
assert.False(t, change.Deleted)
assert.EqualValues(t, 5, change.Object.ID)

change, rev, ok = changeIter.Next()
require.True(t, ok)
assert.NotZero(t, rev)
assert.NotZero(t, change.Revision)
assert.True(t, change.Deleted)
assert.EqualValues(t, 1, change.Object.ID)

cancel()

change, _, ok = changeIter.Next()
assert.False(t, ok)

err = <-errs
require.ErrorIs(t, err, context.Canceled)
}
19 changes: 19 additions & 0 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,19 @@ func (it *changeIterator[Obj]) Next() (ev Change[Obj], revision uint64, ok bool)
return
}

// nextAny is for implementing the /changes HTTP API where the concrete object
// type is not known.
func (it *changeIterator[Obj]) nextAny() (ev Change[any], revision uint64, ok bool) {
var evTyped Change[Obj]
evTyped, revision, ok = it.Next()
ev = Change[any]{
Object: evTyped.Object,
Revision: evTyped.Revision,
Deleted: evTyped.Deleted,
}
return
}

func (it *changeIterator[Obj]) Watch(txn ReadTxn) <-chan struct{} {
if it.iter == nil {
// Iterator has been exhausted, check if we need to requery
Expand Down Expand Up @@ -267,3 +280,9 @@ func (it *changeIterator[Obj]) Close() {
}
*it = changeIterator[Obj]{}
}

type anyChangeIterator interface {
nextAny() (ev Change[any], revision uint64, ok bool)
Watch(ReadTxn) <-chan struct{}
Close()
}
11 changes: 11 additions & 0 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,17 @@ func (t *genTable[Obj]) Changes(txn WriteTxn) (ChangeIterator[Obj], error) {
return iter, nil
}

// anyChanges returns the anyChangeIterator. Used for implementing the /changes HTTP
// API where we can't work with concrete object types as they're not known and thus
// uninstantiatable.
func (t *genTable[Obj]) anyChanges(txn WriteTxn) (anyChangeIterator, error) {
iter, err := t.Changes(txn)
if err != nil {
return nil, err
}
return iter.(*changeIterator[Obj]), err
}

func (t *genTable[Obj]) sortableMutex() internal.SortableMutex {
return t.smu
}
Expand Down
8 changes: 5 additions & 3 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ type Table[Obj any] interface {
// Change is either an update or a delete of an object. Used by Changes() and
// the Observable().
type Change[Obj any] struct {
Object Obj
Revision Revision
Deleted bool
Object Obj `json:"obj"`
Revision Revision `json:"rev"`
Deleted bool `json:"deleted,omitempty"`
}

type ChangeIterator[Obj any] interface {
Expand Down Expand Up @@ -210,6 +210,7 @@ type RWTable[Obj any] interface {
// the object type (the 'Obj' constraint).
type TableMeta interface {
Name() TableName // The name of the table

tableEntry() tableEntry
tablePos() int
setTablePos(int)
Expand All @@ -218,6 +219,7 @@ type TableMeta interface {
primary() anyIndexer // The untyped primary indexer for the table
secondary() map[string]anyIndexer // Secondary indexers (if any)
sortableMutex() internal.SortableMutex // The sortable mutex for locking the table for writing
anyChanges(txn WriteTxn) (anyChangeIterator, error)
}

// Iterator for iterating objects returned from queries.
Expand Down

0 comments on commit e4d9ef7

Please sign in to comment.