Skip to content

Commit

Permalink
Merge pull request #2 from phil3k3/fix_response
Browse files Browse the repository at this point in the history
Fix test output
  • Loading branch information
phil3k3 authored Dec 18, 2023
2 parents ece68bc + b5a64ce commit 0f0d956
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 70 deletions.
2 changes: 1 addition & 1 deletion cqrs-example-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ async fn main() -> io::Result<()> {
client: Mutex::new(
CommandServiceClient::new(
&settings2.get_string("service_id").unwrap(),
Arc::new(create_channel),
Arc::new(Mutex::new(Some(Box::new(create_channel)))),
Box::new(kafka_command_channel)
)
)
Expand Down
2 changes: 1 addition & 1 deletion cqrs-example-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
log = "0.4.8"
env_logger = "0.10.0"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "time"] }
tokio = { version = "1.35.0", features = ["macros", "rt-multi-thread", "time"] }
typetag = "0.2"

config = "0.13.3"
2 changes: 1 addition & 1 deletion cqrs-example-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ async fn main() {
debug!("No message!");
}
Some(mut message) => {
command_service_server.consume_async(&mut message, &mut command_response_channel);
command_service_server.handle_message(&mut message, &mut command_response_channel);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions cqrs-kafka/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ log = "0.4.8"
blockingqueue = "0.1.1"
futures = "0.3.28"


[dev-dependencies]
testcontainers = "0.15.0"
testcontainers-modules = { version = "0.2.0" , features = ["kafka"]}
env_logger = "0.10.0"
tokio = { version = "1.35.0", features = ["macros", "rt-multi-thread", "time"] }
tokio = { version = "1.35.0", features = ["macros", "rt-multi-thread", "time", "sync"] }
12 changes: 6 additions & 6 deletions cqrs-kafka/src/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ impl KafkaInboundChannel {
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "true")
.set("isolation.level", "read_uncommitted")
.set("auto.offset.reset", "earliest");
// .set("debug", "consumer,cgrp,topic,fetch");
.set("auto.offset.reset", "earliest")
.set("debug", "consumer,cgrp,topic,fetch");

// all nodes of the same service are in a group and will get some partitions assigned
// config.set_log_level(RDKafkaLogLevel::Debug);
config.set_log_level(RDKafkaLogLevel::Debug);
config.create_with_context(CustomContext {})
}
}
Expand All @@ -77,11 +77,11 @@ impl StreamKafkaInboundChannel {
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "true")
.set("isolation.level", "read_uncommitted")
.set("auto.offset.reset", "earliest");
// .set("debug", "consumer,cgrp,topic,fetch");
.set("auto.offset.reset", "earliest")
.set("debug", "consumer,cgrp,topic,fetch");

// all nodes of the same service are in a group and will get some partitions assigned
// config.set_log_level(RDKafkaLogLevel::Debug);
config.set_log_level(RDKafkaLogLevel::Debug);
config.create_with_context(CustomContext {})
}

Expand Down
8 changes: 4 additions & 4 deletions cqrs-kafka/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ mod tests {
use std::sync::mpsc::channel;
use std::{thread};
use std::thread::JoinHandle;
use futures::SinkExt;
use log::info;
use testcontainers::{clients, images::kafka};
use testcontainers::clients;
use testcontainers_modules::kafka::{Kafka, KAFKA_PORT};
use cqrs_library::{InboundChannel, OutboundChannel};
use crate::inbound::KafkaInboundChannel;
use crate::outbound::KafkaOutboundChannel;
Expand All @@ -34,13 +34,13 @@ mod tests {
info!("Starting transmission test via Kafka");

let docker = clients::Cli::default();
let kafka_node = docker.run(kafka::Kafka::default());
let kafka_node = docker.run(Kafka::default());

info!("Started Kafka container");

let bootstrap_servers = format!(
"127.0.0.1:{}",
kafka_node.get_host_port_ipv4(kafka::KAFKA_PORT)
kafka_node.get_host_port_ipv4(KAFKA_PORT)
);

info!("{}", bootstrap_servers);
Expand Down
8 changes: 4 additions & 4 deletions cqrs-kafka/src/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,16 @@ fn create_producer(bootstrap_server: String) -> Result<ThreadedProducer<Producer
let mut config = ClientConfig::new();
config
.set("bootstrap.servers", bootstrap_server)
.set("message.timeout.ms", "5000");
// .set("debug", "broker,topic,msg");
// config.set_log_level(RDKafkaLogLevel::Debug);
.set("message.timeout.ms", "5000")
.set("debug", "broker,topic,msg");
config.set_log_level(RDKafkaLogLevel::Debug);
config.create_with_context(ProducerCallbackLogger {})
}

fn create_admin_client(bootstrap_server: String) -> Result<AdminClient<ProducerCallbackLogger>, KafkaError> {
let mut config = ClientConfig::new();
config.set("bootstrap.servers", bootstrap_server);
// config.set_log_level(RDKafkaLogLevel::Debug);
config.set_log_level(RDKafkaLogLevel::Debug);
config.create_with_context(ProducerCallbackLogger {})
}

Expand Down
6 changes: 4 additions & 2 deletions cqrs-library/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ lazy_static = "1.4.0"
dashmap = "5.3.3"
log = "0.4.17"
typetag = "0.2"
tokio = "1.35.0"
tokio = { version = "1.35.0", features = ["macros", "rt-multi-thread", "time", "sync"] }
config = "0.13.4"
env_logger = "0.10.1"

[build-dependencies]
prost-build = "0.9"
prost-build = "0.12.3"

Loading

0 comments on commit 0f0d956

Please sign in to comment.