Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

253 implement fault tolerance mechanisms eg retries dead letter queues circuit breakers e5 #280

Open
wants to merge 28 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
68241da
adding the retry logic on the database
ndefokou Dec 3, 2024
be8b064
adding the retry logic on the different protocols
ndefokou Dec 4, 2024
e472be9
fixe(): formating files
ndefokou Dec 5, 2024
3384fc3
implementing the circuit breaker
ndefokou Dec 11, 2024
ffc396b
implementing the circuit breaker
ndefokou Dec 11, 2024
3539283
implementing the circuit breaker
ndefokou Dec 11, 2024
124547c
fixe() test
ndefokou Dec 11, 2024
ae52335
fixe() file formating
ndefokou Dec 11, 2024
6c1b4be
fixe() removing tuples
ndefokou Dec 12, 2024
6bc341b
fixe() removing un use crate
ndefokou Dec 16, 2024
3885322
fixe()integrating the circuit breaker on the different protocols
ndefokou Dec 18, 2024
8bf79d9
fixe()formating files
ndefokou Dec 18, 2024
01d5d36
fixe()Removing code duplication
ndefokou Dec 19, 2024
2a729e0
fixe() Resolving conflicts
ndefokou Dec 20, 2024
9eff984
fixe():forward protocol
ndefokou Jan 9, 2025
2faec2c
feat: implement and integrate circuit breaker logic across all handlers
Hermann-Core Jan 24, 2025
fd655be
Merge remote-tracking branch 'origin/main' into 253-implement-fault-t…
Hermann-Core Jan 24, 2025
dc7f5b4
fix(breaker): fix code documentation test
Hermann-Core Jan 24, 2025
527b15c
doc(breaker): added public documentation to call method of the Circui…
Hermann-Core Jan 24, 2025
be6ee4c
updated breaker impl
Hermann-Core Jan 28, 2025
313c72e
writing tests
Hermann-Core Jan 28, 2025
713cbfa
refactored breaker implementation
Hermann-Core Jan 28, 2025
b72b083
fix cargo clippy complains
Hermann-Core Jan 29, 2025
3e7f695
tests(breaker): write tests to validate circuit breaker behavior
Hermann-Core Jan 29, 2025
ad68d94
refactor circuit breaker implementation and integration
Hermann-Core Jan 29, 2025
c875859
Merge remote-tracking branch 'origin/main' into 253-implement-fault-t…
Hermann-Core Jan 29, 2025
96bb600
remove unused dependencies to the project
Hermann-Core Jan 29, 2025
28ff978
Replace HashMap with DashMap to improve concurrent access in circuit …
Hermann-Core Jan 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ lazy_static = "1.5.0"
async-trait = "0.1.83"
dotenv-flow = "0.16.2"
serde_json = "1.0"
parking_lot = "0.12.3"
futures = "0.3"
dashmap = "6"
parking_lot = "0.12"
pin-project-lite = "0.2"
curve25519-dalek = "4.1.3"
ed25519-dalek = "2.1.1"
tracing-subscriber = "0.3.19"
Expand Down
1 change: 1 addition & 0 deletions crates/web-plugins/didcomm-messaging/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ tracing.workspace = true
once_cell.workspace = true
serde_json.workspace = true
thiserror.workspace = true
dashmap.workspace = true
http-body-util.workspace = true
tokio = { workspace = true, features = ["full"] }
hyper = { workspace = true, features = ["full"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ chrono.workspace = true
[dev-dependencies]
did-utils.workspace = true
keystore.workspace = true
dashmap.workspace = true
shared = { workspace = true, features = ["test-utils"] }
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod tests {
use super::*;
use axum::http::StatusCode;
use chrono::Utc;
use dashmap::DashMap;
use did_utils::didcore::Document;
use keystore::tests::MockKeyStore;
use serde_json::json;
Expand Down Expand Up @@ -91,8 +92,16 @@ mod tests {
message_repository: Arc::new(MockMessagesRepository::from(vec![])),
keystore: Arc::new(MockKeyStore::new(vec![])),
};
let state =
Arc::new(AppState::from(public_domain, diddoc, None, Some(repository)).unwrap());
let state = Arc::new(
AppState::from(
public_domain,
diddoc,
None,
Some(repository),
DashMap::new(),
)
.unwrap(),
);

let message = Message::build(
"id_alice".to_owned(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ uuid.workspace = true
thiserror.workspace = true
serde_json.workspace = true
async-trait.workspace = true
dashmap.workspace = true
serde = { workspace = true, features = ["derive"] }
tokio = { workspace = true, features = ["full", "rt"] }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub(crate) async fn handle_query_request(

let queries = message.body.get("queries").and_then(|val| val.as_array());

if let Some(protocols) = &state.supported_protocols {
if let Some(protocols) = &state.disclose_protocols {
supported = protocols;
if let Some(queries) = queries {
for value in queries {
Expand Down Expand Up @@ -116,6 +116,7 @@ fn build_response(disclosed_protocols: HashSet<String>) -> Message {
mod test {

use crate::{constants::QUERY_FEATURE, model::Queries};
use dashmap::DashMap;
use did_utils::didcore::Document;
use didcomm::Message;
use keystore::tests::MockKeyStore;
Expand Down Expand Up @@ -149,6 +150,7 @@ mod test {
"https://didcomm.org/coordinate-mediation/2.0/mediate-request".to_string(),
]),
Some(repository),
DashMap::new(),
)
.unwrap(),
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ database.workspace = true
message-api.workspace = true

mongodb.workspace = true
tracing.workspace = true
async-trait.workspace = true
serde_json.workspace = true
thiserror.workspace = true
futures.workspace = true
didcomm = { workspace = true, features = ["uniffi"] }
hyper = { workspace = true, features = ["full"] }
axum = { workspace = true, features = ["macros"] }
tokio = { workspace = true, features = ["full"] }

[dev-dependencies]
keystore = { workspace = true, features = ["test-utils"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub(crate) enum ForwardError {
UncoordinatedSender,
#[error("Internal server error")]
InternalServerError,
#[error("Service unavailable")]
ServiceUnavailable,
}

impl IntoResponse for ForwardError {
Expand All @@ -18,6 +20,7 @@ impl IntoResponse for ForwardError {
ForwardError::MalformedBody => StatusCode::BAD_REQUEST,
ForwardError::UncoordinatedSender => StatusCode::UNAUTHORIZED,
ForwardError::InternalServerError => StatusCode::INTERNAL_SERVER_ERROR,
ForwardError::ServiceUnavailable => StatusCode::SERVICE_UNAVAILABLE,
};

let body = Json(serde_json::json!({
Expand Down
126 changes: 88 additions & 38 deletions crates/web-plugins/didcomm-messaging/protocols/forward/src/handler.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use crate::error::ForwardError;
use crate::{constants::MEDIATE_FORWARD_2_0, error::ForwardError};
use database::Repository;
use didcomm::{AttachmentData, Message};
use futures::future::try_join_all;
use mongodb::bson::doc;
use serde_json::{json, Value};
use shared::{
breaker::{CircuitBreaker, Error as BreakerError},
repository::entity::{Connection, RoutedMessage},
state::{AppState, AppStateRepository},
state::AppState,
};
use std::sync::Arc;

Expand All @@ -15,60 +17,108 @@ pub(crate) async fn mediator_forward_process(
state: Arc<AppState>,
message: Message,
) -> Result<Option<Message>, ForwardError> {
let AppStateRepository {
message_repository,
connection_repository,
..
} = state
// Check if the circuit breaker is open
state
.circuit_breaker
.get(MEDIATE_FORWARD_2_0)
.filter(|cb| cb.is_open())
.map_or(Ok(()), |_| Err(ForwardError::ServiceUnavailable))?;

let repository = state
.repository
.as_ref()
.ok_or(ForwardError::InternalServerError)?;

let next = match checks(&message, connection_repository).await.ok() {
Some(next) => Ok(next),
None => Err(ForwardError::InternalServerError),
};
let next = checks(
&message,
&repository.connection_repository,
state.circuit_breaker.get(MEDIATE_FORWARD_2_0).as_deref(),
)
.await
.map_err(|_| ForwardError::InternalServerError)?;

let attachments = message.attachments.unwrap_or_default();
for attachment in attachments {
let attached = match attachment.data {
AttachmentData::Json { value: data } => data.json,
AttachmentData::Base64 { value: data } => json!(data.base64),
AttachmentData::Links { value: data } => json!(data.links),
};
message_repository
.store(RoutedMessage {
let store_futures: Vec<_> = attachments
.into_iter()
.map(|attachment| async {
let attached = match attachment.data {
AttachmentData::Json { value } => value.json,
AttachmentData::Base64 { value } => json!(value.base64),
AttachmentData::Links { value } => json!(value.links),
};

let routed_message = RoutedMessage {
id: None,
message: attached,
recipient_did: next.as_ref().unwrap().to_owned(),
})
.await
.map_err(|_| ForwardError::InternalServerError)?;
}
recipient_did: next.clone(),
};

match state.circuit_breaker.get(MEDIATE_FORWARD_2_0) {
Some(cb) => cb
.call(|| {
repository
.message_repository
.store(routed_message.to_owned())
})
.await
.map_err(|err| match err {
BreakerError::CircuitOpen => ForwardError::ServiceUnavailable,
_ => ForwardError::InternalServerError,
}),
None => repository
.message_repository
.store(routed_message)
.await
.map_err(|_| ForwardError::InternalServerError),
}
})
.collect();

try_join_all(store_futures).await?;
Ok(None)
}

async fn checks(
message: &Message,
connection_repository: &Arc<dyn Repository<Connection>>,
circuit_breaker: Option<&CircuitBreaker>,
) -> Result<String, ForwardError> {
let next = message.body.get("next").and_then(Value::as_str);
match next {
Some(next) => next,
None => return Err(ForwardError::MalformedBody),
let next = match message.body.get("next") {
Some(Value::String(next)) => next.clone(),
_ => return Err(ForwardError::MalformedBody),
};

// Check if the client's did in mediator's keylist
let _connection = match connection_repository
.find_one_by(doc! {"keylist": doc!{ "$elemMatch": { "$eq": &next}}})
.await
.map_err(|_| ForwardError::InternalServerError)?
{
Some(connection) => connection,
None => return Err(ForwardError::UncoordinatedSender),
};
// Check if the client's did is in mediator's keylist
match circuit_breaker {
Some(cb) => cb
.call(|| {
connection_repository
.find_one_by(doc! {"keylist": doc!{ "$elemMatch": { "$eq": &next}}})
})
.await
.map_err(|err| match err {
BreakerError::CircuitOpen => ForwardError::ServiceUnavailable,
BreakerError::Inner(err) => {
tracing::error!("Failed to find connection: {err:?}");
ForwardError::InternalServerError
}
})?
.is_some()
.then_some(())
.ok_or(ForwardError::UncoordinatedSender)?,
None => connection_repository
.find_one_by(doc! {"keylist": doc!{ "$elemMatch": { "$eq": &next}}})
.await
.map_err(|err| {
tracing::error!("Failed to find connection: {err:?}");
ForwardError::InternalServerError
})?
.is_some()
.then_some(())
.ok_or(ForwardError::UncoordinatedSender)?,
}

Ok(next.unwrap().to_string())
Ok(next)
}

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ impl MessageHandler for ForwardHandler {
state: Arc<AppState>,
msg: Message,
) -> Result<Option<Message>, Response> {
// Pass the state, msg, and the circuit_breaker as arguments
crate::handler::mediator_forward_process(state, msg)
.await
.map_err(|e| e.into_response())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ pub(crate) enum MediationError {
UnexpectedMessageFormat,
#[error("internal server error")]
InternalServerError,
#[error("Service unavailable")]
ServiceUnavailable,
}

impl IntoResponse for MediationError {
Expand All @@ -26,6 +28,7 @@ impl IntoResponse for MediationError {
MediationError::UncoordinatedSender => StatusCode::UNAUTHORIZED,
MediationError::UnexpectedMessageFormat => StatusCode::BAD_REQUEST,
MediationError::InternalServerError => StatusCode::INTERNAL_SERVER_ERROR,
MediationError::ServiceUnavailable => StatusCode::SERVICE_UNAVAILABLE,
};

let body = Json(serde_json::json!({
Expand Down
Loading