Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions packages/examples/cosmos-bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ pub async fn serve(kolme: Kolme<CosmosBridgeApp>, bind: SocketAddr) -> Result<()
let processor = Processor::new(kolme.clone(), my_secret_key().clone());
set.spawn(processor.run());
let listener = Listener::new(kolme.clone(), my_secret_key().clone());
set.spawn(listener.run(ChainName::Cosmos));
set.spawn(async move { listener.run(ChainName::Cosmos).await.map_err(Into::into) });
let approver = Approver::new(kolme.clone(), my_secret_key().clone());
set.spawn(approver.run());
let submitter = Submitter::new_cosmos(
Expand All @@ -208,7 +208,7 @@ pub async fn serve(kolme: Kolme<CosmosBridgeApp>, bind: SocketAddr) -> Result<()
);
set.spawn(submitter.run());
let api_server = ApiServer::new(kolme);
set.spawn(api_server.run(bind));
set.spawn(async move { api_server.run(bind).await.map_err(Into::into) });

while let Some(res) = set.join_next().await {
match res {
Expand All @@ -218,7 +218,7 @@ pub async fn serve(kolme: Kolme<CosmosBridgeApp>, bind: SocketAddr) -> Result<()
}
Ok(Err(e)) => {
set.abort_all();
return Err(e);
return Err(e.into());
}
Ok(Ok(())) => (),
}
Expand Down
6 changes: 3 additions & 3 deletions packages/examples/kademlia-discovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ pub async fn new_version_node(api_server_port: u16) -> Result<()> {
)
.await?;

let mut set = JoinSet::new();
let mut set: JoinSet<std::result::Result<(), KolmeError>> = JoinSet::new();

let processor = Processor::new(kolme.clone(), my_secret_key().clone());
// Processor consumes mempool transactions and add new transactions into blockchain storage.
Expand Down Expand Up @@ -246,7 +246,7 @@ pub async fn new_version_node(api_server_port: u16) -> Result<()> {
}
Ok(Err(e)) => {
set.abort_all();
return Err(e);
return Err(e.into());
}
Ok(Ok(())) => (),
}
Expand Down Expand Up @@ -323,7 +323,7 @@ pub async fn validators(
}
Ok(Err(e)) => {
set.abort_all();
return Err(e);
return Err(e.into());
}
Ok(Ok(())) => (),
}
Expand Down
6 changes: 3 additions & 3 deletions packages/examples/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ pub async fn processor() -> Result<()> {
}
Ok(Err(e)) => {
set.abort_all();
return Err(e);
return Err(e.into());
}
Ok(Ok(())) => (),
}
Expand All @@ -148,7 +148,7 @@ pub async fn api_server(bind: SocketAddr) -> Result<()> {
let gossip = GossipBuilder::new().build(kolme.clone())?;
set.spawn(gossip.run());
let api_server = ApiServer::new(kolme);
set.spawn(api_server.run(bind));
set.spawn(async move { api_server.run(bind).await.map_err(Into::into) });

while let Some(res) = set.join_next().await {
match res {
Expand All @@ -158,7 +158,7 @@ pub async fn api_server(bind: SocketAddr) -> Result<()> {
}
Ok(Err(e)) => {
set.abort_all();
return Err(e);
return Err(e.into());
}
Ok(Ok(())) => (),
}
Expand Down
17 changes: 12 additions & 5 deletions packages/examples/six-sigma/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ impl<App> KolmeDataRequest<App> for OddsSource {
}

pub struct Tasks {
pub set: JoinSet<Result<()>>,
pub set: JoinSet<Result<(), KolmeError>>,
pub processor: Option<AbortHandle>,
pub listener: Option<AbortHandle>,
pub approver: Option<AbortHandle>,
Expand All @@ -340,7 +340,10 @@ impl Tasks {
let chain = self.kolme.get_app().chain;
let listener = Listener::new(self.kolme.clone(), my_secret_key().clone());

self.listener = Some(self.set.spawn(listener.run(chain.name())));
self.listener = Some(
self.set
.spawn(async move { listener.run(chain.name()).await.map_err(Into::into) }),
);
}

pub fn spawn_approver(&mut self) {
Expand All @@ -357,7 +360,11 @@ impl Tasks {

pub fn spawn_api_server(&mut self) {
let api_server = ApiServer::new(self.kolme.clone());
self.api_server = Some(self.set.spawn(api_server.run(self.bind)));
let bind = self.bind;
self.api_server = Some(
self.set
.spawn(async move { api_server.run(bind).await.map_err(Into::into) }),
);
}
}

Expand Down Expand Up @@ -385,7 +392,7 @@ pub async fn serve(
}
Ok(Err(e)) => {
tasks.set.abort_all();
return Err(e);
return Err(e.into());
}
Ok(Ok(())) => (),
}
Expand Down Expand Up @@ -477,7 +484,7 @@ impl TxLogger {
Self { kolme, path }
}

async fn run(self) -> Result<()> {
async fn run(self) -> Result<(), KolmeError> {
let mut file = File::create(self.path)?;
let mut height = BlockHeight::start();
loop {
Expand Down
8 changes: 4 additions & 4 deletions packages/examples/solana-cosmos-bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,10 @@ pub async fn serve(
set.spawn(processor.run());

let listener = Listener::new(kolme.clone(), my_secret_key().clone());
set.spawn(listener.run(ChainName::Cosmos));
set.spawn(async move { listener.run(ChainName::Cosmos).await.map_err(Into::into) });

let listener = Listener::new(kolme.clone(), my_secret_key().clone());
set.spawn(listener.run(ChainName::Solana));
set.spawn(async move { listener.run(ChainName::Solana).await.map_err(Into::into) });

let approver = Approver::new(kolme.clone(), my_secret_key().clone());
set.spawn(approver.run());
Expand All @@ -192,7 +192,7 @@ pub async fn serve(
set.spawn(submitter.run());

let api_server = ApiServer::new(kolme);
set.spawn(api_server.run(bind));
set.spawn(async move { api_server.run(bind).await.map_err(Into::into) });

while let Some(res) = set.join_next().await {
match res {
Expand All @@ -202,7 +202,7 @@ pub async fn serve(
}
Ok(Err(e)) => {
set.abort_all();
return Err(e);
return Err(e.into());
}
Ok(Ok(())) => (),
}
Expand Down
67 changes: 59 additions & 8 deletions packages/kolme-store/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,36 @@
use merkle_map::{MerkleSerialError, Sha256Hash};

#[derive(thiserror::Error, Debug)]
#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
pub enum StorageBackend {
Fjall,
Postgres,
InMemory,
}

impl std::fmt::Display for StorageBackend {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
StorageBackend::Fjall => write!(f, "Fjall"),
StorageBackend::Postgres => write!(f, "Postgres"),
StorageBackend::InMemory => write!(f, "InMemory"),
}
}
}

#[derive(thiserror::Error, Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum KolmeStoreError {
#[error(transparent)]
Custom(Box<dyn std::error::Error + Send + Sync>),
#[error(transparent)]
Merkle(#[from] MerkleSerialError),
#[error("Custom error: {0}")]
Custom(String),

#[error("Merkle error: {0}")]
Merkle(String),

#[error("Block not found in storage: {height}")]
BlockNotFound { height: u64 },
#[error("KolmeStore::delete_block is not supported by this store: {0}")]
UnsupportedDeleteOperation(&'static str),

#[error("KolmeStore::delete_block is not supported by this store: {backend}")]
UnsupportedDeleteOperation { backend: StorageBackend },

// kolme#144 - Reports a diverging hash with same height
#[error("Block with height {height} in database with different hash {existing}, trying to add {adding}")]
ConflictingBlockInDb {
Expand All @@ -20,14 +41,44 @@ pub enum KolmeStoreError {
// kolme#144 - Reports a double insert (Block already exists with same hash and insert)
#[error("Block already in database: {height}")]
MatchingBlockAlreadyInserted { height: u64 },

#[error("Transaction is already present in database: {txhash}")]
TxAlreadyInDb { txhash: Sha256Hash },

#[error("get_height_for_tx: invalid height bytes in Fjall store for tx {txhash:?}, bytes: {bytes:?}, reason: {reason}")]
InvalidHeightInFjall {
txhash: Sha256Hash,
bytes: Vec<u8>,
reason: String,
},

#[error("Fjall error: {0}")]
Fjall(String),

#[error("{0}")]
Other(String),
}

impl KolmeStoreError {
pub fn custom<E: std::error::Error + Send + Sync + 'static>(e: E) -> Self {
Self::Custom(Box::new(e))
Self::Custom(e.to_string())
}
}

impl From<fjall::Error> for KolmeStoreError {
fn from(e: fjall::Error) -> Self {
KolmeStoreError::Fjall(e.to_string())
}
}

impl From<anyhow::Error> for KolmeStoreError {
fn from(e: anyhow::Error) -> Self {
KolmeStoreError::Other(format!("Anyhow Error: {e}"))
}
}

impl From<MerkleSerialError> for KolmeStoreError {
fn from(e: MerkleSerialError) -> Self {
KolmeStoreError::Other(format!("Merkle Serial Error: {e}"))
}
}
21 changes: 16 additions & 5 deletions packages/kolme-store/src/fjall.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
r#trait::KolmeBackingStore, KolmeConstructLock, KolmeStoreError, StorableBlock,
DEFAULT_CACHE_SIZE,
error::StorageBackend, r#trait::KolmeBackingStore, KolmeConstructLock, KolmeStoreError,
StorableBlock, DEFAULT_CACHE_SIZE,
};
use anyhow::Context;
use lru::LruCache;
Expand Down Expand Up @@ -57,7 +57,9 @@ impl KolmeBackingStore for Store {
}

async fn delete_block(&self, _height: u64) -> Result<(), KolmeStoreError> {
Err(KolmeStoreError::UnsupportedDeleteOperation("Fjall"))
Err(KolmeStoreError::UnsupportedDeleteOperation {
backend: StorageBackend::Fjall,
})
}

async fn take_construct_lock(&self) -> Result<crate::KolmeConstructLock, KolmeStoreError> {
Expand All @@ -76,13 +78,22 @@ impl KolmeBackingStore for Store {
}
}

async fn get_height_for_tx(&self, txhash: Sha256Hash) -> anyhow::Result<Option<u64>> {
async fn get_height_for_tx(
&self,
txhash: Sha256Hash,
) -> core::result::Result<Option<u64>, KolmeStoreError> {
let Some(height) = self.merkle.handle.get(tx_key(txhash))? else {
return Ok(None);
};
let height = match <[u8; 8]>::try_from(&*height) {
Ok(height) => u64::from_be_bytes(height),
Err(e) => anyhow::bail!("get_height_for_tx: invalid height in Fjall store: {e}"),
Err(e) => {
return Err(KolmeStoreError::InvalidHeightInFjall {
txhash,
bytes: height.to_vec(),
reason: e.to_string(),
});
}
};
Ok(Some(height))
}
Expand Down
2 changes: 1 addition & 1 deletion packages/kolme-store/src/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl KolmeBackingStore for Store {
Ok(self.0.read().await.blocks.contains_key(&height))
}

async fn get_height_for_tx(&self, txhash: Sha256Hash) -> Result<Option<u64>, anyhow::Error> {
async fn get_height_for_tx(&self, txhash: Sha256Hash) -> Result<Option<u64>, KolmeStoreError> {
Ok(self.0.read().await.txhashes.get(&txhash).copied())
}

Expand Down
11 changes: 7 additions & 4 deletions packages/kolme-store/src/postgres.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
r#trait::KolmeBackingStore, BlockHashes, HasBlockHashes, KolmeConstructLock, KolmeStoreError,
StorableBlock, DEFAULT_CACHE_SIZE,
error::StorageBackend, r#trait::KolmeBackingStore, BlockHashes, HasBlockHashes,
KolmeConstructLock, KolmeStoreError, StorableBlock, DEFAULT_CACHE_SIZE,
};
use anyhow::Context as _;
use lru::LruCache;
Expand Down Expand Up @@ -152,8 +152,11 @@ impl KolmeBackingStore for Store {
.map_err(KolmeStoreError::custom)
.inspect_err(|err| tracing::error!("{err:?}"))
}

async fn delete_block(&self, _height: u64) -> Result<(), KolmeStoreError> {
Err(KolmeStoreError::UnsupportedDeleteOperation("Postgres"))
Err(KolmeStoreError::UnsupportedDeleteOperation {
backend: StorageBackend::Postgres,
})
}

async fn take_construct_lock(&self) -> Result<KolmeConstructLock, KolmeStoreError> {
Expand Down Expand Up @@ -196,7 +199,7 @@ impl KolmeBackingStore for Store {
merkle.load_by_hashes(&[hash], &mut dest).await?;
Ok(dest.remove(&hash))
}
async fn get_height_for_tx(&self, txhash: Sha256Hash) -> anyhow::Result<Option<u64>> {
async fn get_height_for_tx(&self, txhash: Sha256Hash) -> Result<Option<u64>, KolmeStoreError> {
let txhash = txhash.as_array().as_slice();
let height =
sqlx::query_scalar!("SELECT height FROM blocks WHERE txhash=$1 LIMIT 1", txhash)
Expand Down
5 changes: 4 additions & 1 deletion packages/kolme-store/src/trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ pub trait KolmeBackingStore {
&self,
hash: Sha256Hash,
) -> Result<Option<MerkleLayerContents>, MerkleSerialError>;
async fn get_height_for_tx(&self, txhash: Sha256Hash) -> anyhow::Result<Option<u64>>;
async fn get_height_for_tx(
&self,
txhash: Sha256Hash,
) -> core::result::Result<Option<u64>, KolmeStoreError>;

async fn load_latest_block(&self) -> Result<Option<u64>, KolmeStoreError>;
async fn load_block<Block>(
Expand Down
7 changes: 7 additions & 0 deletions packages/kolme-test/src/key_rotation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,13 @@ async fn test_total_replace_inner(testtasks: TestTasks, (): ()) {
processor.add_secret(new_processor.clone());
testtasks.try_spawn_persistent(processor.run());

// The genesis event hasn't completed, which causes this test to fail.
// We need to investigate why this is happening.
// Adding a short delay (sleep) as shown below allows the test to pass.

tracing::info!("Waiting for genesis event...");
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;

// Swap out the approver and listener right away. Since there's only one
// key being used, we don't need to do any approving.
let expected_new_set = ValidatorSet {
Expand Down
4 changes: 1 addition & 3 deletions packages/kolme-test/src/max_tx_height.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ async fn max_tx_height_inner(testtasks: TestTasks, (): ()) {
let e: KolmeError = kolme
.sign_propose_await_transaction(&secret, tx_builder)
.await
.unwrap_err()
.downcast()
.unwrap();
.unwrap_err();
match e {
KolmeError::PastMaxHeight {
txhash: _,
Expand Down
2 changes: 1 addition & 1 deletion packages/kolme-test/src/validations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async fn test_invalid_hashes_inner(testtasks: TestTasks, (): ()) {
kolme: &Kolme<SampleKolmeApp>,
mut block: Block<SampleMessage>,
f: impl FnOnce(&mut Block<SampleMessage>),
) -> anyhow::Result<()> {
) -> Result<(), KolmeError> {
f(&mut block);
let signed = TaggedJson::new(block)
.unwrap()
Expand Down
Loading