Skip to content

Commit

Permalink
improve benchmarking tools (#4450)
Browse files Browse the repository at this point in the history
  • Loading branch information
rubenfiszel committed Sep 27, 2024
1 parent 3fac66a commit 654c6bf
Show file tree
Hide file tree
Showing 12 changed files with 288 additions and 376 deletions.
59 changes: 59 additions & 0 deletions backend/plot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import json
import matplotlib.pyplot as plt

# Function to load JSON data from a file
def load_json_data(filepath):
with open(filepath, 'r') as file:
data = json.load(file)
return data

# Function to plot two arrays of subarrays with tuples (step_name, duration)
def plot_two_arrays_of_subarrays(arrays1, arrays2):
# Function to calculate sum of durations for each step
def calculate_sums(arrays):
steps = [step for step, _ in arrays[0]] # Extract steps from the first iteration
sums = {step: 0 for step in steps} # Initialize sums dictionary with step names

# Sum up the durations for each step across all subarrays
for subarray in arrays:
for step_name, duration in subarray:
sums[step_name] += duration

# Convert the sums dictionary to two lists (for plotting)
step_names = list(sums.keys())
durations = list(sums.values())
return step_names, durations

# Calculate sums for both arrays of subarrays
step_names1, sums1 = calculate_sums(arrays1)
step_names2, sums2 = calculate_sums(arrays2)

# Create two subplots, one on top of the other
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8))

# First plot (top) for the first array of subarrays
ax1.plot(step_names1, sums1, marker='o', linestyle='-', color='b')
ax1.set_title('Total Duration per Step - Main Loop')
ax1.set_xlabel('Step Name')
ax1.set_ylabel('Total Duration')
ax1.grid(True)

# Second plot (bottom) for the second array of subarrays
ax2.plot(step_names2, sums2, marker='o', linestyle='-', color='r')
ax2.set_title('Total Duration per Step - Result Processor')
ax2.set_xlabel('Step Name')
ax2.set_ylabel('Total Duration')
ax2.grid(True)

# Adjust layout so the plots don't overlap
plt.tight_layout()

# Display the plot
plt.show()

# Load arrays from the JSON files
arrays1 = load_json_data('/tmp/windmill/profiling_main.json')
arrays2 = load_json_data('/tmp/windmill/profiling_result_processor.json')

# Plot the data
plot_two_arrays_of_subarrays(arrays1, arrays2)
2 changes: 1 addition & 1 deletion backend/src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ fn read_log_counters(ts_str: String) -> (usize, usize) {
ok_lines = counter.non_error_count;
err_lines = counter.error_count;
} else {
println!("no counter found for {ts_str}");
// println!("no counter found for {ts_str}");
}
} else {
println!("Error reading log counters 2");
Expand Down
1 change: 1 addition & 0 deletions backend/windmill-api/src/flows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ pub async fn get_hub_flow_by_id(

#[derive(Deserialize)]
pub struct ToggleWorkspaceErrorHandler {
#[cfg(feature = "enterprise")]
pub muted: Option<bool>,
}

Expand Down
2 changes: 1 addition & 1 deletion backend/windmill-api/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ use windmill_common::{
oauth2::HmacSha256,
scripts::{ScriptHash, ScriptLang},
users::username_to_permissioned_as,
utils::{not_found_if_none, now_from_db, paginate, require_admin, Pagination, StripPath},
utils::{not_found_if_none, now_from_db, paginate, paginate_without_limits, require_admin, Pagination, StripPath},
};

#[cfg(all(feature = "enterprise", feature = "parquet"))]
Expand Down
1 change: 1 addition & 0 deletions backend/windmill-api/src/scripts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,7 @@ async fn list_paths(

#[derive(Deserialize)]
pub struct ToggleWorkspaceErrorHandler {
#[cfg(feature = "enterprise")]
pub muted: Option<bool>,
}

Expand Down
8 changes: 6 additions & 2 deletions backend/windmill-api/src/workspaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ use windmill_common::schedule::Schedule;
use windmill_common::users::username_to_permissioned_as;
use windmill_common::variables::build_crypt;
use windmill_common::worker::{to_raw_value, CLOUD_HOSTED};
use windmill_common::workspaces::{WorkspaceDeploymentUISettings, WorkspaceGitSyncSettings};
#[cfg(feature = "enterprise")]
use windmill_common::workspaces::WorkspaceDeploymentUISettings;
use windmill_common::workspaces::WorkspaceGitSyncSettings;
use windmill_common::{
error::{to_anyhow, Error, JsonResult, Result},
flows::Flow,
Expand Down Expand Up @@ -991,6 +993,7 @@ async fn edit_large_file_storage_config(

#[derive(Deserialize)]
pub struct EditGitSyncConfig {
#[cfg(feature = "enterprise")]
pub git_sync_settings: Option<WorkspaceGitSyncSettings>,
}

Expand Down Expand Up @@ -1056,6 +1059,7 @@ async fn edit_git_sync_config(

#[derive(Deserialize)]
struct EditDeployUIConfig {
#[cfg(feature = "enterprise")]
deploy_ui_settings: Option<WorkspaceDeploymentUISettings>,
}

Expand All @@ -1064,7 +1068,6 @@ async fn edit_deploy_ui_config(
_authed: ApiAuthed,
Extension(_db): Extension<DB>,
Path(_w_id): Path<String>,
Json(_new_config): Json<EditDeployUIConfig>,
) -> Result<String> {
return Err(Error::BadRequest(
"Deployment UI is only available on Windmill Enterprise Edition".to_string(),
Expand Down Expand Up @@ -1122,6 +1125,7 @@ async fn edit_deploy_ui_config(

#[derive(Deserialize)]
pub struct EditDefaultApp {
#[cfg(feature = "enterprise")]
pub default_app_path: Option<String>,
}

Expand Down
25 changes: 18 additions & 7 deletions backend/windmill-queue/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ use ulid::Ulid;
use uuid::Uuid;
use windmill_audit::audit_ee::{audit_log, AuditAuthor};
use windmill_audit::ActionKind;
#[cfg(not(feature = "enterprise"))]
use windmill_common::worker::PriorityTags;

use windmill_common::{
auth::{fetch_authed_from_permissioned_as, permissioned_as_to_username},
db::{Authed, UserDB},
Expand All @@ -62,9 +61,12 @@ use windmill_common::{
to_raw_value, DEFAULT_TAGS_PER_WORKSPACE, DEFAULT_TAGS_WORKSPACES, NO_LOGS, WORKER_CONFIG,
WORKER_PULL_QUERIES, WORKER_SUSPENDED_PULL_QUERY,
},
BASE_URL, DB, METRICS_ENABLED,
DB, METRICS_ENABLED,
};

#[cfg(feature = "enterprise")]
use windmill_common::BASE_URL;

#[cfg(feature = "cloud")]
use windmill_common::users::SUPERADMIN_SYNC_EMAIL;

Expand Down Expand Up @@ -125,10 +127,12 @@ const MAX_FREE_CONCURRENT_RUNS: i32 = 30;

const ERROR_HANDLER_USERNAME: &str = "error_handler";
const SCHEDULE_ERROR_HANDLER_USERNAME: &str = "schedule_error_handler";
#[cfg(feature = "enterprise")]
const SCHEDULE_RECOVERY_HANDLER_USERNAME: &str = "schedule_recovery_handler";
const ERROR_HANDLER_USER_GROUP: &str = "g/error_handler";
const ERROR_HANDLER_USER_EMAIL: &str = "[email protected]";
const SCHEDULE_ERROR_HANDLER_USER_EMAIL: &str = "[email protected]";
#[cfg(feature = "enterprise")]
const SCHEDULE_RECOVERY_HANDLER_USER_EMAIL: &str = "[email protected]";

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -477,6 +481,7 @@ where

#[derive(Deserialize)]
struct RawFlowFailureModule {
#[cfg(feature = "enterprise")]
failure_module: Option<Box<RawValue>>,
}

Expand Down Expand Up @@ -671,6 +676,7 @@ pub async fn add_completed_job<
}
}
// tracing::error!("Added completed job {:#?}", queued_job);
#[cfg(feature = "enterprise")]
let mut skip_downstream_error_handlers = false;
tx = delete_job(tx, &queued_job.workspace_id, job_id).await?;
// tracing::error!("3 {:?}", start.elapsed());
Expand Down Expand Up @@ -716,7 +722,10 @@ pub async fn add_completed_job<
.await?;

if let Some(schedule) = schedule {
skip_downstream_error_handlers = schedule.ws_error_handler_muted;
#[cfg(feature = "enterprise")]
{
skip_downstream_error_handlers = schedule.ws_error_handler_muted;
}

// script or flow that failed on start and might not have been rescheduled
let schedule_next_tick = !queued_job.is_flow()
Expand Down Expand Up @@ -749,6 +758,7 @@ pub async fn add_completed_job<
};
}

#[cfg(feature = "enterprise")]
if let Err(err) = apply_schedule_handlers(
rsmq.clone(),
db,
Expand Down Expand Up @@ -1324,6 +1334,8 @@ struct CompletedJobSubset {
result: Option<sqlx::types::Json<Box<RawValue>>>,
started_at: chrono::DateTime<chrono::Utc>,
}

#[cfg(feature = "enterprise")]
async fn apply_schedule_handlers<
'a,
'c,
Expand All @@ -1342,7 +1354,6 @@ async fn apply_schedule_handlers<
job_priority: Option<i16>,
) -> windmill_common::error::Result<()> {
if !success {
#[cfg(feature = "enterprise")]
if let Some(on_failure_path) = schedule.on_failure.clone() {
let times = schedule.on_failure_times.unwrap_or(1).max(1);
let exact = schedule.on_failure_exact.unwrap_or(false);
Expand Down Expand Up @@ -1392,7 +1403,6 @@ async fn apply_schedule_handlers<
.await?;
}
} else {
#[cfg(feature = "enterprise")]
if let Some(ref on_success_path) = schedule.on_success {
handle_successful_schedule(
db,
Expand All @@ -1410,7 +1420,6 @@ async fn apply_schedule_handlers<
.await?;
}

#[cfg(feature = "enterprise")]
if let Some(ref on_recovery_path) = schedule.on_recovery.clone() {
let tx: QueueTransaction<'_, R> = (rsmq.clone(), db.begin().await?).into();
let times = schedule.on_recovery_times.unwrap_or(1).max(1);
Expand Down Expand Up @@ -1579,6 +1588,7 @@ fn sanitize_result<T: Serialize + Send + Sync>(result: Json<&T>) -> HashMap<Stri
// is_flow: boolean,
// extra_args: serde_json::Value
// }
#[cfg(feature = "enterprise")]
async fn handle_recovered_schedule<
'a,
'c,
Expand Down Expand Up @@ -1671,6 +1681,7 @@ async fn handle_recovered_schedule<
Ok(())
}

#[cfg(feature = "enterprise")]
async fn handle_successful_schedule<
'a,
'c,
Expand Down
111 changes: 111 additions & 0 deletions backend/windmill-worker/src/bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use serde::Serialize;
use tokio::time::Instant;
use windmill_common::{
worker::{write_file, TMP_DIR},
DB,
};

pub struct BenchmarkInfo {
iters: u64,
timings: Vec<BenchmarkIter>,
}

impl Serialize for BenchmarkInfo {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let timings: Vec<Vec<(String, u32)>> = self
.timings
.iter()
.map(|x| x.timings.clone())
.collect::<Vec<Vec<(String, u32)>>>();
//serialize timings as vec of vec of tuples
timings.serialize(serializer)
}
}

impl BenchmarkInfo {
pub fn new() -> Self {
BenchmarkInfo { iters: 0, timings: vec![] }
}

pub fn add_iter(&mut self, bench: BenchmarkIter) {
self.iters += 1;
self.timings.push(bench);
}

pub fn write_to_file(&self, path: &str) -> anyhow::Result<()> {
println!("Writing benchmark {path}");
write_file(TMP_DIR, path, &serde_json::to_string(&self).unwrap()).expect("write profiling");
Ok(())
}
}

pub struct BenchmarkIter {
last_instant: Instant,
timings: Vec<(String, u32)>,
}

impl BenchmarkIter {
pub fn new() -> Self {
BenchmarkIter { last_instant: Instant::now(), timings: vec![] }
}

pub fn add_timing(&mut self, name: &str) {
let elapsed = self.last_instant.elapsed().as_nanos() as u32;
self.timings.push((name.to_string(), elapsed));
self.last_instant = Instant::now();
}
}

pub async fn benchmark_init(is_dedicated_worker: bool, db: &DB) {
use windmill_common::{jobs::JobKind, scripts::ScriptLang};

let benchmark_jobs: i32 = std::env::var("BENCHMARK_JOBS_AT_INIT")
.unwrap_or("5000".to_string())
.parse::<i32>()
.unwrap();
if is_dedicated_worker {
// you need to create the script first, check https://github.com/windmill-labs/windmill/blob/b76a92cfe454c686f005c65f534e29e039f3c706/benchmarks/lib.ts#L47
let hash = sqlx::query_scalar!(
"SELECT hash FROM script WHERE path = $1 AND workspace_id = $2",
"f/benchmarks/dedicated",
"admins"
)
.fetch_one(db)
.await
.unwrap_or_else(|_e| panic!("failed to insert dedicated jobs"));
sqlx::query!("INSERT INTO queue (id, script_hash, script_path, job_kind, language, tag, created_by, permissioned_as, email, scheduled_for, workspace_id) (SELECT gen_random_uuid(), $1, $2, $3, $4, $5, $6, $7, $8, $9, $10 FROM generate_series(1, $11))",
hash,
"f/benchmarks/dedicated",
JobKind::Script as JobKind,
ScriptLang::Bun as ScriptLang,
"admins:f/benchmarks/dedicated",
"admin",
"u/admin",
"[email protected]",
chrono::Utc::now(),
"admins",
benchmark_jobs
)
.execute(db)
.await.unwrap_or_else(|_e| panic!("failed to insert dedicated jobs"));
} else {
sqlx::query!("INSERT INTO queue (id, script_hash, script_path, job_kind, language, tag, created_by, permissioned_as, email, scheduled_for, workspace_id) (SELECT gen_random_uuid(), $1, $2, $3, $4, $5, $6, $7, $8, $9, $10 FROM generate_series(1, $11))",
None::<i64>,
None::<String>,
JobKind::Noop as JobKind,
ScriptLang::Deno as ScriptLang,
"deno",
"admin",
"u/admin",
"[email protected]",
chrono::Utc::now(),
"admins",
benchmark_jobs
)
.execute(db)
.await.unwrap_or_else(|_e| panic!("failed to insert noop jobs"));
}
}
1 change: 0 additions & 1 deletion backend/windmill-worker/src/handle_child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use windmill_common::error::{self, Error};

use windmill_common::worker::{get_windmill_memory_usage, get_worker_memory_usage, CLOUD_HOSTED};

use anyhow::Result;
use windmill_queue::{append_logs, CanceledBy};

#[cfg(any(target_os = "linux", target_os = "macos"))]
Expand Down
2 changes: 2 additions & 0 deletions backend/windmill-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ mod snowflake_executor;

mod ansible_executor;
mod bash_executor;
#[cfg(feature = "benchmark")]
mod bench;
mod bun_executor;
pub mod common;
mod config;
Expand Down
Loading

0 comments on commit 654c6bf

Please sign in to comment.