Skip to content

Commit

Permalink
Add HTTP API and RemoteTable[Obj]
Browse files Browse the repository at this point in the history
Simple HTTP JSON API for dumping and querying the database.

Example usage:

  var db *statedb.DB
  http.Handle("/db", statedb.NewHTTPHandler(db))
  go http.ListenAndServe(":8080")

  url, _ := url.Parse("http://localhost:8080/db")

  var table *statedb.Table[*Example]
  var index *statedb.Index[*Example, string]

  remoteTable := statedb.NewRemoteTable(url, table.Name())

  iter, errs := remoteTable.Get(ctx, index.Query("foo"))
  for obj, rev, ok := iter.Next(); ok; obj, rev, ok = iter.Next() {
    ...
  }
  if err := <-errs; err != nil {
    // handle error
  }

  iter, errs = remoteTable.LowerBound(ctx, statedb.ByRevision(0))
  ...

Signed-off-by: Jussi Maki <[email protected]>
  • Loading branch information
joamaki committed Apr 17, 2024
1 parent 2c22faa commit d71458a
Show file tree
Hide file tree
Showing 6 changed files with 437 additions and 24 deletions.
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ type DB struct {
defaultHandle Handle
}

type dbRoot []tableEntry
type dbRoot = []tableEntry

func NewDB(tables []TableMeta, metrics Metrics) (*DB, error) {
db := &DB{
Expand Down
148 changes: 148 additions & 0 deletions http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package statedb

import (
"encoding/base64"
"encoding/json"
"io"
"net/http"

"github.com/cilium/statedb/part"
)

func (db *DB) HTTPHandler() http.Handler {
h := dbHandler{db}
mux := http.NewServeMux()
mux.HandleFunc("GET /dump", h.dumpAll)
mux.HandleFunc("GET /dump/{table}", h.dumpTable)
mux.HandleFunc("/query", h.query)
return mux
}

type dbHandler struct {
db *DB
}

func (h dbHandler) dumpAll(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
h.db.ReadTxn().WriteJSON(w)
}

func (h dbHandler) dumpTable(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)

var err error
if table := r.PathValue("table"); table != "" {
err = h.db.ReadTxn().WriteJSON(w, r.PathValue("table"))
} else {
err = h.db.ReadTxn().WriteJSON(w)
}
if err != nil {
panic(err)
}
}

func (h dbHandler) query(w http.ResponseWriter, r *http.Request) {
enc := json.NewEncoder(w)

var req QueryRequest
body, err := io.ReadAll(r.Body)
r.Body.Close()
if err != nil {
w.WriteHeader(http.StatusBadRequest)
enc.Encode(QueryResponse{Err: err.Error()})
return
}

if err := json.Unmarshal(body, &req); err != nil {
w.WriteHeader(http.StatusBadRequest)
enc.Encode(QueryResponse{Err: err.Error()})
return
}

queryKey, err := base64.StdEncoding.DecodeString(req.Key)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
enc.Encode(QueryResponse{Err: err.Error()})
return
}

txn := h.db.ReadTxn().getTxn()

// Look up the table
var table TableMeta
for _, e := range txn.root {
if e.meta.Name() == req.Table {
table = e.meta
}
}
if table == nil {
w.WriteHeader(http.StatusNotFound)
enc.Encode(QueryResponse{Err: err.Error()})
return
}

indexPos := table.indexPos(req.Index)

indexTxn, err := txn.indexReadTxn(table, indexPos)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
enc.Encode(QueryResponse{Err: err.Error()})
return
}

w.WriteHeader(http.StatusOK)
onObject := func(obj object) error {
return enc.Encode(QueryResponse{
Rev: obj.revision,
Obj: obj.data,
})
}
runQuery(indexTxn, req.LowerBound, queryKey, onObject)
}

type QueryRequest struct {
Key string `json:"key"` // Base64 encoded query key
Table string `json:"table"`
Index string `json:"index"`
LowerBound bool `json:"lowerbound"`
}

type QueryResponse struct {
Rev uint64 `json:"rev"`
Obj any `json:"obj"`
Err string `json:"err,omitempty"`
}

func runQuery(indexTxn indexReadTxn, lowerbound bool, queryKey []byte, onObject func(object) error) {
var iter *part.Iterator[object]
if lowerbound {
iter = indexTxn.LowerBound(queryKey)
} else {
iter, _ = indexTxn.Prefix(queryKey)
}
var match func([]byte) bool
switch {
case lowerbound:
match = func([]byte) bool { return true }
case indexTxn.unique:
match = func(k []byte) bool { return len(k) == len(queryKey) }
default:
match = func(k []byte) bool {
_, secondary := decodeNonUniqueKey(k)
return len(secondary) == len(queryKey)
}
}
for key, obj, ok := iter.Next(); ok; key, obj, ok = iter.Next() {
if !match(key) {
continue
}
if err := onObject(obj); err != nil {
panic(err)
}
}
}
122 changes: 122 additions & 0 deletions http_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package statedb

import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"errors"
"io"
"net/http"
"net/url"
)

// NewRemoteTable creates a new handle for querying a remote StateDB table over the HTTP.
// Example usage:
//
// devices := statedb.NewRemoteTable[*tables.Device](url.Parse("http://localhost:8080/db"), "devices")
//
// // Get all devices ordered by name.
// iter, errs := devices.LowerBound(ctx, tables.DeviceByName(""))
// for device, revision, ok := iter.Next(); ok; device, revision, ok = iter.Next() { ... }
//
// // Get device by name.
// iter, errs := devices.Get(ctx, tables.DeviceByName("eth0"))
// if dev, revision, ok := iter.Next(); ok { ... }
//
// // Get devices in revision order, e.g. oldest changed devices first.
// iter, errs = devices.LowerBound(ctx, statedb.ByRevision(0))
func NewRemoteTable[Obj any](base *url.URL, table TableName) *RemoteTable[Obj] {
return &RemoteTable[Obj]{base: base, tableName: table}
}

type RemoteTable[Obj any] struct {
base *url.URL
tableName TableName
}

func (t *RemoteTable[Obj]) query(ctx context.Context, lowerBound bool, q Query[Obj]) (iter Iterator[Obj], errChan <-chan error) {
// Use a channel to return errors so we can use the same Iterator[Obj] interface as StateDB does.
errChanSend := make(chan error, 1)
errChan = errChanSend

key := base64.StdEncoding.EncodeToString(q.key)
queryReq := QueryRequest{
Key: key,
Table: t.tableName,
Index: q.index,
LowerBound: lowerBound,
}
bs, err := json.Marshal(&queryReq)
if err != nil {
errChanSend <- err
return
}

url := t.base.JoinPath("/query")
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url.String(), bytes.NewBuffer(bs))
if err != nil {
errChanSend <- err
return
}
req.Header.Add("Content-Type", "application/json")
req.Header.Add("Accept", "application/json")

resp, err := http.DefaultClient.Do(req)
if err != nil {
errChanSend <- err
return
}
return &remoteGetIterator[Obj]{json.NewDecoder(resp.Body), errChanSend}, errChan
}
func (t *RemoteTable[Obj]) Get(ctx context.Context, q Query[Obj]) (Iterator[Obj], <-chan error) {
return t.query(ctx, false, q)
}

func (t *RemoteTable[Obj]) LowerBound(ctx context.Context, q Query[Obj]) (Iterator[Obj], <-chan error) {
return t.query(ctx, true, q)
}

type remoteGetIterator[Obj any] struct {
decoder *json.Decoder
errChan chan error
}

// responseObject is a typed counterpart of [queryResponseObject]
type responseObject[Obj any] struct {
Rev uint64 `json:"rev"`
Obj Obj `json:"obj"`
Err string `json:"err,omitempty"`
}

func (it *remoteGetIterator[Obj]) Next() (obj Obj, revision Revision, ok bool) {
if it.decoder == nil {
return
}

var resp responseObject[Obj]
err := it.decoder.Decode(&resp)
errString := ""
if err != nil {
if errors.Is(err, io.EOF) {
close(it.errChan)
return
}
errString = err.Error()
} else {
errString = resp.Err
}
if errString != "" {
it.decoder = nil
it.errChan <- errors.New(errString)
return
}

obj = resp.Obj
revision = resp.Rev
ok = true
return
}
Loading

0 comments on commit d71458a

Please sign in to comment.