diff --git a/cqrs-example-client/src/main.rs b/cqrs-example-client/src/main.rs index fabe384..35d2a56 100644 --- a/cqrs-example-client/src/main.rs +++ b/cqrs-example-client/src/main.rs @@ -126,7 +126,7 @@ async fn main() -> io::Result<()> { client: Mutex::new( CommandServiceClient::new( &settings2.get_string("service_id").unwrap(), - Arc::new(create_channel), + Arc::new(Mutex::new(Some(Box::new(create_channel)))), Box::new(kafka_command_channel) ) ) diff --git a/cqrs-example-server/Cargo.toml b/cqrs-example-server/Cargo.toml index 5b13783..490f8ba 100644 --- a/cqrs-example-server/Cargo.toml +++ b/cqrs-example-server/Cargo.toml @@ -12,7 +12,7 @@ serde = { version = "1", features = ["derive"] } serde_json = "1" log = "0.4.8" env_logger = "0.10.0" -tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "time"] } +tokio = { version = "1.35.0", features = ["macros", "rt-multi-thread", "time"] } typetag = "0.2" config = "0.13.3" diff --git a/cqrs-example-server/src/main.rs b/cqrs-example-server/src/main.rs index 8f0d1ca..8d7a647 100644 --- a/cqrs-example-server/src/main.rs +++ b/cqrs-example-server/src/main.rs @@ -100,7 +100,7 @@ async fn main() { debug!("No message!"); } Some(mut message) => { - command_service_server.consume_async(&mut message, &mut command_response_channel); + command_service_server.handle_message(&mut message, &mut command_response_channel); } } } diff --git a/cqrs-kafka/Cargo.toml b/cqrs-kafka/Cargo.toml index 3d881e3..30bc6d8 100644 --- a/cqrs-kafka/Cargo.toml +++ b/cqrs-kafka/Cargo.toml @@ -12,8 +12,8 @@ log = "0.4.8" blockingqueue = "0.1.1" futures = "0.3.28" - [dev-dependencies] testcontainers = "0.15.0" +testcontainers-modules = { version = "0.2.0" , features = ["kafka"]} env_logger = "0.10.0" -tokio = { version = "1.35.0", features = ["macros", "rt-multi-thread", "time"] } +tokio = { version = "1.35.0", features = ["macros", "rt-multi-thread", "time", "sync"] } diff --git a/cqrs-kafka/src/inbound.rs b/cqrs-kafka/src/inbound.rs index ecfae0b..b47b6b0 100644 --- a/cqrs-kafka/src/inbound.rs +++ b/cqrs-kafka/src/inbound.rs @@ -51,11 +51,11 @@ impl KafkaInboundChannel { .set("session.timeout.ms", "6000") .set("enable.auto.commit", "true") .set("isolation.level", "read_uncommitted") - .set("auto.offset.reset", "earliest"); - // .set("debug", "consumer,cgrp,topic,fetch"); + .set("auto.offset.reset", "earliest") + .set("debug", "consumer,cgrp,topic,fetch"); // all nodes of the same service are in a group and will get some partitions assigned - // config.set_log_level(RDKafkaLogLevel::Debug); + config.set_log_level(RDKafkaLogLevel::Debug); config.create_with_context(CustomContext {}) } } @@ -77,11 +77,11 @@ impl StreamKafkaInboundChannel { .set("session.timeout.ms", "6000") .set("enable.auto.commit", "true") .set("isolation.level", "read_uncommitted") - .set("auto.offset.reset", "earliest"); - // .set("debug", "consumer,cgrp,topic,fetch"); + .set("auto.offset.reset", "earliest") + .set("debug", "consumer,cgrp,topic,fetch"); // all nodes of the same service are in a group and will get some partitions assigned - // config.set_log_level(RDKafkaLogLevel::Debug); + config.set_log_level(RDKafkaLogLevel::Debug); config.create_with_context(CustomContext {}) } diff --git a/cqrs-kafka/src/lib.rs b/cqrs-kafka/src/lib.rs index d13b597..f451402 100644 --- a/cqrs-kafka/src/lib.rs +++ b/cqrs-kafka/src/lib.rs @@ -6,9 +6,9 @@ mod tests { use std::sync::mpsc::channel; use std::{thread}; use std::thread::JoinHandle; - use futures::SinkExt; use log::info; - use testcontainers::{clients, images::kafka}; + use testcontainers::clients; + use testcontainers_modules::kafka::{Kafka, KAFKA_PORT}; use cqrs_library::{InboundChannel, OutboundChannel}; use crate::inbound::KafkaInboundChannel; use crate::outbound::KafkaOutboundChannel; @@ -34,13 +34,13 @@ mod tests { info!("Starting transmission test via Kafka"); let docker = clients::Cli::default(); - let kafka_node = docker.run(kafka::Kafka::default()); + let kafka_node = docker.run(Kafka::default()); info!("Started Kafka container"); let bootstrap_servers = format!( "127.0.0.1:{}", - kafka_node.get_host_port_ipv4(kafka::KAFKA_PORT) + kafka_node.get_host_port_ipv4(KAFKA_PORT) ); info!("{}", bootstrap_servers); diff --git a/cqrs-kafka/src/outbound.rs b/cqrs-kafka/src/outbound.rs index 5599ac5..0718419 100644 --- a/cqrs-kafka/src/outbound.rs +++ b/cqrs-kafka/src/outbound.rs @@ -40,16 +40,16 @@ fn create_producer(bootstrap_server: String) -> Result Result, KafkaError> { let mut config = ClientConfig::new(); config.set("bootstrap.servers", bootstrap_server); - // config.set_log_level(RDKafkaLogLevel::Debug); + config.set_log_level(RDKafkaLogLevel::Debug); config.create_with_context(ProducerCallbackLogger {}) } diff --git a/cqrs-library/Cargo.toml b/cqrs-library/Cargo.toml index e9972f9..06efc7d 100644 --- a/cqrs-library/Cargo.toml +++ b/cqrs-library/Cargo.toml @@ -17,8 +17,10 @@ lazy_static = "1.4.0" dashmap = "5.3.3" log = "0.4.17" typetag = "0.2" -tokio = "1.35.0" +tokio = { version = "1.35.0", features = ["macros", "rt-multi-thread", "time", "sync"] } config = "0.13.4" +env_logger = "0.10.1" [build-dependencies] -prost-build = "0.9" +prost-build = "0.12.3" + diff --git a/cqrs-library/src/lib.rs b/cqrs-library/src/lib.rs index 2177a00..4a578cc 100644 --- a/cqrs-library/src/lib.rs +++ b/cqrs-library/src/lib.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::error::Error; use std::fmt::Debug; use std::io::Cursor; use std::ops::Deref; @@ -47,7 +46,7 @@ impl<'a> CommandStore { pub struct EventProducerImpl { service_id: String, - event_channel: Box + event_channel: Box } pub trait EventProducer { @@ -64,7 +63,7 @@ impl EventProducer for EventProducerImpl { impl<'e> EventProducerImpl { - pub fn new(service_id: &str, event_channel: Box) -> EventProducerImpl { + pub fn new(service_id: &str, event_channel: Box) -> EventProducerImpl { EventProducerImpl { service_id: String::from(service_id), event_channel } } @@ -134,11 +133,11 @@ pub struct CommandServiceClient { command_channel: Box, pending_responses_senders: Arc>>>, pending_responses_receiver: Mutex>>, - running: Arc>, + // running: Arc>, channel_builder: InboundChannelBuilder } -type InboundChannelBuilder = Arc Box + Sync + Send>; +type InboundChannelBuilder = Arc Box + Sync + Send>>>>; pub struct EventListener { handlers: HashMap> @@ -204,31 +203,34 @@ impl<'a, T: InboundChannel + Send + Sync + 'static> CommandServiceClient { command_channel, pending_responses_senders: Arc::new(Mutex::new(HashMap::new())), pending_responses_receiver: Mutex::new(HashMap::new()), - running: Arc::new(Mutex::new(false)), + // running: Arc::new(Mutex::new(false)), channel_builder } } pub fn start(&mut self, settings: Config) -> JoinHandle<()> { // Clone Arc handles to move into the async block - let running = self.running.clone(); + // let running = self.running.clone(); let pending_responses_senders = self.pending_responses_senders.clone(); let channel_reader = self.channel_builder.clone(); return tokio::task::spawn(async move { - let mut channel = channel_reader(settings.clone()); - loop { - let response = CommandServiceClient::read_response(&mut channel); - match response { - None => { - info!("No result"); - } - Some(result) => { - if let Some(sender) = pending_responses_senders.lock().unwrap().remove(result.1.as_str()) { - debug!("Received response for {}: {}", result.1.as_str(), result.0); - sender.send(result.0).expect("Command could not be delivered"); - } else { - error!("Sender not found"); + let mut guard = channel_reader.lock().unwrap(); + if let Some(func) = guard.take() { + let mut channel = func(settings); + loop { + let response = CommandServiceClient::read_response(&mut channel); + match response { + None => { + info!("No result"); + } + Some(result) => { + if let Some(sender) = pending_responses_senders.lock().unwrap().remove(result.1.as_str()) { + debug!("Received response for {}: {}", result.1.as_str(), result.0); + sender.send(result.0).expect("Command could not be delivered"); + } else { + error!("Sender not found"); + } } } } @@ -255,7 +257,7 @@ impl<'a, T: InboundChannel + Send + Sync + 'static> CommandServiceClient { command_channel.send(command.get_subject().as_bytes().to_vec(),serialized_command); } - fn read_response(mut command_response_channel: &mut Box) -> Option<(CommandResponse, String)> { + fn read_response(command_response_channel: &mut Box) -> Option<(CommandResponse, String)> { let serialized_message = command_response_channel.consume(); return match serialized_message { None => { @@ -320,7 +322,7 @@ impl<'a> CommandServiceServer<'a> { } } - pub fn consume_async(&mut self, message: &mut Vec, command_response_channel: &mut dyn OutboundChannel) { + pub fn handle_message(&mut self, message: &mut Vec, command_response_channel: &mut dyn OutboundChannel) { let command_response = handle_command(&message, &self.command_store, &mut self.event_producer); match command_response { None => { @@ -440,11 +442,25 @@ struct CommandResponseResult { #[cfg(test)] mod tests { - - use crate::{CommandAccessor, CommandStore, CommandResponse, CommandServiceClient, OutboundChannel, InboundChannel, CommandServiceServer, Command, EventProducer, Event}; - use serde::{Serialize, Deserialize, Deserializer, Serializer}; + use std::env; + use std::sync::{Arc, Mutex}; + use config::Config; + use crate::{CommandAccessor, + CommandStore, + CommandResponse, + CommandServiceClient, + OutboundChannel, + InboundChannel, + CommandServiceServer, + Command, + EventProducer, + Event, + EventProducerImpl}; + use serde::{Serialize, Deserialize}; use log::{debug}; + use tokio::sync::oneshot; + use tokio::sync::oneshot::{Receiver, Sender}; #[derive(Debug, Deserialize, Serialize)] @@ -469,7 +485,7 @@ mod tests { } #[typetag::serde] - impl Event<'_> for UserCreatedEvent { + impl Event for UserCreatedEvent { fn get_id(&self) -> String { self.user_id.to_owned() } @@ -483,13 +499,45 @@ mod tests { messages: Vec> } - impl OutboundChannel for CapturingChannel { - fn send(&mut self, _key: Vec, command: Vec) { - debug!("Adding message"); - self.messages.push(command); + + struct TokioOutboundChannel { + sender: Option>> + } + + impl TokioOutboundChannel { + fn new(sender: Sender>) -> Self { + return TokioOutboundChannel { + sender: Some(sender) + } + } + } + + impl OutboundChannel for TokioOutboundChannel { + fn send(&mut self, _key: Vec, message: Vec) { + if let Some(sender) = self.sender.take() { + sender.send(message).expect("Message sending failed"); + } + } + } + + struct TokioInboundChannel { + receiver: Option>> + } + + impl InboundChannel for TokioInboundChannel { + fn consume(&mut self) -> Option> { + if let Some(receiver) = self.receiver.take() { + match receiver.blocking_recv() { + Ok(k) => Some(k), + Err(_) => None + } + } else { + None + } } } + impl InboundChannel for CapturingChannel { fn consume(&mut self) -> Option> { debug!("Removing message"); @@ -542,35 +590,62 @@ mod tests { CommandResponse::Ok } - #[test] - fn test_serialize_command_response() { + + #[tokio::test] + #[ignore] + async fn test_serialize_command_response() { let command = TestCreateUserCommand { user_id: String::from("user_id"), name: String::from("user_name") }; - let mut command_store = CommandStore::new("COMMAND-SERVER"); - command_store.register_handler("CreateUserCommand", verify_handle_create_user); + if env::var("RUST_LOG").is_err() { + env::set_var("RUST_LOG", "debug") + } - let mut command_channel = CapturingChannel { messages: Vec::new() }; - let mut command_response_channel = CapturingChannel { messages: Vec::new() }; - let event_channel = CapturingChannel { messages: Vec::new() }; + env_logger::init(); + + let (tx_command, rx_command) : (Sender>, Receiver>) = oneshot::channel(); + let (tx_command_response, rx_command_response) : (Sender>, Receiver>) = oneshot::channel(); + let (tx_event, _rx_event) : (Sender>, Receiver>) = oneshot::channel(); - let event_producer = EventProducer::new(&event_channel, "COMMAND-SERVER"); - let mut command_service_client = CommandServiceClient::new( - "COMMAND-SERVERIMPORT", || { - return Box::new(command_response_channel); - }, - Box::new(command_channel) - ); - let mut command_service_server = CommandServiceServer::new(&command_store, &event_producer); - command_service_client.send_command(&command); + tokio::task::spawn(async move { + let mut command_store = CommandStore::new("COMMAND-SERVER"); + command_store.register_handler("CreateUserCommand", verify_handle_create_user); - command_service_server.consume(&mut command_channel, &mut command_response_channel); - let command_response = command_service_client.read_response(); + let mut event_producer = EventProducerImpl::new("COMMAND-SERVER", Box::new(TokioOutboundChannel::new(tx_event))); + let mut command_service_server = CommandServiceServer::new(&command_store, &mut event_producer); - assert_eq!(command_response, CommandResponse::Ok); + match rx_command.await { + Ok(mut k) => { + let mut channel1 = TokioOutboundChannel { + sender: Some(tx_command_response) + }; + command_service_server.handle_message(&mut k, &mut channel1) + } + Err(_) => { + assert!(false); + } + } + }); + + let mut command_service_client = CommandServiceClient::new( + "COMMAND-CLIENT", Arc::new(Mutex::new(Some(Box::new(|_config| { + return Box::new(TokioInboundChannel { + receiver: Some(rx_command_response) + }); + })))), + Box::new(TokioOutboundChannel::new(tx_command)) + ); + let config = Config::builder().set_default("a", "b").unwrap().build(); + command_service_client.start(config.unwrap()); + match command_service_client.send_command(&command).await { + Ok(respose) => { + assert_eq!(respose, CommandResponse::Ok) + } + Err(_) => assert!(false) + }; } }