Skip to content

Commit

Permalink
re-enable streamer
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Trottier committed Dec 30, 2019
1 parent 511e8eb commit 969f386
Show file tree
Hide file tree
Showing 9 changed files with 330 additions and 87 deletions.
1 change: 1 addition & 0 deletions doc/PROTO.md
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,7 @@ NodeAPI provide an API to control the underlying custom ipfs node
| P2P | [P2PRequest](#pb.P2PRequest) | [P2PResponse](#pb.P2PResponse) | P2P allows control of generalized p2p streams for tcp/udp based protocol. By using this RPC, we can tunnel traffic similar to ssh tunneling except using libp2p as the transport layer, and and tcp/udp port. |
| Blockstore | [BlockstoreRequest](#pb.BlockstoreRequest) | [BlockstoreResponse](#pb.BlockstoreResponse) | Blockstore allows low-level management of the underlying blockstore |
| Dag | [DagRequest](#pb.DagRequest) | [DagResponse](#pb.DagResponse) | Dag is a unidirectional rpc allowing manipulation of low-level ipld objects |
| DagStream | [DagRequest](#pb.DagRequest) stream | [DagResponse](#pb.DagResponse) stream | DagStream is like Dag but with bidirectional streams |



Expand Down
242 changes: 157 additions & 85 deletions go/node.pb.go

Large diffs are not rendered by default.

65 changes: 65 additions & 0 deletions java/pb/NodeAPIGrpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,38 @@ pb.Node.DagResponse> getDagMethod() {
return getDagMethod;
}

private static volatile io.grpc.MethodDescriptor<pb.Node.DagRequest,
pb.Node.DagResponse> getDagStreamMethod;

@io.grpc.stub.annotations.RpcMethod(
fullMethodName = SERVICE_NAME + '/' + "DagStream",
requestType = pb.Node.DagRequest.class,
responseType = pb.Node.DagResponse.class,
methodType = io.grpc.MethodDescriptor.MethodType.BIDI_STREAMING)
public static io.grpc.MethodDescriptor<pb.Node.DagRequest,
pb.Node.DagResponse> getDagStreamMethod() {
io.grpc.MethodDescriptor<pb.Node.DagRequest, pb.Node.DagResponse> getDagStreamMethod;
if ((getDagStreamMethod = NodeAPIGrpc.getDagStreamMethod) == null) {
synchronized (NodeAPIGrpc.class) {
if ((getDagStreamMethod = NodeAPIGrpc.getDagStreamMethod) == null) {
NodeAPIGrpc.getDagStreamMethod = getDagStreamMethod =
io.grpc.MethodDescriptor.<pb.Node.DagRequest, pb.Node.DagResponse>newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.BIDI_STREAMING)
.setFullMethodName(generateFullMethodName(
"pb.NodeAPI", "DagStream"))
.setSampledToLocalTracing(true)
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
pb.Node.DagRequest.getDefaultInstance()))
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
pb.Node.DagResponse.getDefaultInstance()))
.setSchemaDescriptor(new NodeAPIMethodDescriptorSupplier("DagStream"))
.build();
}
}
}
return getDagStreamMethod;
}

/**
* Creates a new async stub that supports all call types for the service
*/
Expand Down Expand Up @@ -272,6 +304,16 @@ public void dag(pb.Node.DagRequest request,
asyncUnimplementedUnaryCall(getDagMethod(), responseObserver);
}

/**
* <pre>
* DagStream is like Dag but with bidirectional streams
* </pre>
*/
public io.grpc.stub.StreamObserver<pb.Node.DagRequest> dagStream(
io.grpc.stub.StreamObserver<pb.Node.DagResponse> responseObserver) {
return asyncUnimplementedStreamingCall(getDagStreamMethod(), responseObserver);
}

@java.lang.Override public final io.grpc.ServerServiceDefinition bindService() {
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
Expand Down Expand Up @@ -309,6 +351,13 @@ public void dag(pb.Node.DagRequest request,
pb.Node.DagRequest,
pb.Node.DagResponse>(
this, METHODID_DAG)))
.addMethod(
getDagStreamMethod(),
asyncBidiStreamingCall(
new MethodHandlers<
pb.Node.DagRequest,
pb.Node.DagResponse>(
this, METHODID_DAG_STREAM)))
.build();
}
}
Expand Down Expand Up @@ -390,6 +439,17 @@ public void dag(pb.Node.DagRequest request,
asyncUnaryCall(
getChannel().newCall(getDagMethod(), getCallOptions()), request, responseObserver);
}

/**
* <pre>
* DagStream is like Dag but with bidirectional streams
* </pre>
*/
public io.grpc.stub.StreamObserver<pb.Node.DagRequest> dagStream(
io.grpc.stub.StreamObserver<pb.Node.DagResponse> responseObserver) {
return asyncBidiStreamingCall(
getChannel().newCall(getDagStreamMethod(), getCallOptions()), responseObserver);
}
}

/**
Expand Down Expand Up @@ -550,6 +610,7 @@ public com.google.common.util.concurrent.ListenableFuture<pb.Node.DagResponse> d
private static final int METHODID_P2P = 2;
private static final int METHODID_BLOCKSTORE = 3;
private static final int METHODID_DAG = 4;
private static final int METHODID_DAG_STREAM = 5;

private static final class MethodHandlers<Req, Resp> implements
io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp>,
Expand Down Expand Up @@ -598,6 +659,9 @@ public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserv
public io.grpc.stub.StreamObserver<Req> invoke(
io.grpc.stub.StreamObserver<Resp> responseObserver) {
switch (methodId) {
case METHODID_DAG_STREAM:
return (io.grpc.stub.StreamObserver<Req>) serviceImpl.dagStream(
(io.grpc.stub.StreamObserver<pb.Node.DagResponse>) responseObserver);
default:
throw new AssertionError();
}
Expand Down Expand Up @@ -654,6 +718,7 @@ public static io.grpc.ServiceDescriptor getServiceDescriptor() {
.addMethod(getP2PMethod())
.addMethod(getBlockstoreMethod())
.addMethod(getDagMethod())
.addMethod(getDagStreamMethod())
.build();
}
}
Expand Down
12 changes: 12 additions & 0 deletions js/node_grpc_pb.js
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,18 @@ var NodeAPIService = exports.NodeAPIService = {
responseSerialize: serialize_pb_DagResponse,
responseDeserialize: deserialize_pb_DagResponse,
},
// DagStream is like Dag but with bidirectional streams
dagStream: {
path: '/pb.NodeAPI/DagStream',
requestStream: true,
responseStream: true,
requestType: node_pb.DagRequest,
responseType: node_pb.DagResponse,
requestSerialize: serialize_pb_DagRequest,
requestDeserialize: deserialize_pb_DagRequest,
responseSerialize: serialize_pb_DagResponse,
responseDeserialize: deserialize_pb_DagResponse,
},
};

exports.NodeAPIClient = grpc.makeGenericClientConstructor(NodeAPIService);
2 changes: 2 additions & 0 deletions pb/node.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ service NodeAPI {
rpc Blockstore(BlockstoreRequest) returns (BlockstoreResponse) { };
// Dag is a unidirectional rpc allowing manipulation of low-level ipld objects
rpc Dag(DagRequest) returns (DagResponse) { };
// DagStream is like Dag but with bidirectional streams
rpc DagStream(stream DagRequest) returns (stream DagResponse);
}

// P2PREQTYPE denotes the particular type of request being used in the p2p rpc
Expand Down
13 changes: 11 additions & 2 deletions py/node_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions py/node_pb2_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ def __init__(self, channel):
request_serializer=node__pb2.DagRequest.SerializeToString,
response_deserializer=node__pb2.DagResponse.FromString,
)
self.DagStream = channel.stream_stream(
'/pb.NodeAPI/DagStream',
request_serializer=node__pb2.DagRequest.SerializeToString,
response_deserializer=node__pb2.DagResponse.FromString,
)


class NodeAPIServicer(object):
Expand Down Expand Up @@ -83,6 +88,13 @@ def Dag(self, request, context):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def DagStream(self, request_iterator, context):
"""DagStream is like Dag but with bidirectional streams
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')


def add_NodeAPIServicer_to_server(servicer, server):
rpc_method_handlers = {
Expand Down Expand Up @@ -111,6 +123,11 @@ def add_NodeAPIServicer_to_server(servicer, server):
request_deserializer=node__pb2.DagRequest.FromString,
response_serializer=node__pb2.DagResponse.SerializeToString,
),
'DagStream': grpc.stream_stream_rpc_method_handler(
servicer.DagStream,
request_deserializer=node__pb2.DagRequest.FromString,
response_serializer=node__pb2.DagResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'pb.NodeAPI', rpc_method_handlers)
Expand Down
11 changes: 11 additions & 0 deletions ts/node_pb_service.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,23 @@ type NodeAPIDag = {
readonly responseType: typeof node_pb.DagResponse;
};

type NodeAPIDagStream = {
readonly methodName: string;
readonly service: typeof NodeAPI;
readonly requestStream: true;
readonly responseStream: true;
readonly requestType: typeof node_pb.DagRequest;
readonly responseType: typeof node_pb.DagResponse;
};

export class NodeAPI {
static readonly serviceName: string;
static readonly ConnMgmt: NodeAPIConnMgmt;
static readonly Extras: NodeAPIExtras;
static readonly P2P: NodeAPIP2P;
static readonly Blockstore: NodeAPIBlockstore;
static readonly Dag: NodeAPIDag;
static readonly DagStream: NodeAPIDagStream;
}

export type ServiceError = { message: string, code: number; metadata: grpc.Metadata }
Expand Down Expand Up @@ -136,5 +146,6 @@ export class NodeAPIClient {
requestMessage: node_pb.DagRequest,
callback: (error: ServiceError|null, responseMessage: node_pb.DagResponse|null) => void
): UnaryResponse;
dagStream(metadata?: grpc.Metadata): BidirectionalStream<node_pb.DagRequest, node_pb.DagResponse>;
}

Loading

0 comments on commit 969f386

Please sign in to comment.