Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

253 implement fault tolerance mechanisms eg retries dead letter queues circuit breakers e5 #280

Open
wants to merge 28 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
68241da
adding the retry logic on the database
ndefokou Dec 3, 2024
be8b064
adding the retry logic on the different protocols
ndefokou Dec 4, 2024
e472be9
fixe(): formating files
ndefokou Dec 5, 2024
3384fc3
implementing the circuit breaker
ndefokou Dec 11, 2024
ffc396b
implementing the circuit breaker
ndefokou Dec 11, 2024
3539283
implementing the circuit breaker
ndefokou Dec 11, 2024
124547c
fixe() test
ndefokou Dec 11, 2024
ae52335
fixe() file formating
ndefokou Dec 11, 2024
6c1b4be
fixe() removing tuples
ndefokou Dec 12, 2024
6bc341b
fixe() removing un use crate
ndefokou Dec 16, 2024
3885322
fixe()integrating the circuit breaker on the different protocols
ndefokou Dec 18, 2024
8bf79d9
fixe()formating files
ndefokou Dec 18, 2024
01d5d36
fixe()Removing code duplication
ndefokou Dec 19, 2024
2a729e0
fixe() Resolving conflicts
ndefokou Dec 20, 2024
9eff984
fixe():forward protocol
ndefokou Jan 9, 2025
2faec2c
feat: implement and integrate circuit breaker logic across all handlers
Hermann-Core Jan 24, 2025
fd655be
Merge remote-tracking branch 'origin/main' into 253-implement-fault-t…
Hermann-Core Jan 24, 2025
dc7f5b4
fix(breaker): fix code documentation test
Hermann-Core Jan 24, 2025
527b15c
doc(breaker): added public documentation to call method of the Circui…
Hermann-Core Jan 24, 2025
be6ee4c
updated breaker impl
Hermann-Core Jan 28, 2025
313c72e
writing tests
Hermann-Core Jan 28, 2025
713cbfa
refactored breaker implementation
Hermann-Core Jan 28, 2025
b72b083
fix cargo clippy complains
Hermann-Core Jan 29, 2025
3e7f695
tests(breaker): write tests to validate circuit breaker behavior
Hermann-Core Jan 29, 2025
ad68d94
refactor circuit breaker implementation and integration
Hermann-Core Jan 29, 2025
c875859
Merge remote-tracking branch 'origin/main' into 253-implement-fault-t…
Hermann-Core Jan 29, 2025
96bb600
remove unused dependencies to the project
Hermann-Core Jan 29, 2025
28ff978
Replace HashMap with DashMap to improve concurrent access in circuit …
Hermann-Core Jan 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub(crate) enum ForwardError {
UncoordinatedSender,
#[error("Internal server error")]
InternalServerError,
#[error("Service unavailable")]
CircuitOpen,
}

impl IntoResponse for ForwardError {
Expand All @@ -18,6 +20,7 @@ impl IntoResponse for ForwardError {
ForwardError::MalformedBody => StatusCode::BAD_REQUEST,
ForwardError::UncoordinatedSender => StatusCode::UNAUTHORIZED,
ForwardError::InternalServerError => StatusCode::INTERNAL_SERVER_ERROR,
ForwardError::CircuitOpen => StatusCode::SERVICE_UNAVAILABLE,
};

let body = Json(serde_json::json!({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ use mongodb::bson::doc;
use serde_json::{json, Value};
use shared::{
repository::entity::{Connection, RoutedMessage},
retry::{retry_async, RetryOptions},
state::{AppState, AppStateRepository},
};
use std::sync::Arc;
use std::time::Duration;

/// Mediator receives forwarded messages, extract the next field in the message body, and the attachments in the message
/// then stores the attachment with the next field as key for pickup
Expand All @@ -24,6 +26,11 @@ pub(crate) async fn mediator_forward_process(
.as_ref()
.ok_or_else(|| ForwardError::InternalServerError)?;

let circuit_breaker = state.circuit_breaker.clone();
if circuit_breaker.is_open() {
return Err(ForwardError::CircuitOpen);
}
ndefokou marked this conversation as resolved.
Show resolved Hide resolved

let next = match checks(&message, connection_repository).await.ok() {
Some(next) => Ok(next),
None => Err(ForwardError::InternalServerError),
Expand All @@ -36,15 +43,38 @@ pub(crate) async fn mediator_forward_process(
AttachmentData::Base64 { value: data } => json!(data.base64),
AttachmentData::Links { value: data } => json!(data.links),
};
message_repository
.store(RoutedMessage {
id: None,
message: attached,
recipient_did: next.as_ref().unwrap().to_owned(),
})
.await
.map_err(|_| ForwardError::InternalServerError)?;

let result = retry_async(
|| {
let attached = attached.clone();
let recipient_did = next.as_ref().unwrap().to_owned();

async move {
message_repository
.store(RoutedMessage {
id: None,
message: attached,
recipient_did,
})
.await
}
},
RetryOptions::new()
.retries(5)
.exponential_backoff(Duration::from_millis(100))
.max_delay(Duration::from_secs(1)),
)
.await;
ndefokou marked this conversation as resolved.
Show resolved Hide resolved

match result {
Ok(_) => circuit_breaker.record_success(),
Err(_) => {
circuit_breaker.record_failure();
return Err(ForwardError::InternalServerError);
}
};
}

Ok(None)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
use shared::{
midlw::ensure_transport_return_route_is_decorated_all,
repository::entity::Connection,
retry::{retry_async, RetryOptions},
state::{AppState, AppStateRepository},
CircuitBreaker::CircuitBreaker,

Check warning on line 26 in crates/web-plugins/didcomm-messaging/protocols/mediator-coordination/src/handler/stateful.rs

View workflow job for this annotation

GitHub Actions / Build and test

unused import: `CircuitBreaker::CircuitBreaker`

Check warning on line 26 in crates/web-plugins/didcomm-messaging/protocols/mediator-coordination/src/handler/stateful.rs

View workflow job for this annotation

GitHub Actions / Build and test

unused import: `CircuitBreaker::CircuitBreaker`
ndefokou marked this conversation as resolved.
Show resolved Hide resolved
};
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use uuid::Uuid;

/// Process a DIDComm mediate request
Expand Down Expand Up @@ -53,10 +55,24 @@
.ok_or(MediationError::InternalServerError)?;

// If there is already mediation, send mediate deny
if let Some(_connection) = connection_repository
.find_one_by(doc! { "client_did": sender_did})
.await
.map_err(|_| MediationError::InternalServerError)?
if let Some(_connection) = retry_async(
|| {
let sender_did = sender_did.clone();
let connection_repository = connection_repository.clone();

async move {
connection_repository
.find_one_by(doc! { "client_did": sender_did })
.await
}
},
RetryOptions::new()
.retries(5)
.exponential_backoff(Duration::from_millis(100))
.max_delay(Duration::from_secs(1)),
)
.await
.map_err(|_| MediationError::InternalServerError)?
{
tracing::info!("Sending mediate deny.");
return Ok(Some(
Expand Down Expand Up @@ -85,15 +101,24 @@
.as_ref()
.ok_or(MediationError::InternalServerError)?;

let diddoc = state
.did_resolver
.resolve(&routing_did)
.await
.map_err(|err| {
tracing::error!("Failed to resolve DID: {:?}", err);
MediationError::InternalServerError
})?
.ok_or(MediationError::InternalServerError)?;
let diddoc = retry_async(
|| {
let did_resolver = state.did_resolver.clone();
let routing_did = routing_did.clone();

async move { did_resolver.resolve(&routing_did).await.map_err(|_| ()) }
},
ndefokou marked this conversation as resolved.
Show resolved Hide resolved
RetryOptions::new()
.retries(5)
.exponential_backoff(Duration::from_millis(100))
.max_delay(Duration::from_secs(2)),
)
.await
.map_err(|err| {
tracing::error!("Failed to resolve DID: {:?}", err);
MediationError::InternalServerError
})?
.ok_or(MediationError::InternalServerError)?;

let agreem_keys_jwk: Jwk = agreem_keys.try_into().unwrap();

Expand Down Expand Up @@ -229,11 +254,29 @@

// Find connection for this keylist update

let connection = connection_repository
.find_one_by(doc! { "client_did": &sender })
.await
.unwrap()
.ok_or_else(|| MediationError::UncoordinatedSender)?;
let connection = retry_async(
|| {
let connection_repository = connection_repository.clone();
let sender = sender.clone();

async move {
connection_repository
.find_one_by(doc! { "client_did": &sender })
.await
.map_err(|_| ())
}
},
RetryOptions::new()
.retries(5)
.exponential_backoff(Duration::from_millis(100))
.max_delay(Duration::from_secs(2)),
)
.await
.map_err(|err| {
tracing::error!("Failed to find connection after retries: {:?}", err);
MediationError::InternalServerError
})?
.ok_or_else(|| MediationError::UncoordinatedSender)?;

// Prepare handles to relevant collections

Expand Down Expand Up @@ -347,11 +390,29 @@
.as_ref()
.ok_or(MediationError::InternalServerError)?;

let connection = connection_repository
.find_one_by(doc! { "client_did": &sender })
.await
.unwrap()
.ok_or_else(|| MediationError::UncoordinatedSender)?;
let connection = retry_async(
|| {
let connection_repository = connection_repository.clone();
let sender = sender.clone();

async move {
connection_repository
.find_one_by(doc! { "client_did": &sender })
.await
.map_err(|_| ())
}
},
RetryOptions::new()
.retries(5)
.exponential_backoff(Duration::from_millis(100))
.max_delay(Duration::from_secs(2)),
)
.await
.map_err(|err| {
tracing::error!("Failed to find connection after retries: {:?}", err);
MediationError::InternalServerError
})?
.ok_or_else(|| MediationError::UncoordinatedSender)?;

println!("keylist: {:?}", connection);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

#[error("Malformed request. {0}")]
MalformedRequest(String),

#[error("Service unavailable")]
CircuitOpen,

Check warning on line 19 in crates/web-plugins/didcomm-messaging/protocols/pickup/src/error.rs

View workflow job for this annotation

GitHub Actions / Build and test

variant `CircuitOpen` is never constructed

Check warning on line 19 in crates/web-plugins/didcomm-messaging/protocols/pickup/src/error.rs

View workflow job for this annotation

GitHub Actions / Build and test

variant `CircuitOpen` is never constructed
}

impl IntoResponse for PickupError {
Expand All @@ -22,6 +25,7 @@
PickupError::MissingSenderDID | PickupError::MalformedRequest(_) => {
StatusCode::BAD_REQUEST
}
PickupError::CircuitOpen => StatusCode::SERVICE_UNAVAILABLE,
PickupError::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PickupError::MissingClientConnection => StatusCode::UNAUTHORIZED,
};
Expand Down
Loading
Loading