diff --git a/src/main/scala/dxWDL/CompilerNative.scala b/src/main/scala/dxWDL/CompilerNative.scala index da779d87..d839d532 100644 --- a/src/main/scala/dxWDL/CompilerNative.scala +++ b/src/main/scala/dxWDL/CompilerNative.scala @@ -122,9 +122,15 @@ object CompilerNative { | | # setup any file streams. Keep track of background | # processes in the 'background_pids' array. + | # We 'source' the sub-script here, because we + | # need to wait for the pids. This can only be done + | # for child processes (not grand-children). | if [[ -e $${HOME}/execution/meta/setup_streams ]]; then - | cat $${HOME}/execution/meta/setup_streams - | $${HOME}/execution/meta/setup_streams + | source $${HOME}/execution/meta/setup_streams > $${HOME}/execution/meta/background_pids.txt + | + | # reads the file line by line, and converts into a bash array + | mapfile -t background_pids < $${HOME}/execution/meta/background_pids.txt + | echo "Background processes ids: $${background_pids[@]}" | fi | | # Run the shell script generated by the prolog. @@ -147,14 +153,21 @@ object CompilerNative { | # 1) 'dx cat' returns zero status when a user reads only the beginning | # of a file | for pid in $${background_pids[@]}; do - | p_status=`ps --pid $$pid --no-headers | wc -l` + | p_status=0 + | p_status=`ps --pid $$pid --no-headers | wc -l` || p_status=0 + | | if [[ $$p_status == 0 ]]; then | # the process is already dead, check correct exit status - | wait $$pid + | echo "wait $$pid" + | rc=0 + | wait $$pid || rc=$$? + | if [[ $$rc != 0 ]]; then + | echo "Background download process $$pid failed" + | exit $$rc + | fi | else - | echo "Warning: background download process $$pid has not exited." + | echo "Warning: background download process $$pid is still running." | echo "Perhaps the worker process did not read it." - | ps --pid $$pid -F | fi | done | diff --git a/src/main/scala/dxWDL/RunnerTask.scala b/src/main/scala/dxWDL/RunnerTask.scala index 0b62ebad..c11f35fd 100644 --- a/src/main/scala/dxWDL/RunnerTask.scala +++ b/src/main/scala/dxWDL/RunnerTask.scala @@ -238,15 +238,16 @@ case class RunnerTask(task:WdlTask, // file on the instance. private def handleStreamingFiles(inputs: Map[String, BValue]) : (Option[String], Map[String, BValue]) = { - // A file that needs to be stream-downloaded. - // Make a named pipe, and stream the file from the platform to the pipe. - // Keep track of the download process. We need to ensure pipes have - // different names, even if the file-names are the same. + // A file that needs to be stream-downloaded. Make a named + // pipe, and stream the file from the platform to the pipe. + // Ensure pipes have different names, even if the + // file-names are the same. Write the process ids of the download jobs, + // to stdout. The calling script will keep track of them, and check + // for abnormal termination. // - // Note: all other files have already been downloaded. + // Note: at this point, all other files have already been downloaded. var fifoCount = 0 def mkfifo(wvl: WdlVarLinks, path: String) : (WdlValue, String) = { - System.err.println(s"Creating named fifo for file ${path}, it will be streamed") val filename = Paths.get(path).toFile.getName val fifo:Path = Paths.get(Utils.DX_HOME, s"fifo_${fifoCount}_${filename}") fifoCount += 1 @@ -254,7 +255,7 @@ case class RunnerTask(task:WdlTask, val bashSnippet:String = s"""|mkfifo ${fifo.toString} |dx cat ${dxFileId} > ${fifo.toString} & - |background_pids+=($$!) + |echo $$! |""".stripMargin (WdlSingleFile(fifo.toString), bashSnippet) }