-
Notifications
You must be signed in to change notification settings - Fork 111
Use journal table v2 as default #3921
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
8d319fc
1d5ea2e
4b81531
c0c0f95
99563a4
cd0423b
2ff6495
c2541b4
60b2632
b45f03d
4f4528e
b30bef6
a127b80
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -73,6 +73,7 @@ pub mod test_util { | |
| 0, | ||
| MillisSinceEpoch::UNIX_EPOCH, | ||
| 0, | ||
| true, | ||
| ), | ||
| futures::stream::empty(), | ||
| ))) | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -119,6 +119,11 @@ pub(crate) enum InvokerError { | |||||
| actual: InvocationEpoch, | ||||||
| expected: InvocationEpoch, | ||||||
| }, | ||||||
| #[error( | ||||||
| "error when reading the journal: expected to read {expected} entries, but read only {expected}. This indicates a bug or a storage corruption." | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| )] | ||||||
| #[code(unknown)] | ||||||
| UnexpectedEntryCount { actual: u32, expected: u32 }, | ||||||
|
|
||||||
| #[error(transparent)] | ||||||
| #[code(restate_errors::RT0010)] | ||||||
|
|
@@ -172,6 +177,12 @@ pub(crate) enum InvokerError { | |||||
| #[error("service is temporary unavailable '{0}'")] | ||||||
| #[code(restate_errors::RT0010)] | ||||||
| ServiceUnavailable(http::StatusCode), | ||||||
|
|
||||||
| #[error( | ||||||
| "service {0} is exposed by the deprecated deployment {1}, please upgrade the SDK used by the service." | ||||||
| )] | ||||||
| #[code(restate_errors::RT0020)] | ||||||
| DeploymentDeprecated(String, DeploymentId), | ||||||
| } | ||||||
|
|
||||||
| impl InvokerError { | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -184,8 +184,13 @@ where | |
|
|
||
| // Execute the replay | ||
| crate::shortcircuit!( | ||
| self.replay_loop(&mut http_stream_tx, &mut decoder_stream, journal_stream) | ||
| .await | ||
| self.replay_loop( | ||
| &mut http_stream_tx, | ||
| &mut decoder_stream, | ||
| journal_stream, | ||
| journal_metadata.length | ||
| ) | ||
| .await | ||
| ); | ||
|
|
||
| // If we have the invoker_rx and the protocol type is bidi stream, | ||
|
|
@@ -305,13 +310,15 @@ where | |
| http_stream_tx: &mut InvokerRequestStreamSender, | ||
| http_stream_rx: &mut S, | ||
| journal_stream: JournalStream, | ||
| expected_entries_count: u32, | ||
| ) -> TerminalLoopState<()> | ||
| where | ||
| JournalStream: Stream<Item = JournalEntry> + Unpin, | ||
| S: Stream<Item = Result<DecoderStreamItem, InvokerError>> + Unpin, | ||
| { | ||
| let mut journal_stream = journal_stream.fuse(); | ||
| let mut got_headers = false; | ||
| let mut sent_entries = 0; | ||
|
|
||
| loop { | ||
| tokio::select! { | ||
|
|
@@ -334,10 +341,11 @@ where | |
| opt_je = journal_stream.next() => { | ||
| match opt_je { | ||
| Some(JournalEntry::JournalV2(entry)) => { | ||
| sent_entries += 1; | ||
| crate::shortcircuit!(self.write_entry(http_stream_tx, entry.inner).await); | ||
|
|
||
| } | ||
| Some(JournalEntry::JournalV1(old_entry)) => { | ||
| sent_entries += 1; | ||
| if let journal::Entry::Input(input_entry) = crate::shortcircuit!(old_entry.deserialize_entry::<ProtobufRawEntryCodec>()) { | ||
| crate::shortcircuit!(self.write_entry( | ||
| http_stream_tx, | ||
|
|
@@ -352,6 +360,14 @@ where | |
| } | ||
| }, | ||
| None => { | ||
| // Let's verify if we sent all the entries we promised, otherwise the stream will hang in a bad way! | ||
| if sent_entries < expected_entries_count { | ||
| return TerminalLoopState::Failed(InvokerError::UnexpectedEntryCount { | ||
| actual: sent_entries, | ||
| expected: expected_entries_count, | ||
| }) | ||
| } | ||
|
Comment on lines
+363
to
+369
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this a situation that couldn't arise before and is now possible due to the changes of the PR or is this covering a case that could have happened before as well? |
||
|
|
||
| // No need to wait for the headers to continue | ||
| trace!("Finished to replay the journal"); | ||
| return TerminalLoopState::Continue(()) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,7 +19,6 @@ use restate_storage_api::state_table::ReadStateTable; | |
| use restate_storage_api::{IsolationLevel, journal_table as journal_table_v1, journal_table_v2}; | ||
| use restate_types::identifiers::InvocationId; | ||
| use restate_types::identifiers::ServiceId; | ||
| use restate_types::service_protocol::ServiceProtocolVersion; | ||
| use std::vec::IntoIter; | ||
|
|
||
| #[derive(Debug, thiserror::Error)] | ||
|
|
@@ -81,40 +80,40 @@ where | |
| .unwrap_or_else(|| invocation_id.to_random_seed()); | ||
|
|
||
| if let InvocationStatus::Invoked(invoked_status) = invocation_status { | ||
| let (journal_metadata, journal_stream) = if invoked_status | ||
| .pinned_deployment | ||
| .as_ref() | ||
| .is_some_and(|p| p.service_protocol_version >= ServiceProtocolVersion::V4) | ||
| { | ||
| // If pinned service protocol version exists and >= V4, we need to read from Journal Table V2! | ||
| let entries = journal_table_v2::ReadJournalTable::get_journal( | ||
| &mut self.txn, | ||
| *invocation_id, | ||
| invoked_status.journal_metadata.length, | ||
| )? | ||
| .map(|entry| { | ||
| entry | ||
| .map_err(InvokerStorageReaderError::Storage) | ||
| .map(|(_, entry)| { | ||
| restate_invoker_api::invocation_reader::JournalEntry::JournalV2(entry) | ||
| }) | ||
| }) | ||
| // TODO: Update invoker to maintain transaction while reading the journal stream: See https://github.com/restatedev/restate/issues/275 | ||
| // collecting the stream because we cannot keep the transaction open | ||
| .try_collect::<Vec<_>>() | ||
| .await?; | ||
|
|
||
| let journal_metadata = JournalMetadata::new( | ||
| invoked_status.journal_metadata.length, | ||
| invoked_status.journal_metadata.span_context, | ||
| invoked_status.pinned_deployment, | ||
| invoked_status.current_invocation_epoch, | ||
| invoked_status.timestamps.modification_time(), | ||
| random_seed, | ||
| ); | ||
| // Try to read first from journal table v2 | ||
| let entries = journal_table_v2::ReadJournalTable::get_journal( | ||
| &mut self.txn, | ||
| *invocation_id, | ||
| invoked_status.journal_metadata.length, | ||
| )? | ||
| .map(|entry| { | ||
| entry | ||
| .map_err(InvokerStorageReaderError::Storage) | ||
| .map(|(_, entry)| { | ||
| restate_invoker_api::invocation_reader::JournalEntry::JournalV2(entry) | ||
| }) | ||
| }) | ||
| // TODO: Update invoker to maintain transaction while reading the journal stream: See https://github.com/restatedev/restate/issues/275 | ||
| // collecting the stream because we cannot keep the transaction open | ||
| .try_collect::<Vec<_>>() | ||
| .await?; | ||
|
|
||
| (journal_metadata, entries) | ||
| let (journal_metadata, journal_stream) = if !entries.is_empty() { | ||
| // We got the journal, good to go | ||
| ( | ||
| JournalMetadata::new( | ||
| invoked_status.journal_metadata.length, | ||
| invoked_status.journal_metadata.span_context, | ||
| invoked_status.pinned_deployment, | ||
| invoked_status.current_invocation_epoch, | ||
| invoked_status.timestamps.modification_time(), | ||
| random_seed, | ||
| true, | ||
| ), | ||
| entries, | ||
| ) | ||
| } else { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This branch will no longer be necessary once we have the "eager" migration from v1 to v2 on the version barrier, right? |
||
| // We didn't read a thing from journal table v2 -> we need to read journal v1 | ||
| ( | ||
| JournalMetadata::new( | ||
| // Use entries len here, because we might be filtering out events | ||
|
|
@@ -124,6 +123,7 @@ where | |
| invoked_status.current_invocation_epoch, | ||
| invoked_status.timestamps.modification_time(), | ||
| random_seed, | ||
| false, | ||
| ), | ||
| journal_table_v1::ReadJournalTable::get_journal( | ||
| &mut self.txn, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By enabling the journal v2 usage by default, aren't we giving up forward compatibility of v1.5? Why are we testing this case?