Skip to content

Commit a1877bc

Browse files
authored
Support various payload of baidu-std: json, proto-json and proto-text (#2946)
* Support various payload of baidu-std: json, proto-json and proto-text * Refactor implementation of compress
1 parent d9dd7b0 commit a1877bc

30 files changed

+729
-209
lines changed

src/brpc/compress.cpp

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@
1717

1818

1919
#include "butil/logging.h"
20+
#include "json2pb/json_to_pb.h"
2021
#include "brpc/compress.h"
2122
#include "brpc/protocol.h"
22-
23+
#include "brpc/proto_base.pb.h"
2324

2425
namespace brpc {
2526

@@ -47,7 +48,7 @@ int RegisterCompressHandler(CompressType type,
4748

4849
// Find CompressHandler by type.
4950
// Returns NULL if not found
50-
inline const CompressHandler* FindCompressHandler(CompressType type) {
51+
const CompressHandler* FindCompressHandler(CompressType type) {
5152
int index = type;
5253
if (index < 0 || index >= MAX_HANDLER_SIZE) {
5354
LOG(ERROR) << "CompressType=" << type << " is out of range";
@@ -83,10 +84,14 @@ bool ParseFromCompressedData(const butil::IOBuf& data,
8384
return ParsePbFromIOBuf(msg, data);
8485
}
8586
const CompressHandler* handler = FindCompressHandler(compress_type);
86-
if (NULL != handler) {
87-
return handler->Decompress(data, msg);
87+
if (NULL == handler) {
88+
return false;
8889
}
89-
return false;
90+
91+
Deserializer deserializer([msg](google::protobuf::io::ZeroCopyInputStream* input) {
92+
return msg->ParseFromZeroCopyStream(input);
93+
});
94+
return handler->Decompress(data, &deserializer);
9095
}
9196

9297
bool SerializeAsCompressedData(const google::protobuf::Message& msg,
@@ -96,10 +101,28 @@ bool SerializeAsCompressedData(const google::protobuf::Message& msg,
96101
return msg.SerializeToZeroCopyStream(&wrapper);
97102
}
98103
const CompressHandler* handler = FindCompressHandler(compress_type);
99-
if (NULL != handler) {
100-
return handler->Compress(msg, buf);
104+
if (NULL == handler) {
105+
return false;
101106
}
102-
return false;
107+
108+
Serializer serializer([&msg](google::protobuf::io::ZeroCopyOutputStream* output) {
109+
return msg.SerializeToZeroCopyStream(output);
110+
});
111+
return handler->Compress(serializer, buf);
112+
}
113+
114+
::google::protobuf::Metadata Serializer::GetMetadata() const {
115+
::google::protobuf::Metadata metadata{};
116+
metadata.descriptor = SerializerBase::descriptor();
117+
metadata.reflection = nullptr;
118+
return metadata;
119+
}
120+
121+
::google::protobuf::Metadata Deserializer::GetMetadata() const {
122+
::google::protobuf::Metadata metadata{};
123+
metadata.descriptor = DeserializerBase::descriptor();
124+
metadata.reflection = nullptr;
125+
return metadata;
103126
}
104127

105128
} // namespace brpc

src/brpc/compress.h

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,143 @@
2121

2222
#include <google/protobuf/message.h> // Message
2323
#include "butil/iobuf.h" // butil::IOBuf
24+
#include "butil/logging.h"
2425
#include "brpc/options.pb.h" // CompressType
26+
#include "brpc/nonreflectable_message.h"
2527

2628
namespace brpc {
2729

30+
// Serializer can be used to implement custom serialization
31+
// before compression with user callback.
32+
class Serializer : public NonreflectableMessage<Serializer> {
33+
public:
34+
using Callback = std::function<bool(google::protobuf::io::ZeroCopyOutputStream*)>;
35+
36+
Serializer() :Serializer(NULL) {}
37+
38+
explicit Serializer(Callback callback)
39+
:_callback(std::move(callback)) {
40+
SharedCtor();
41+
}
42+
43+
~Serializer() override {
44+
SharedDtor();
45+
}
46+
47+
Serializer(const Serializer& from)
48+
: NonreflectableMessage(from) {
49+
SharedCtor();
50+
MergeFrom(from);
51+
}
52+
53+
Serializer& operator=(const Serializer& from) {
54+
CopyFrom(from);
55+
return *this;
56+
}
57+
58+
void Swap(Serializer* other) {
59+
if (other != this) {
60+
}
61+
}
62+
63+
void MergeFrom(const Serializer& from) override {
64+
CHECK_NE(&from, this);
65+
}
66+
67+
// implements Message ----------------------------------------------
68+
void Clear() override {
69+
_callback = nullptr;
70+
}
71+
size_t ByteSizeLong() const override { return 0; }
72+
int GetCachedSize() const PB_425_OVERRIDE { return ByteSize(); }
73+
74+
::google::protobuf::Metadata GetMetadata() const PB_527_OVERRIDE;
75+
76+
// Converts the data into `output' for later compression.
77+
bool SerializeTo(google::protobuf::io::ZeroCopyOutputStream* output) const {
78+
if (!_callback) {
79+
LOG(WARNING) << "Serializer::SerializeTo() called without callback";
80+
return false;
81+
}
82+
return _callback(output);
83+
}
84+
85+
void SetCallback(Callback callback) {
86+
_callback = std::move(callback);
87+
}
88+
89+
private:
90+
void SharedCtor() {}
91+
void SharedDtor() {}
92+
93+
Callback _callback;
94+
};
95+
96+
// Deserializer can be used to implement custom deserialization
97+
// after decompression with user callback.
98+
class Deserializer : public NonreflectableMessage<Deserializer> {
99+
public:
100+
public:
101+
using Callback = std::function<bool(google::protobuf::io::ZeroCopyInputStream*)>;
102+
103+
Deserializer() :Deserializer(NULL) {}
104+
105+
explicit Deserializer(Callback callback) : _callback(std::move(callback)) {
106+
SharedCtor();
107+
}
108+
109+
~Deserializer() override {
110+
SharedDtor();
111+
}
112+
113+
Deserializer(const Deserializer& from)
114+
: NonreflectableMessage(from) {
115+
SharedCtor();
116+
MergeFrom(from);
117+
}
118+
119+
Deserializer& operator=(const Deserializer& from) {
120+
CopyFrom(from);
121+
return *this;
122+
}
123+
124+
void Swap(Deserializer* other) {
125+
if (other != this) {
126+
_callback.swap(other->_callback);
127+
}
128+
}
129+
130+
void MergeFrom(const Deserializer& from) override {
131+
CHECK_NE(&from, this);
132+
_callback = from._callback;
133+
}
134+
135+
// implements Message ----------------------------------------------
136+
void Clear() override { _callback = nullptr; }
137+
size_t ByteSizeLong() const override { return 0; }
138+
int GetCachedSize() const PB_425_OVERRIDE { return ByteSize(); }
139+
140+
::google::protobuf::Metadata GetMetadata() const PB_527_OVERRIDE;
141+
142+
// Converts the decompressed `input'.
143+
bool DeserializeFrom(google::protobuf::io::ZeroCopyInputStream* intput) const {
144+
if (!_callback) {
145+
LOG(WARNING) << "Deserializer::DeserializeFrom() called without callback";
146+
return false;
147+
}
148+
return _callback(intput);
149+
}
150+
void SetCallback(Callback callback) {
151+
_callback = std::move(callback);
152+
}
153+
154+
private:
155+
void SharedCtor() {}
156+
void SharedDtor() {}
157+
158+
Callback _callback;
159+
};
160+
28161
struct CompressHandler {
29162
// Compress serialized `msg' into `buf'.
30163
// Returns true on success, false otherwise
@@ -42,6 +175,9 @@ struct CompressHandler {
42175
// Returns 0 on success, -1 otherwise
43176
int RegisterCompressHandler(CompressType type, CompressHandler handler);
44177

178+
// Returns CompressHandler pointer of `type' if registered, NULL otherwise.
179+
const CompressHandler* FindCompressHandler(CompressType type);
180+
45181
// Returns the `name' of the CompressType if registered
46182
const char* CompressTypeToCStr(CompressType type);
47183

src/brpc/controller.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,8 @@ void Controller::ResetPods() {
290290
_http_response = NULL;
291291
_request_user_fields = NULL;
292292
_response_user_fields = NULL;
293+
_request_content_type = CONTENT_TYPE_PB;
294+
_response_content_type = CONTENT_TYPE_PB;
293295
_request_streams.clear();
294296
_response_streams.clear();
295297
_remote_stream_settings = NULL;

src/brpc/controller.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -616,6 +616,20 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
616616

617617
void CallAfterRpcResp(const google::protobuf::Message* req, const google::protobuf::Message* res);
618618

619+
void set_request_content_type(ContentType type) {
620+
_request_content_type = type;
621+
}
622+
ContentType request_content_type() const {
623+
return _request_content_type;
624+
}
625+
626+
void set_response_content_type(ContentType type) {
627+
_response_content_type = type;
628+
}
629+
ContentType response_content_type() const {
630+
return _response_content_type;
631+
}
632+
619633
private:
620634
struct CompletionInfo {
621635
CallId id; // call_id of the corresponding request
@@ -859,6 +873,11 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
859873
butil::IOBuf _request_attachment;
860874
butil::IOBuf _response_attachment;
861875

876+
// Only SerializedRequest supports `_request_content_type'.
877+
ContentType _request_content_type;
878+
// Only SerializedResponse supports `_response_content_type'.
879+
ContentType _response_content_type;
880+
862881
// Writable progressive attachment
863882
butil::intrusive_ptr<ProgressiveAttachment> _wpa;
864883
// Readable progressive attachment

src/brpc/global.cpp

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -388,25 +388,22 @@ static void GlobalInitializeOrDieImpl() {
388388
LoadBalancerExtension()->RegisterOrDie("_dynpart", &g_ext->dynpart_lb);
389389

390390
// Compress Handlers
391-
const CompressHandler gzip_compress =
392-
{ GzipCompress, GzipDecompress, "gzip" };
391+
CompressHandler gzip_compress = { GzipCompress, GzipDecompress, "gzip" };
393392
if (RegisterCompressHandler(COMPRESS_TYPE_GZIP, gzip_compress) != 0) {
394393
exit(1);
395394
}
396-
const CompressHandler zlib_compress =
397-
{ ZlibCompress, ZlibDecompress, "zlib" };
395+
CompressHandler zlib_compress = { ZlibCompress, ZlibDecompress, "zlib" };
398396
if (RegisterCompressHandler(COMPRESS_TYPE_ZLIB, zlib_compress) != 0) {
399397
exit(1);
400398
}
401-
const CompressHandler snappy_compress =
402-
{ SnappyCompress, SnappyDecompress, "snappy" };
399+
CompressHandler snappy_compress = { SnappyCompress, SnappyDecompress, "snappy" };
403400
if (RegisterCompressHandler(COMPRESS_TYPE_SNAPPY, snappy_compress) != 0) {
404401
exit(1);
405402
}
406403

407404
// Protocols
408405
Protocol baidu_protocol = { ParseRpcMessage,
409-
SerializeRequestDefault, PackRpcRequest,
406+
SerializeRpcRequest, PackRpcRequest,
410407
ProcessRpcRequest, ProcessRpcResponse,
411408
VerifyRpcRequest, NULL, NULL,
412409
CONNECTION_TYPE_ALL, "baidu_std" };

src/brpc/memcache.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ MemcacheRequest::MemcacheRequest()
3232
}
3333

3434
MemcacheRequest::MemcacheRequest(const MemcacheRequest& from)
35-
: NonreflectableMessage<MemcacheRequest>() {
35+
: NonreflectableMessage<MemcacheRequest>(from) {
3636
SharedCtor();
3737
MergeFrom(from);
3838
}
@@ -143,7 +143,7 @@ MemcacheResponse::MemcacheResponse()
143143
}
144144

145145
MemcacheResponse::MemcacheResponse(const MemcacheResponse& from)
146-
: NonreflectableMessage<MemcacheResponse>() {
146+
: NonreflectableMessage<MemcacheResponse>(from) {
147147
SharedCtor();
148148
MergeFrom(from);
149149
}

src/brpc/nonreflectable_message.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
#include <google/protobuf/generated_message_reflection.h>
2222
#include <google/protobuf/message.h>
2323

24-
#include "pb_compat.h"
24+
#include "brpc/pb_compat.h"
2525

2626
namespace brpc {
2727

src/brpc/nshead_message.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ NsheadMessage::NsheadMessage()
2828
}
2929

3030
NsheadMessage::NsheadMessage(const NsheadMessage& from)
31-
: NonreflectableMessage<NsheadMessage>() {
31+
: NonreflectableMessage<NsheadMessage>(from) {
3232
SharedCtor();
3333
MergeFrom(from);
3434
}

src/brpc/options.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,13 @@ enum CompressType {
7474
COMPRESS_TYPE_LZ4 = 4;
7575
}
7676

77+
enum ContentType {
78+
CONTENT_TYPE_PB = 0;
79+
CONTENT_TYPE_JSON = 1;
80+
CONTENT_TYPE_PROTO_JSON = 2;
81+
CONTENT_TYPE_PROTO_TEXT = 3;
82+
}
83+
7784
message ChunkInfo {
7885
required int64 stream_id = 1;
7986
required int64 chunk_id = 2;

src/brpc/policy/baidu_rpc_meta.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ message RpcMeta {
3333
optional bytes authentication_data = 7;
3434
optional StreamSettings stream_settings = 8;
3535
map<string, string> user_fields = 9;
36+
optional ContentType content_type = 10;
3637
}
3738

3839
message RpcRequestMeta {

0 commit comments

Comments
 (0)