Skip to content

Commit

Permalink
chore: code review feedback.
Browse files Browse the repository at this point in the history
Changed:

* Ensured a dependency from the command section to every input in a task
  evaluation graph; this will be necessary to properly map input files that
  aren't referenced by the command by otherwise treated as "used" (e.g. a `bai`
  file input). This is relevant for future backends that need to map files into
  containers.
* Made the working directory attempt-specific so that if a task fails and a
  retry is made, a new working directory is used while preserving the previous
  attempt's working directory.
* Expanded upon the comment for the scatter `concurrency` setting.
* Changed the task `retries` configuration option to mean the "default" number
  of retries to use if a task does not specify its own max retries requirement,
  which is also now respected in task execution.
* Added `attempt` to the `ProgressKind::TaskExecutionStarted` variant.
  • Loading branch information
peterhuene committed Jan 31, 2025
1 parent c1f45d6 commit ec15c62
Show file tree
Hide file tree
Showing 17 changed files with 200 additions and 95 deletions.
35 changes: 19 additions & 16 deletions wdl-analysis/src/document/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,23 +588,26 @@ fn add_task(config: DiagnosticsConfig, document: &mut DocumentData, definition:

// Check for unused input
if let Some(severity) = config.unused_input {
let name = decl.name();
// Don't warn for environment variables as they are always implicitly used
if decl.env().is_none()
&& graph
.edges_directed(index, Direction::Outgoing)
.next()
.is_none()
{
// Determine if the input is really used based on its name and type
if is_input_used(name.as_str(), &task.inputs[name.as_str()].ty) {
continue;
}
if decl.env().is_none() {
// For any input that isn't an environment variable, check to see if there's
// a single implicit dependency edge; if so, it might be unused
let mut edges = graph.edges_directed(index, Direction::Outgoing);

if !decl.syntax().is_rule_excepted(UNUSED_INPUT_RULE_ID) {
document.diagnostics.push(
unused_input(name.as_str(), name.span()).with_severity(severity),
);
if let (Some(true), None) = (edges.next().map(|e| e.weight()), edges.next())
{
let name = decl.name();

// Determine if the input is really used based on its name and type
if is_input_used(name.as_str(), &task.inputs[name.as_str()].ty) {
continue;
}

if !decl.syntax().is_rule_excepted(UNUSED_INPUT_RULE_ID) {
document.diagnostics.push(
unused_input(name.as_str(), name.span())
.with_severity(severity),
);
}
}
}
}
Expand Down
70 changes: 45 additions & 25 deletions wdl-analysis/src/eval/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,23 @@ pub struct TaskGraphBuilder {

impl TaskGraphBuilder {
/// Builds a new task evaluation graph.
///
/// The nodes are [`TaskGraphNode`] and the edges represent a reverse
/// dependency relationship (A -> B => "node A is depended on by B").
///
/// The edge data indicates whether or not the edge is an implicit edge
/// between the node and the command section.
///
/// Commands implicitly depend on all inputs, environment variables, the
/// requirements section, the runtime section, and the hints section.
///
/// Outputs implicitly depend on the command section.
pub fn build(
mut self,
version: SupportedVersion,
task: &TaskDefinition,
diagnostics: &mut Vec<Diagnostic>,
) -> DiGraph<TaskGraphNode, ()> {
) -> DiGraph<TaskGraphNode, bool> {
// Populate the declaration types and build a name reference graph
let mut graph = DiGraph::default();
let mut saw_inputs = false;
Expand Down Expand Up @@ -181,49 +192,55 @@ impl TaskGraphBuilder {
// Add name reference edges before adding the outputs
self.add_reference_edges(version, None, &mut graph, diagnostics);

// Add the outputs
let count = graph.node_count();
if let Some(section) = outputs {
for decl in section.declarations() {
if let Some(index) = self.add_named_node(
self.add_named_node(
decl.name(),
TaskGraphNode::Output(Decl::Bound(decl)),
&mut graph,
diagnostics,
) {
// Add an edge to the command node as all outputs depend on the command
if let Some(command) = self.command {
graph.update_edge(command, index, ());
}
}
);
}
}

// Add reference edges again, but only for the output declaration nodes
self.add_reference_edges(version, Some(count), &mut graph, diagnostics);

// Finally, add edges from the command to runtime/requirements/hints and
// environment variables
// Finally, add implicit edges to and from the command
if let Some(command) = self.command {
// The command section depends on the runtime section
if let Some(runtime) = self.runtime {
graph.update_edge(runtime, command, ());
graph.update_edge(runtime, command, true);
}

// The command section depends on the requirements section
if let Some(requirements) = self.requirements {
graph.update_edge(requirements, command, ());
graph.update_edge(requirements, command, true);
}

// The command section depends on the hints section
if let Some(hints) = self.hints {
graph.update_edge(hints, command, ());
graph.update_edge(hints, command, true);
}

// As environment variables are implicitly used by commands, add edges from the
// command to the environment variable declarations
// The command section depends on any input or environment variable declaration
// All outputs depend on the command
for index in self.names.values() {
match &graph[*index] {
TaskGraphNode::Input(decl) | TaskGraphNode::Decl(decl)
if decl.env().is_some() =>
{
graph.update_edge(*index, command, ());
TaskGraphNode::Input(_) => {
if !graph.contains_edge(*index, command) {
graph.update_edge(*index, command, true);
}
}
TaskGraphNode::Decl(decl) if decl.env().is_some() => {
if !graph.contains_edge(*index, command) {
graph.update_edge(*index, command, true);
}
}
TaskGraphNode::Output(_) => {
graph.update_edge(command, *index, true);
}
_ => continue,
}
Expand All @@ -238,7 +255,7 @@ impl TaskGraphBuilder {
&mut self,
name: Ident,
node: TaskGraphNode,
graph: &mut DiGraph<TaskGraphNode, ()>,
graph: &mut DiGraph<TaskGraphNode, bool>,
diagnostics: &mut Vec<Diagnostic>,
) -> Option<NodeIndex> {
// Check for conflicting nodes
Expand All @@ -265,7 +282,7 @@ impl TaskGraphBuilder {
from: NodeIndex,
descendants: impl Iterator<Item = SyntaxNode>,
allow_task_var: bool,
graph: &mut DiGraph<TaskGraphNode, ()>,
graph: &mut DiGraph<TaskGraphNode, bool>,
diagnostics: &mut Vec<Diagnostic>,
) {
// Add edges for any descendant name references
Expand All @@ -275,7 +292,7 @@ impl TaskGraphBuilder {
// Look up the name; we don't check for cycles here as decls can't
// reference a section.
if let Some(to) = self.names.get(name.as_str()) {
graph.update_edge(*to, from, ());
graph.update_edge(*to, from, false);
} else if name.as_str() != TASK_VAR_NAME || !allow_task_var {
diagnostics.push(unknown_name(name.as_str(), name.span()));
}
Expand All @@ -287,7 +304,7 @@ impl TaskGraphBuilder {
&mut self,
version: SupportedVersion,
skip: Option<usize>,
graph: &mut DiGraph<TaskGraphNode, ()>,
graph: &mut DiGraph<TaskGraphNode, bool>,
diagnostics: &mut Vec<Diagnostic>,
) {
// Populate edges for any nodes that reference other nodes by name
Expand Down Expand Up @@ -373,7 +390,7 @@ impl TaskGraphBuilder {
from: NodeIndex,
expr: Expr,
allow_task_var: bool,
graph: &mut DiGraph<TaskGraphNode, ()>,
graph: &mut DiGraph<TaskGraphNode, bool>,
diagnostics: &mut Vec<Diagnostic>,
) {
for r in expr.syntax().descendants().filter_map(NameRef::cast) {
Expand Down Expand Up @@ -408,7 +425,7 @@ impl TaskGraphBuilder {
continue;
}

graph.update_edge(*to, from, ());
graph.update_edge(*to, from, false);
} else if name.as_str() != TASK_VAR_NAME || !allow_task_var {
diagnostics.push(unknown_name(name.as_str(), name.span()));
}
Expand Down Expand Up @@ -537,6 +554,9 @@ pub struct WorkflowGraphBuilder {

impl WorkflowGraphBuilder {
/// Builds a new workflow evaluation graph.
///
/// The nodes are [`WorkflowGraphNode`] and the edges represent a reverse
/// dependency relationship (A -> B => "node A is depended on by B").
pub fn build(
mut self,
workflow: &WorkflowDefinition,
Expand Down
9 changes: 5 additions & 4 deletions wdl-engine/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub struct TaskExecutionConstraints {
}

/// Represents the root directory of a task execution.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct TaskExecutionRoot {
/// The path to the working directory for the execution.
work_dir: PathBuf,
Expand Down Expand Up @@ -96,11 +96,11 @@ impl TaskExecutionRoot {
})
}

/// Gets the working directory path for the task's execution.
/// Gets the working directory for the given attempt number.
///
/// The working directory will be created upon spawning the task.
pub fn work_dir(&self) -> &Path {
&self.work_dir
pub fn work_dir(&self, attempt: u64) -> PathBuf {
self.work_dir.join(format!("{attempt}"))
}

/// Gets the temporary directory path for the task's execution.
Expand Down Expand Up @@ -250,6 +250,7 @@ pub trait TaskExecutionBackend: Send + Sync {
fn spawn(
&self,
request: Arc<TaskSpawnRequest>,
attempt: u64,
spawned: oneshot::Sender<()>,
) -> Result<Receiver<Result<i32>>>;
}
21 changes: 8 additions & 13 deletions wdl-engine/src/backend/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ fn memory(requirements: &HashMap<String, Value>) -> Result<i64> {
struct LocalTaskSpawnRequest {
/// The inner task spawn request.
inner: Arc<TaskSpawnRequest>,
/// The attempt count for the task.
attempt: u64,
/// The requested CPU reservation for the task.
///
/// Note that CPU isn't actually reserved for the task process.
Expand Down Expand Up @@ -297,27 +299,18 @@ impl LocalTaskExecutionBackend {
let spawned = request.spawned.take().unwrap();
spawned.send(()).ok();

let result = Self::spawn_task(request.inner.clone()).await;
let result = Self::spawn_task(request.inner.clone(), request.attempt).await;
LocalTaskSpawnResponse { request, result }
});
}

/// Spawns the requested task.
///
/// Returns the status code of the process when it has exited.
async fn spawn_task(request: Arc<TaskSpawnRequest>) -> Result<i32> {
async fn spawn_task(request: Arc<TaskSpawnRequest>, attempt: u64) -> Result<i32> {
// Recreate the working directory
let work_dir = request.root.work_dir();
if work_dir.exists() {
fs::remove_dir_all(work_dir).with_context(|| {
format!(
"failed to remove directory `{path}`",
path = work_dir.display()
)
})?;
}

fs::create_dir_all(work_dir).with_context(|| {
let work_dir = request.root.work_dir(attempt);
fs::create_dir_all(&work_dir).with_context(|| {
format!(
"failed to create directory `{path}`",
path = work_dir.display()
Expand Down Expand Up @@ -450,6 +443,7 @@ impl TaskExecutionBackend for LocalTaskExecutionBackend {
fn spawn(
&self,
request: Arc<TaskSpawnRequest>,
attempt: u64,
spawned: oneshot::Sender<()>,
) -> Result<oneshot::Receiver<Result<i32>>> {
let (tx, rx) = oneshot::channel();
Expand All @@ -460,6 +454,7 @@ impl TaskExecutionBackend for LocalTaskExecutionBackend {
self.tx
.send(LocalTaskSpawnRequest {
inner: request,
attempt,
cpu,
memory,
spawned: Some(spawned),
Expand Down
44 changes: 42 additions & 2 deletions wdl-engine/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,44 @@ pub struct ScatterConfig {
/// Lower values use less memory for evaluation and higher values may better
/// saturate the task execution backend with tasks to execute.
///
/// This setting does not change how many tasks an execution backend can run
/// concurrently, but may affect how many tasks are sent to the backend to
/// run at a time.
///
/// For example, if `concurrency` was set to 10 and we evaluate the
/// following scatters:
///
/// ```wdl
/// scatter (i in range(100)) {
/// call my_task
/// }
///
/// scatter (j in range(100)) {
/// call my_task as my_task2
/// }
/// ```
///
/// Here each scatter is independent and therefore there will be 20 calls
/// (10 for each scatter) made concurrently. If the task execution
/// backend can only execute 5 tasks concurrently, 5 tasks will execute
/// and 15 will be "ready" to execute and waiting for an executing task
/// to complete.
///
/// If instead we evaluate the following scatters:
///
/// ```wdl
/// scatter (i in range(100)) {
/// scatter (j in range(100)) {
/// call my_task
/// }
/// }
/// ```
///
/// Then there will be 100 calls (10*10 as 10 are made for each outer
/// element) made concurrently. If the task execution backend can only
/// execute 5 tasks concurrently, 5 tasks will execute and 95 will be
/// "ready" to execute and waiting for an executing task to complete.
///
/// <div class="warning">
/// Warning: nested scatter statements cause exponential memory usage based
/// on this value, as each scatter statement evaluation requires allocating
Expand All @@ -103,11 +141,13 @@ impl ScatterConfig {
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub struct TaskConfig {
/// The maximum number of retries to attempt if a task fails.
/// The default maximum number of retries to attempt if a task fails.
///
/// A task's `max_retries` requirement will override this value.
///
/// Defaults to 0 (no retries).
#[serde(default, skip_serializing_if = "Option::is_none")]
pub retries: Option<u8>,
pub retries: Option<u64>,
}

impl TaskConfig {
Expand Down
4 changes: 2 additions & 2 deletions wdl-engine/src/eval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ impl EvaluatedTask {
/// Constructs a new evaluated task.
///
/// Returns an error if the stdout or stderr paths are not UTF-8.
fn new(root: &TaskExecutionRoot, status_code: i32) -> anyhow::Result<Self> {
fn new(root: &TaskExecutionRoot, work_dir: &Path, status_code: i32) -> anyhow::Result<Self> {
let stdout = PrimitiveValue::new_file(root.stdout().to_str().with_context(|| {
format!(
"path to stdout file `{path}` is not UTF-8",
Expand All @@ -266,7 +266,7 @@ impl EvaluatedTask {

Ok(Self {
status_code,
work_dir: root.work_dir().into(),
work_dir: work_dir.into(),
temp_dir: root.temp_dir().into(),
command: root.command().into(),
stdout,
Expand Down
5 changes: 5 additions & 0 deletions wdl-engine/src/eval/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ pub enum ProgressKind<'a> {
TaskExecutionStarted {
/// The identifier of the task.
id: &'a str,
/// The attempt number for the task's execution, starting at 0 to
/// indicate the first attempt.
///
/// This value is incremented upon each retry.
attempt: u64,
},
/// A task with the given id has completed execution.
TaskExecutionCompleted {
Expand Down
Loading

0 comments on commit ec15c62

Please sign in to comment.