Skip to content

Commit

Permalink
Merging improved streaming suppport from master branch
Browse files Browse the repository at this point in the history
  • Loading branch information
orodeh committed Aug 28, 2017
1 parent 8ea095d commit 855cc3e
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 13 deletions.
25 changes: 19 additions & 6 deletions src/main/scala/dxWDL/CompilerNative.scala
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,15 @@ object CompilerNative {
|
| # setup any file streams. Keep track of background
| # processes in the 'background_pids' array.
| # We 'source' the sub-script here, because we
| # need to wait for the pids. This can only be done
| # for child processes (not grand-children).
| if [[ -e $${HOME}/execution/meta/setup_streams ]]; then
| cat $${HOME}/execution/meta/setup_streams
| $${HOME}/execution/meta/setup_streams
| source $${HOME}/execution/meta/setup_streams > $${HOME}/execution/meta/background_pids.txt
|
| # reads the file line by line, and converts into a bash array
| mapfile -t background_pids < $${HOME}/execution/meta/background_pids.txt
| echo "Background processes ids: $${background_pids[@]}"
| fi
|
| # Run the shell script generated by the prolog.
Expand All @@ -147,14 +153,21 @@ object CompilerNative {
| # 1) 'dx cat' returns zero status when a user reads only the beginning
| # of a file
| for pid in $${background_pids[@]}; do
| p_status=`ps --pid $$pid --no-headers | wc -l`
| p_status=0
| p_status=`ps --pid $$pid --no-headers | wc -l` || p_status=0
|
| if [[ $$p_status == 0 ]]; then
| # the process is already dead, check correct exit status
| wait $$pid
| echo "wait $$pid"
| rc=0
| wait $$pid || rc=$$?
| if [[ $$rc != 0 ]]; then
| echo "Background download process $$pid failed"
| exit $$rc
| fi
| else
| echo "Warning: background download process $$pid has not exited."
| echo "Warning: background download process $$pid is still running."
| echo "Perhaps the worker process did not read it."
| ps --pid $$pid -F
| fi
| done
|
Expand Down
15 changes: 8 additions & 7 deletions src/main/scala/dxWDL/RunnerTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -238,23 +238,24 @@ case class RunnerTask(task:WdlTask,
// file on the instance.
private def handleStreamingFiles(inputs: Map[String, BValue])
: (Option[String], Map[String, BValue]) = {
// A file that needs to be stream-downloaded.
// Make a named pipe, and stream the file from the platform to the pipe.
// Keep track of the download process. We need to ensure pipes have
// different names, even if the file-names are the same.
// A file that needs to be stream-downloaded. Make a named
// pipe, and stream the file from the platform to the pipe.
// Ensure pipes have different names, even if the
// file-names are the same. Write the process ids of the download jobs,
// to stdout. The calling script will keep track of them, and check
// for abnormal termination.
//
// Note: all other files have already been downloaded.
// Note: at this point, all other files have already been downloaded.
var fifoCount = 0
def mkfifo(wvl: WdlVarLinks, path: String) : (WdlValue, String) = {
System.err.println(s"Creating named fifo for file ${path}, it will be streamed")
val filename = Paths.get(path).toFile.getName
val fifo:Path = Paths.get(Utils.DX_HOME, s"fifo_${fifoCount}_${filename}")
fifoCount += 1
val dxFileId = WdlVarLinks.getFileId(wvl)
val bashSnippet:String =
s"""|mkfifo ${fifo.toString}
|dx cat ${dxFileId} > ${fifo.toString} &
|background_pids+=($$!)
|echo $$!
|""".stripMargin
(WdlSingleFile(fifo.toString), bashSnippet)
}
Expand Down

0 comments on commit 855cc3e

Please sign in to comment.