-
Notifications
You must be signed in to change notification settings - Fork 2
/
http_client.go
172 lines (152 loc) · 4.45 KB
/
http_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium
package statedb
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"iter"
"net/http"
"net/url"
)
// NewRemoteTable creates a new handle for querying a remote StateDB table over the HTTP.
// Example usage:
//
// devices := statedb.NewRemoteTable[*tables.Device](url.Parse("http://localhost:8080/db"), "devices")
//
// // Get all devices ordered by name.
// iter, errs := devices.LowerBound(ctx, tables.DeviceByName(""))
// for device, revision, ok := iter.Next(); ok; device, revision, ok = iter.Next() { ... }
//
// // Get device by name.
// iter, errs := devices.Get(ctx, tables.DeviceByName("eth0"))
// if dev, revision, ok := iter.Next(); ok { ... }
//
// // Get devices in revision order, e.g. oldest changed devices first.
// iter, errs = devices.LowerBound(ctx, statedb.ByRevision(0))
func NewRemoteTable[Obj any](base *url.URL, table TableName) *RemoteTable[Obj] {
return &RemoteTable[Obj]{base: base, tableName: table}
}
type RemoteTable[Obj any] struct {
client http.Client
base *url.URL
tableName TableName
}
func (t *RemoteTable[Obj]) SetTransport(tr *http.Transport) {
t.client.Transport = tr
}
func (t *RemoteTable[Obj]) query(ctx context.Context, lowerBound bool, q Query[Obj]) (seq iter.Seq2[Obj, Revision], errChan <-chan error) {
// Use a channel to return errors so we can use the same Iterator[Obj] interface as StateDB does.
errChanSend := make(chan error, 1)
errChan = errChanSend
key := base64.StdEncoding.EncodeToString(q.key)
queryReq := QueryRequest{
Key: key,
Table: t.tableName,
Index: q.index,
LowerBound: lowerBound,
}
bs, err := json.Marshal(&queryReq)
if err != nil {
errChanSend <- err
return
}
url := t.base.JoinPath("/query")
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url.String(), bytes.NewBuffer(bs))
if err != nil {
errChanSend <- err
return
}
req.Header.Add("Content-Type", "application/json")
req.Header.Add("Accept", "application/json")
resp, err := t.client.Do(req)
if err != nil {
errChanSend <- err
return
}
return remoteGetSeq[Obj](json.NewDecoder(resp.Body), errChanSend), errChan
}
func (t *RemoteTable[Obj]) Get(ctx context.Context, q Query[Obj]) (iter.Seq2[Obj, Revision], <-chan error) {
return t.query(ctx, false, q)
}
func (t *RemoteTable[Obj]) LowerBound(ctx context.Context, q Query[Obj]) (iter.Seq2[Obj, Revision], <-chan error) {
return t.query(ctx, true, q)
}
// responseObject is a typed counterpart of [queryResponseObject]
type responseObject[Obj any] struct {
Rev uint64 `json:"rev"`
Obj Obj `json:"obj"`
Err string `json:"err,omitempty"`
}
func remoteGetSeq[Obj any](dec *json.Decoder, errChan chan error) iter.Seq2[Obj, Revision] {
return func(yield func(Obj, Revision) bool) {
for {
var resp responseObject[Obj]
err := dec.Decode(&resp)
errString := ""
if err != nil {
if errors.Is(err, io.EOF) {
close(errChan)
break
}
errString = "Decode error: " + err.Error()
} else {
errString = resp.Err
}
if errString != "" {
errChan <- errors.New(errString)
break
}
if !yield(resp.Obj, resp.Rev) {
break
}
}
}
}
func (t *RemoteTable[Obj]) Changes(ctx context.Context) (seq iter.Seq2[Change[Obj], Revision], errChan <-chan error) {
// Use a channel to return errors so we can use the same Iterator[Obj] interface as StateDB does.
errChanSend := make(chan error, 1)
errChan = errChanSend
url := t.base.JoinPath("/changes", t.tableName)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url.String(), nil)
if err != nil {
errChanSend <- err
close(errChanSend)
return
}
req.Header.Add("Content-Type", "application/json")
req.Header.Add("Accept", "application/json")
resp, err := t.client.Do(req)
if err != nil {
errChanSend <- err
close(errChanSend)
return
}
return remoteChangeSeq[Obj](json.NewDecoder(resp.Body), errChanSend), errChan
}
func remoteChangeSeq[Obj any](dec *json.Decoder, errChan chan error) iter.Seq2[Change[Obj], Revision] {
return func(yield func(Change[Obj], Revision) bool) {
defer close(errChan)
for {
var change Change[Obj]
err := dec.Decode(&change)
if err == nil && change.Revision == 0 {
// Keep-alive message, skip it.
continue
}
if err != nil {
if !errors.Is(err, io.EOF) {
errChan <- fmt.Errorf("decode error: %w", err)
}
return
}
if !yield(change, change.Revision) {
return
}
}
}
}