Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
phil3k3 committed Dec 21, 2023
1 parent 677f940 commit 11d492e
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 46 deletions.
23 changes: 8 additions & 15 deletions cqrs-example-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,11 @@ async fn post_user(
user_id: Uuid::new_v4().to_string(),
name: String::from(user_id.into_inner()),
};
match command_service_client.client.lock().unwrap().send_command(&command).await {
Ok(result) => {
if result == CommandResponse::Ok {
HttpResponse::Ok().body(command.user_id)
} else {
HttpResponse::InternalServerError().body("Failed to process command, check server logs")
}
}
Err(_) => {
HttpResponse::InternalServerError().body("Failed to send command")
}
let result = command_service_client.client.lock().unwrap().send_command(&command).await;
if result == CommandResponse::Ok {
HttpResponse::Ok().body(command.user_id)
} else {
HttpResponse::InternalServerError().body("Failed to process command, check server logs")
}
}

Expand All @@ -77,7 +71,7 @@ fn create_channel(settings: Config) -> Box<KafkaInboundChannel> {
&settings.get_string("service_id").unwrap(),
&[&settings.get_string("response_topic").unwrap()],
&settings.get_string("bootstrap_server").unwrap(),
))
));
}

#[tokio::main]
Expand Down Expand Up @@ -126,8 +120,8 @@ async fn main() -> io::Result<()> {
client: Mutex::new(
CommandServiceClient::new(
&settings2.get_string("service_id").unwrap(),
Arc::new(Mutex::new(Some(Box::new(create_channel)))),
Box::new(kafka_command_channel)
Arc::new(tokio::sync::Mutex::new(Some(Box::new(create_channel)))),
Box::new(kafka_command_channel),
)
)
}
Expand All @@ -139,5 +133,4 @@ async fn main() -> io::Result<()> {
}).bind(("127.0.0.1", 8080))?
.run()
.await

}
69 changes: 38 additions & 31 deletions cqrs-library/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ use std::collections::HashMap;
use std::fmt::Debug;
use std::io::Cursor;
use std::ops::Deref;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::time::Duration;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use chrono::Utc;
use config::Config;
use prost::Message;
use log::{debug, error, info};
use tokio::sync::oneshot::{channel, Sender, Receiver};
use tokio::sync::oneshot::error::RecvError;
use tokio::sync::Mutex;
use tokio::sync::oneshot::{channel, Sender};
use tokio::task::JoinHandle;
use crate::envelope::{CommandResponseEnvelopeProto, DomainEventEnvelopeProto};
pub use crate::messages::{CommandMetadata, CommandResponse, CommandServerResult};
Expand Down Expand Up @@ -132,11 +133,11 @@ pub struct CommandServiceClient<T> {
service_instance_id: u32,
command_channel: Box<dyn OutboundChannel + Sync + Send>,
pending_responses_senders: Arc<Mutex<HashMap<String, Sender<CommandResponse>>>>,
pending_responses_receiver: Mutex<HashMap<String, Receiver<CommandResponse>>>,
// running: Arc<Mutex<bool>>,
channel_builder: InboundChannelBuilder<T>
}


type InboundChannelBuilder<T> = Arc<Mutex<Option<Box<dyn FnOnce(Config) -> Box<T> + Sync + Send>>>>;

pub struct EventListener {
Expand Down Expand Up @@ -202,7 +203,6 @@ impl<'a, T: InboundChannel + Send + Sync + 'static> CommandServiceClient<T> {
service_instance_id: 0u32,
command_channel,
pending_responses_senders: Arc::new(Mutex::new(HashMap::new())),
pending_responses_receiver: Mutex::new(HashMap::new()),
// running: Arc::new(Mutex::new(false)),
channel_builder
}
Expand All @@ -215,7 +215,7 @@ impl<'a, T: InboundChannel + Send + Sync + 'static> CommandServiceClient<T> {
let channel_reader = self.channel_builder.clone();

return tokio::task::spawn(async move {
let mut guard = channel_reader.lock().unwrap();
let mut guard = channel_reader.lock().await;
if let Some(func) = guard.take() {
let mut channel = func(settings);
loop {
Expand All @@ -225,30 +225,39 @@ impl<'a, T: InboundChannel + Send + Sync + 'static> CommandServiceClient<T> {
info!("No result");
}
Some(result) => {
if let Some(sender) = pending_responses_senders.lock().unwrap().remove(result.1.as_str()) {
debug!("Received response for {}: {}", result.1.as_str(), result.0);
sender.send(result.0).expect("Command could not be delivered");
} else {
error!("Sender not found");
let mut result1 = pending_responses_senders.lock().await;
{
if let Some(sender) = result1.remove(result.1.as_str()) {
debug!("Received response for {}: {}", result.1.as_str(), result.0);
sender.send(result.0).expect("Command could not be delivered");
} else {
error!("Sender not found");
}
}
}
}
tokio::time::sleep(Duration::from_millis(1000)).await;
}
}
})
}

pub async fn send_command<C: Command<'a>+?Sized>(&mut self, command: &C) -> Result<CommandResponse, RecvError> {
pub async fn send_command<C: Command<'a>+?Sized>(&mut self, command: &C) -> CommandResponse {
let command_id = Uuid::new_v4().to_string();
let serialized_command = serialize_command_to_protobuf(&command_id, command, String::from(&self.service_id), self.service_instance_id);
let (tx, rx) = channel();
self.pending_responses_receiver.get_mut().unwrap().insert(command_id.to_owned(), rx);
let mut result = self.pending_responses_senders.lock().unwrap();
result.insert(command_id.to_owned(), tx);

{
let mut result1 = self.pending_responses_senders.lock().await;
result1.insert(command_id.to_owned(), tx);
}

self.command_channel.send(command.get_subject().as_bytes().to_vec(),serialized_command);

return self.pending_responses_receiver.get_mut().unwrap().get_mut(command_id.clone().as_str()).unwrap().await
match rx.await {
Ok(k) => k,
Err(_) => panic!("error")
}
}

pub fn send_command_async<C: Command<'a>+?Sized>(&mut self, command: &C, command_channel: &mut (dyn OutboundChannel + Send + Sync)) {
Expand Down Expand Up @@ -443,7 +452,7 @@ struct CommandResponseResult {
#[cfg(test)]
mod tests {
use std::env;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use config::Config;
use crate::{CommandAccessor,
CommandStore,
Expand All @@ -458,7 +467,7 @@ mod tests {
EventProducerImpl};
use serde::{Serialize, Deserialize};

use log::{debug};
use log::debug;
use tokio::sync::oneshot;
use tokio::sync::oneshot::{Receiver, Sender};

Expand Down Expand Up @@ -526,11 +535,12 @@ mod tests {

impl InboundChannel for TokioInboundChannel {
fn consume(&mut self) -> Option<Vec<u8>> {
if let Some(receiver) = self.receiver.take() {
match receiver.blocking_recv() {
if let Some(mut receiver) = self.receiver.take() {
let result = match receiver.try_recv() {
Ok(k) => Some(k),
Err(_) => None
}
};
return result;
} else {
None
}
Expand Down Expand Up @@ -592,7 +602,6 @@ mod tests {


#[tokio::test]
#[ignore]
async fn test_serialize_command_response() {

let command = TestCreateUserCommand {
Expand All @@ -611,7 +620,7 @@ mod tests {
let (tx_event, _rx_event) : (Sender<Vec<u8>>, Receiver<Vec<u8>>) = oneshot::channel();


tokio::task::spawn(async move {
let handle = tokio::task::spawn(async move {
let mut command_store = CommandStore::new("COMMAND-SERVER");
command_store.register_handler("CreateUserCommand", verify_handle_create_user);

Expand All @@ -632,20 +641,18 @@ mod tests {
});

let mut command_service_client = CommandServiceClient::new(
"COMMAND-CLIENT", Arc::new(Mutex::new(Some(Box::new(|_config| {
"COMMAND-CLIENT", Arc::new(tokio::sync::Mutex::new(Some(Box::new(|_config| {
return Box::new(TokioInboundChannel {
receiver: Some(rx_command_response)
});
})))),
Box::new(TokioOutboundChannel::new(tx_command))
);
let config = Config::builder().set_default("a", "b").unwrap().build();
command_service_client.start(config.unwrap());
match command_service_client.send_command(&command).await {
Ok(respose) => {
assert_eq!(respose, CommandResponse::Ok)
}
Err(_) => assert!(false)
};
let handle2 = command_service_client.start(config.unwrap());
let response = command_service_client.send_command(&command).await;
assert_eq!(response, CommandResponse::Ok);
handle2.abort();
handle.abort();
}
}

0 comments on commit 11d492e

Please sign in to comment.