diff --git a/src/services/public_http_server/handlers/relay_webhook/mod.rs b/src/services/public_http_server/handlers/relay_webhook/mod.rs index 130232e6..66ae1730 100644 --- a/src/services/public_http_server/handlers/relay_webhook/mod.rs +++ b/src/services/public_http_server/handlers/relay_webhook/mod.rs @@ -128,48 +128,52 @@ pub async fn handler( let event = claims.evt; - state - .relay_mailbox_clearer_tx - .send(Receipt { - topic: event.topic.clone(), - message_id: event.message_id, - }) - .await - .expect("Batch receive channel should not be closed"); - - // Check these after the mailbox cleaner because these - // messages would actually be in the mailbox becuase - // the client ID (sub) matches, meaning we are the one - // that subscribed. However, aud and whu are not valid, - // that's a relay error. We should still clear the mailbox - // TODO check sub - info!("aud: {}", claims.basic.aud); - // TODO check whu - info!("whu: {}", claims.whu); - - let incoming_message = RelayIncomingMessage { - topic: event.topic, - message_id: get_message_id(&event.message).into(), - message: event.message, - tag: event.tag, - received_at: Utc::now(), - }; + // If the message was queued to the mailbox, remove it from the mailbox + // The message was already handle by the accepted event, so don't handle a second time here + if event.status == WatchStatus::Queued { + state + .relay_mailbox_clearer_tx + .send(Receipt { + topic: event.topic.clone(), + message_id: event.message_id, + }) + .await + .expect("Batch receive channel should not be closed"); + } else { + // Check these after the mailbox cleaner because these + // messages would actually be in the mailbox becuase + // the client ID (sub) matches, meaning we are the one + // that subscribed. However, aud and whu are not valid, + // that's a relay error. We should still clear the mailbox + // TODO check sub + info!("aud: {}", claims.basic.aud); + // TODO check whu + info!("whu: {}", claims.whu); + + let incoming_message = RelayIncomingMessage { + topic: event.topic, + message_id: get_message_id(&event.message).into(), + message: event.message, + tag: event.tag, + received_at: Utc::now(), + }; + + if claims.act != WatchAction::WatchEvent { + return Err(Error::Client(ClientError::WrongWatchAction(claims.act))); + } + if claims.typ != WatchType::Subscriber { + return Err(Error::Client(ClientError::WrongWatchType(claims.typ))); + } - if claims.act != WatchAction::WatchEvent { - return Err(Error::Client(ClientError::WrongWatchAction(claims.act))); - } - if claims.typ != WatchType::Subscriber { - return Err(Error::Client(ClientError::WrongWatchType(claims.typ))); - } + if event.status != WatchStatus::Accepted { + return Err(Error::Client(ClientError::WrongWatchStatus(event.status))); + } - if event.status != WatchStatus::Queued && event.status != WatchStatus::Accepted { - return Err(Error::Client(ClientError::WrongWatchStatus(event.status))); + handle_msg(incoming_message, &state) + .await + .map_err(Error::Server)?; } - handle_msg(incoming_message, &state) - .await - .map_err(Error::Server)?; - Ok(StatusCode::NO_CONTENT.into_response()) } diff --git a/src/services/relay_renewal_job/register_webhook.rs b/src/services/relay_renewal_job/register_webhook.rs index abac0938..71c0c4f0 100644 --- a/src/services/relay_renewal_job/register_webhook.rs +++ b/src/services/relay_renewal_job/register_webhook.rs @@ -31,7 +31,8 @@ pub async fn run( tags: INCOMING_TAGS.to_vec(), // Alternatively we could not care about the tag, as an incoming message is an incoming message // tags: (4000..4100).collect(), - statuses: vec![WatchStatus::Accepted], + // Accepted webhook to handle the message, Queued webhook to remove message from mailbox + statuses: vec![WatchStatus::Accepted, WatchStatus::Queued], ttl: Duration::from_secs(60 * 60 * 24 * 30), }, keypair, @@ -111,7 +112,7 @@ mod tests { ); assert_eq!(claims.typ, WatchType::Subscriber); assert_eq!(claims.act, WatchAction::Register); - assert_eq!(claims.sts, vec![WatchStatus::Accepted]); + assert_eq!(claims.sts, vec![WatchStatus::Accepted, WatchStatus::Queued]); const LEEWAY: i64 = 2; let expected_iat = Utc::now().timestamp(); assert!(claims.basic.iat <= expected_iat);