Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,24 @@ jobs:
restateCommit: ${{ github.event.pull_request.head.sha || github.sha }}
serviceImage: "ghcr.io/restatedev/test-services-java:main"

sdk-java-journal-table-v2:
name: Run SDK-Java integration tests with Journal Table v2
permissions:
contents: read
issues: read
checks: write
pull-requests: write
actions: read
secrets: inherit
needs: docker
uses: restatedev/sdk-java/.github/workflows/integration.yaml@main
with:
restateCommit: ${{ github.event.pull_request.head.sha || github.sha }}
serviceImage: "ghcr.io/restatedev/test-services-java:main"
testArtifactOutput: sdk-java-integration-test-journal-table-v2
envVars: |
RESTATE_EXPERIMENTAL_FEATURE__USE_JOURNAL_V2_BY_DEFAULT=true

sdk-python:
name: Run SDK-Python integration tests
permissions:
Expand Down Expand Up @@ -226,6 +244,59 @@ jobs:
envVars: |
RESTATE_WORKER__INVOKER__experimental_features_allow_protocol_v6=true

e2e-enable-journal-table-v2:
name: Run E2E tests with Journal Table V2 feature
runs-on: warp-ubuntu-latest-x64-4x
permissions:
contents: read
issues: read
checks: write
pull-requests: write
actions: read
needs: docker
steps:
- name: Set up Docker containerd snapshotter
uses: crazy-max/ghaction-setup-docker@v3
with:
set-host: true
daemon-config: |
{
"features": {
"containerd-snapshotter": true
}
}

### Download the Restate container image, if needed
- name: Download restate snapshot from in-progress workflow
uses: actions/download-artifact@v4
with:
name: restate.tar
- name: Install restate snapshot
run: |
output=$(docker load --input restate.tar | head -n 1)
docker tag "${output#*: }" "localhost/restatedev/restate-commit-download:latest"
docker image ls -a

### Run e2e tests
- name: Run E2E tests
uses: restatedev/e2e@main
with:
testArtifactOutput: e2e-journal-table-v2-test-report
restateContainerImage: localhost/restatedev/restate-commit-download:latest
# Needed for backward compatibility tests 1.6 -> 1.5
envVars: |
RESTATE_WORKER__INVOKER__experimental_features_allow_protocol_v6=true
RESTATE_EXPERIMENTAL_FEATURE__USE_JOURNAL_V2_BY_DEFAULT=true
Comment on lines +286 to +289
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By enabling the journal v2 usage by default, aren't we giving up forward compatibility of v1.5? Why are we testing this case?

# Why these tests are disabled?
# In restate 1.5 the invoker storage reader will not handle correctly the case where there's no pinned deployment yet
# and the journal table used is v2. This doesn't show in the logs, but will simply hang badly the invocation task loop!
# These tests trigger this condition!
exclusions: |
exclusions:
"versionCompat":
- "dev.restate.sdktesting.tests.BackCompatibilityTest\\$OldVersion#proxyCallShouldBeDone"
- "dev.restate.sdktesting.tests.BackCompatibilityTest\\$OldVersion#proxyOneWayCallShouldBeDone"

jepsen:
if: github.event.repository.fork == false && github.event.pull_request.head.repo.full_name == 'restatedev/restate' && github.ref == 'refs/heads/main'
runs-on: warp-ubuntu-latest-arm64-4x
Expand Down
4 changes: 4 additions & 0 deletions crates/invoker-api/src/invocation_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ pub struct JournalMetadata {
/// and the max time difference between two replicas applying the journal append command.
pub last_modification_date: MillisSinceEpoch,
pub random_seed: u64,
/// If true, the entries are stored in journal table v2
pub using_journal_table_v2: bool,
}

impl JournalMetadata {
Expand All @@ -42,6 +44,7 @@ impl JournalMetadata {
invocation_epoch: InvocationEpoch,
last_modification_date: MillisSinceEpoch,
random_seed: u64,
using_journal_table_v2: bool,
) -> Self {
Self {
pinned_deployment,
Expand All @@ -50,6 +53,7 @@ impl JournalMetadata {
last_modification_date,
invocation_epoch,
random_seed,
using_journal_table_v2,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/invoker-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ pub mod test_util {
0,
MillisSinceEpoch::UNIX_EPOCH,
0,
true,
),
futures::stream::empty(),
)))
Expand Down
11 changes: 11 additions & 0 deletions crates/invoker-impl/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ pub(crate) enum InvokerError {
actual: InvocationEpoch,
expected: InvocationEpoch,
},
#[error(
"error when reading the journal: expected to read {expected} entries, but read only {expected}. This indicates a bug or a storage corruption."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"error when reading the journal: expected to read {expected} entries, but read only {expected}. This indicates a bug or a storage corruption."
"error when reading the journal: expected to read {expected} entries, but read only {actual}. This indicates a bug or a storage corruption."

)]
#[code(unknown)]
UnexpectedEntryCount { actual: u32, expected: u32 },

#[error(transparent)]
#[code(restate_errors::RT0010)]
Expand Down Expand Up @@ -172,6 +177,12 @@ pub(crate) enum InvokerError {
#[error("service is temporary unavailable '{0}'")]
#[code(restate_errors::RT0010)]
ServiceUnavailable(http::StatusCode),

#[error(
"service {0} is exposed by the deprecated deployment {1}, please upgrade the SDK used by the service."
)]
#[code(restate_errors::RT0020)]
DeploymentDeprecated(String, DeploymentId),
}

impl InvokerError {
Expand Down
10 changes: 10 additions & 0 deletions crates/invoker-impl/src/invocation_task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,16 @@ where
EagerState::<Empty<_>>::default().map(itertools::Either::Right)
};

if chosen_service_protocol_version < ServiceProtocolVersion::V4
&& journal_metadata.using_journal_table_v2
{
// We don't support migrating from journal v2 to journal v1!
shortcircuit!(Err(InvokerError::DeploymentDeprecated(
self.invocation_target.service_name().to_string(),
deployment.id
)));
}

// No need to read from Rocksdb anymore
drop(txn);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,13 @@ where

// Execute the replay
crate::shortcircuit!(
self.replay_loop(&mut http_stream_tx, &mut decoder_stream, journal_stream)
.await
self.replay_loop(
&mut http_stream_tx,
&mut decoder_stream,
journal_stream,
journal_metadata.length
)
.await
);

// If we have the invoker_rx and the protocol type is bidi stream,
Expand Down Expand Up @@ -305,13 +310,15 @@ where
http_stream_tx: &mut InvokerRequestStreamSender,
http_stream_rx: &mut S,
journal_stream: JournalStream,
expected_entries_count: u32,
) -> TerminalLoopState<()>
where
JournalStream: Stream<Item = JournalEntry> + Unpin,
S: Stream<Item = Result<DecoderStreamItem, InvokerError>> + Unpin,
{
let mut journal_stream = journal_stream.fuse();
let mut got_headers = false;
let mut sent_entries = 0;

loop {
tokio::select! {
Expand All @@ -334,10 +341,11 @@ where
opt_je = journal_stream.next() => {
match opt_je {
Some(JournalEntry::JournalV2(entry)) => {
sent_entries += 1;
crate::shortcircuit!(self.write_entry(http_stream_tx, entry.inner).await);

}
Some(JournalEntry::JournalV1(old_entry)) => {
sent_entries += 1;
if let journal::Entry::Input(input_entry) = crate::shortcircuit!(old_entry.deserialize_entry::<ProtobufRawEntryCodec>()) {
crate::shortcircuit!(self.write_entry(
http_stream_tx,
Expand All @@ -352,6 +360,14 @@ where
}
},
None => {
// Let's verify if we sent all the entries we promised, otherwise the stream will hang in a bad way!
if sent_entries < expected_entries_count {
return TerminalLoopState::Failed(InvokerError::UnexpectedEntryCount {
actual: sent_entries,
expected: expected_entries_count,
})
}
Comment on lines +363 to +369
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a situation that couldn't arise before and is now possible due to the changes of the PR or is this covering a case that could have happened before as well?


// No need to wait for the headers to continue
trace!("Finished to replay the journal");
return TerminalLoopState::Continue(())
Expand Down
66 changes: 33 additions & 33 deletions crates/worker/src/partition/invoker_storage_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use restate_storage_api::state_table::ReadStateTable;
use restate_storage_api::{IsolationLevel, journal_table as journal_table_v1, journal_table_v2};
use restate_types::identifiers::InvocationId;
use restate_types::identifiers::ServiceId;
use restate_types::service_protocol::ServiceProtocolVersion;
use std::vec::IntoIter;

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -81,40 +80,40 @@ where
.unwrap_or_else(|| invocation_id.to_random_seed());

if let InvocationStatus::Invoked(invoked_status) = invocation_status {
let (journal_metadata, journal_stream) = if invoked_status
.pinned_deployment
.as_ref()
.is_some_and(|p| p.service_protocol_version >= ServiceProtocolVersion::V4)
{
// If pinned service protocol version exists and >= V4, we need to read from Journal Table V2!
let entries = journal_table_v2::ReadJournalTable::get_journal(
&mut self.txn,
*invocation_id,
invoked_status.journal_metadata.length,
)?
.map(|entry| {
entry
.map_err(InvokerStorageReaderError::Storage)
.map(|(_, entry)| {
restate_invoker_api::invocation_reader::JournalEntry::JournalV2(entry)
})
})
// TODO: Update invoker to maintain transaction while reading the journal stream: See https://github.com/restatedev/restate/issues/275
// collecting the stream because we cannot keep the transaction open
.try_collect::<Vec<_>>()
.await?;

let journal_metadata = JournalMetadata::new(
invoked_status.journal_metadata.length,
invoked_status.journal_metadata.span_context,
invoked_status.pinned_deployment,
invoked_status.current_invocation_epoch,
invoked_status.timestamps.modification_time(),
random_seed,
);
// Try to read first from journal table v2
let entries = journal_table_v2::ReadJournalTable::get_journal(
&mut self.txn,
*invocation_id,
invoked_status.journal_metadata.length,
)?
.map(|entry| {
entry
.map_err(InvokerStorageReaderError::Storage)
.map(|(_, entry)| {
restate_invoker_api::invocation_reader::JournalEntry::JournalV2(entry)
})
})
// TODO: Update invoker to maintain transaction while reading the journal stream: See https://github.com/restatedev/restate/issues/275
// collecting the stream because we cannot keep the transaction open
.try_collect::<Vec<_>>()
.await?;

(journal_metadata, entries)
let (journal_metadata, journal_stream) = if !entries.is_empty() {
// We got the journal, good to go
(
JournalMetadata::new(
invoked_status.journal_metadata.length,
invoked_status.journal_metadata.span_context,
invoked_status.pinned_deployment,
invoked_status.current_invocation_epoch,
invoked_status.timestamps.modification_time(),
random_seed,
true,
),
entries,
)
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This branch will no longer be necessary once we have the "eager" migration from v1 to v2 on the version barrier, right?

// We didn't read a thing from journal table v2 -> we need to read journal v1
(
JournalMetadata::new(
// Use entries len here, because we might be filtering out events
Expand All @@ -124,6 +123,7 @@ where
invoked_status.current_invocation_epoch,
invoked_status.timestamps.modification_time(),
random_seed,
false,
),
journal_table_v1::ReadJournalTable::get_journal(
&mut self.txn,
Expand Down
11 changes: 9 additions & 2 deletions crates/worker/src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod shuffle;
mod state_machine;
pub mod types;

use std::env;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -66,7 +67,7 @@ use crate::metric_definitions::{
};
use crate::partition::invoker_storage_reader::InvokerStorageReader;
use crate::partition::leadership::LeadershipState;
use crate::partition::state_machine::{ActionCollector, StateMachine};
use crate::partition::state_machine::{ActionCollector, Feature, StateMachine};

/// Target leader state of the partition processor.
#[derive(Clone, Copy, Debug, Default, PartialEq)]
Expand Down Expand Up @@ -193,13 +194,19 @@ where
});
}

let mut features = EnumSet::new();
// TODO(till) enable this using partition processor version barrier
if env::var("RESTATE_EXPERIMENTAL_FEATURE__USE_JOURNAL_V2_BY_DEFAULT").is_ok() {
features.insert(Feature::UseJournalTableV2AsDefault);
}

let state_machine = StateMachine::new(
inbox_seq_number,
outbox_seq_number,
outbox_head_seq_number,
partition_store.partition_key_range().clone(),
min_restate_version,
EnumSet::empty(),
features,
schema,
);

Expand Down
Loading
Loading