Skip to content

Commit 8c6f394

Browse files
authored
Stream docker
Fix streaming for tasks that use docker
1 parent acd350b commit 8c6f394

File tree

8 files changed

+156
-67
lines changed

8 files changed

+156
-67
lines changed

RELEASE_NOTES.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# Release Notes
22

3+
## 0.39
4+
- Streaming works with tasks that use docker
5+
36
## 0.38
47
- Minor fixes, and better testing, for file streaming
58

reference_stanza.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
dxWDL {
2-
version = "0.38"
2+
version = "0.39"
33
asset_ids = []
44
}

src/main/scala/dxWDL/RunnerTask.scala

Lines changed: 110 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,8 @@ case class RunnerTask(task:Task,
217217
Utils.writeFileContent(jobOutputPath, ast_pp)
218218
}
219219

220-
private def writeSubmitBashScript(env: Map[String, WdlValue]) : Unit = {
220+
// Figure out if a docker image is specified. If so, return it as a string.
221+
private def dockerImage(env: Map[String, WdlValue]) : Option[String] = {
221222
def lookup(varName : String) : WdlValue = {
222223
env.get(varName) match {
223224
case Some(x) => x
@@ -235,37 +236,16 @@ case class RunnerTask(task:Task,
235236
}
236237
// Figure out if docker is used. If so, it is specified by an
237238
// expression that requires evaluation.
238-
val docker: Option[String] =
239-
task.runtimeAttributes.attrs.get("docker") match {
240-
case None => None
241-
case Some(expr) => Some(evalStringExpr(expr))
242-
}
243-
docker match {
244-
case None => ()
245-
case Some(imgName) =>
246-
// The user wants to use a docker container with the
247-
// image [imgName]. We implement this with dx-docker.
248-
// There may be corner cases where the image will run
249-
// into permission limitations due to security.
250-
//
251-
// Map the home directory into the container, so that
252-
// we can reach the result files, and upload them to
253-
// the platform.
254-
val DX_HOME = Utils.DX_HOME
255-
val dockerRunPath = getMetaDir().resolve("script.submit")
256-
val dockerRunScript =
257-
s"""|#!/bin/bash -ex
258-
|dx-docker run --entrypoint /bin/bash -v ${DX_HOME}:${DX_HOME} ${imgName} $${HOME}/execution/meta/script""".stripMargin.trim
259-
System.err.println(s"writing docker run script to ${dockerRunPath}")
260-
Utils.writeFileContent(dockerRunPath, dockerRunScript)
261-
dockerRunPath.toFile.setExecutable(true)
239+
task.runtimeAttributes.attrs.get("docker") match {
240+
case None => None
241+
case Some(expr) => Some(evalStringExpr(expr))
262242
}
263243
}
264244

265245
// Each file marked "stream", is converted into a special fifo
266246
// file on the instance.
267247
private def handleStreamingFiles(inputs: Map[String, BValue])
268-
: (String, String, Map[Declaration, WdlValue]) = {
248+
: (Option[(String, String)], Map[String, BValue]) = {
269249
// A file that needs to be stream-downloaded.
270250
// Make a named pipe, and stream the file from the platform to the pipe.
271251
// Keep track of the download process. We need to ensure pipes have
@@ -281,55 +261,76 @@ case class RunnerTask(task:Task,
281261
val bashSnippet:String =
282262
s"""|mkfifo ${fifo.toString}
283263
|dx cat ${dxFileId} > ${fifo.toString} &
284-
|download_stream_pids+=($$!)
285264
|""".stripMargin
286265
(WdlSingleFile(fifo.toString), bashSnippet)
287266
}
288267

289-
val m:Map[Declaration, (WdlValue, String)] = inputs.map{
290-
case (_, BValue(_, _, None)) => throw new Exception("Sanity")
291-
case (_, BValue(wvl, wdlValue, Some(decl))) =>
292-
wdlValue match {
268+
val m:Map[String, (String, BValue)] = inputs.map{
269+
case (varName, BValue(wvl, wdlValue, declOpt)) =>
270+
val (wdlValueRewrite,bashSnippet) = wdlValue match {
293271
case WdlSingleFile(path) if wvl.attrs.stream =>
294-
decl -> mkfifo(wvl, path)
272+
mkfifo(wvl, path)
295273
case WdlOptionalValue(_,Some(WdlSingleFile(path))) if wvl.attrs.stream =>
296-
decl -> mkfifo(wvl, path)
274+
mkfifo(wvl, path)
297275
case _ =>
298276
// everything else
299-
decl -> (wdlValue, "")
277+
(wdlValue,"")
300278
}
301-
}
279+
val bVal:BValue = BValue(wvl, wdlValueRewrite, declOpt)
280+
varName -> (bashSnippet, bVal)
281+
}.toMap
302282

303283
// set up all the named pipes
304284
val snippets = m.collect{
305-
case (_, (_, bashSnippet)) if !bashSnippet.isEmpty => bashSnippet
285+
case (_, (bashSnippet,_)) if !bashSnippet.isEmpty => bashSnippet
306286
}.toVector
307-
val bashProlog = ("download_stream_pids=()" +:
287+
val bashProlog = ("background_pids=()" +:
308288
snippets).mkString("\n")
309289

310-
// Wait for all download processes to complete. It is legal
290+
// Wait for all background processes to complete. It is legal
311291
// for the user job to read only the beginning of the
312292
// file. This causes the download streams to close
313-
// prematurely, which can be show up as an error. We need to tolerate this
314-
// case.
315-
val bashEpilog:String = "wait ${download_stream_pids[@]}"
316-
val inputsWithPipes = m.map{ case (decl, (wdlValue, _)) => decl -> wdlValue }.toMap
317-
(bashProlog, bashEpilog, inputsWithPipes)
293+
// prematurely, which can be show up as an error. We need to
294+
// tolerate this case.
295+
val bashEpilog = ""
296+
// "wait ${background_pids[@]}"
297+
/* """|echo "robust wait for ${background_pids[@]}"
298+
|for pid in ${background_pids[@]}; do
299+
| while [[ ( -d /proc/$pid ) && ( -z `grep zombie /proc/$pid/status` ) ]]; do
300+
| sleep 10
301+
| echo "waiting for $pid"
302+
| done
303+
|done
304+
|""".stripMargin.trim + "\n" */
305+
val inputsWithPipes = m.map{ case (varName, (_,bValue)) => varName -> bValue }.toMap
306+
val bashPrologEpilog =
307+
if (fifoCount == 0) {
308+
// No streaming files
309+
None
310+
} else {
311+
// There are some streaming files
312+
Some((bashProlog, bashEpilog))
313+
}
314+
(bashPrologEpilog, inputsWithPipes)
318315
}
319316

320-
private def writeBashScript(inputs: Map[String, BValue]) : Unit = {
317+
// Write the core bash script into a file. In some cases, we
318+
// need to run some shell setup statements before and after this
319+
// script. Returns these as two strings (prolog, epilog).
320+
private def writeBashScript(inputs: Map[String, BValue],
321+
bashPrologEpilog: Option[(String, String)]) : Unit = {
321322
val metaDir = getMetaDir()
322323
val scriptPath = metaDir.resolve("script")
323324
val stdoutPath = metaDir.resolve("stdout")
324325
val stderrPath = metaDir.resolve("stderr")
325326
val rcPath = metaDir.resolve("rc")
326327

327-
// deal with files
328-
val (bashProlog, bashEpilog, inputsWithPipes) = handleStreamingFiles(inputs)
329-
330328
// instantiate the command
331-
val taskCmd : String = task.instantiateCommand(inputsWithPipes, DxFunctions).get
332-
val shellCmd = List(bashProlog, taskCmd, bashEpilog).mkString("\n")
329+
val env: Map[Declaration, WdlValue] = inputs.map {
330+
case (_, BValue(_,wdlValue,Some(decl))) => decl -> wdlValue
331+
case (_, BValue(varName,_,None)) => throw new Exception("missing declaration")
332+
}.toMap
333+
val shellCmd : String = task.instantiateCommand(env, DxFunctions).get
333334

334335
// This is based on Cromwell code from
335336
// [BackgroundAsyncJobExecutionActor.scala]. Generate a bash
@@ -349,12 +350,17 @@ case class RunnerTask(task:Task,
349350
|echo 0 > ${rcPath}
350351
|""".stripMargin.trim + "\n"
351352
} else {
353+
val cdHome = s"cd ${Utils.DX_HOME}"
354+
var cmdLines: List[String] = bashPrologEpilog match {
355+
case None =>
356+
List(cdHome, shellCmd)
357+
case Some((bashProlog, bashEpilog)) =>
358+
List(cdHome, bashProlog, shellCmd, bashEpilog)
359+
}
360+
val cmd = cmdLines.mkString("\n")
352361
s"""|#!/bin/bash
353362
|(
354-
|if [ -d ${Utils.DX_HOME} ]; then
355-
| cd ${Utils.DX_HOME}
356-
|fi
357-
|${shellCmd}
363+
|${cmd}
358364
|) \\
359365
| > >( tee ${stdoutPath} ) \\
360366
| 2> >( tee ${stderrPath} >&2 )
@@ -365,6 +371,40 @@ case class RunnerTask(task:Task,
365371
Utils.writeFileContent(scriptPath, script)
366372
}
367373

374+
private def writeDockerSubmitBashScript(env: Map[String, WdlValue],
375+
imgName: String,
376+
bashPrologEpilog: Option[(String, String)]) : Unit = {
377+
// The user wants to use a docker container with the
378+
// image [imgName]. We implement this with dx-docker.
379+
// There may be corner cases where the image will run
380+
// into permission limitations due to security.
381+
//
382+
// Map the home directory into the container, so that
383+
// we can reach the result files, and upload them to
384+
// the platform.
385+
val DX_HOME = Utils.DX_HOME
386+
val dockerCmd = s"""|dx-docker run --entrypoint /bin/bash
387+
|-v ${DX_HOME}:${DX_HOME}
388+
|${imgName}
389+
|$${HOME}/execution/meta/script""".stripMargin.replaceAll("\n", " ")
390+
val dockerRunPath = getMetaDir().resolve("script.submit")
391+
val dockerRunScript = bashPrologEpilog match {
392+
case None =>
393+
s"""|#!/bin/bash -ex
394+
|${dockerCmd}""".stripMargin
395+
case Some((bashProlog, bashEpilog)) =>
396+
List("#!/bin/bash -ex",
397+
bashProlog,
398+
dockerCmd,
399+
bashEpilog
400+
).mkString("\n")
401+
}
402+
System.err.println(s"writing docker run script to ${dockerRunPath}")
403+
Utils.writeFileContent(dockerRunPath,
404+
dockerRunScript)
405+
dockerRunPath.toFile.setExecutable(true)
406+
}
407+
368408
// Calculate the input variables for the task, download the input files,
369409
// and build a shell script to run the command.
370410
def prolog(jobInputPath : Path,
@@ -385,17 +425,25 @@ case class RunnerTask(task:Task,
385425
}.toMap
386426

387427
// evaluate the top declarations
388-
val decls: Map[String, BValue] = evalDeclarations(task.declarations, inputWvls)
389-
390-
// Write shell script to a file. It will be executed by the dx-applet code
391-
writeBashScript(decls)
392-
393-
// write the script that launches the shell script. It could be a docker
394-
// image.
395-
val env:Map[String, WdlValue] = decls.map{
428+
val inputs: Map[String, BValue] = evalDeclarations(task.declarations, inputWvls)
429+
val env:Map[String, WdlValue] = inputs.map{
396430
case (varName, BValue(_,wdlValue,_)) => varName -> wdlValue
397431
}.toMap
398-
writeSubmitBashScript(env)
432+
val docker = dockerImage(env)
433+
434+
// deal with files that need streaming
435+
val (bashPrologEpilog, inputsWithPipes) = handleStreamingFiles(inputs)
436+
437+
// Write shell script to a file. It will be executed by the dx-applet code
438+
docker match {
439+
case None =>
440+
writeBashScript(inputsWithPipes, bashPrologEpilog)
441+
case Some(img) =>
442+
// write a script that launches the actual command inside a docker image.
443+
// Streamed files are set up before launching docker.
444+
writeBashScript(inputsWithPipes, None)
445+
writeDockerSubmitBashScript(env, img, bashPrologEpilog)
446+
}
399447

400448
// serialize the environment, so we don't have to calculate it again in
401449
// the epilog

test/files.wdl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,10 +121,6 @@ workflow files {
121121
File f1
122122
File f2
123123

124-
call lib.Colocation as colocation {
125-
input : A=f1, B=f2
126-
}
127-
128124
# Try an applet that streams two files
129125
call lib.diff as diff1 {
130126
input: a=f, b=f
@@ -133,6 +129,10 @@ workflow files {
133129
input: a=f, b=f2
134130
}
135131

132+
call lib.Colocation as colocation {
133+
input : A=f1, B=f2
134+
}
135+
136136
call z_Copy as Copy { input : src=f, basename="tearFrog" }
137137
call z_Copy as Copy2 { input : src=Copy.outf, basename="mixing" }
138138
call z_FindFiles as FindFiles

test/library_sys_call.wdl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ task diff {
5959
a : "stream"
6060
b : "stream"
6161
}
62+
runtime {
63+
docker: "ubuntu:16.04"
64+
}
6265
command {
6366
diff ${a} ${b} | wc -l
6467
}

test/quick_diff.wdl

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
task diff {
2+
File a
3+
File b
4+
5+
parameter_meta {
6+
a : "stream"
7+
b : "stream"
8+
}
9+
runtime {
10+
docker: "ubuntu:16.04"
11+
}
12+
command {
13+
diff ${a} ${b} | wc -l
14+
}
15+
output {
16+
Int result = read_int(stdout())
17+
}
18+
}
19+
20+
workflow quick_diff {
21+
File x
22+
File y
23+
24+
call diff {
25+
input: a=x, b=y
26+
}
27+
output {
28+
diff.result
29+
}
30+
}

test/quick_diff_input.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{
2+
"quick_diff.x" : "dx://dxWDL_playground:/test_data/fileA",
3+
"quick_diff.y" : "dx://dxWDL_playground:/test_data/fileB"
4+
}

test/quick_diff_results.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{}

0 commit comments

Comments
 (0)