From 84ff425d14ca866c636d0f556ebab2474f3c1e74 Mon Sep 17 00:00:00 2001 From: Andreas Fackler Date: Wed, 15 Jan 2025 12:21:44 +0100 Subject: [PATCH] Make the chain manager a `View`. (#3135) ## Motivation Currently the chain manager is serialized as a whole and stored in a `RegisterView` in the chain state view. Since it contains blobs it can be very large, and the blobs are not needed every time the chain manager is loaded. ## Proposal Make the chain manager a `View`, and put the blobs in a `MapView`. ## Test Plan This doesn't change any logic, so CI should catch regressions. (In fact, it already did: https://github.com/linera-io/linera-protocol/pull/3133) ## Release Plan - Nothing to do / These changes follow the usual release cycle. ## Links - In preparation for: https://github.com/linera-io/linera-protocol/issues/3048 - [reviewer checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist) --- linera-chain/src/chain.rs | 12 +- linera-chain/src/manager.rs | 354 +++++++++++------- .../chain_worker/state/attempted_changes.rs | 30 +- linera-core/src/chain_worker/state/mod.rs | 34 +- .../chain_worker/state/temporary_changes.rs | 5 +- linera-core/src/client/mod.rs | 7 +- linera-core/src/data_types.rs | 2 +- linera-core/src/local_node.rs | 13 +- linera-core/src/unit_tests/worker_tests.rs | 55 +-- linera-core/src/updater.rs | 6 +- .../gql/service_requests.graphql | 11 +- .../gql/service_schema.graphql | 71 +++- linera-service-graphql-client/src/service.rs | 6 +- linera-storage/src/lib.rs | 4 +- 14 files changed, 369 insertions(+), 241 deletions(-) diff --git a/linera-chain/src/chain.rs b/linera-chain/src/chain.rs index fc457a993a7..0d5fa35cd0b 100644 --- a/linera-chain/src/chain.rs +++ b/linera-chain/src/chain.rs @@ -195,7 +195,7 @@ where pub tip_state: RegisterView, /// Consensus state. - pub manager: RegisterView, + pub manager: ChainManager, /// Hashes of all certified blocks for this sender. /// This ends with `block_hash` and has length `usize::from(next_block_height)`. @@ -381,7 +381,7 @@ where /// Returns true if there are no more outgoing messages in flight up to the given /// block height. - pub fn all_messages_delivered_up_to(&mut self, height: BlockHeight) -> bool { + pub fn all_messages_delivered_up_to(&self, height: BlockHeight) -> bool { tracing::debug!( "Messages left in {:.8}'s outbox: {:?}", self.chain_id(), @@ -571,8 +571,8 @@ where self.execution_state_hash.set(Some(hash)); let maybe_committee = self.execution_state.system.current_committee().into_iter(); // Last, reset the consensus state based on the current ownership. - self.manager.get_mut().reset( - self.execution_state.system.ownership.get(), + self.manager.reset( + self.execution_state.system.ownership.get().clone(), BlockHeight(0), local_time, maybe_committee.flat_map(|(_, committee)| committee.keys_and_weights()), @@ -873,8 +873,8 @@ where self.execution_state_hash.set(Some(state_hash)); // Last, reset the consensus state based on the current ownership. let maybe_committee = self.execution_state.system.current_committee().into_iter(); - self.manager.get_mut().reset( - self.execution_state.system.ownership.get(), + self.manager.reset( + self.execution_state.system.ownership.get().clone(), block.height.try_add_one()?, local_time, maybe_committee.flat_map(|(_, committee)| committee.keys_and_weights()), diff --git a/linera-chain/src/manager.rs b/linera-chain/src/manager.rs index f2ee8435312..0c60343bfdd 100644 --- a/linera-chain/src/manager.rs +++ b/linera-chain/src/manager.rs @@ -70,17 +70,24 @@ use std::collections::BTreeMap; +use async_graphql::{ComplexObject, SimpleObject}; use custom_debug_derive::Debug; use futures::future::Either; use linera_base::{ crypto::{KeyPair, PublicKey}, data_types::{ArithmeticError, Blob, BlockHeight, Round, Timestamp}, - doc_scalar, ensure, + ensure, hashed::Hashed, identifiers::{BlobId, ChainId, Owner}, ownership::ChainOwnership, }; -use linera_execution::committee::Epoch; +use linera_execution::{committee::Epoch, ExecutionRuntimeContext}; +use linera_views::{ + context::Context, + map_view::MapView, + register_view::RegisterView, + views::{ClonableView, View, ViewError}, +}; use rand_chacha::{rand_core::SeedableRng, ChaCha8Rng}; use rand_distr::{Distribution, WeightedAliasIndex}; use serde::{Deserialize, Serialize}; @@ -99,85 +106,93 @@ pub enum Outcome { Skip, } +pub type ValidatedOrConfirmedVote<'a> = Either<&'a Vote, &'a Vote>; + /// The state of the certification process for a chain's next block. -#[derive(Default, Clone, Debug, Serialize, Deserialize)] -pub struct ChainManager { +#[derive(Debug, View, ClonableView, SimpleObject)] +#[graphql(complex)] +pub struct ChainManager +where + C: Clone + Context + Send + Sync + 'static, +{ /// The public keys, weights and types of the chain's owners. - pub ownership: ChainOwnership, + pub ownership: RegisterView, /// The seed for the pseudo-random number generator that determines the round leaders. - pub seed: u64, + pub seed: RegisterView, /// The probability distribution for choosing a round leader. - #[debug(skip_if = Option::is_none)] - pub distribution: Option>, + #[graphql(skip)] // Derived from ownership. + pub distribution: RegisterView>>, /// The probability distribution for choosing a fallback round leader. - #[debug(skip_if = Option::is_none)] - pub fallback_distribution: Option>, + #[graphql(skip)] // Derived from validator weights. + pub fallback_distribution: RegisterView>>, /// Highest-round authenticated block that we have received and checked. If there are multiple /// proposals in the same round, this contains only the first one. - #[debug(skip_if = Option::is_none)] - pub proposed: Option, + #[graphql(skip)] + pub proposed: RegisterView>, /// Latest validated proposal that we have voted to confirm (or would have, if we are not a /// validator). - #[debug(skip_if = Option::is_none)] - pub locked: Option, + #[graphql(skip)] + pub locked: RegisterView>, /// These are blobs published or read by the locked block. - #[debug(skip_if = BTreeMap::is_empty)] - pub locked_blobs: BTreeMap, + pub locked_blobs: MapView, /// Latest leader timeout certificate we have received. - #[debug(skip_if = Option::is_none)] - pub timeout: Option, + #[graphql(skip)] + pub timeout: RegisterView>, /// Latest vote we cast to confirm a block. - #[debug(skip_if = Option::is_none)] - pub confirmed_vote: Option>, + #[graphql(skip)] + pub confirmed_vote: RegisterView>>, /// Latest vote we cast to validate a block. - #[debug(skip_if = Option::is_none)] - pub validated_vote: Option>, + #[graphql(skip)] + pub validated_vote: RegisterView>>, /// Latest timeout vote we cast. - #[debug(skip_if = Option::is_none)] - pub timeout_vote: Option>, + #[graphql(skip)] + pub timeout_vote: RegisterView>>, /// Fallback vote we cast. - #[debug(skip_if = Option::is_none)] - pub fallback_vote: Option>, + #[graphql(skip)] + pub fallback_vote: RegisterView>>, /// The time after which we are ready to sign a timeout certificate for the current round. - #[debug(skip_if = Option::is_none)] - pub round_timeout: Option, + pub round_timeout: RegisterView>, /// The lowest round where we can still vote to validate or confirm a block. This is /// the round to which the timeout applies. /// /// Having a leader timeout certificate in any given round causes the next one to become /// current. Seeing a validated block certificate or a valid proposal in any round causes that /// round to become current, unless a higher one already is. - pub current_round: Round, + #[graphql(skip)] + pub current_round: RegisterView, /// The owners that take over in fallback mode. - #[debug(skip_if = BTreeMap::is_empty)] - pub fallback_owners: BTreeMap, + pub fallback_owners: RegisterView>, } -doc_scalar!( - ChainManager, - "The state of the certification process for a chain's next block" -); +#[ComplexObject] +impl ChainManager +where + C: Context + Clone + Send + Sync + 'static, +{ + /// Returns the lowest round where we can still vote to validate or confirm a block. This is + /// the round to which the timeout applies. + /// + /// Having a leader timeout certificate in any given round causes the next one to become + /// current. Seeing a validated block certificate or a valid proposal in any round causes that + /// round to become current, unless a higher one already is. + #[graphql(derived(name = "current_round"))] + async fn _current_round(&self) -> Round { + self.current_round() + } +} -impl ChainManager { +impl ChainManager +where + C: Context + Clone + Send + Sync + 'static, +{ /// Replaces `self` with a new chain manager. pub fn reset<'a>( &mut self, - ownership: &ChainOwnership, + ownership: ChainOwnership, height: BlockHeight, local_time: Timestamp, fallback_owners: impl Iterator + 'a, ) -> Result<(), ChainError> { - *self = ChainManager::new(ownership.clone(), height.0, local_time, fallback_owners)?; - Ok(()) - } - - /// Creates a new `ChainManager`, and starts the first round. - fn new<'a>( - ownership: ChainOwnership, - seed: u64, - local_time: Timestamp, - fallback_owners: impl Iterator + 'a, - ) -> Result { let distribution = if !ownership.owners.is_empty() { let weights = ownership .owners @@ -205,43 +220,45 @@ impl ChainManager { let round_duration = ownership.round_timeout(current_round); let round_timeout = round_duration.map(|rd| local_time.saturating_add(rd)); - Ok(ChainManager { - ownership, - seed, - distribution, - fallback_distribution, - proposed: None, - locked: None, - timeout: None, - confirmed_vote: None, - validated_vote: None, - timeout_vote: None, - fallback_vote: None, - round_timeout, - current_round, - fallback_owners, - locked_blobs: BTreeMap::new(), - }) + self.clear(); + self.seed.set(height.0); + self.ownership.set(ownership); + self.distribution.set(distribution); + self.fallback_distribution.set(fallback_distribution); + self.fallback_owners.set(fallback_owners); + self.current_round.set(current_round); + self.round_timeout.set(round_timeout); + Ok(()) } /// Returns the most recent confirmed vote we cast. pub fn confirmed_vote(&self) -> Option<&Vote> { - self.confirmed_vote.as_ref() + self.confirmed_vote.get().as_ref() } /// Returns the most recent validated vote we cast. pub fn validated_vote(&self) -> Option<&Vote> { - self.validated_vote.as_ref() + self.validated_vote.get().as_ref() } /// Returns the most recent timeout vote we cast. pub fn timeout_vote(&self) -> Option<&Vote> { - self.timeout_vote.as_ref() + self.timeout_vote.get().as_ref() } /// Returns the most recent fallback vote we cast. pub fn fallback_vote(&self) -> Option<&Vote> { - self.fallback_vote.as_ref() + self.fallback_vote.get().as_ref() + } + + /// Returns the lowest round where we can still vote to validate or confirm a block. This is + /// the round to which the timeout applies. + /// + /// Having a leader timeout certificate in any given round causes the next one to become + /// current. Seeing a validated block certificate or a valid proposal in any round causes that + /// round to become current, unless a higher one already is. + pub fn current_round(&self) -> Round { + *self.current_round.get() } /// Verifies the safety of a proposed block with respect to voting rules. @@ -256,12 +273,13 @@ impl ChainManager { ChainError::InvalidBlockHeight ); let expected_round = match &proposal.validated_block_certificate { - None => self.current_round, + None => self.current_round(), Some(cert) => self .ownership + .get() .next_round(cert.round) .ok_or_else(|| ChainError::ArithmeticError(ArithmeticError::Overflow))? - .max(self.current_round), + .max(self.current_round()), }; // In leader rotation mode, the round must equal the expected one exactly. // Only the first single-leader round can be entered at any time. @@ -278,7 +296,7 @@ impl ChainManager { ChainError::WrongRound(expected_round) ); } - if let Some(old_proposal) = &self.proposed { + if let Some(old_proposal) = self.proposed.get() { if old_proposal.content == proposal.content { return Ok(Outcome::Skip); // We already voted for this proposal; nothing to do. } @@ -300,7 +318,7 @@ impl ChainManager { } // If we have a locked block, the proposal must contain a certificate that validates the // proposed block and is no older than the locked block. - if let Some(locked) = &self.locked { + if let Some(locked) = self.locked.get() { ensure!( proposal .validated_block_certificate @@ -324,20 +342,21 @@ impl ChainManager { let Some(key_pair) = key_pair else { return false; // We are not a validator. }; - let Some(round_timeout) = self.round_timeout else { + let Some(round_timeout) = *self.round_timeout.get() else { return false; // The current round does not time out. }; - if local_time < round_timeout || self.ownership.owners.is_empty() { + if local_time < round_timeout || self.ownership.get().owners.is_empty() { return false; // Round has not timed out yet, or there are no regular owners. } - let current_round = self.current_round; - if let Some(vote) = &self.timeout_vote { + let current_round = self.current_round(); + if let Some(vote) = self.timeout_vote.get() { if vote.round == current_round { return false; // We already signed this timeout. } } let value = Hashed::new(Timeout::new(chain_id, height, epoch)); - self.timeout_vote = Some(Vote::new(value, current_round, key_pair)); + self.timeout_vote + .set(Some(Vote::new(value, current_round, key_pair))); true } @@ -355,12 +374,13 @@ impl ChainManager { let Some(key_pair) = key_pair else { return false; // We are not a validator. }; - if self.fallback_vote.is_some() || self.current_round >= Round::Validator(0) { + if self.fallback_vote.get().is_some() || self.current_round() >= Round::Validator(0) { return false; // We already signed this or are already in fallback mode. } let value = Hashed::new(Timeout::new(chain_id, height, epoch)); let last_regular_round = Round::SingleLeader(u32::MAX); - self.fallback_vote = Some(Vote::new(value, last_regular_round, key_pair)); + self.fallback_vote + .set(Some(Vote::new(value, last_regular_round, key_pair))); true } @@ -371,20 +391,20 @@ impl ChainManager { ) -> Result { let new_block = &certificate.executed_block().block; let new_round = certificate.round; - if let Some(Vote { value, round, .. }) = &self.confirmed_vote { + if let Some(Vote { value, round, .. }) = self.confirmed_vote.get() { if value.inner().executed_block().block == *new_block && *round == new_round { return Ok(Outcome::Skip); // We already voted to confirm this block. } } - if let Some(Vote { round, .. }) = &self.validated_vote { + if let Some(Vote { round, .. }) = self.validated_vote.get() { ensure!(new_round >= *round, ChainError::InsufficientRound(*round)) } // We don't compare to `current_round` here: Non-validators must update their locked block // even if it is older than the current round. Validators will only sign in the current // round, though. (See `create_final_vote` below.) - if let Some(locked) = &self.locked { + if let Some(locked) = self.locked.get() { if locked.hash() == certificate.hash() && locked.round == certificate.round { return Ok(Outcome::Skip); } @@ -404,47 +424,51 @@ impl ChainManager { key_pair: Option<&KeyPair>, local_time: Timestamp, blobs: BTreeMap, - ) -> Option, &Vote>> { + ) -> Result, ViewError> { let round = proposal.content.round; // If the validated block certificate is more recent, update our locked block. if let Some(lite_cert) = &proposal.validated_block_certificate { if self .locked + .get() .as_ref() .map_or(true, |locked| locked.round < lite_cert.round) { let value = Hashed::new(ValidatedBlock::new(executed_block.clone())); if let Some(certificate) = lite_cert.clone().with_value(value) { - self.locked = Some(certificate); - self.locked_blobs = blobs; + self.set_locked(certificate, blobs)?; } } } // Record the proposed block, so it can be supplied to clients that request it. - self.proposed = Some(proposal); + self.proposed.set(Some(proposal)); self.update_current_round(local_time); if let Some(key_pair) = key_pair { // If this is a fast block, vote to confirm. Otherwise vote to validate. if round.is_fast() { - self.validated_vote = None; - Some(Either::Right(self.confirmed_vote.insert(Vote::new( - Hashed::new(ConfirmedBlock::new(executed_block)), - round, - key_pair, + self.validated_vote.set(None); + Ok(Some(Either::Right(self.confirmed_vote.get_mut().insert( + Vote::new( + Hashed::new(ConfirmedBlock::new(executed_block)), + round, + key_pair, + ), )))) } else { - self.confirmed_vote = None; - Some(Either::Left(&*self.validated_vote.insert(Vote::new( - Hashed::new(ValidatedBlock::new(executed_block)), - round, - key_pair, + self.confirmed_vote.set(None); + Ok(Some(Either::Left(&*self.validated_vote.get_mut().insert( + Vote::new( + Hashed::new(ValidatedBlock::new(executed_block)), + round, + key_pair, + ), )))) } } else { - None + Ok(None) } } @@ -455,24 +479,34 @@ impl ChainManager { key_pair: Option<&KeyPair>, local_time: Timestamp, blobs: BTreeMap, - ) { + ) -> Result<(), ViewError> { let round = validated.round; // Validators only change their locked block if the new one is included in a proposal in the // current round, or it is itself in the current round. - if key_pair.is_some() && round < self.current_round { - return; + if key_pair.is_some() && round < self.current_round() { + return Ok(()); } let confirmed_block = ConfirmedBlock::new(validated.inner().executed_block().clone()); - self.locked = Some(validated); - self.locked_blobs = blobs; + self.set_locked(validated, blobs)?; self.update_current_round(local_time); if let Some(key_pair) = key_pair { // Vote to confirm. let vote = Vote::new(Hashed::new(confirmed_block), round, key_pair); // Ok to overwrite validation votes with confirmation votes at equal or higher round. - self.confirmed_vote = Some(vote); - self.validated_vote = None; + self.confirmed_vote.set(Some(vote)); + self.validated_vote.set(None); } + Ok(()) + } + + /// Returns the requested blob if it belongs to the proposal or the locked block. + pub async fn pending_blob(&self, blob_id: &BlobId) -> Result, ViewError> { + if let Some(proposal) = self.proposed.get() { + if let Some(blob) = proposal.blobs.iter().find(|blob| blob.id() == *blob_id) { + return Ok(Some(blob.clone())); + } + } + self.locked_blobs.get(blob_id).await } /// Updates `current_round` and `round_timeout` if necessary. @@ -481,23 +515,36 @@ impl ChainManager { fn update_current_round(&mut self, local_time: Timestamp) { let current_round = self .timeout + .get() .iter() .map(|certificate| { self.ownership + .get() .next_round(certificate.round) .unwrap_or(Round::Validator(u32::MAX)) }) - .chain(self.locked.iter().map(|certificate| certificate.round)) - .chain(self.proposed.iter().map(|proposal| proposal.content.round)) + .chain( + self.locked + .get() + .iter() + .map(|certificate| certificate.round), + ) + .chain( + self.proposed + .get() + .iter() + .map(|proposal| proposal.content.round), + ) .max() .unwrap_or_default() - .max(self.ownership.first_round()); - if current_round <= self.current_round { + .max(self.ownership.get().first_round()); + if current_round <= self.current_round() { return; } - let round_duration = self.ownership.round_timeout(current_round); - self.round_timeout = round_duration.map(|rd| local_time.saturating_add(rd)); - self.current_round = current_round; + let round_duration = self.ownership.get().round_timeout(current_round); + self.round_timeout + .set(round_duration.map(|rd| local_time.saturating_add(rd))); + self.current_round.set(current_round); } /// Updates the round number and timer if the timeout certificate is from a higher round than @@ -508,19 +555,19 @@ impl ChainManager { local_time: Timestamp, ) { let round = certificate.round; - if let Some(known_certificate) = &self.timeout { + if let Some(known_certificate) = self.timeout.get() { if known_certificate.round >= round { return; } } - self.timeout = Some(certificate); + self.timeout.set(Some(certificate)); self.update_current_round(local_time); } /// Returns the public key of the block proposal's signer, if they are a valid owner and allowed /// to propose a block in the proposal's round. pub fn verify_owner(&self, proposal: &BlockProposal) -> Option { - if let Some(public_key) = self.ownership.super_owners.get(&proposal.owner) { + if let Some(public_key) = self.ownership.get().super_owners.get(&proposal.owner) { return Some(*public_key); } match proposal.content.round { @@ -530,18 +577,19 @@ impl ChainManager { Round::MultiLeader(_) => { // Not in leader rotation mode; any owner is allowed to propose. self.ownership + .get() .owners .get(&proposal.owner) .map(|(public_key, _)| *public_key) } Round::SingleLeader(r) => { let index = self.round_leader_index(r)?; - let (leader, (public_key, _)) = self.ownership.owners.iter().nth(index)?; + let (leader, (public_key, _)) = self.ownership.get().owners.iter().nth(index)?; (*leader == proposal.owner).then_some(*public_key) } Round::Validator(r) => { let index = self.fallback_round_leader_index(r)?; - let (leader, (public_key, _)) = self.fallback_owners.iter().nth(index)?; + let (leader, (public_key, _)) = self.fallback_owners.get().iter().nth(index)?; (*leader == proposal.owner).then_some(*public_key) } } @@ -553,11 +601,11 @@ impl ChainManager { match round { Round::SingleLeader(r) => { let index = self.round_leader_index(r)?; - self.ownership.owners.keys().nth(index) + self.ownership.get().owners.keys().nth(index) } Round::Validator(r) => { let index = self.fallback_round_leader_index(r)?; - self.fallback_owners.keys().nth(index) + self.fallback_owners.get().keys().nth(index) } Round::Fast | Round::MultiLeader(_) => None, } @@ -565,22 +613,40 @@ impl ChainManager { /// Returns the index of the leader who is allowed to propose a block in the given round. fn round_leader_index(&self, round: u32) -> Option { - let seed = u64::from(round).rotate_left(32).wrapping_add(self.seed); + let seed = u64::from(round) + .rotate_left(32) + .wrapping_add(*self.seed.get()); let mut rng = ChaCha8Rng::seed_from_u64(seed); - Some(self.distribution.as_ref()?.sample(&mut rng)) + Some(self.distribution.get().as_ref()?.sample(&mut rng)) } /// Returns the index of the fallback leader who is allowed to propose a block in the given /// round. fn fallback_round_leader_index(&self, round: u32) -> Option { - let seed = u64::from(round).rotate_left(32).wrapping_add(self.seed); + let seed = u64::from(round) + .rotate_left(32) + .wrapping_add(*self.seed.get()); let mut rng = ChaCha8Rng::seed_from_u64(seed); - Some(self.fallback_distribution.as_ref()?.sample(&mut rng)) + Some(self.fallback_distribution.get().as_ref()?.sample(&mut rng)) } /// Returns whether the owner is a super owner. fn is_super(&self, owner: &Owner) -> bool { - self.ownership.super_owners.contains_key(owner) + self.ownership.get().super_owners.contains_key(owner) + } + + /// Sets the locked block and the associated blobs. + fn set_locked( + &mut self, + certificate: ValidatedBlockCertificate, + blobs: BTreeMap, + ) -> Result<(), ViewError> { + self.locked.set(Some(certificate)); + self.locked_blobs.clear(); + for (blob_id, blob) in blobs { + self.locked_blobs.insert(&blob_id, blob)?; + } + Ok(()) } } @@ -626,42 +692,58 @@ pub struct ChainManagerInfo { pub round_timeout: Option, } -impl From<&ChainManager> for ChainManagerInfo { - fn from(manager: &ChainManager) -> Self { - let current_round = manager.current_round; +impl From<&ChainManager> for ChainManagerInfo +where + C: Context + Clone + Send + Sync + 'static, +{ + fn from(manager: &ChainManager) -> Self { + let current_round = manager.current_round(); let pending = manager .confirmed_vote + .get() .as_ref() .map(|vote| vote.lite()) - .or_else(move || manager.validated_vote.as_ref().map(|vote| vote.lite())); + .or_else(move || { + manager + .validated_vote + .get() + .as_ref() + .map(|vote| vote.lite()) + }); ChainManagerInfo { - ownership: manager.ownership.clone(), + ownership: manager.ownership.get().clone(), requested_proposed: None, requested_locked: None, - timeout: manager.timeout.clone().map(Box::new), + timeout: manager.timeout.get().clone().map(Box::new), pending, - timeout_vote: manager.timeout_vote.as_ref().map(Vote::lite), - fallback_vote: manager.fallback_vote.as_ref().map(Vote::lite), + timeout_vote: manager.timeout_vote.get().as_ref().map(Vote::lite), + fallback_vote: manager.fallback_vote.get().as_ref().map(Vote::lite), requested_confirmed: None, requested_validated: None, current_round, leader: manager.round_leader(current_round).cloned(), - round_timeout: manager.round_timeout, + round_timeout: *manager.round_timeout.get(), } } } impl ChainManagerInfo { /// Adds requested certificate values and proposals to the `ChainManagerInfo`. - pub fn add_values(&mut self, manager: &ChainManager) { - self.requested_proposed = manager.proposed.clone().map(Box::new); - self.requested_locked = manager.locked.clone().map(Box::new); + pub fn add_values(&mut self, manager: &ChainManager) + where + C: Context + Clone + Send + Sync + 'static, + C::Extra: ExecutionRuntimeContext, + { + self.requested_proposed = manager.proposed.get().clone().map(Box::new); + self.requested_locked = manager.locked.get().clone().map(Box::new); self.requested_confirmed = manager .confirmed_vote + .get() .as_ref() .map(|vote| Box::new(vote.value.clone())); self.requested_validated = manager .validated_vote + .get() .as_ref() .map(|vote| Box::new(vote.value.clone())); } diff --git a/linera-core/src/chain_worker/state/attempted_changes.rs b/linera-core/src/chain_worker/state/attempted_changes.rs index 307913b1f9f..98a1bb7feea 100644 --- a/linera-core/src/chain_worker/state/attempted_changes.rs +++ b/linera-core/src/chain_worker/state/attempted_changes.rs @@ -100,15 +100,14 @@ where actions, )); } - let old_round = self.state.chain.manager.get().current_round; + let old_round = self.state.chain.manager.current_round(); let timeout_chainid = certificate.inner().chain_id; let timeout_height = certificate.inner().height; self.state .chain .manager - .get_mut() .handle_timeout_certificate(certificate, self.state.storage.clock().current_time()); - let round = self.state.chain.manager.get().current_round; + let round = self.state.chain.manager.current_round(); if round > old_round { actions.notifications.push(Notification { chain_id: timeout_chainid, @@ -140,8 +139,8 @@ where BTreeMap::new() }; let key_pair = self.state.config.key_pair(); - let manager = self.state.chain.manager.get_mut(); - match manager.create_vote(proposal, executed_block, key_pair, local_time, blobs) { + let manager = &mut self.state.chain.manager; + match manager.create_vote(proposal, executed_block, key_pair, local_time, blobs)? { // Cache the value we voted on, so the client doesn't have to send it again. Some(Either::Left(vote)) => { self.state @@ -194,7 +193,6 @@ where self.state .chain .manager - .get() .check_validated_block(&certificate) .map(|outcome| outcome == manager::Outcome::Skip) }; @@ -216,16 +214,16 @@ where self.state .check_for_unneeded_blobs(&required_blob_ids, blobs)?; let blobs = self.state.get_required_blobs(executed_block, blobs).await?; - let old_round = self.state.chain.manager.get().current_round; - self.state.chain.manager.get_mut().create_final_vote( + let old_round = self.state.chain.manager.current_round(); + self.state.chain.manager.create_final_vote( certificate, self.state.config.key_pair(), self.state.storage.clock().current_time(), blobs, - ); + )?; let info = ChainInfoResponse::new(&self.state.chain, self.state.config.key_pair()); self.save().await?; - let round = self.state.chain.manager.get().current_round; + let round = self.state.chain.manager.current_round(); if round > old_round { actions.notifications.push(Notification { chain_id: self.state.chain_id(), @@ -523,8 +521,10 @@ where let height = chain.tip_state.get().next_block_height; let key_pair = self.state.config.key_pair(); let local_time = self.state.storage.clock().current_time(); - let manager = chain.manager.get_mut(); - if manager.vote_timeout(chain_id, height, *epoch, key_pair, local_time) { + if chain + .manager + .vote_timeout(chain_id, height, *epoch, key_pair, local_time) + { self.save().await?; } } @@ -549,8 +549,10 @@ where let chain_id = chain.chain_id(); let height = chain.tip_state.get().next_block_height; let key_pair = self.state.config.key_pair(); - let manager = chain.manager.get_mut(); - if manager.vote_fallback(chain_id, height, *epoch, key_pair) { + if chain + .manager + .vote_fallback(chain_id, height, *epoch, key_pair) + { self.save().await?; } } diff --git a/linera-core/src/chain_worker/state/mod.rs b/linera-core/src/chain_worker/state/mod.rs index abc0e08cb98..b6c7889dce7 100644 --- a/linera-core/src/chain_worker/state/mod.rs +++ b/linera-core/src/chain_worker/state/mod.rs @@ -296,14 +296,8 @@ where /// Returns the requested blob, if it belongs to the current locked block or pending proposal. pub(super) async fn download_pending_blob(&self, blob_id: BlobId) -> Result { - let manager = self.chain.manager.get(); - manager - .proposed - .as_ref() - .and_then(|proposal| proposal.blobs.iter().find(|blob| blob.id() == blob_id)) - .or_else(|| manager.locked_blobs.get(&blob_id)) - .cloned() - .ok_or_else(|| WorkerError::BlobsNotFound(vec![blob_id])) + let maybe_blob = self.chain.manager.pending_blob(&blob_id).await?; + maybe_blob.ok_or_else(|| WorkerError::BlobsNotFound(vec![blob_id])) } /// Ensures that the current chain is active, returning an error otherwise. @@ -341,29 +335,21 @@ where blobs: &[Blob], ) -> Result, WorkerError> { let mut blob_ids = executed_block.required_blob_ids(); - let manager = self.chain.manager.get(); - let mut found_blobs = BTreeMap::new(); - for blob in manager - .proposed - .iter() - .flat_map(|proposal| &proposal.blobs) - .chain(blobs) - { + for blob in blobs { if blob_ids.remove(&blob.id()) { found_blobs.insert(blob.id(), blob.clone()); } } - blob_ids.retain(|blob_id| { - if let Some(blob) = manager.locked_blobs.get(blob_id) { - found_blobs.insert(*blob_id, blob.clone()); - false + let mut missing_blob_ids = Vec::new(); + for blob_id in blob_ids { + if let Some(blob) = self.chain.manager.pending_blob(&blob_id).await? { + found_blobs.insert(blob_id, blob); } else { - true + missing_blob_ids.push(blob_id); } - }); - let missing_blob_ids = blob_ids.into_iter().collect::>(); + } let blobs_from_storage = self.storage.read_blobs(&missing_blob_ids).await?; let mut not_found_blob_ids = Vec::new(); for (blob_id, maybe_blob) in missing_blob_ids.into_iter().zip(blobs_from_storage) { @@ -507,7 +493,7 @@ where /// Returns true if there are no more outgoing messages in flight up to the given /// block height. pub async fn all_messages_to_tracked_chains_delivered_up_to( - &mut self, + &self, height: BlockHeight, ) -> Result { if self.chain.all_messages_delivered_up_to(height) { diff --git a/linera-core/src/chain_worker/state/temporary_changes.rs b/linera-core/src/chain_worker/state/temporary_changes.rs index 5b166729e15..554aad3cb33 100644 --- a/linera-core/src/chain_worker/state/temporary_changes.rs +++ b/linera-core/src/chain_worker/state/temporary_changes.rs @@ -191,7 +191,6 @@ where .0 .chain .manager - .get() .verify_owner(proposal) .ok_or(WorkerError::InvalidOwner)?; proposal.check_signature(public_key)?; @@ -205,7 +204,7 @@ where // Check if the chain is ready for this new block proposal. // This should always pass for nodes without voting key. self.0.chain.tip_state.get().verify_block_chaining(block)?; - if self.0.chain.manager.get().check_proposed_block(proposal)? == manager::Outcome::Skip { + if self.0.chain.manager.check_proposed_block(proposal)? == manager::Outcome::Skip { return Ok(None); } // Update the inboxes so that we can verify the provided hashed certificate values are @@ -340,7 +339,7 @@ where info.requested_received_log = chain.received_log.read(start..).await?; } if query.request_manager_values { - info.manager.add_values(chain.manager.get()); + info.manager.add_values(&chain.manager); } Ok(ChainInfoResponse::new(info, self.0.config.key_pair())) } diff --git a/linera-core/src/client/mod.rs b/linera-core/src/client/mod.rs index 30bc6f69789..ec400e64376 100644 --- a/linera-core/src/client/mod.rs +++ b/linera-core/src/client/mod.rs @@ -1955,12 +1955,7 @@ where let maybe_blob = { let chain_state_view = self.chain_state_view().await?; - chain_state_view - .manager - .get() - .locked_blobs - .get(&blob_id) - .cloned() + chain_state_view.manager.locked_blobs.get(&blob_id).await? }; if let Some(blob) = maybe_blob { diff --git a/linera-core/src/data_types.rs b/linera-core/src/data_types.rs index 9f2a5507148..87e1771942b 100644 --- a/linera-core/src/data_types.rs +++ b/linera-core/src/data_types.rs @@ -275,7 +275,7 @@ where chain_id: view.chain_id(), epoch: *system_state.epoch.get(), description: *system_state.description.get(), - manager: Box::new(ChainManagerInfo::from(view.manager.get())), + manager: Box::new(ChainManagerInfo::from(&view.manager)), chain_balance: *system_state.balance.get(), block_hash: tip_state.block_hash, next_block_height: tip_state.next_block_height, diff --git a/linera-core/src/local_node.rs b/linera-core/src/local_node.rs index 233ff636dd2..0334d79cad5 100644 --- a/linera-core/src/local_node.rs +++ b/linera-core/src/local_node.rs @@ -203,11 +203,14 @@ where chain_id: ChainId, ) -> Result>, LocalNodeError> { let chain = self.chain_state_view(chain_id).await?; - let manager = chain.manager.get(); - Ok(blob_ids - .iter() - .map(|blob_id| manager.locked_blobs.get(blob_id).cloned()) - .collect()) + let mut blobs = Vec::new(); + for blob_id in blob_ids { + match chain.manager.locked_blobs.get(blob_id).await? { + None => return Ok(None), + Some(blob) => blobs.push(blob), + } + } + Ok(Some(blobs)) } /// Writes the given blobs to storage if there is an appropriate blob state. diff --git a/linera-core/src/unit_tests/worker_tests.rs b/linera-core/src/unit_tests/worker_tests.rs index f501357aabb..4b1b5fbe409 100644 --- a/linera-core/src/unit_tests/worker_tests.rs +++ b/linera-core/src/unit_tests/worker_tests.rs @@ -454,8 +454,8 @@ where ); let chain = worker.chain_state_view(ChainId::root(1)).await?; assert!(chain.is_active()); - assert!(chain.manager.get().confirmed_vote().is_none()); - assert!(chain.manager.get().validated_vote().is_none()); + assert!(chain.manager.confirmed_vote().is_none()); + assert!(chain.manager.validated_vote().is_none()); Ok(()) } @@ -504,8 +504,8 @@ where ); let chain = worker.chain_state_view(ChainId::root(1)).await?; assert!(chain.is_active()); - assert!(chain.manager.get().confirmed_vote().is_none()); - assert!(chain.manager.get().validated_vote().is_none()); + assert!(chain.manager.confirmed_vote().is_none()); + assert!(chain.manager.validated_vote().is_none()); Ok(()) } @@ -617,8 +617,8 @@ where ); let chain = worker.chain_state_view(ChainId::root(1)).await?; assert!(chain.is_active()); - assert!(chain.manager.get().confirmed_vote().is_none()); - assert!(chain.manager.get().validated_vote().is_none()); + assert!(chain.manager.confirmed_vote().is_none()); + assert!(chain.manager.validated_vote().is_none()); Ok(()) } @@ -672,8 +672,8 @@ where ); let chain = worker.chain_state_view(ChainId::root(1)).await?; assert!(chain.is_active()); - assert!(chain.manager.get().confirmed_vote().is_none()); - assert!(chain.manager.get().validated_vote().is_none()); + assert!(chain.manager.confirmed_vote().is_none()); + assert!(chain.manager.validated_vote().is_none()); drop(chain); worker @@ -684,7 +684,6 @@ where assert_eq!( &chain .manager - .get() .validated_vote() .unwrap() .value() @@ -693,17 +692,11 @@ where .block, &block_proposal0.content.block ); // Multi-leader round - it's not confirmed yet. - assert!(chain.manager.get().confirmed_vote().is_none()); + assert!(chain.manager.confirmed_vote().is_none()); let block_certificate0 = make_certificate( &committee, &worker, - chain - .manager - .get() - .validated_vote() - .unwrap() - .value() - .clone(), + chain.manager.validated_vote().unwrap().value().clone(), ); drop(chain); @@ -715,7 +708,6 @@ where assert_eq!( &chain .manager - .get() .confirmed_vote() .unwrap() .value() @@ -724,7 +716,7 @@ where .block, &block_proposal0.content.block ); // Should be confirmed after handling the certificate. - assert!(chain.manager.get().validated_vote().is_none()); + assert!(chain.manager.validated_vote().is_none()); drop(chain); worker @@ -742,7 +734,6 @@ where assert_eq!( &chain .manager - .get() .validated_vote() .unwrap() .value() @@ -751,7 +742,7 @@ where .block, &block_proposal1.content.block ); - assert!(chain.manager.get().confirmed_vote().is_none()); + assert!(chain.manager.confirmed_vote().is_none()); drop(chain); assert_matches!( worker.handle_block_proposal(block_proposal0).await, @@ -1166,8 +1157,8 @@ where ); let chain = worker.chain_state_view(ChainId::root(1)).await?; assert!(chain.is_active()); - assert!(chain.manager.get().confirmed_vote().is_none()); - assert!(chain.manager.get().validated_vote().is_none()); + assert!(chain.manager.confirmed_vote().is_none()); + assert!(chain.manager.validated_vote().is_none()); Ok(()) } @@ -1199,18 +1190,12 @@ where chain_info_response.check(&ValidatorName(worker.public_key()))?; let chain = worker.chain_state_view(ChainId::root(1)).await?; assert!(chain.is_active()); - assert!(chain.manager.get().confirmed_vote().is_none()); // It was a multi-leader - // round. + assert!(chain.manager.confirmed_vote().is_none()); // It was a multi-leader + // round. let validated_certificate = make_certificate( &committee, &worker, - chain - .manager - .get() - .validated_vote() - .unwrap() - .value() - .clone(), + chain.manager.validated_vote().unwrap().value().clone(), ); drop(chain); @@ -1220,8 +1205,8 @@ where chain_info_response.check(&ValidatorName(worker.public_key()))?; let chain = worker.chain_state_view(ChainId::root(1)).await?; assert!(chain.is_active()); - assert!(chain.manager.get().validated_vote().is_none()); // Should be confirmed by now. - let pending_vote = chain.manager.get().confirmed_vote().unwrap().lite(); + assert!(chain.manager.validated_vote().is_none()); // Should be confirmed by now. + let pending_vote = chain.manager.confirmed_vote().unwrap().lite(); assert_eq!( chain_info_response.info.manager.pending.unwrap(), pending_vote @@ -2017,7 +2002,7 @@ where *recipient_chain.execution_state.system.balance.get(), Amount::from_tokens(4) ); - let ownership = &recipient_chain.manager.get().ownership; + let ownership = &recipient_chain.manager.ownership.get(); assert!( ownership .owners diff --git a/linera-core/src/updater.rs b/linera-core/src/updater.rs index 2c3f74c7482..91ea3df8563 100644 --- a/linera-core/src/updater.rs +++ b/linera-core/src/updater.rs @@ -338,11 +338,11 @@ where // Obtain the missing blocks and the manager state from the local node. let range: Range = initial_block_height.try_into()?..target_block_height.try_into()?; - let (keys, manager) = { + let (keys, timeout) = { let chain = self.local_node.chain_state_view(chain_id).await?; ( chain.confirmed_log.read(range).await?, - chain.manager.get().clone(), + chain.manager.timeout.get().clone(), ) }; if !keys.is_empty() { @@ -353,7 +353,7 @@ where self.send_confirmed_certificate(cert, delivery).await?; } } - if let Some(cert) = manager.timeout { + if let Some(cert) = timeout { if cert.inner().chain_id == chain_id { // Timeouts are small and don't have blobs, so we can call `handle_certificate` // directly. diff --git a/linera-service-graphql-client/gql/service_requests.graphql b/linera-service-graphql-client/gql/service_requests.graphql index 05da43f6298..965026a596a 100644 --- a/linera-service-graphql-client/gql/service_requests.graphql +++ b/linera-service-graphql-client/gql/service_requests.graphql @@ -116,7 +116,16 @@ query Chain( blockHash nextBlockHeight } - manager + manager { + ownership + seed + lockedBlobs { + keys + } + roundTimeout + fallbackOwners + currentRound + } confirmedLog { entries } diff --git a/linera-service-graphql-client/gql/service_schema.graphql b/linera-service-graphql-client/gql/service_schema.graphql index b93fcb874f1..0ecd2f5322f 100644 --- a/linera-service-graphql-client/gql/service_schema.graphql +++ b/linera-service-graphql-client/gql/service_schema.graphql @@ -45,6 +45,16 @@ input ApplicationPermissions { closeChain: [ApplicationId!]! = [] } +""" +A blob of binary data, with its content-addressed blob ID. +""" +scalar Blob + +""" +A content-addressed blob ID i.e. the hash of the `BlobContent` +""" +scalar BlobId + """ A block containing operations to apply on a given chain, as well as the acknowledgment of a number of incoming messages from other chains. @@ -166,9 +176,39 @@ The unique identifier (UID) of a chain. This is currently computed as the hash v scalar ChainId """ -The state of the certification process for a chain's next block +The state of the certification process for a chain's next block. """ -scalar ChainManager +type ChainManager { + """ + The public keys, weights and types of the chain's owners. + """ + ownership: ChainOwnership! + """ + The seed for the pseudo-random number generator that determines the round leaders. + """ + seed: Int! + """ + These are blobs published or read by the locked block. + """ + lockedBlobs: MapView_BlobId_Blob_3711e760! + """ + The time after which we are ready to sign a timeout certificate for the current round. + """ + roundTimeout: Timestamp + """ + The owners that take over in fallback mode. + """ + fallbackOwners: JSONObject! + """ + Returns the lowest round where we can still vote to validate or confirm a block. This is + the round to which the timeout applies. + + Having a leader timeout certificate in any given round causes the next one to become + current. Seeing a validated block certificate or a valid proposal in any round causes that + round to become current, unless a higher one already is. + """ + currentRound: Round! +} """ Represents the owner(s) of a chain @@ -361,6 +401,14 @@ type Entry_AccountOwner_Amount_aaf96548 { value: Amount } +""" +A GraphQL-visible map item, complete with key. +""" +type Entry_BlobId_Blob_9f0b41f3 { + key: BlobId! + value: Blob +} + """ A GraphQL-visible map item, complete with key. """ @@ -506,6 +554,10 @@ input MapFilters_AccountOwner_d6668c53 { keys: [AccountOwner!] } +input MapFilters_BlobId_4d2a0555 { + keys: [BlobId!] +} + input MapFilters_ChannelFullName_3b59bf69 { keys: [ChannelFullName!] } @@ -522,6 +574,10 @@ input MapInput_AccountOwner_d6668c53 { filters: MapFilters_AccountOwner_d6668c53 } +input MapInput_BlobId_4d2a0555 { + filters: MapFilters_BlobId_4d2a0555 +} + input MapInput_ChannelFullName_3b59bf69 { filters: MapFilters_ChannelFullName_3b59bf69 } @@ -540,6 +596,12 @@ type MapView_AccountOwner_Amount_11ef1379 { entries(input: MapInput_AccountOwner_d6668c53): [Entry_AccountOwner_Amount_aaf96548!]! } +type MapView_BlobId_Blob_3711e760 { + keys(count: Int): [BlobId!]! + entry(key: BlobId!): Entry_BlobId_Blob_9f0b41f3! + entries(input: MapInput_BlobId_4d2a0555): [Entry_BlobId_Blob_9f0b41f3!]! +} + """ An message to be sent and possibly executed in the receiver's block. """ @@ -941,6 +1003,11 @@ input ResourceControlPolicy { maximumBytesWrittenPerBlock: Int! } +""" +A number to identify successive attempts to decide a value in a consensus protocol. +""" +scalar Round + """ An event stream ID. """ diff --git a/linera-service-graphql-client/src/service.rs b/linera-service-graphql-client/src/service.rs index a250adcb957..058d03671b5 100644 --- a/linera-service-graphql-client/src/service.rs +++ b/linera-service-graphql-client/src/service.rs @@ -4,10 +4,10 @@ use graphql_client::GraphQLQuery; use linera_base::{ crypto::CryptoHash, - data_types::{Amount, BlockHeight, OracleResponse, Timestamp}, + data_types::{Amount, BlockHeight, OracleResponse, Round, Timestamp}, identifiers::{ - Account, ChainDescription, ChainId, ChannelName, Destination, GenericApplicationId, Owner, - StreamName, + Account, BlobId, ChainDescription, ChainId, ChannelName, Destination, GenericApplicationId, + Owner, StreamName, }, }; diff --git a/linera-storage/src/lib.rs b/linera-storage/src/lib.rs index df431215695..661964f724c 100644 --- a/linera-storage/src/lib.rs +++ b/linera-storage/src/lib.rs @@ -208,8 +208,8 @@ pub trait Storage: Sized { let id = description.into(); let mut chain = self.load_chain(id).await?; assert!(!chain.is_active(), "Attempting to create a chain twice"); - chain.manager.get_mut().reset( - &ChainOwnership::single(public_key), + chain.manager.reset( + ChainOwnership::single(public_key), BlockHeight(0), self.clock().current_time(), committee.keys_and_weights(),