Skip to content

Commit

Permalink
Merge pull request #1 from phil3k3/fix_response
Browse files Browse the repository at this point in the history
Refactor command response to be handled with tokio::oneshot
  • Loading branch information
phil3k3 authored Dec 14, 2023
2 parents 6f5dbc4 + 590bdb1 commit ece68bc
Show file tree
Hide file tree
Showing 8 changed files with 181 additions and 108 deletions.
2 changes: 1 addition & 1 deletion cqrs-example-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ actix-web = "4"
log = "0.4.8"
env_logger = "0.10.0"
tokio = { version = "1.35.0", features = ["macros", "rt-multi-thread", "time"] }
uuid = { version = "0.8.2", features = ["v4"] }
uuid = { version = "1.6.1", features = ["v4"] }
4 changes: 2 additions & 2 deletions cqrs-example-client/src/Settings.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
service_id = "CQRS-EXAMPLE-CLIENT"
command_topic = "TEST"
service_subscriptions = "TEST"
service_subscriptions = "TEST_EVENTS"
response_topic = "TEST_RESPONSE"
bootstrap_server = "127.0.0.1:9092"
log_level = "debug"
log_level = "info"
56 changes: 30 additions & 26 deletions cqrs-example-client/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::{env, io};
use std::sync::Mutex;
use std::sync::{Arc, Mutex};
use config::Config;
use cqrs_library::{CommandServiceClient, Command, EventListener, Event};
use cqrs_library::{CommandServiceClient, Command, EventListener, Event, CommandResponse};
use cqrs_kafka::inbound::{KafkaInboundChannel, StreamKafkaInboundChannel};
use cqrs_kafka::outbound::KafkaOutboundChannel;
use serde::{Deserialize, Serialize};
use actix_web::{App, error, HttpResponse, HttpServer, post, Responder, web};
use actix_web::http::header::ContentType;
use actix_web::{App, HttpResponse, HttpServer, post, Responder, web};
use log::info;
use uuid::Uuid;

Expand All @@ -17,7 +16,7 @@ struct CreateUserCommand {
}

struct AppState {
client: Mutex<CommandServiceClient>,
client: Mutex<CommandServiceClient<KafkaInboundChannel>>,
}

impl Command<'_> for CreateUserCommand {
Expand All @@ -32,7 +31,7 @@ impl Command<'_> for CreateUserCommand {
#[derive(Debug, Deserialize, Serialize)]
struct UserCreatedEvent {
user_id: String,
user_name: String,
name: String,
}

#[typetag::serde]
Expand All @@ -47,30 +46,38 @@ impl Event for UserCreatedEvent {
}

fn handle_event(event: &dyn Event) {
dbg!(event);
print!("user created received");
info!("{:?}", event);
}

#[post("/users/{user_id}")]
async fn post_user(
command_service_client: web::Data<AppState>,
user_id: web::Path<String>) -> impl Responder {
print!("Creating user {}", user_id);
info!("Creating user {}", user_id);
let command = CreateUserCommand {
user_id: Uuid::new_v4().to_string(),
name: String::from(user_id.into_inner()),
};
match command_service_client.client.lock().unwrap().send_command(&command).await {
Ok(result) => {
if result == CommandResponse::Ok {
HttpResponse::Ok().body(command.user_id)
} else {
HttpResponse::InternalServerError().body("Failed to process command, check server logs")
}
}
Err(_) => {
HttpResponse::InternalServerError().body("Failed to send command")
}
}
}

let result = web::block(move || {
command_service_client.client
.lock()
.unwrap()
.send_command(&command);
}).await.map_err(error::ErrorInternalServerError);

HttpResponse::Ok()
.content_type(ContentType::plaintext())
.body(result.unwrap())
fn create_channel(settings: Config) -> Box<KafkaInboundChannel> {
return Box::new(KafkaInboundChannel::new(
&settings.get_string("service_id").unwrap(),
&[&settings.get_string("response_topic").unwrap()],
&settings.get_string("bootstrap_server").unwrap(),
))
}

#[tokio::main]
Expand Down Expand Up @@ -114,26 +121,23 @@ async fn main() -> io::Result<()> {
&settings2.get_string("bootstrap_server").unwrap(),
);

let kafka_command_response_channel = KafkaInboundChannel::new(
&settings2.get_string("service_id").unwrap(),
&[&settings2.get_string("response_topic").unwrap()],
&settings2.get_string("bootstrap_server").unwrap(),
);
let command_service_client_data = web::Data::new(
AppState {
client: Mutex::new(
CommandServiceClient::new(
&settings2.get_string("service_id").unwrap(),
Box::new(kafka_command_channel),
Box::new(kafka_command_response_channel),
Arc::new(create_channel),
Box::new(kafka_command_channel)
)
)
}
);
command_service_client_data.client.lock().unwrap().start(settings2);
App::new()
.app_data(command_service_client_data.clone())
.service(post_user)
}).bind(("127.0.0.1", 8080))?
.run()
.await

}
4 changes: 2 additions & 2 deletions cqrs-kafka/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ edition = "2021"

[dependencies]
cqrs-library = { path = "../cqrs-library" }
rdkafka = { version = "0.36.0", features = ["cmake-build"] }
rdkafka = { version = "0.35.0", features = ["cmake-build"] }
log = "0.4.8"
blockingqueue = "0.1.1"
futures = "0.3.28"


[dev-dependencies]
testcontainers = "0.14.0"
testcontainers = "0.15.0"
env_logger = "0.10.0"
tokio = { version = "1.35.0", features = ["macros", "rt-multi-thread", "time"] }
60 changes: 26 additions & 34 deletions cqrs-kafka/src/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ type LoggingConsumer = BaseConsumer<CustomContext>;
type LoggingStreamingConsumer = StreamConsumer<CustomContext>;

pub struct KafkaInboundChannel {
consumer: BaseConsumer<CustomContext>
consumer: BaseConsumer<CustomContext>,
}

pub struct StreamKafkaInboundChannel {
consumer: StreamConsumer<CustomContext>
consumer: StreamConsumer<CustomContext>,
}

impl KafkaInboundChannel {
pub fn new(service_id: &str,topics: &[&str], bootstrap_server: &str) -> KafkaInboundChannel {
pub fn new(service_id: &str, topics: &[&str], bootstrap_server: &str) -> KafkaInboundChannel {
let channel = KafkaInboundChannel {
consumer: KafkaInboundChannel::create_consumer(bootstrap_server.to_string(), service_id.to_string()).unwrap()
};
Expand All @@ -51,11 +51,11 @@ impl KafkaInboundChannel {
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "true")
.set("isolation.level", "read_uncommitted")
.set("auto.offset.reset", "earliest")
.set("debug", "consumer,cgrp,topic,fetch");
.set("auto.offset.reset", "earliest");
// .set("debug", "consumer,cgrp,topic,fetch");

// all nodes of the same service are in a group and will get some partitions assigned
config.set_log_level(RDKafkaLogLevel::Debug);
// config.set_log_level(RDKafkaLogLevel::Debug);
config.create_with_context(CustomContext {})
}
}
Expand All @@ -77,11 +77,11 @@ impl StreamKafkaInboundChannel {
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "true")
.set("isolation.level", "read_uncommitted")
.set("auto.offset.reset", "earliest")
.set("debug", "consumer,cgrp,topic,fetch");
.set("auto.offset.reset", "earliest");
// .set("debug", "consumer,cgrp,topic,fetch");

// all nodes of the same service are in a group and will get some partitions assigned
config.set_log_level(RDKafkaLogLevel::Debug);
// config.set_log_level(RDKafkaLogLevel::Debug);
config.create_with_context(CustomContext {})
}

Expand All @@ -93,28 +93,13 @@ impl StreamKafkaInboundChannel {
}
}

pub async fn consume_async_blocking(&self, event_listener: &EventListener) {

let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", "test_group")
.set("bootstrap.servers", "localhost:9092")
.set("enable.auto.commit", "true")
.create()
.expect("Consumer creation failed");
return consumer.stream().try_for_each(|borrowed_message| {
pub async fn consume_async_blocking(&self, event_listener: &EventListener) {
return self.consumer.stream().try_for_each(|borrowed_message| {
async move {
let payload = match borrowed_message.payload_view::<str>() {
Some(Ok(s)) => s,
Some(Err(e)) => {
println!("Error while deserializing message payload: {:?}", e);
""
},
None => "<no payload>",
};
event_listener.consume(borrowed_message.payload().unwrap());

println!("Key: '{:?}', Payload: '{}', Topic: '{}', Partition: {}, Offset: {}",
borrowed_message.key(), payload, borrowed_message.topic(), borrowed_message.partition(), borrowed_message.offset());
let key = String::from_utf8_lossy(borrowed_message.key().unwrap());
println!("Key: '{:?}', Topic: '{}', Partition: {}, Offset: {}",
key, borrowed_message.topic(), borrowed_message.partition(), borrowed_message.offset());
Ok(()) // Important to return Ok(()) for successful processing
}
}).await.expect("Stream processing failed");
Expand All @@ -124,10 +109,17 @@ impl StreamKafkaInboundChannel {
impl InboundChannel for KafkaInboundChannel {
fn consume(&mut self) -> Option<Vec<u8>> {
return self.consumer.poll(Duration::from_secs(1))
.and_then(|x| match x {
Ok(t) => t.payload().and_then(|y| Some(y.to_vec())),
Err(_v) => None
}
);
.and_then(|x| {
match x {
Ok(t) => {
let option = t.payload();
option.and_then(|y| {
Some(y.to_vec())
})
},
Err(_v) => None
}
}
);
}
}
8 changes: 4 additions & 4 deletions cqrs-kafka/src/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,16 @@ fn create_producer(bootstrap_server: String) -> Result<ThreadedProducer<Producer
let mut config = ClientConfig::new();
config
.set("bootstrap.servers", bootstrap_server)
.set("message.timeout.ms", "5000")
.set("debug", "broker,topic,msg");
config.set_log_level(RDKafkaLogLevel::Debug);
.set("message.timeout.ms", "5000");
// .set("debug", "broker,topic,msg");
// config.set_log_level(RDKafkaLogLevel::Debug);
config.create_with_context(ProducerCallbackLogger {})
}

fn create_admin_client(bootstrap_server: String) -> Result<AdminClient<ProducerCallbackLogger>, KafkaError> {
let mut config = ClientConfig::new();
config.set("bootstrap.servers", bootstrap_server);
config.set_log_level(RDKafkaLogLevel::Debug);
// config.set_log_level(RDKafkaLogLevel::Debug);
config.create_with_context(ProducerCallbackLogger {})
}

Expand Down
8 changes: 5 additions & 3 deletions cqrs-library/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
prost = "0.9"
prost = "0.12.3"
bytes = "1.1.0"
prost-types = "0.9"
prost-types = "0.12.3"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
uuid = { version = "0.8", features = ["serde", "v4"] }
uuid = { version = "1.6.1", features = ["serde", "v4"] }
chrono = "0.4"
lazy_static = "1.4.0"
dashmap = "5.3.3"
log = "0.4.17"
typetag = "0.2"
tokio = "1.35.0"
config = "0.13.4"

[build-dependencies]
prost-build = "0.9"
Loading

0 comments on commit ece68bc

Please sign in to comment.