Skip to content

Commit 5460f62

Browse files
committed
Slightly improved serialization size estimation
1 parent 34321b6 commit 5460f62

File tree

5 files changed

+190
-10
lines changed

5 files changed

+190
-10
lines changed

crates/storage-api/src/protobuf_types.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ impl<T: prost::Message + 'static> StorageEncode for ProtobufStorageWrapper<T> {
4747
.map_err(|err| restate_types::storage::StorageEncodeError::EncodeValue(err.into()))
4848
}
4949

50+
fn estimated_encoded_len(&self) -> usize {
51+
T::encoded_len(&self.0)
52+
}
53+
5054
fn default_codec(&self) -> restate_types::storage::StorageCodecKind {
5155
restate_types::storage::StorageCodecKind::Protobuf
5256
}

crates/types/src/net/codec.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ pub fn decode_as_flexbuffers<T: DeserializeOwned>(
114114
}
115115

116116
pub fn encode_as_bilrost<T: bilrost::Message>(value: &T) -> Bytes {
117-
let buf = value.encode_fast();
117+
let buf = value.encode_contiguous();
118118
Bytes::from(buf.into_vec())
119119
}
120120

crates/types/src/storage.rs

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ impl StorageCodec {
101101
value: &T,
102102
buf: &mut BytesMut,
103103
) -> Result<(), StorageEncodeError> {
104+
buf.reserve(value.estimated_encoded_len() + mem::size_of::<u8>());
104105
// write codec
105106
buf.put_u8(value.default_codec().into());
106107
// encode value
@@ -143,6 +144,9 @@ pub trait StorageEncode: DowncastSync {
143144

144145
/// Codec which is used when encode new values.
145146
fn default_codec(&self) -> StorageCodecKind;
147+
148+
/// Returns an estimate for the size of the encoded value.
149+
fn estimated_encoded_len(&self) -> usize;
146150
}
147151
impl_downcast!(sync StorageEncode);
148152

@@ -176,6 +180,10 @@ macro_rules! flexbuffers_storage_encode_decode {
176180
) -> Result<(), $crate::storage::StorageEncodeError> {
177181
$crate::storage::encode::encode_serde(self, buf, self.default_codec())
178182
}
183+
184+
fn estimated_encoded_len(&self) -> usize {
185+
$crate::storage::encode::estimate_encoded_serde_len(self, self.default_codec())
186+
}
179187
}
180188

181189
impl $crate::storage::StorageDecode for $name {
@@ -227,12 +235,7 @@ impl PolyBytes {
227235
pub fn estimated_encode_size(&self) -> usize {
228236
match self {
229237
PolyBytes::Bytes(bytes) => bytes.len(),
230-
PolyBytes::Typed(_) => {
231-
// constant, assumption based on base envelope size of ~600 bytes.
232-
// todo: use StorageEncode trait to get an actual estimate based
233-
// on the underlying type
234-
2_048 // 2KiB
235-
}
238+
PolyBytes::Typed(value) => value.estimated_encoded_len(),
236239
}
237240
}
238241
}
@@ -251,6 +254,13 @@ impl StorageEncode for PolyBytes {
251254
fn default_codec(&self) -> StorageCodecKind {
252255
StorageCodecKind::FlexbuffersSerde
253256
}
257+
258+
fn estimated_encoded_len(&self) -> usize {
259+
match self {
260+
PolyBytes::Bytes(bytes) => bytes.len(),
261+
PolyBytes::Typed(typed) => typed.estimated_encoded_len(),
262+
}
263+
}
254264
}
255265

256266
/// SerializeAs/DeserializeAs to implement ser/de trait for [`PolyBytes`]
@@ -265,7 +275,6 @@ impl serde_with::SerializeAs<PolyBytes> for EncodedPolyBytes {
265275
match source {
266276
PolyBytes::Bytes(bytes) => serializer.serialize_bytes(bytes.as_ref()),
267277
PolyBytes::Typed(typed) => {
268-
// todo: estimate size to avoid re allocations
269278
let mut buf = BytesMut::new();
270279
StorageCodec::encode(&**typed, &mut buf).expect("record serde is infallible");
271280
serializer.serialize_bytes(buf.as_ref())
@@ -292,6 +301,10 @@ impl StorageEncode for String {
292301
StorageCodecKind::LengthPrefixedRawBytes
293302
}
294303

304+
fn estimated_encoded_len(&self) -> usize {
305+
self.len() + mem::size_of::<u32>()
306+
}
307+
295308
fn encode(&self, buf: &mut BytesMut) -> Result<(), StorageEncodeError> {
296309
let my_bytes = self.as_bytes();
297310
buf.put_u32_le(u32::try_from(my_bytes.len()).map_err(|_| {
@@ -359,6 +372,10 @@ impl StorageEncode for bytes::Bytes {
359372
StorageCodecKind::LengthPrefixedRawBytes
360373
}
361374

375+
fn estimated_encoded_len(&self) -> usize {
376+
self.len() + mem::size_of::<u32>()
377+
}
378+
362379
fn encode(&self, buf: &mut BytesMut) -> Result<(), StorageEncodeError> {
363380
buf.put_u32_le(u32::try_from(self.len()).map_err(|_| {
364381
StorageEncodeError::EncodeValue(

crates/types/src/storage/encode.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
// by the Apache License, Version 2.0.
1010
use std::mem;
1111

12+
use bincode::enc::write::SizeWriter;
1213
use bytes::{BufMut, Bytes, BytesMut};
1314
use serde::Serialize;
1415

@@ -33,6 +34,17 @@ pub fn encode_serde<T: Serialize>(
3334
}
3435
}
3536

37+
pub fn estimate_encoded_serde_len<T: Serialize>(value: &T, codec: StorageCodecKind) -> usize {
38+
match codec {
39+
// 2 KiB, completely arbitrary size since we don't have a way to estimate the size
40+
// beforehand which is s a shame.
41+
StorageCodecKind::FlexbuffersSerde => 2_048,
42+
StorageCodecKind::BincodeSerde => estimate_bincode_len(value).unwrap_or(0),
43+
StorageCodecKind::Json => estimate_json_serde_len(value).unwrap_or(0),
44+
codec => unreachable!("Cannot encode serde type with codec {}", codec),
45+
}
46+
}
47+
3648
/// Utility method to encode a [`Serialize`] type as flexbuffers using serde.
3749
fn encode_serde_as_flexbuffers<T: Serialize>(
3850
value: T,
@@ -50,6 +62,12 @@ fn encode_serde_as_flexbuffers<T: Serialize>(
5062
Ok(())
5163
}
5264

65+
fn estimate_bincode_len<T: Serialize>(value: &T) -> Result<usize, bincode::error::EncodeError> {
66+
let mut writer = SizeWriter::default();
67+
bincode::serde::encode_into_writer(value, &mut writer, bincode::config::standard())?;
68+
Ok(writer.bytes_written)
69+
}
70+
5371
/// Utility method to encode a [`Serialize`] type as bincode using serde.
5472
fn encode_serde_as_bincode<T: Serialize>(
5573
value: &T,
@@ -69,6 +87,29 @@ fn encode_serde_as_bincode<T: Serialize>(
6987
Ok(())
7088
}
7189

90+
fn estimate_json_serde_len<T: Serialize>(value: &T) -> Result<usize, serde_json::error::Error> {
91+
#[derive(Default)]
92+
struct SizeWriter {
93+
bytes_written: usize,
94+
}
95+
96+
impl std::io::Write for SizeWriter {
97+
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
98+
self.bytes_written += buf.len();
99+
Ok(buf.len())
100+
}
101+
fn flush(&mut self) -> std::io::Result<()> {
102+
Ok(())
103+
}
104+
}
105+
106+
let mut writer = SizeWriter::default();
107+
108+
serde_json::to_writer(&mut writer, value)?;
109+
110+
Ok(writer.bytes_written)
111+
}
112+
72113
/// Utility method to encode a [`Serialize`] type as json using serde.
73114
fn encode_serde_as_json<T: Serialize>(
74115
value: &T,

crates/wal-protocol/src/lib.rs

Lines changed: 120 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,9 @@ mod envelope {
250250

251251
use restate_storage_api::protobuf_types::v1 as protobuf;
252252
use restate_types::storage::decode::{decode_bilrost, decode_serde};
253-
use restate_types::storage::encode::{encode_bilrost, encode_serde};
253+
use restate_types::storage::encode::{
254+
encode_bilrost, encode_serde, estimate_encoded_serde_len,
255+
};
254256
use restate_types::storage::{
255257
StorageCodecKind, StorageDecode, StorageDecodeError, StorageEncode, StorageEncodeError,
256258
};
@@ -261,7 +263,9 @@ mod envelope {
261263
fn encode(&self, buf: &mut BytesMut) -> Result<(), StorageEncodeError> {
262264
use bytes::BufMut;
263265
match self.default_codec() {
264-
StorageCodecKind::FlexbuffersSerde => encode_serde(self, buf, self.default_codec()),
266+
StorageCodecKind::FlexbuffersSerde => {
267+
encode_serde(self, buf, StorageCodecKind::FlexbuffersSerde)
268+
}
265269
StorageCodecKind::Custom => {
266270
buf.put_slice(&encode(self)?);
267271
Ok(())
@@ -270,6 +274,19 @@ mod envelope {
270274
}
271275
}
272276

277+
fn estimated_encoded_len(&self) -> usize {
278+
match self.default_codec() {
279+
StorageCodecKind::FlexbuffersSerde => {
280+
restate_types::storage::encode::estimate_encoded_serde_len(
281+
self,
282+
StorageCodecKind::FlexbuffersSerde,
283+
)
284+
}
285+
StorageCodecKind::Custom => estimate_custom_encoding_len(self),
286+
_ => unreachable!("developer error"),
287+
}
288+
}
289+
273290
fn default_codec(&self) -> StorageCodecKind {
274291
// TODO(azmy): Change to `Custom` in v1.5
275292
StorageCodecKind::FlexbuffersSerde
@@ -359,13 +376,49 @@ mod envelope {
359376
})
360377
}
361378

379+
fn serde_encoded_len<T: serde::Serialize>(value: &T, codec: StorageCodecKind) -> usize {
380+
static EMPTY: Field = Field {
381+
// note: the value of codec is irrelevant for this method, we assume that
382+
// it takes a similar amount of space to encode all values of the StorageCodecKind
383+
// enum.
384+
codec: Some(StorageCodecKind::FlexbuffersSerde),
385+
bytes: Bytes::new(),
386+
};
387+
388+
let value_len = estimate_encoded_serde_len(value, codec);
389+
EMPTY.encoded_len() + value_len
390+
}
391+
392+
fn bilrost_encoded_len<T: bilrost::Message>(value: &T) -> usize {
393+
static EMPTY: Field = Field {
394+
codec: Some(StorageCodecKind::Bilrost),
395+
bytes: Bytes::new(),
396+
};
397+
398+
let value_len = bilrost::Message::encoded_len(value);
399+
EMPTY.encoded_len() + value_len
400+
}
401+
362402
fn encode_bilrost<T: bilrost::Message>(value: &T) -> Result<Self, StorageEncodeError> {
363403
Ok(Self {
364404
codec: Some(StorageCodecKind::Bilrost),
365405
bytes: encode_bilrost(value),
366406
})
367407
}
368408

409+
fn protobuf_encoded_len<T: prost::Message>(value: &T) -> usize {
410+
static EMPTY: Field = Field {
411+
// note: the value of codec is irrelevant for this method, we assume that
412+
// it takes a similar amount of space to encode all values of the StorageCodecKind
413+
// enum.
414+
codec: Some(StorageCodecKind::Protobuf),
415+
bytes: Bytes::new(),
416+
};
417+
418+
let value_len = prost::Message::encoded_len(value);
419+
EMPTY.encoded_len() + value_len
420+
}
421+
369422
fn encode_protobuf<T: prost::Message>(value: &T) -> Result<Self, StorageEncodeError> {
370423
let mut buf = BytesMut::new();
371424
value
@@ -437,6 +490,71 @@ mod envelope {
437490
}};
438491
}
439492

493+
pub fn estimate_custom_encoding_len(envelope: &super::Envelope) -> usize {
494+
let command_len = match &envelope.command {
495+
Command::UpdatePartitionDurability(value) => Field::bilrost_encoded_len(value),
496+
Command::VersionBarrier(value) => Field::bilrost_encoded_len(value),
497+
Command::AnnounceLeader(value) => {
498+
Field::serde_encoded_len(value, StorageCodecKind::FlexbuffersSerde)
499+
}
500+
Command::PatchState(value) => {
501+
// we are copying because we _assume_ that PatchState is not widely used.
502+
// The clone will allocate a new hashmap but kvpairs are Bytes (cheap clones)
503+
let value = protobuf::StateMutation::from(value.clone());
504+
Field::protobuf_encoded_len(&value)
505+
}
506+
Command::TerminateInvocation(value) => {
507+
Field::serde_encoded_len(value, StorageCodecKind::FlexbuffersSerde)
508+
}
509+
Command::PurgeInvocation(value) => {
510+
Field::serde_encoded_len(value, StorageCodecKind::FlexbuffersSerde)
511+
}
512+
Command::PurgeJournal(value) => {
513+
Field::serde_encoded_len(value, StorageCodecKind::FlexbuffersSerde)
514+
}
515+
Command::Invoke(value) => {
516+
let value = protobuf::ServiceInvocation::from(value.as_ref());
517+
// ideally, the envelope would carry the protobuf wrapper instead of doing the
518+
// conversion twice (once for length estimate and another for serialization)
519+
Field::protobuf_encoded_len(&value)
520+
}
521+
Command::TruncateOutbox(value) => {
522+
Field::serde_encoded_len(value, StorageCodecKind::FlexbuffersSerde)
523+
}
524+
Command::ProxyThrough(value) => {
525+
let value = protobuf::ServiceInvocation::from(value.as_ref());
526+
Field::protobuf_encoded_len(&value)
527+
}
528+
Command::AttachInvocation(value) => {
529+
let value = protobuf::outbox_message::AttachInvocationRequest::from(value.clone());
530+
Field::protobuf_encoded_len(&value)
531+
}
532+
Command::InvokerEffect(value) => {
533+
Field::serde_encoded_len(value, StorageCodecKind::FlexbuffersSerde)
534+
}
535+
Command::Timer(value) => {
536+
Field::serde_encoded_len(value, StorageCodecKind::FlexbuffersSerde)
537+
}
538+
Command::ScheduleTimer(value) => {
539+
Field::serde_encoded_len(value, StorageCodecKind::FlexbuffersSerde)
540+
}
541+
Command::InvocationResponse(value) => {
542+
let value =
543+
protobuf::outbox_message::OutboxServiceInvocationResponse::from(value.clone());
544+
Field::protobuf_encoded_len(&value)
545+
}
546+
Command::NotifyGetInvocationOutputResponse(value) => Field::bilrost_encoded_len(value),
547+
Command::NotifySignal(value) => {
548+
let value = protobuf::outbox_message::NotifySignal::from(value.clone());
549+
Field::protobuf_encoded_len(&value)
550+
}
551+
};
552+
553+
// Assuming 350 bytes for the header and the envelope type-tag + 8 bytes for the command kind
554+
// overhead = 358
555+
358 + command_len
556+
}
557+
440558
pub fn encode(envelope: &super::Envelope) -> Result<Bytes, StorageEncodeError> {
441559
// todo(azmy): avoid clone? this will require change to `From` implementation
442560
let (command_kind, command) = match &envelope.command {

0 commit comments

Comments
 (0)