Skip to content

Commit

Permalink
Merge pull request #54 from RTradeLtd/streamable
Browse files Browse the repository at this point in the history
Blockstore Modifications
  • Loading branch information
bonedaddy authored Mar 28, 2020
2 parents 9502419 + 35f102d commit bd8f0c2
Show file tree
Hide file tree
Showing 14 changed files with 1,170 additions and 701 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

Tracks changes made in between releases

# v3.2.1

* Replication doc cli, and typo fix [#53](https://github.com/RTradeLtd/TxPB/pull/53)
* Add additional blockstore calls, enable a streaming version [#54](https://github.com/RTradeLtd/TxPB/pull/54)

# v3.2.0

* Add examples, and rust bindings [#52](https://github.com/RTradeLtd/TxPB/pull/52)
Expand Down
12 changes: 10 additions & 2 deletions doc/PROTO.md
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ NameSysAPI provides a generic name resolution API
| ----- | ---- | ----- | ----------- |
| cid | [string](#string) | | cid is the identifier of the block |
| data | [bytes](#bytes) | | data is the actual contents of the block |
| size | [int64](#int64) | | size of the block, only filled out by BS_GET_STATS since if we just want stats, we dont want to retrieve all the data. |



Expand All @@ -495,7 +496,7 @@ BlockstoreRequest is a message used to control blockstores
| ----- | ---- | ----- | ----------- |
| requestType | [BSREQTYPE](#pb.BSREQTYPE) | | indicates the particular request type being made |
| reqOpts | [BSREQOPTS](#pb.BSREQOPTS) | repeated | optional request settings |
| cids | [string](#string) | repeated | cids of blocks sent by: BS_DELETE, BS_GET, BS_GET_MANY |
| cids | [string](#string) | repeated | cids of blocks sent by: BS_DELETE, BS_GET, BS_GET_MANY, BS_GET_STATS |
| data | [bytes](#bytes) | repeated | the data we are putting sent by: BS_PUT, BS_PUT_MANY |
| cidVersion | [string](#string) | | the cid version to use when constructing blocks, default is v1 sent by: BS_PUT, BS_PUT_MANY |
| hashFunc | [string](#string) | | the hash function to use when constructing blocks, default is sha2-256 sent by: BS_PUT, BS_PUT_MANY |
Expand All @@ -514,7 +515,11 @@ BlockstoreResponse is a response to a BlockstoreqRequest
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| requestType | [BSREQTYPE](#pb.BSREQTYPE) | | indicates the particular request type being made |
| blocks | [Block](#pb.Block) | repeated | a copy of blocks from the blockstore sent by: BS_PUT, BS_PUT_MANY, BS_GET, BS_GET_MANY in the case of BS_PUT, and BS_PUT_MANY requests the data field will be empty as this is only populated by get requests |
| blocks | [Block](#pb.Block) | repeated | a copy of blocks from the blockstore sent by: BS_PUT, BS_PUT_MANY, BS_GET, BS_GET_MANY, BS_GET_STATS, BS_GET_ALL

in the case of BS_PUT, and BS_PUT_MANY requests the data field will be empty as this is only populated by get requests

in the case of BS_GET_STATS only the cid, and size params will be filled out, since we are just interested in the size |



Expand Down Expand Up @@ -907,6 +912,8 @@ BSREQTYPE is a particular blockstore request type
| BS_PUT_MANY | 2 | BS_PUT_MANY is used to put many blocks in the store |
| BS_GET | 3 | BS_GET is used to get a block from the store |
| BS_GET_MANY | 4 | BS_GET_MANY is used to get many blocks from the store |
| BS_GET_ALL | 5 | BS_GET_ALL is used to retrieve all blocks from the store It is the gRPC equivalent of Blockstore::AllKeysChan |
| BS_GET_STATS | 6 | BS_GET_STATS is used to retrieve statistics about individual blocks |



Expand Down Expand Up @@ -1009,6 +1016,7 @@ NodeAPI provide an API to control the underlying custom ipfs node
| Extras | [ExtrasRequest](#pb.ExtrasRequest) | [Empty](#pb.Empty) | Extras provide control over node extras capabilities |
| P2P | [P2PRequest](#pb.P2PRequest) | [P2PResponse](#pb.P2PResponse) | P2P allows control of generalized p2p streams for tcp/udp based protocol. By using this RPC, we can tunnel traffic similar to ssh tunneling except using libp2p as the transport layer, and and tcp/udp port. |
| Blockstore | [BlockstoreRequest](#pb.BlockstoreRequest) | [BlockstoreResponse](#pb.BlockstoreResponse) | Blockstore allows low-level management of the underlying blockstore |
| BlockstoreStream | [BlockstoreRequest](#pb.BlockstoreRequest) stream | [BlockstoreResponse](#pb.BlockstoreResponse) stream | BlockstoreStream is akin to Blockstore, except streamable Once v4 is out, condense this + blockstore into a single call |
| Dag | [DagRequest](#pb.DagRequest) | [DagResponse](#pb.DagResponse) | Dag is a unidirectional rpc allowing manipulation of low-level ipld objects |
| Keystore | [KeystoreRequest](#pb.KeystoreRequest) | [KeystoreResponse](#pb.KeystoreResponse) | Keystore is a unidirectional RPC allowing management of ipfs keystores |
| Persist | [PersistRequest](#pb.PersistRequest) | [PersistResponse](#pb.PersistResponse) | Persist is used to retrieve data from the network and make it available locally |
Expand Down
341 changes: 234 additions & 107 deletions go/node.pb.go

Large diffs are not rendered by default.

66 changes: 66 additions & 0 deletions java/pb/NodeAPIGrpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,37 @@ pb.Node.BlockstoreResponse> getBlockstoreMethod() {
return getBlockstoreMethod;
}

private static volatile io.grpc.MethodDescriptor<pb.Node.BlockstoreRequest,
pb.Node.BlockstoreResponse> getBlockstoreStreamMethod;

@io.grpc.stub.annotations.RpcMethod(
fullMethodName = SERVICE_NAME + '/' + "BlockstoreStream",
requestType = pb.Node.BlockstoreRequest.class,
responseType = pb.Node.BlockstoreResponse.class,
methodType = io.grpc.MethodDescriptor.MethodType.BIDI_STREAMING)
public static io.grpc.MethodDescriptor<pb.Node.BlockstoreRequest,
pb.Node.BlockstoreResponse> getBlockstoreStreamMethod() {
io.grpc.MethodDescriptor<pb.Node.BlockstoreRequest, pb.Node.BlockstoreResponse> getBlockstoreStreamMethod;
if ((getBlockstoreStreamMethod = NodeAPIGrpc.getBlockstoreStreamMethod) == null) {
synchronized (NodeAPIGrpc.class) {
if ((getBlockstoreStreamMethod = NodeAPIGrpc.getBlockstoreStreamMethod) == null) {
NodeAPIGrpc.getBlockstoreStreamMethod = getBlockstoreStreamMethod =
io.grpc.MethodDescriptor.<pb.Node.BlockstoreRequest, pb.Node.BlockstoreResponse>newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.BIDI_STREAMING)
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "BlockstoreStream"))
.setSampledToLocalTracing(true)
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
pb.Node.BlockstoreRequest.getDefaultInstance()))
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
pb.Node.BlockstoreResponse.getDefaultInstance()))
.setSchemaDescriptor(new NodeAPIMethodDescriptorSupplier("BlockstoreStream"))
.build();
}
}
}
return getBlockstoreStreamMethod;
}

private static volatile io.grpc.MethodDescriptor<pb.Node.DagRequest,
pb.Node.DagResponse> getDagMethod;

Expand Down Expand Up @@ -340,6 +371,17 @@ public void blockstore(pb.Node.BlockstoreRequest request,
asyncUnimplementedUnaryCall(getBlockstoreMethod(), responseObserver);
}

/**
* <pre>
* BlockstoreStream is akin to Blockstore, except streamable
* Once v4 is out, condense this + blockstore into a single call
* </pre>
*/
public io.grpc.stub.StreamObserver<pb.Node.BlockstoreRequest> blockstoreStream(
io.grpc.stub.StreamObserver<pb.Node.BlockstoreResponse> responseObserver) {
return asyncUnimplementedStreamingCall(getBlockstoreStreamMethod(), responseObserver);
}

/**
* <pre>
* Dag is a unidirectional rpc allowing manipulation of low-level ipld objects
Expand Down Expand Up @@ -400,6 +442,13 @@ public void persist(pb.Node.PersistRequest request,
pb.Node.BlockstoreRequest,
pb.Node.BlockstoreResponse>(
this, METHODID_BLOCKSTORE)))
.addMethod(
getBlockstoreStreamMethod(),
asyncBidiStreamingCall(
new MethodHandlers<
pb.Node.BlockstoreRequest,
pb.Node.BlockstoreResponse>(
this, METHODID_BLOCKSTORE_STREAM)))
.addMethod(
getDagMethod(),
asyncUnaryCall(
Expand Down Expand Up @@ -488,6 +537,18 @@ public void blockstore(pb.Node.BlockstoreRequest request,
getChannel().newCall(getBlockstoreMethod(), getCallOptions()), request, responseObserver);
}

/**
* <pre>
* BlockstoreStream is akin to Blockstore, except streamable
* Once v4 is out, condense this + blockstore into a single call
* </pre>
*/
public io.grpc.stub.StreamObserver<pb.Node.BlockstoreRequest> blockstoreStream(
io.grpc.stub.StreamObserver<pb.Node.BlockstoreResponse> responseObserver) {
return asyncBidiStreamingCall(
getChannel().newCall(getBlockstoreStreamMethod(), getCallOptions()), responseObserver);
}

/**
* <pre>
* Dag is a unidirectional rpc allowing manipulation of low-level ipld objects
Expand Down Expand Up @@ -716,6 +777,7 @@ public com.google.common.util.concurrent.ListenableFuture<pb.Node.PersistRespons
private static final int METHODID_DAG = 4;
private static final int METHODID_KEYSTORE = 5;
private static final int METHODID_PERSIST = 6;
private static final int METHODID_BLOCKSTORE_STREAM = 7;

private static final class MethodHandlers<Req, Resp> implements
io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp>,
Expand Down Expand Up @@ -772,6 +834,9 @@ public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserv
public io.grpc.stub.StreamObserver<Req> invoke(
io.grpc.stub.StreamObserver<Resp> responseObserver) {
switch (methodId) {
case METHODID_BLOCKSTORE_STREAM:
return (io.grpc.stub.StreamObserver<Req>) serviceImpl.blockstoreStream(
(io.grpc.stub.StreamObserver<pb.Node.BlockstoreResponse>) responseObserver);
default:
throw new AssertionError();
}
Expand Down Expand Up @@ -827,6 +892,7 @@ public static io.grpc.ServiceDescriptor getServiceDescriptor() {
.addMethod(getExtrasMethod())
.addMethod(getP2PMethod())
.addMethod(getBlockstoreMethod())
.addMethod(getBlockstoreStreamMethod())
.addMethod(getDagMethod())
.addMethod(getKeystoreMethod())
.addMethod(getPersistMethod())
Expand Down
13 changes: 13 additions & 0 deletions js/node_grpc_pb.js
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,19 @@ blockstore: {
responseSerialize: serialize_pb_BlockstoreResponse,
responseDeserialize: deserialize_pb_BlockstoreResponse,
},
// BlockstoreStream is akin to Blockstore, except streamable
// Once v4 is out, condense this + blockstore into a single call
blockstoreStream: {
path: '/pb.NodeAPI/BlockstoreStream',
requestStream: true,
responseStream: true,
requestType: node_pb.BlockstoreRequest,
responseType: node_pb.BlockstoreResponse,
requestSerialize: serialize_pb_BlockstoreRequest,
requestDeserialize: deserialize_pb_BlockstoreRequest,
responseSerialize: serialize_pb_BlockstoreResponse,
responseDeserialize: deserialize_pb_BlockstoreResponse,
},
// Dag is a unidirectional rpc allowing manipulation of low-level ipld objects
dag: {
path: '/pb.NodeAPI/Dag',
Expand Down
33 changes: 31 additions & 2 deletions js/node_pb.js
Original file line number Diff line number Diff line change
Expand Up @@ -2468,7 +2468,8 @@ proto.pb.Block.prototype.toObject = function(opt_includeInstance) {
proto.pb.Block.toObject = function(includeInstance, msg) {
var f, obj = {
cid: jspb.Message.getFieldWithDefault(msg, 1, ""),
data: msg.getData_asB64()
data: msg.getData_asB64(),
size: jspb.Message.getFieldWithDefault(msg, 3, 0)
};

if (includeInstance) {
Expand Down Expand Up @@ -2513,6 +2514,10 @@ proto.pb.Block.deserializeBinaryFromReader = function(msg, reader) {
var value = /** @type {!Uint8Array} */ (reader.readBytes());
msg.setData(value);
break;
case 3:
var value = /** @type {number} */ (reader.readInt64());
msg.setSize(value);
break;
default:
reader.skipField();
break;
Expand Down Expand Up @@ -2556,6 +2561,13 @@ proto.pb.Block.serializeBinaryToWriter = function(message, writer) {
f
);
}
f = message.getSize();
if (f !== 0) {
writer.writeInt64(
3,
f
);
}
};


Expand Down Expand Up @@ -2613,6 +2625,21 @@ proto.pb.Block.prototype.setData = function(value) {
};


/**
* optional int64 size = 3;
* @return {number}
*/
proto.pb.Block.prototype.getSize = function() {
return /** @type {number} */ (jspb.Message.getFieldWithDefault(this, 3, 0));
};


/** @param {number} value */
proto.pb.Block.prototype.setSize = function(value) {
jspb.Message.setProto3IntField(this, 3, value);
};



/**
* Generated by JsPbCodeGenerator.
Expand Down Expand Up @@ -4568,7 +4595,9 @@ proto.pb.BSREQTYPE = {
BS_PUT: 1,
BS_PUT_MANY: 2,
BS_GET: 3,
BS_GET_MANY: 4
BS_GET_MANY: 4,
BS_GET_ALL: 5,
BS_GET_STATS: 6
};

/**
Expand Down
20 changes: 18 additions & 2 deletions pb/node.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ service NodeAPI {
rpc P2P(P2PRequest) returns (P2PResponse) { };
// Blockstore allows low-level management of the underlying blockstore
rpc Blockstore(BlockstoreRequest) returns (BlockstoreResponse) { };
// BlockstoreStream is akin to Blockstore, except streamable
// Once v4 is out, condense this + blockstore into a single call
rpc BlockstoreStream(stream BlockstoreRequest) returns (stream BlockstoreResponse) { };
// Dag is a unidirectional rpc allowing manipulation of low-level ipld objects
rpc Dag(DagRequest) returns (DagResponse) { };
// Keystore is a unidirectional RPC allowing management of ipfs keystores
Expand Down Expand Up @@ -166,6 +169,11 @@ enum BSREQTYPE {
BS_GET = 3;
// BS_GET_MANY is used to get many blocks from the store
BS_GET_MANY = 4;
// BS_GET_ALL is used to retrieve all blocks from the store
// It is the gRPC equivalent of Blockstore::AllKeysChan
BS_GET_ALL = 5;
// BS_GET_STATS is used to retrieve statistics about individual blocks
BS_GET_STATS = 6;
}

// BSREQOPTS are options for blockstore requests
Expand All @@ -183,7 +191,7 @@ message BlockstoreRequest {
// optional request settings
repeated BSREQOPTS reqOpts = 2;
// cids of blocks
// sent by: BS_DELETE, BS_GET, BS_GET_MANY
// sent by: BS_DELETE, BS_GET, BS_GET_MANY, BS_GET_STATS
repeated string cids = 3;
// the data we are putting
// sent by: BS_PUT, BS_PUT_MANY
Expand All @@ -201,10 +209,14 @@ message BlockstoreResponse {
// indicates the particular request type being made
BSREQTYPE requestType = 1;
// a copy of blocks from the blockstore
// sent by: BS_PUT, BS_PUT_MANY, BS_GET, BS_GET_MANY
// sent by: BS_PUT, BS_PUT_MANY, BS_GET, BS_GET_MANY, BS_GET_STATS, BS_GET_ALL
//
// in the case of BS_PUT, and BS_PUT_MANY requests
// the data field will be empty as this is only populated
// by get requests
//
// in the case of BS_GET_STATS only the cid, and size params
// will be filled out, since we are just interested in the size
repeated Block blocks = 2;
}

Expand All @@ -213,6 +225,10 @@ message Block {
string cid = 1;
// data is the actual contents of the block
bytes data = 2;
// size of the block, only filled out by BS_GET_STATS
// since if we just want stats, we dont want to
// retrieve all the data.
int64 size = 3;
}

// DAGREQTYPE indicates the particular DagAPI request being performed
Expand Down
Loading

0 comments on commit bd8f0c2

Please sign in to comment.