Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

253 implement fault tolerance mechanisms eg retries dead letter queues circuit breakers e5 #280

Open
wants to merge 28 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
68241da
adding the retry logic on the database
ndefokou Dec 3, 2024
be8b064
adding the retry logic on the different protocols
ndefokou Dec 4, 2024
e472be9
fixe(): formating files
ndefokou Dec 5, 2024
3384fc3
implementing the circuit breaker
ndefokou Dec 11, 2024
ffc396b
implementing the circuit breaker
ndefokou Dec 11, 2024
3539283
implementing the circuit breaker
ndefokou Dec 11, 2024
124547c
fixe() test
ndefokou Dec 11, 2024
ae52335
fixe() file formating
ndefokou Dec 11, 2024
6c1b4be
fixe() removing tuples
ndefokou Dec 12, 2024
6bc341b
fixe() removing un use crate
ndefokou Dec 16, 2024
3885322
fixe()integrating the circuit breaker on the different protocols
ndefokou Dec 18, 2024
8bf79d9
fixe()formating files
ndefokou Dec 18, 2024
01d5d36
fixe()Removing code duplication
ndefokou Dec 19, 2024
2a729e0
fixe() Resolving conflicts
ndefokou Dec 20, 2024
9eff984
fixe():forward protocol
ndefokou Jan 9, 2025
2faec2c
feat: implement and integrate circuit breaker logic across all handlers
Hermann-Core Jan 24, 2025
fd655be
Merge remote-tracking branch 'origin/main' into 253-implement-fault-t…
Hermann-Core Jan 24, 2025
dc7f5b4
fix(breaker): fix code documentation test
Hermann-Core Jan 24, 2025
527b15c
doc(breaker): added public documentation to call method of the Circui…
Hermann-Core Jan 24, 2025
be6ee4c
updated breaker impl
Hermann-Core Jan 28, 2025
313c72e
writing tests
Hermann-Core Jan 28, 2025
713cbfa
refactored breaker implementation
Hermann-Core Jan 28, 2025
b72b083
fix cargo clippy complains
Hermann-Core Jan 29, 2025
3e7f695
tests(breaker): write tests to validate circuit breaker behavior
Hermann-Core Jan 29, 2025
ad68d94
refactor circuit breaker implementation and integration
Hermann-Core Jan 29, 2025
c875859
Merge remote-tracking branch 'origin/main' into 253-implement-fault-t…
Hermann-Core Jan 29, 2025
96bb600
remove unused dependencies to the project
Hermann-Core Jan 29, 2025
28ff978
Replace HashMap with DashMap to improve concurrent access in circuit …
Hermann-Core Jan 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ thiserror.workspace = true
didcomm = { workspace = true, features = ["uniffi"] }
hyper = { workspace = true, features = ["full"] }
axum = { workspace = true, features = ["macros"] }
tokio = "1.27.0"

[dev-dependencies]
keystore = { workspace = true, features = ["test-utils"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub(crate) enum ForwardError {
UncoordinatedSender,
#[error("Internal server error")]
InternalServerError,
#[error("Service unavailable")]
CircuitOpen,
}

impl IntoResponse for ForwardError {
Expand All @@ -18,6 +20,7 @@ impl IntoResponse for ForwardError {
ForwardError::MalformedBody => StatusCode::BAD_REQUEST,
ForwardError::UncoordinatedSender => StatusCode::UNAUTHORIZED,
ForwardError::InternalServerError => StatusCode::INTERNAL_SERVER_ERROR,
ForwardError::CircuitOpen => StatusCode::SERVICE_UNAVAILABLE,
};

let body = Json(serde_json::json!({
Expand Down
110 changes: 77 additions & 33 deletions crates/web-plugins/didcomm-messaging/protocols/forward/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,48 +4,84 @@ use didcomm::{AttachmentData, Message};
use mongodb::bson::doc;
use serde_json::{json, Value};
use shared::{
circuit_breaker::CircuitBreaker,
repository::entity::{Connection, RoutedMessage},
retry::{retry_async, RetryOptions},
state::{AppState, AppStateRepository},
};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;

/// Mediator receives forwarded messages, extract the next field in the message body, and the attachments in the message
/// then stores the attachment with the next field as key for pickup
pub(crate) async fn mediator_forward_process(
state: Arc<AppState>,
message: Message,
circuit_breaker: Arc<Mutex<CircuitBreaker>>,
) -> Result<Option<Message>, ForwardError> {
let AppStateRepository {
message_repository,
connection_repository,
..
} = state
.repository
.as_ref()
.ok_or_else(|| ForwardError::InternalServerError)?;

let next = match checks(&message, connection_repository).await.ok() {
Some(next) => Ok(next),
None => Err(ForwardError::InternalServerError),
};

let attachments = message.attachments.unwrap_or_default();
for attachment in attachments {
let attached = match attachment.data {
AttachmentData::Json { value: data } => data.json,
AttachmentData::Base64 { value: data } => json!(data.base64),
AttachmentData::Links { value: data } => json!(data.links),
};
message_repository
.store(RoutedMessage {
id: None,
message: attached,
recipient_did: next.as_ref().unwrap().to_owned(),
})
.await
.map_err(|_| ForwardError::InternalServerError)?;
let mut cb = circuit_breaker.lock().await;

let result = cb
.call_async(|| {
let state = Arc::clone(&state);
let message = message.clone();
async move {
let AppStateRepository {
message_repository,
connection_repository,
..
} = state
.repository
.as_ref()
.ok_or_else(|| ForwardError::InternalServerError)?;

let next = match checks(&message, connection_repository).await.ok() {
Some(next) => Ok(next),
None => Err(ForwardError::InternalServerError),
}?;

let attachments = message.attachments.unwrap_or_default();
for attachment in attachments {
let attached = match attachment.data {
AttachmentData::Json { value: data } => data.json,
AttachmentData::Base64 { value: data } => json!(data.base64),
AttachmentData::Links { value: data } => json!(data.links),
};
retry_async(
|| {
let attached = attached.clone();
let recipient_did = next.to_owned();

async move {
message_repository
.store(RoutedMessage {
id: None,
message: attached,
recipient_did,
})
.await
}
},
RetryOptions::new()
.retries(5)
.exponential_backoff(Duration::from_millis(100))
.max_delay(Duration::from_secs(1)),
)
.await
.map_err(|_| ForwardError::InternalServerError)?;
}
Ok::<Option<Message>, ForwardError>(None)
}
})
.await;

match result {
Some(Ok(None)) => Ok(None),
Some(Ok(Some(_))) => Err(ForwardError::InternalServerError),
Some(Err(err)) => Err(err),
None => Err(ForwardError::CircuitOpen),
}
Ok(None)
}

async fn checks(
Expand Down Expand Up @@ -83,6 +119,7 @@ mod test {
use keystore::Secrets;
use serde_json::json;
use shared::{
circuit_breaker,
repository::{
entity::Connection,
tests::{MockConnectionRepository, MockMessagesRepository},
Expand Down Expand Up @@ -166,9 +203,16 @@ mod test {
.await
.expect("Unable unpack");

let msg = mediator_forward_process(Arc::new(state.clone()), msg)
.await
.unwrap();
// Wrap the CircuitBreaker in Arc and Mutex
let circuit_breaker = circuit_breaker::CircuitBreaker::new(3, Duration::from_secs(3));

let msg: Option<Message> = mediator_forward_process(
Arc::new(state.clone()),
msg,
Arc::new(circuit_breaker.into()),
)
.await
.unwrap();

println!("Mediator1 is forwarding message \n{:?}\n", msg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use async_trait::async_trait;
use axum::response::{IntoResponse, Response};
use didcomm::Message;
use message_api::{MessageHandler, MessagePlugin, MessageRouter};
use shared::state::AppState;
use std::sync::Arc;
use shared::{circuit_breaker::CircuitBreaker, state::AppState};
use std::{sync::Arc, time::Duration};
use tokio::sync::Mutex;

pub struct RoutingProtocol;

Expand All @@ -17,7 +18,13 @@ impl MessageHandler for ForwardHandler {
state: Arc<AppState>,
msg: Message,
) -> Result<Option<Message>, Response> {
crate::handler::mediator_forward_process(state, msg)
let circuit_breaker = Arc::new(Mutex::new(CircuitBreaker::new(
2,
Duration::from_millis(5000),
)));

// Pass the state, msg, and the circuit_breaker as arguments
crate::handler::mediator_forward_process(state, msg, circuit_breaker)
.await
.map_err(|e| e.into_response())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ pub(crate) enum MediationError {
UnexpectedMessageFormat,
#[error("internal server error")]
InternalServerError,
#[error("service unavailable")]
CircuitOpen,
}

impl IntoResponse for MediationError {
Expand All @@ -26,6 +28,7 @@ impl IntoResponse for MediationError {
MediationError::UncoordinatedSender => StatusCode::UNAUTHORIZED,
MediationError::UnexpectedMessageFormat => StatusCode::BAD_REQUEST,
MediationError::InternalServerError => StatusCode::INTERNAL_SERVER_ERROR,
MediationError::CircuitOpen => StatusCode::SERVICE_UNAVAILABLE,
};

let body = Json(serde_json::json!({
Expand Down
Loading
Loading