Skip to content

Commit 3bebc22

Browse files
authored
Merge pull request #859 from cjihrig/client-stream
grpc-js: add client streaming RPC support
2 parents 74b4b9f + 4857c63 commit 3bebc22

File tree

4 files changed

+165
-1
lines changed

4 files changed

+165
-1
lines changed

packages/grpc-js/src/server-call.ts

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import {StatusObject} from './call-stream';
2424
import {Status} from './constants';
2525
import {Deserialize, Serialize} from './make-client';
2626
import {Metadata} from './metadata';
27+
import {StreamDecoder} from './stream-decoder';
2728

2829
function noop(): void {}
2930

@@ -107,6 +108,15 @@ export class ServerReadableStreamImpl<RequestType, ResponseType> extends
107108
private _deserialize: Deserialize<RequestType>) {
108109
super({objectMode: true});
109110
this.cancelled = false;
111+
this.call.setupReadable(this);
112+
}
113+
114+
_read(size: number) {
115+
this.call.resume();
116+
}
117+
118+
deserialize(input: Buffer): RequestType {
119+
return this._deserialize(input);
110120
}
111121

112122
getPeer(): string {
@@ -468,6 +478,38 @@ export class Http2ServerCallStream<RequestType, ResponseType> extends
468478
this.sendMetadata();
469479
return this.stream.write(chunk);
470480
}
481+
482+
resume() {
483+
this.stream.resume();
484+
}
485+
486+
setupReadable(readable: ServerReadableStream<RequestType, ResponseType>|
487+
ServerDuplexStream<RequestType, ResponseType>) {
488+
const decoder = new StreamDecoder();
489+
490+
this.stream.on('data', async (data: Buffer) => {
491+
const message = decoder.write(data);
492+
493+
if (message === null) {
494+
return;
495+
}
496+
497+
try {
498+
const deserialized = await this.deserializeMessage(message);
499+
500+
if (!readable.push(deserialized)) {
501+
this.stream.pause();
502+
}
503+
} catch (err) {
504+
err.code = Status.INTERNAL;
505+
readable.emit('error', err);
506+
}
507+
});
508+
509+
this.stream.once('end', () => {
510+
readable.push(null);
511+
});
512+
}
471513
}
472514

473515
// tslint:disable:no-any

packages/grpc-js/src/server.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,22 @@ function handleClientStreaming<RequestType, ResponseType>(
314314
call: Http2ServerCallStream<RequestType, ResponseType>,
315315
handler: ClientStreamingHandler<RequestType, ResponseType>,
316316
metadata: Metadata): void {
317-
throw new Error('not implemented yet');
317+
const stream = new ServerReadableStreamImpl<RequestType, ResponseType>(
318+
call, metadata, handler.deserialize);
319+
320+
function respond(
321+
err: ServiceError|null, value: ResponseType|null, trailer?: Metadata,
322+
flags?: number) {
323+
stream.destroy();
324+
call.sendUnaryMessage(err, value, trailer, flags);
325+
}
326+
327+
if (call.cancelled) {
328+
return;
329+
}
330+
331+
stream.on('error', respond);
332+
handler.func(stream, respond);
318333
}
319334

320335

packages/grpc-js/test/test-server-errors.ts

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,17 @@ describe('Client malformed response handling', () => {
126126
});
127127
});
128128

129+
it('should get an INTERNAL status with a client stream call', (done) => {
130+
const call = client.clientStream((err: ServiceError, data: any) => {
131+
assert(err);
132+
assert.strictEqual(err.code, grpc.status.INTERNAL);
133+
done();
134+
});
135+
136+
call.write({});
137+
call.end();
138+
});
139+
129140
it('should get an INTERNAL status with a server stream call', (done) => {
130141
const call = client.serverStream({});
131142

@@ -229,6 +240,17 @@ describe('Server serialization failure handling', () => {
229240
});
230241
});
231242

243+
it('should get an INTERNAL status with a client stream call', (done) => {
244+
const call = client.clientStream((err: ServiceError, data: any) => {
245+
assert(err);
246+
assert.strictEqual(err.code, grpc.status.INTERNAL);
247+
done();
248+
});
249+
250+
call.write({});
251+
call.end();
252+
});
253+
232254
it('should get an INTERNAL status with a server stream call', (done) => {
233255
const call = client.serverStream({});
234256

@@ -397,6 +419,18 @@ describe('Other conditions', () => {
397419
});
398420
});
399421

422+
it('should respond correctly to a client stream', (done) => {
423+
const call =
424+
misbehavingClient.clientStream((err: ServiceError, data: any) => {
425+
assert(err);
426+
assert.strictEqual(err.code, grpc.status.INTERNAL);
427+
done();
428+
});
429+
430+
call.write(badArg);
431+
call.end();
432+
});
433+
400434
it('should respond correctly to a server stream', (done) => {
401435
const call = misbehavingClient.serverStream(badArg);
402436

@@ -457,6 +491,56 @@ describe('Other conditions', () => {
457491
});
458492
});
459493

494+
it('should be present when a client stream call succeeds', (done) => {
495+
let count = 0;
496+
const call = client.clientStream((err: ServiceError, data: any) => {
497+
assert.ifError(err);
498+
499+
count++;
500+
if (count === 2) {
501+
done();
502+
}
503+
});
504+
505+
call.write({error: false});
506+
call.write({error: false});
507+
call.end();
508+
509+
call.on('status', (status: grpc.StatusObject) => {
510+
assert.deepStrictEqual(status.metadata.get('trailer-present'), ['yes']);
511+
512+
count++;
513+
if (count === 2) {
514+
done();
515+
}
516+
});
517+
});
518+
519+
it('should be present when a client stream call fails', (done) => {
520+
let count = 0;
521+
const call = client.clientStream((err: ServiceError, data: any) => {
522+
assert(err);
523+
524+
count++;
525+
if (count === 2) {
526+
done();
527+
}
528+
});
529+
530+
call.write({error: false});
531+
call.write({error: true});
532+
call.end();
533+
534+
call.on('status', (status: grpc.StatusObject) => {
535+
assert.deepStrictEqual(status.metadata.get('trailer-present'), ['yes']);
536+
537+
count++;
538+
if (count === 2) {
539+
done();
540+
}
541+
});
542+
});
543+
460544
it('should be present when a server stream call succeeds', (done) => {
461545
const call = client.serverStream({error: false});
462546

@@ -489,6 +573,19 @@ describe('Other conditions', () => {
489573
});
490574
});
491575

576+
it('for a client stream call', (done) => {
577+
const call = client.clientStream((err: ServiceError, data: any) => {
578+
assert(err);
579+
assert.strictEqual(err.code, grpc.status.UNKNOWN);
580+
assert.strictEqual(err.details, 'Requested error');
581+
done();
582+
});
583+
584+
call.write({error: false});
585+
call.write({error: true});
586+
call.end();
587+
});
588+
492589
it('for a server stream call', (done) => {
493590
const call = client.serverStream({error: true});
494591

packages/grpc-js/test/test-server.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,16 @@ describe('Server', () => {
263263
});
264264
});
265265

266+
it('should respond to a client stream with UNIMPLEMENTED', (done) => {
267+
const call = client.sum((error: ServiceError, response: any) => {
268+
assert(error);
269+
assert.strictEqual(error.code, grpc.status.UNIMPLEMENTED);
270+
done();
271+
});
272+
273+
call.end();
274+
});
275+
266276
it('should respond to a server stream with UNIMPLEMENTED', (done) => {
267277
const call = client.fib({limit: 5});
268278

0 commit comments

Comments
 (0)