Skip to content

Commit

Permalink
Make application services keep running across queries (linera-io#2216)
Browse files Browse the repository at this point in the history
* Store application service type in global variable

Keep it running while the Wasm module is running.

* Refactor to merge `impl` blocks

There's no need to have a separate `impl` item.

* Don't match on the single variant `enum`

Remove an unnecessary level of indentation.

* Restart the runtime before handling a query

Ensure that the query context is properly configured to handle the
query.

* Don't pass ownership of the actor endpoints

Allow them to be reused in other queries later.

* Keep service runtime actor running

But still reset the context before every query.

* Don't restart service runtime actor unless needed

Keep it running for as long as possible, so that the services also stay
running for as long as possible.

* Support any `ExecutionRuntimeContext` in test

Allow registering `MockApplication`s with contexts different from
`TestExecutionContext`.

* Allow asserting for no leaked expected calls

Prepare to test if all expected calls have been received.

* Test if a service instance can handle many queries

Ensure that a new service instance is not created for handling the
subsequent query.

* Test if new block restarts the service

Ensure that the service does not live longer than its context.

* Add a TODO for handling queries concurrently

Place a reminder in the `ChainWorkerActor` to implement a potential
performance improvement.
  • Loading branch information
jvff authored Jul 16, 2024
1 parent 1ad2dda commit d7c14c7
Show file tree
Hide file tree
Showing 13 changed files with 429 additions and 86 deletions.
6 changes: 4 additions & 2 deletions linera-chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,10 @@ where
&mut self,
local_time: Timestamp,
query: Query,
incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
incoming_execution_requests: &mut futures::channel::mpsc::UnboundedReceiver<
ExecutionRequest,
>,
runtime_request_sender: &mut std::sync::mpsc::Sender<ServiceRuntimeRequest>,
) -> Result<Response, ChainError> {
let context = QueryContext {
chain_id: self.chain_id(),
Expand Down
77 changes: 52 additions & 25 deletions linera-core/src/chain_worker/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{

use linera_base::{
crypto::CryptoHash,
data_types::{BlockHeight, HashedBlob},
data_types::{BlockHeight, HashedBlob, Timestamp},
identifiers::{BlobId, ChainId},
};
use linera_chain::{
Expand All @@ -21,13 +21,14 @@ use linera_chain::{
ChainStateView,
};
use linera_execution::{
Query, Response, ServiceSyncRuntime, UserApplicationDescription, UserApplicationId,
ExecutionRequest, Query, QueryContext, Response, ServiceRuntimeRequest, ServiceSyncRuntime,
UserApplicationDescription, UserApplicationId,
};
use linera_storage::Storage;
use linera_views::views::ViewError;
use tokio::{
sync::{mpsc, oneshot, OwnedRwLockReadGuard},
task::JoinSet,
task::{JoinHandle, JoinSet},
};
use tracing::{instrument, trace, warn};
#[cfg(with_testing)]
Expand Down Expand Up @@ -152,6 +153,7 @@ where
{
worker: ChainWorkerState<StorageClient>,
incoming_requests: mpsc::UnboundedReceiver<ChainWorkerRequest<StorageClient::Context>>,
service_runtime_thread: JoinHandle<()>,
}

impl<StorageClient> ChainWorkerActor<StorageClient>
Expand All @@ -170,26 +172,63 @@ where
join_set: &mut JoinSet<()>,
) -> Result<mpsc::UnboundedSender<ChainWorkerRequest<StorageClient::Context>>, WorkerError>
{
let (service_runtime_thread, execution_state_receiver, runtime_request_sender) =
Self::spawn_service_runtime_actor(chain_id);

let worker = ChainWorkerState::load(
config,
storage,
certificate_value_cache,
blob_cache,
chain_id,
execution_state_receiver,
runtime_request_sender,
)
.await?;
let (sender, receiver) = mpsc::unbounded_channel();

let (sender, receiver) = mpsc::unbounded_channel();
let actor = ChainWorkerActor {
worker,
incoming_requests: receiver,
service_runtime_thread,
};

join_set.spawn_task(actor.run(tracing::Span::current()));

Ok(sender)
}

/// Spawns a blocking task to execute the service runtime actor.
///
/// Returns the task handle and the endpoints to interact with the actor.
fn spawn_service_runtime_actor(
chain_id: ChainId,
) -> (
JoinHandle<()>,
futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
std::sync::mpsc::Sender<ServiceRuntimeRequest>,
) {
let context = QueryContext {
chain_id,
next_block_height: BlockHeight(0),
local_time: Timestamp::from(0),
};

let (execution_state_sender, execution_state_receiver) =
futures::channel::mpsc::unbounded();
let (runtime_request_sender, runtime_request_receiver) = std::sync::mpsc::channel();

let service_runtime_thread = tokio::task::spawn_blocking(move || {
ServiceSyncRuntime::new(execution_state_sender, context).run(runtime_request_receiver)
});

(
service_runtime_thread,
execution_state_receiver,
runtime_request_sender,
)
}

/// Runs the worker until there are no more incoming requests.
#[instrument(
name = "ChainWorkerActor",
Expand All @@ -201,6 +240,7 @@ where
trace!("Starting `ChainWorkerActor`");

while let Some(request) = self.incoming_requests.recv().await {
// TODO(#2237): Spawn concurrent tasks for read-only operations
trace!("Handling `ChainWorkerRequest`: {request:?}");

let responded = match request {
Expand All @@ -225,27 +265,9 @@ where
ChainWorkerRequest::GetChainStateView { callback } => {
callback.send(self.worker.chain_state_view().await).is_ok()
}
ChainWorkerRequest::QueryApplication { query, callback } => {
let (execution_state_sender, execution_state_receiver) =
futures::channel::mpsc::unbounded();
let (request_sender, request_receiver) = std::sync::mpsc::channel();
let context = self.worker.current_query_context();

let runtime_thread = tokio::task::spawn_blocking(move || {
ServiceSyncRuntime::new(execution_state_sender, context)
.run(request_receiver)
});

let response = self
.worker
.query_application(query, execution_state_receiver, request_sender)
.await;

runtime_thread
.await
.expect("Service runtime thread should not panic");
callback.send(response).is_ok()
}
ChainWorkerRequest::QueryApplication { query, callback } => callback
.send(self.worker.query_application(query).await)
.is_ok(),
#[cfg(with_testing)]
ChainWorkerRequest::ReadBytecodeLocation {
bytecode_id,
Expand Down Expand Up @@ -320,6 +342,11 @@ where
}
}

drop(self.worker);
self.service_runtime_thread
.await
.expect("Service runtime thread should not panic");

trace!("`ChainWorkerActor` finished");
}
}
Expand Down
16 changes: 9 additions & 7 deletions linera-core/src/chain_worker/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ where
storage: StorageClient,
chain: ChainStateView<StorageClient::Context>,
shared_chain_view: Option<Arc<RwLock<ChainStateView<StorageClient::Context>>>>,
execution_state_receiver: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
recent_hashed_certificate_values: Arc<ValueCache<CryptoHash, HashedCertificateValue>>,
recent_hashed_blobs: Arc<ValueCache<BlobId, HashedBlob>>,
knows_chain_is_active: bool,
Expand All @@ -76,6 +78,8 @@ where
certificate_value_cache: Arc<ValueCache<CryptoHash, HashedCertificateValue>>,
blob_cache: Arc<ValueCache<BlobId, HashedBlob>>,
chain_id: ChainId,
execution_state_receiver: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
) -> Result<Self, WorkerError> {
let chain = storage.load_chain(chain_id).await?;

Expand All @@ -84,6 +88,8 @@ where
storage,
chain,
shared_chain_view: None,
execution_state_receiver,
runtime_request_sender,
recent_hashed_certificate_values: certificate_value_cache,
recent_hashed_blobs: blob_cache,
knows_chain_is_active: false,
Expand Down Expand Up @@ -153,11 +159,9 @@ where
pub(super) async fn query_application(
&mut self,
query: Query,
incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
) -> Result<Response, WorkerError> {
ChainWorkerStateWithTemporaryChanges(self)
.query_application(query, incoming_execution_requests, runtime_request_sender)
.query_application(query)
.await
}

Expand Down Expand Up @@ -560,8 +564,6 @@ where
pub(super) async fn query_application(
&mut self,
query: Query,
incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
) -> Result<Response, WorkerError> {
self.0.ensure_is_active()?;
let local_time = self.0.storage.clock().current_time();
Expand All @@ -571,8 +573,8 @@ where
.query_application(
local_time,
query,
incoming_execution_requests,
runtime_request_sender,
&mut self.0.execution_state_receiver,
&mut self.0.runtime_request_sender,
)
.await?;
Ok(response)
Expand Down
Loading

0 comments on commit d7c14c7

Please sign in to comment.