Skip to content

Commit

Permalink
Contracts can emit events, record them in blocks. (linera-io#2241)
Browse files Browse the repository at this point in the history
* Add an emit method to the runtime.

* Add events to BlockExecutionOutcome.

* Test that events get added.

* Add StreamId type.

* Make events have a key and a value.

* Enforce a maximum key length.

* Limit stream name length.
  • Loading branch information
afck authored Jul 17, 2024
1 parent f79a1d8 commit 85c3121
Show file tree
Hide file tree
Showing 21 changed files with 375 additions and 24 deletions.
7 changes: 6 additions & 1 deletion examples/meta-counter/src/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#![cfg_attr(target_arch = "wasm32", no_main)]

use linera_sdk::{
base::{ApplicationId, WithContractAbi},
base::{ApplicationId, StreamName, WithContractAbi},
Contract, ContractRuntime, Resources,
};
use meta_counter::{Message, MetaCounterAbi, Operation};
Expand Down Expand Up @@ -40,6 +40,11 @@ impl Contract for MetaCounterContract {
// Send a no-op message to ourselves. This is only for testing contracts that send messages
// on initialization. Since the value is 0 it does not change the counter value.
let this_chain = self.runtime.chain_id();
self.runtime.emit(
StreamName(b"announcements".to_vec()),
b"updates",
b"instantiated",
);
self.runtime.send_message(this_chain, Message::Increment(0));
}

Expand Down
67 changes: 65 additions & 2 deletions linera-base/src/identifiers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{
};

use anyhow::{anyhow, Context};
use async_graphql::SimpleObject;
use linera_witty::{WitLoad, WitStore, WitType};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -211,7 +212,21 @@ pub struct ApplicationId<A = ()> {
}

/// A unique identifier for an application.
#[derive(Eq, PartialEq, Ord, PartialOrd, Copy, Clone, Hash, Debug, Serialize, Deserialize)]
#[derive(
Eq,
PartialEq,
Ord,
PartialOrd,
Copy,
Clone,
Hash,
Debug,
Serialize,
Deserialize,
WitLoad,
WitStore,
WitType,
)]
pub enum GenericApplicationId {
/// The system application.
System,
Expand Down Expand Up @@ -263,6 +278,46 @@ pub struct BytecodeId<Abi = (), Parameters = (), InstantiationArgument = ()> {
)]
pub struct ChannelName(#[serde(with = "serde_bytes")] Vec<u8>);

/// The name of an event stream.
#[derive(
Clone,
Debug,
Eq,
Hash,
Ord,
PartialEq,
PartialOrd,
Serialize,
Deserialize,
WitLoad,
WitStore,
WitType,
)]
pub struct StreamName(#[serde(with = "serde_bytes")] pub Vec<u8>);

/// An event stream ID.
#[derive(
Clone,
Debug,
Eq,
Hash,
Ord,
PartialEq,
PartialOrd,
Serialize,
Deserialize,
WitLoad,
WitStore,
WitType,
SimpleObject,
)]
pub struct StreamId {
/// The application that can add events to this stream.
pub application_id: GenericApplicationId,
/// The name of this stream: an application can have multiple streams with different names.
pub stream_name: StreamName,
}

/// The destination of a message, relative to a particular application.
#[derive(
Clone,
Expand Down Expand Up @@ -317,7 +372,14 @@ impl From<Vec<u8>> for ChannelName {
}

impl ChannelName {
/// Turns the channel into bytes.
/// Turns the channel name into bytes.
pub fn into_bytes(self) -> Vec<u8> {
self.0
}
}

impl StreamName {
/// Turns the stream name into bytes.
pub fn into_bytes(self) -> Vec<u8> {
self.0
}
Expand Down Expand Up @@ -808,6 +870,7 @@ doc_scalar!(
ChainDescription."
);
doc_scalar!(ChannelName, "The name of a subscription channel");
doc_scalar!(StreamName, "The name of an event stream");
bcs_scalar!(MessageId, "The index of a message in a chain");
doc_scalar!(
Owner,
Expand Down
35 changes: 28 additions & 7 deletions linera-chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use linera_base::{
crypto::CryptoHash,
data_types::{Amount, ArithmeticError, BlockHeight, OracleResponse, Timestamp},
ensure,
identifiers::{ChainId, Destination, GenericApplicationId, MessageId},
identifiers::{ChainId, Destination, GenericApplicationId, MessageId, StreamId},
};
use linera_execution::{
system::SystemMessage, ExecutionOutcome, ExecutionRequest, ExecutionRuntimeContext,
Expand All @@ -35,8 +35,8 @@ use {linera_base::identifiers::BytecodeId, linera_execution::BytecodeLocation};

use crate::{
data_types::{
Block, BlockExecutionOutcome, ChainAndHeight, ChannelFullName, Event, IncomingMessage,
MessageAction, MessageBundle, Origin, OutgoingMessage, Target,
Block, BlockExecutionOutcome, ChainAndHeight, ChannelFullName, Event, EventRecord,
IncomingMessage, MessageAction, MessageBundle, Origin, OutgoingMessage, Target,
},
inbox::{Cursor, InboxError, InboxStateView},
manager::ChainManager,
Expand Down Expand Up @@ -805,6 +805,7 @@ where
);
let mut oracle_responses = oracle_responses.map(Vec::into_iter);
let mut new_oracle_responses = Vec::new();
let mut events = Vec::new();
let mut next_message_index = 0;
for (index, message) in block.incoming_messages.iter().enumerate() {
#[cfg(with_metrics)]
Expand Down Expand Up @@ -912,7 +913,7 @@ where
}
};
new_oracle_responses.push(oracle_responses);
let messages_out = self
let (messages_out, new_events) = self
.process_execution_outcomes(context.height, outcomes)
.await?;
if let MessageAction::Accept = message.action {
Expand All @@ -927,6 +928,7 @@ where
next_message_index +=
u32::try_from(messages_out.len()).map_err(|_| ArithmeticError::Overflow)?;
messages.push(messages_out);
events.push(new_events);
}
// Second, execute the operations in the block and remember the recipients to notify.
for (index, operation) in block.operations.iter().enumerate() {
Expand Down Expand Up @@ -961,7 +963,7 @@ where
.await
.map_err(|err| ChainError::ExecutionError(err, chain_execution_context))?;
new_oracle_responses.push(oracle_responses);
let messages_out = self
let (messages_out, new_events) = self
.process_execution_outcomes(context.height, outcomes)
.await?;
resource_controller
Expand All @@ -979,6 +981,7 @@ where
next_message_index +=
u32::try_from(messages_out.len()).map_err(|_| ArithmeticError::Overflow)?;
messages.push(messages_out);
events.push(new_events);
}

// Finally, charge for the block fee, except if the chain is closed. Closed chains should
Expand Down Expand Up @@ -1033,22 +1036,25 @@ where
messages,
state_hash,
oracle_responses: new_oracle_responses,
events,
})
}

async fn process_execution_outcomes(
&mut self,
height: BlockHeight,
results: Vec<ExecutionOutcome>,
) -> Result<Vec<OutgoingMessage>, ChainError> {
) -> Result<(Vec<OutgoingMessage>, Vec<EventRecord>), ChainError> {
let mut messages = Vec::new();
let mut events = Vec::new();
for result in results {
match result {
ExecutionOutcome::System(result) => {
self.process_raw_execution_outcome(
GenericApplicationId::System,
Message::System,
&mut messages,
&mut events,
height,
result,
)
Expand All @@ -1062,27 +1068,42 @@ where
bytes,
},
&mut messages,
&mut events,
height,
result,
)
.await?;
}
}
}
Ok(messages)
Ok((messages, events))
}

async fn process_raw_execution_outcome<E, F>(
&mut self,
application_id: GenericApplicationId,
lift: F,
messages: &mut Vec<OutgoingMessage>,
events: &mut Vec<EventRecord>,
height: BlockHeight,
raw_outcome: RawExecutionOutcome<E, Amount>,
) -> Result<(), ChainError>
where
F: Fn(E) -> Message,
{
events.extend(
raw_outcome
.events
.into_iter()
.map(|(stream_name, key, value)| EventRecord {
stream_id: StreamId {
application_id,
stream_name,
},
key,
value,
}),
);
let max_stream_queries = self.context().max_stream_queries();
// Record the messages of the execution. Messages are understood within an
// application.
Expand Down
15 changes: 15 additions & 0 deletions linera-chain/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use linera_base::{
doc_scalar, ensure,
identifiers::{
Account, BlobId, ChainId, ChannelName, Destination, GenericApplicationId, MessageId, Owner,
StreamId,
},
};
use linera_execution::{
Expand Down Expand Up @@ -272,9 +273,23 @@ pub struct ExecutedBlock {
pub struct BlockExecutionOutcome {
/// The list of outgoing messages for each transaction.
pub messages: Vec<Vec<OutgoingMessage>>,
/// The hash of the chain's execution state after this block.
pub state_hash: CryptoHash,
/// The record of oracle responses for each transaction.
pub oracle_responses: Vec<Vec<OracleResponse>>,
/// The list of events produced by each transaction.
pub events: Vec<Vec<EventRecord>>,
}

/// An event recorded in an executed block.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, SimpleObject)]
pub struct EventRecord {
/// The ID of the stream this event belongs to.
pub stream_id: StreamId,
/// The event key.
pub key: Vec<u8>,
/// The payload data.
pub value: Vec<u8>,
}

/// A statement to be certified by the validators.
Expand Down
2 changes: 2 additions & 0 deletions linera-chain/src/unit_tests/data_types_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ fn test_signed_values() {
messages: vec![Vec::new()],
state_hash: CryptoHash::test_hash("state"),
oracle_responses: vec![Vec::new()],
events: vec![Vec::new()],
}
.with(block);
let value = HashedCertificateValue::new_confirmed(executed_block);
Expand Down Expand Up @@ -47,6 +48,7 @@ fn test_certificates() {
messages: vec![Vec::new()],
state_hash: CryptoHash::test_hash("state"),
oracle_responses: vec![Vec::new()],
events: vec![Vec::new()],
}
.with(block);
let value = HashedCertificateValue::new_confirmed(executed_block);
Expand Down
23 changes: 20 additions & 3 deletions linera-core/src/unit_tests/wasm_client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ use async_graphql::Request;
use counter::CounterAbi;
use linera_base::{
data_types::{Amount, HashedBlob, OracleResponse},
identifiers::{AccountOwner, ApplicationId, ChainDescription, ChainId, Destination, Owner},
identifiers::{
AccountOwner, ApplicationId, ChainDescription, ChainId, Destination, Owner, StreamId,
StreamName,
},
ownership::{ChainOwnership, TimeoutConfig},
};
use linera_chain::data_types::{CertificateValue, MessageAction, OutgoingMessage};
use linera_chain::data_types::{CertificateValue, EventRecord, MessageAction, OutgoingMessage};
use linera_execution::{
Bytecode, Message, MessageKind, Operation, ResourceControlPolicy, SystemMessage,
UserApplicationDescription, WasmRuntime,
Expand Down Expand Up @@ -367,7 +370,7 @@ where
.await
.unwrap()
.unwrap();
let (application_id2, _) = creator
let (application_id2, certificate) = creator
.create_application(
bytecode_id2,
&application_id1,
Expand All @@ -377,6 +380,20 @@ where
.await
.unwrap()
.unwrap();
assert_eq!(
certificate.value().executed_block().unwrap().outcome.events,
vec![
Vec::new(),
vec![EventRecord {
stream_id: StreamId {
application_id: application_id2.forget_abi().into(),
stream_name: StreamName(b"announcements".to_vec()),
},
key: b"updates".to_vec(),
value: b"instantiated".to_vec(),
}]
]
);

let mut operation = meta_counter::Operation::increment(receiver_id, 5);
operation.fuel_grant = 1000000;
Expand Down
Loading

0 comments on commit 85c3121

Please sign in to comment.