Skip to content

Commit

Permalink
Add concurrent verb execution load tests
Browse files Browse the repository at this point in the history
Spams a running server with brute force verb executions in an attempt to measure throughput.
  • Loading branch information
rdaum committed Dec 7, 2024
1 parent 8b18963 commit f447cfb
Show file tree
Hide file tree
Showing 5 changed files with 424 additions and 45 deletions.
1 change: 1 addition & 0 deletions bacon.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ default_job = "daemon"
command = [
"cargo",
"run",
"--release",
"-p",
"moor-daemon",
"--",
Expand Down
4 changes: 4 additions & 0 deletions crates/testing/load-tools/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ description = "Load testing and transaction model checking"
name = "moor-model-checker"
path = "src/tx-list-append.rs"

[[bin]]
name = "load-test"
path = "src/verb-dispatch-load-test.rs"

[dependencies]
moor-values = { path = "../../common" }
rpc-async-client = { path = "../../rpc/rpc-async-client" }
Expand Down
60 changes: 52 additions & 8 deletions crates/testing/load-tools/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,26 @@
// this program. If not, see <https://www.gnu.org/licenses/>.
//

use eyre::bail;
use eyre::{anyhow, bail};
use moor_values::model::ObjectRef;
use moor_values::tasks::VerbProgramError;
use moor_values::{Obj, Symbol, SYSTEM_OBJECT};
use rpc_async_client::pubsub_client::broadcast_recv;
use moor_values::{Obj, Symbol, Var, SYSTEM_OBJECT};
use rpc_async_client::pubsub_client::{broadcast_recv, events_recv};
use rpc_async_client::rpc_client::RpcSendClient;
use rpc_async_client::{ListenersClient, ListenersMessage};
use rpc_common::HostClientToDaemonMessage::ConnectionEstablish;
use rpc_common::{
AuthToken, ClientToken, ClientsBroadcastEvent, DaemonToClientReply, HostClientToDaemonMessage,
HostType, ReplyResult, VerbProgramResponse, CLIENT_BROADCAST_TOPIC,
AuthToken, ClientEvent, ClientToken, ClientsBroadcastEvent, DaemonToClientReply,
HostClientToDaemonMessage, HostType, ReplyResult, VerbProgramResponse, CLIENT_BROADCAST_TOPIC,
};
use std::collections::HashMap;
use std::net::{Ipv4Addr, SocketAddr};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::SystemTime;
use std::time::{Instant, SystemTime};
use tmq::subscribe::Subscribe;
use tmq::{request, subscribe};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::{debug, error, info};
use uuid::Uuid;
Expand Down Expand Up @@ -301,6 +303,7 @@ pub async fn initialization_session(
info!("Initialization script executed successfully");

for (verb_name, verb_code) in verbs {
info!("Compiling {} verb", verb_name);
compile(
&mut rpc_client,
client_id,
Expand All @@ -321,6 +324,47 @@ pub async fn initialization_session(
}

pub struct ExecutionContext {
pub(crate) zmq_ctx: tmq::Context,
pub(crate) kill_switch: Arc<std::sync::atomic::AtomicBool>,
pub zmq_ctx: tmq::Context,
pub kill_switch: Arc<std::sync::atomic::AtomicBool>,
}

pub async fn listen_responses(
client_id: Uuid,
mut events_sub: Subscribe,
ks: Arc<AtomicBool>,
event_listen_task_results: Arc<Mutex<HashMap<usize, Result<Var, eyre::Error>>>>,
) {
tokio::spawn(async move {
let start_time = Instant::now();
info!("Waiting for events...");
loop {
if ks.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
let msg = events_recv(client_id, &mut events_sub).await;
match msg {
Ok(ClientEvent::TaskSuccess(tid, v)) => {
let mut tasks = event_listen_task_results.lock().await;
tasks.insert(tid, Ok(v));
}
Ok(ClientEvent::TaskError(tid, e)) => {
let mut tasks = event_listen_task_results.lock().await;
tasks.insert(tid, Err(anyhow!("Task error: {:?}", e)));
}
Ok(_) => {}
Err(e) => {
panic!("Error in event recv: {}", e);
}
}
}
let seconds_since_start = start_time.elapsed().as_secs();
if seconds_since_start % 5 == 0 {
let tasks = event_listen_task_results.lock().await;
info!(
"Event listener running for {} seconds with {} tasks",
seconds_since_start,
tasks.len()
);
}
});
}
47 changes: 10 additions & 37 deletions crates/testing/load-tools/src/tx-list-append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
mod setup;

use crate::setup::{broadcast_handle, create_user_session, initialization_session};
use crate::setup::{
broadcast_handle, create_user_session, initialization_session, listen_responses,
};
use clap::Parser;
use clap_derive::Parser;
use edn_format::{Keyword, Value};
Expand Down Expand Up @@ -423,42 +425,13 @@ async fn list_append_workload(
.await?;

let task_results = Arc::new(Mutex::new(HashMap::new()));

let event_listen_task_results = task_results.clone();
let ks = kill_switch.clone();
tokio::spawn(async move {
let start_time = Instant::now();
info!("Waiting for events...");
loop {
if ks.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
let msg = events_recv(client_id, &mut events_sub).await;
match msg {
Ok(ClientEvent::TaskSuccess(tid, v)) => {
let mut tasks = event_listen_task_results.lock().await;
tasks.insert(tid, Ok(v));
}
Ok(ClientEvent::TaskError(tid, e)) => {
let mut tasks = event_listen_task_results.lock().await;
tasks.insert(tid, Err(anyhow!("Task error: {:?}", e)));
}
Ok(_) => {}
Err(e) => {
panic!("Error in event recv: {}", e);
}
}
}
let seconds_since_start = start_time.elapsed().as_secs();
if seconds_since_start % 5 == 0 {
let tasks = event_listen_task_results.lock().await;
info!(
"Event listener running for {} seconds with {} tasks",
seconds_since_start,
tasks.len()
);
}
});
listen_responses(
client_id,
events_sub,
kill_switch.clone(),
task_results.clone(),
)
.await;

info!("Starting {} workloads", args.num_concurrent_workloads);
let mut workload_futures = FuturesUnordered::new();
Expand Down
Loading

0 comments on commit f447cfb

Please sign in to comment.