Skip to content

Commit 4ecfbe9

Browse files
committed
feat: migrate to AsyncIterable and remove Observable
Remove Observable<T> completely in favor of AsyncIterable<T>. Uses the useAsyncIterable flag in ts-proto. stephenh/ts-proto#605 Signed-off-by: Christian Stewart <[email protected]>
1 parent 31e8138 commit 4ecfbe9

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+2856
-505
lines changed

.github/workflows/tests.yml

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ jobs:
2626
uses: actions/setup-node@v3
2727
with:
2828
node-version: ${{ matrix.node }}
29+
cache: 'yarn'
2930
- name: Yarn install
3031
run: yarn install
3132
- name: Depcheck
@@ -34,7 +35,28 @@ jobs:
3435
run: yarn run build
3536
- name: Lint Javascript
3637
run: yarn run lint:js
38+
- name: Test Js
39+
run: yarn test:js
40+
# Cache go build cache, used to speedup go test
41+
# https://markphelps.me/posts/speed-up-your-go-builds-with-actions-cache/
42+
- id: go-cache-paths
43+
run: |
44+
echo "::set-output name=go-build::$(go env GOCACHE)"
45+
echo "::set-output name=go-mod::$(go env GOMODCACHE)"
46+
- name: Go Build Cache
47+
uses: actions/cache@v3
48+
with:
49+
path: ${{ steps.go-cache-paths.outputs.go-build }}
50+
key: ${{ runner.os }}-go-build-${{ hashFiles('**/go.sum') }}
51+
# Cache go mod cache, used to speedup builds
52+
- name: Go Mod Cache
53+
uses: actions/cache@v3
54+
with:
55+
path: ${{ steps.go-cache-paths.outputs.go-mod }}
56+
key: ${{ runner.os }}-go-mod-${{ hashFiles('**/go.sum') }}
57+
- name: Lint Go
58+
run: yarn run lint:go
3759
- name: Test Go
3860
run: make test
3961
- name: Test integration of Go and TypeScript
40-
run: make integration
62+
run: yarn integration

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ gents: $(PROTOWRAP) node_modules
109109
--ts_proto_opt=forceLong=long \
110110
--ts_proto_opt=oneof=unions \
111111
--ts_proto_opt=outputServices=default,outputServices=generic-definitions \
112+
--ts_proto_opt=useAsyncIterable=true \
112113
--proto_path $$(pwd)/vendor \
113114
--print_structure \
114115
--only_specified_files \

README.md

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,17 @@ Can use any Stream multiplexer: defaults to [libp2p-mplex] over a WebSocket.
1919

2020
# Usage
2121

22-
Starting with the [protobuf-project] repository on the "starpc" branch.
22+
Start with the [protobuf-project] template repository on the "starpc" branch.
23+
24+
[protobuf-project]: https://github.com/aperturerobotics/protobuf-project/tree/starpc
2325

2426
Use "git add" to add your new .proto files, then `yarn gen` to generate the
25-
TypeScript and Go code for them.
27+
TypeScript and Go code.
2628

2729
# Examples
2830

29-
See the [protobuf-project] template on the "starpc" branch.
30-
3131
The demo/boilerplate project implements the Echo example below.
3232

33-
[protobuf-project]: https://github.com/aperturerobotics/protobuf-project/tree/starpc
34-
3533
This repository uses protowrap, see the [Makefile](./Makefile).
3634

3735
## Protobuf
@@ -118,6 +116,7 @@ This example demonstrates both the server and client:
118116
import { pipe } from 'it-pipe'
119117
import { createHandler, createMux, Server, Client, Conn } from 'srpc'
120118
import { EchoerDefinition, EchoerServer, runClientTest } from 'srpc/echo'
119+
import { pushable } from 'it-pushable'
121120

122121
const mux = createMux()
123122
const echoer = new EchoerServer()
@@ -129,20 +128,31 @@ const serverConn = new Conn(server)
129128
pipe(clientConn, serverConn, clientConn)
130129
const client = new Client(clientConn.buildOpenStreamFunc())
131130

131+
// call the unary rpc
132132
console.log('Calling Echo: unary call...')
133133
let result = await demoServiceClient.Echo({
134134
body: 'Hello world!',
135135
})
136136
console.log('success: output', result.body)
137137

138-
const clientRequestStream = new Observable<EchoMsg>(subscriber => {
139-
subscriber.next({body: 'Hello world from streaming request.'})
140-
subscriber.complete()
141-
})
138+
// create a client -> server stream
139+
const clientRequestStream = pushable<EchoMsg>({objectMode: true})
140+
clientRequestStream.push({body: 'Hello world from streaming request.'})
141+
clientRequestStream.end()
142142

143+
// call the client -> server streaming rpc
143144
console.log('Calling EchoClientStream: client -> server...')
144145
result = await demoServiceClient.EchoClientStream(clientRequestStream)
145146
console.log('success: output', result.body)
147+
148+
// call the server -> client streaming rpc
149+
console.log('Calling EchoServerStream: server -> client...')
150+
const serverStream = demoServiceClient.EchoServerStream({
151+
body: 'Hello world from server to client streaming request.',
152+
})
153+
for await (const msg of serverStream) {
154+
console.log('server: output', msg.body)
155+
}
146156
```
147157

148158
## WebSocket

doc/overview.md

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,34 +4,34 @@ For a service:
44

55
```protobuf
66
syntax = "proto3";
7-
package web.demo;
7+
package simple;
88
99
service DemoService {
10-
rpc DemoEcho(DemoEchoMsg) returns (DemoEchoMsg) {}
10+
rpc BidiStreaming(stream TestMessage) returns (stream TestMessage) {}
1111
}
1212
13-
message DemoEchoMsg {
14-
string msg = 1;
13+
message TestMessage {
14+
string value = 1;
1515
}
1616
```
1717

1818
ts-proto generates a RPC interface like:
1919

2020
```typescript
2121
export interface DemoService {
22-
DemoEcho(request: DemoEchoMsg): Promise<DemoEchoMsg>
22+
BidiStreaming(request: AsyncIterable<TestMessage>): AsyncIterable<TestMsg>
2323
}
2424

2525
export class DemoServiceClientImpl implements DemoService {
2626
private readonly rpc: Rpc
2727
constructor(rpc: Rpc) {
2828
this.rpc = rpc
29-
this.DemoEcho = this.DemoEcho.bind(this)
29+
this.BidiStreaming = this.BidiStreaming.bind(this)
3030
}
31-
DemoEcho(request: DemoEchoMsg): Promise<DemoEchoMsg> {
32-
const data = DemoEchoMsg.encode(request).finish()
33-
const promise = this.rpc.request('web.demo.DemoService', 'DemoEcho', data)
34-
return promise.then((data) => DemoEchoMsg.decode(new _m0.Reader(data)))
31+
BidiStreaming(request: AsyncIterable<TestMessage>): AsyncIterable<TestMessage> {
32+
const data = TestMessage.encodeTransform(request);
33+
const result = this.rpc.bidirectionalStreamingRequest('simple.Test', 'BidiStreaming', data);
34+
return TestMessage.decodeTransform(result);
3535
}
3636
}
3737
```
@@ -48,18 +48,18 @@ interface Rpc {
4848
clientStreamingRequest(
4949
service: string,
5050
method: string,
51-
data: Observable<Uint8Array>
51+
data: AsyncIterable<Uint8Array>
5252
): Promise<Uint8Array>
5353
serverStreamingRequest(
5454
service: string,
5555
method: string,
5656
data: Uint8Array
57-
): Observable<Uint8Array>
57+
): AsyncIterable<Uint8Array>
5858
bidirectionalStreamingRequest(
5959
service: string,
6060
method: string,
61-
data: Observable<Uint8Array>
62-
): Observable<Uint8Array>
61+
data: AsyncIterable<Uint8Array>
62+
): AsyncIterable<Uint8Array>
6363
}
6464
```
6565

e2e/e2e.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
import { pipe } from 'it-pipe'
22
import { createHandler, createMux, Server, Client, Conn } from '../srpc'
33
import { EchoerDefinition, EchoerServer, runClientTest } from '../echo'
4+
import { runRpcStreamTest } from '../echo/client-test'
45

56
async function runRPC() {
67
const mux = createMux()
7-
const echoer = new EchoerServer()
8-
mux.register(createHandler(EchoerDefinition, echoer))
98
const server = new Server(mux)
9+
const echoer = new EchoerServer(server)
10+
mux.register(createHandler(EchoerDefinition, echoer))
1011

1112
const clientConn = new Conn()
1213
const serverConn = new Conn(server)
1314
pipe(clientConn, serverConn, clientConn)
1415
const client = new Client(clientConn.buildOpenStreamFunc())
1516

17+
await runRpcStreamTest(client)
1618
await runClientTest(client)
1719
}
1820

@@ -21,7 +23,6 @@ runRPC()
2123
console.log('finished successfully')
2224
})
2325
.catch((err) => {
24-
console.log('failed')
2526
console.error(err)
2627
process.exit(1)
2728
})

e2e/e2e_test.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"time"
99

1010
"github.com/aperturerobotics/starpc/echo"
11+
"github.com/aperturerobotics/starpc/rpcstream"
1112
"github.com/aperturerobotics/starpc/srpc"
1213
"github.com/libp2p/go-libp2p/p2p/muxer/mplex"
1314
mp "github.com/libp2p/go-mplex"
@@ -17,8 +18,8 @@ import (
1718
// RunE2E runs an end to end test with a callback.
1819
func RunE2E(t *testing.T, cb func(client echo.SRPCEchoerClient) error) {
1920
// construct the server
20-
echoServer := &echo.EchoServer{}
2121
mux := srpc.NewMux()
22+
echoServer := echo.NewEchoServer(mux)
2223
if err := echo.SRPCRegisterEchoer(mux, echoServer); err != nil {
2324
t.Fatal(err.Error())
2425
}
@@ -179,3 +180,25 @@ func TestE2E_BidiStream(t *testing.T) {
179180
return strm.Close()
180181
})
181182
}
183+
184+
func TestE2E_RpcStream(t *testing.T) {
185+
ctx := context.Background()
186+
RunE2E(t, func(client echo.SRPCEchoerClient) error {
187+
openStreamFn := rpcstream.NewRpcStreamOpenStream(func(ctx context.Context) (rpcstream.RpcStream, error) {
188+
return client.RpcStream(ctx)
189+
}, "test")
190+
proxiedClient := srpc.NewClient(openStreamFn)
191+
proxiedSvc := echo.NewSRPCEchoerClient(proxiedClient)
192+
193+
// run a RPC proxied over another RPC
194+
resp, err := proxiedSvc.Echo(ctx, &echo.EchoMsg{Body: "hello world"})
195+
if err != nil {
196+
return err
197+
}
198+
if resp.GetBody() != "hello world" {
199+
return errors.Errorf("response body incorrect: %q", resp.GetBody())
200+
}
201+
202+
return nil
203+
})
204+
}

echo/client-test.ts

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { Client } from '../srpc/index.js'
22
import { EchoerClientImpl, EchoMsg } from './echo.pb.js'
3-
import { Observable } from 'rxjs'
3+
import { pushable } from 'it-pushable'
4+
import { buildRpcStreamOpenStream } from '../rpcstream/rpcstream.js'
45

56
export async function runClientTest(client: Client) {
67
const demoServiceClient = new EchoerClientImpl(client)
@@ -12,10 +13,9 @@ export async function runClientTest(client: Client) {
1213
console.log('success: output', result.body)
1314

1415
// observable for client requests
15-
const clientRequestStream = new Observable<EchoMsg>((subscriber) => {
16-
subscriber.next({ body: 'Hello world from streaming request.' })
17-
subscriber.complete()
18-
})
16+
const clientRequestStream = pushable<EchoMsg>({ objectMode: true })
17+
clientRequestStream.push({ body: 'Hello world from streaming request.' })
18+
clientRequestStream.end()
1919

2020
console.log('Calling EchoClientStream: client -> server...')
2121
result = await demoServiceClient.EchoClientStream(clientRequestStream)
@@ -25,17 +25,22 @@ export async function runClientTest(client: Client) {
2525
const serverStream = demoServiceClient.EchoServerStream({
2626
body: 'Hello world from server to client streaming request.',
2727
})
28-
await new Promise<void>((resolve, reject) => {
29-
serverStream.subscribe({
30-
next(result) {
31-
console.log('server: output', result.body)
32-
},
33-
complete() {
34-
resolve()
35-
},
36-
error(err: Error) {
37-
reject(err)
38-
},
39-
})
40-
})
28+
for await (const msg of serverStream) {
29+
console.log('server: output', msg.body)
30+
}
31+
}
32+
33+
// runRpcStreamTest tests a RPCStream.
34+
export async function runRpcStreamTest(client: Client) {
35+
console.log('Calling RpcStream to open a RPC stream client...')
36+
const service = new EchoerClientImpl(client)
37+
const openStreamFn = buildRpcStreamOpenStream(
38+
'test',
39+
service.RpcStream.bind(service)
40+
)
41+
const proxiedClient = new Client(openStreamFn)
42+
const proxiedService = new EchoerClientImpl(proxiedClient)
43+
console.log('Calling Echo via RPC stream...')
44+
const resp = await proxiedService.Echo({ body: 'hello world via proxy' })
45+
console.log('rpc stream test: succeeded: response: ' + resp.body)
4146
}

echo/echo.pb.go

Lines changed: 33 additions & 20 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)