Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: grpc-web support promise stream #631

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ node_modules/
build/
/.idea
/.vscode
/.log
/integration/*/pbjs.js
/integration/*/pbjs.d.ts
coverage/
Expand Down
Binary file modified integration/async-iterable-services/simple.bin
Binary file not shown.
18 changes: 13 additions & 5 deletions integration/async-iterable-services/simple.proto
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
syntax = "proto3";

package simple;

service Test {
rpc BidiStreaming(stream TestMessage) returns (stream TestMessage) {}
// Echoer service returns the given message.
service Echoer {
// Echo returns the given message.
rpc Echo(EchoMsg) returns (EchoMsg);
// EchoServerStream is an example of a server -> client one-way stream.
rpc EchoServerStream(EchoMsg) returns (stream EchoMsg);
// EchoClientStream is an example of client->server one-way stream.
rpc EchoClientStream(stream EchoMsg) returns (EchoMsg);
// EchoBidiStream is an example of a two-way stream.
rpc EchoBidiStream(stream EchoMsg) returns (stream EchoMsg);
}

message TestMessage {
string value = 1;
// EchoMsg is the message body for Echo.
message EchoMsg {
string body = 1;
}
98 changes: 64 additions & 34 deletions integration/async-iterable-services/simple.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,32 @@ import * as _m0 from 'protobufjs/minimal';

export const protobufPackage = 'simple';

export interface TestMessage {
value: string;
/** EchoMsg is the message body for Echo. */
export interface EchoMsg {
body: string;
}

function createBaseTestMessage(): TestMessage {
return { value: '' };
function createBaseEchoMsg(): EchoMsg {
return { body: '' };
}

export const TestMessage = {
encode(message: TestMessage, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
if (message.value !== '') {
writer.uint32(10).string(message.value);
export const EchoMsg = {
encode(message: EchoMsg, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
if (message.body !== '') {
writer.uint32(10).string(message.body);
}
return writer;
},

decode(input: _m0.Reader | Uint8Array, length?: number): TestMessage {
decode(input: _m0.Reader | Uint8Array, length?: number): EchoMsg {
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseTestMessage();
const message = createBaseEchoMsg();
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
message.value = reader.string();
message.body = reader.string();
break;
default:
reader.skipType(tag & 7);
Expand All @@ -38,70 +39,99 @@ export const TestMessage = {
},

// encodeTransform encodes a source of message objects.
// Transform<TestMessage, Uint8Array>
// Transform<EchoMsg, Uint8Array>
async *encodeTransform(
source: AsyncIterable<TestMessage | TestMessage[]> | Iterable<TestMessage | TestMessage[]>
source: AsyncIterable<EchoMsg | EchoMsg[]> | Iterable<EchoMsg | EchoMsg[]>
): AsyncIterable<Uint8Array> {
for await (const pkt of source) {
if (Array.isArray(pkt)) {
for (const p of pkt) {
yield* [TestMessage.encode(p).finish()];
yield* [EchoMsg.encode(p).finish()];
}
} else {
yield* [TestMessage.encode(pkt).finish()];
yield* [EchoMsg.encode(pkt).finish()];
}
}
},

// decodeTransform decodes a source of encoded messages.
// Transform<Uint8Array, TestMessage>
// Transform<Uint8Array, EchoMsg>
async *decodeTransform(
source: AsyncIterable<Uint8Array | Uint8Array[]> | Iterable<Uint8Array | Uint8Array[]>
): AsyncIterable<TestMessage> {
): AsyncIterable<EchoMsg> {
for await (const pkt of source) {
if (Array.isArray(pkt)) {
for (const p of pkt) {
yield* [TestMessage.decode(p)];
yield* [EchoMsg.decode(p)];
}
} else {
yield* [TestMessage.decode(pkt)];
yield* [EchoMsg.decode(pkt)];
}
}
},

fromJSON(object: any): TestMessage {
fromJSON(object: any): EchoMsg {
return {
value: isSet(object.value) ? String(object.value) : '',
body: isSet(object.body) ? String(object.body) : '',
};
},

toJSON(message: TestMessage): unknown {
toJSON(message: EchoMsg): unknown {
const obj: any = {};
message.value !== undefined && (obj.value = message.value);
message.body !== undefined && (obj.body = message.body);
return obj;
},

fromPartial<I extends Exact<DeepPartial<TestMessage>, I>>(object: I): TestMessage {
const message = createBaseTestMessage();
message.value = object.value ?? '';
fromPartial<I extends Exact<DeepPartial<EchoMsg>, I>>(object: I): EchoMsg {
const message = createBaseEchoMsg();
message.body = object.body ?? '';
return message;
},
};

export interface Test {
BidiStreaming(request: AsyncIterable<TestMessage>): AsyncIterable<TestMessage>;
/** Echoer service returns the given message. */
export interface Echoer {
/** Echo returns the given message. */
Echo(request: EchoMsg): Promise<EchoMsg>;
/** EchoServerStream is an example of a server -> client one-way stream. */
EchoServerStream(request: EchoMsg): AsyncIterable<EchoMsg>;
/** EchoClientStream is an example of client->server one-way stream. */
EchoClientStream(request: AsyncIterable<EchoMsg>): Promise<EchoMsg>;
/** EchoBidiStream is an example of a two-way stream. */
EchoBidiStream(request: AsyncIterable<EchoMsg>): AsyncIterable<EchoMsg>;
}

export class TestClientImpl implements Test {
export class EchoerClientImpl implements Echoer {
private readonly rpc: Rpc;
constructor(rpc: Rpc) {
this.rpc = rpc;
this.BidiStreaming = this.BidiStreaming.bind(this);
this.Echo = this.Echo.bind(this);
this.EchoServerStream = this.EchoServerStream.bind(this);
this.EchoClientStream = this.EchoClientStream.bind(this);
this.EchoBidiStream = this.EchoBidiStream.bind(this);
}
BidiStreaming(request: AsyncIterable<TestMessage>): AsyncIterable<TestMessage> {
const data = TestMessage.encodeTransform(request);
const result = this.rpc.bidirectionalStreamingRequest('simple.Test', 'BidiStreaming', data);
return TestMessage.decodeTransform(result);
Echo(request: EchoMsg): Promise<EchoMsg> {
const data = EchoMsg.encode(request).finish();
const promise = this.rpc.request('simple.Echoer', 'Echo', data);
return promise.then((data) => EchoMsg.decode(new _m0.Reader(data)));
}

EchoServerStream(request: EchoMsg): AsyncIterable<EchoMsg> {
const data = EchoMsg.encode(request).finish();
const result = this.rpc.serverStreamingRequest('simple.Echoer', 'EchoServerStream', data);
return EchoMsg.decodeTransform(result);
}

EchoClientStream(request: AsyncIterable<EchoMsg>): Promise<EchoMsg> {
const data = EchoMsg.encodeTransform(request);
const promise = this.rpc.clientStreamingRequest('simple.Echoer', 'EchoClientStream', data);
return promise.then((data) => EchoMsg.decode(new _m0.Reader(data)));
}

EchoBidiStream(request: AsyncIterable<EchoMsg>): AsyncIterable<EchoMsg> {
const data = EchoMsg.encodeTransform(request);
const result = this.rpc.bidirectionalStreamingRequest('simple.Echoer', 'EchoBidiStream', data);
return EchoMsg.decodeTransform(result);
}
}

Expand Down
1 change: 1 addition & 0 deletions integration/grpc-web-promise/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
build
21 changes: 21 additions & 0 deletions integration/grpc-web-promise/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
`grpc-web` client stream need `WebSocket` transport.

Because need bundle single `.js` file to browser, so you need to install `webpack toolchain`.

```
cd grpc-web
npm install
npx webpack
```

Then, use you browser open `index.html` file.

## server

Running the rpc server.

```
cd grpc-web-go-server
./build.sh
GRPC_GO_LOG_SEVERITY_LEVEL=info GRPC_GO_LOG_VERBOSITY_LEVEL=0 ./grpc-web-go-server
```
94 changes: 94 additions & 0 deletions integration/grpc-web-promise/client-ts.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#!/usr/bin/env node

import { DashAPICredsClientImpl, DashStateClientImpl, GrpcWebImpl, DashUserSettingsState } from './example';
import { grpc } from '@improbable-eng/grpc-web';

const defTransport = grpc.CrossBrowserHttpTransport({ withCredentials: false });
const ws = grpc.WebsocketTransport();

const rpc = new GrpcWebImpl('http://localhost:9090', {
transport: defTransport,
debug: true,
metadata: new grpc.Metadata({ SomeHeader: 'bar' }),
});

const client = new DashStateClientImpl(rpc);
const creds = new DashAPICredsClientImpl(rpc);

async function main() {
console.log('calling client.UserSettings');
console.log(await client.UserSettings({}));

console.log('calling creds.Create');
const cred = await creds.Create({ description: 'test desc fooo' });
console.log(cred);

console.log('calling creds.Delete');
const del = await creds.Delete({ id: cred.id });
console.log(del);

console.log('calling creds.Update');
try {
await creds.Update({ description: 'test desc2' });
} catch (e) {
console.log('got expected error', e);
}

console.log('(server-stream) calling client.ActiveUserSettingsStream');
const serverStream = await client.ActiveUserSettingsStream({});
serverStream.on('data', (message) => console.log('(server-stream) message: ', message));
serverStream.on('end', (status) => console.log('(server-stream) end: ', status));
serverStream.on('status', (status) => console.log('(server-stream) status: ', status));

console.log('(client-stream) calling client.ManyUserSettingsStream');
const clientStream = await client.ManyUserSettingsStream({
rpcOptions: { transport: ws },
});
// will only receive one message when client stream finish.
clientStream.on('data', (message) => console.log('(client-stream) message: ', message));
clientStream.on('end', (status) => console.log('(client-stream) end: ', status));
clientStream.on('status', (status) => console.log('(client-stream) status: ', status));
await (function () {
return new Promise((resolve, _) => {
let clientStreamCount = 0;
const clientStreanIntervalId = setInterval(() => {
if (clientStreamCount >= 10) {
clientStream.end();
clearInterval(clientStreanIntervalId);
resolve(true);
return;
}
clientStream.write({ email: '[email protected]' });
clientStreamCount++;
}, 1000);
});
})();

console.log('(client-server bidirectional stream) calling client.ChangeUserSettingsStream');
const bidirectional = await client.ChangeUserSettingsStream({
rpcOptions: { transport: ws },
});
bidirectional.on('data', (message) => console.log('(bidirectional-stream) message: ', message));
bidirectional.on('end', (status) => console.log('(bidirectional-stream) end: ', status));
bidirectional.on('status', (status) => console.log('(bidirectional-stream) status: ', status));
await (function () {
return new Promise((resolve, _) => {
let bidirectionalCount = 0;
const biDiIntervalId = setInterval(() => {
if (bidirectionalCount >= 10) {
bidirectional.end();
clearInterval(biDiIntervalId);
resolve(true);
return;
}
bidirectional.write({ email: '[email protected]' });
bidirectionalCount++;
}, 1000);
});
})();
}

main().then(
() => console.log('done'),
(err) => console.log('failed', err)
);
35 changes: 35 additions & 0 deletions integration/grpc-web-promise/example-test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { DashStateClientImpl } from './example';
import { EMPTY } from 'rxjs';

describe('grpc-web', () => {
it('at least compiles', () => {
// TODO move the hand-run `client-ts` integration into here, but for now
// at least check that things compile
const rpc = {
unary: jest.fn(),
invoke: jest.fn(),
stream: jest.fn(),
};
const client = new DashStateClientImpl(rpc);
client.UserSettings({});
});
it('binds rpc function', () => {
const rpc = {
unary: jest.fn(),
invoke: jest.fn(),
stream: jest.fn(),
};
const client = new DashStateClientImpl(rpc);
const userSettings = client.UserSettings;
userSettings({});
});
it('throws on client streaming call', () => {
const rpc = {
unary: jest.fn(),
invoke: jest.fn(),
stream: jest.fn(),
};
const client = new DashStateClientImpl(rpc);
client.ChangeUserSettingsStream({});
});
});
Binary file added integration/grpc-web-promise/example.bin
Binary file not shown.
Loading