diff --git a/hail/src/main/scala/is/hail/backend/BackendServer.scala b/hail/src/main/scala/is/hail/backend/BackendServer.scala deleted file mode 100644 index db23bcb310fe..000000000000 --- a/hail/src/main/scala/is/hail/backend/BackendServer.scala +++ /dev/null @@ -1,123 +0,0 @@ -package is.hail.backend - -import is.hail.types.virtual.Kinds.{BlockMatrix, Matrix, Table, Value} -import is.hail.utils._ -import is.hail.utils.ExecutionTimer.Timings - -import java.io.Closeable -import java.net.InetSocketAddress -import java.util.concurrent._ - -import com.google.api.client.http.HttpStatusCodes -import com.sun.net.httpserver.{HttpExchange, HttpHandler, HttpServer} -import org.json4s._ -import org.json4s.jackson.{JsonMethods, Serialization} - -class BackendServer(backend: Backend) extends Closeable { - // 0 => let the OS pick an available port - private[this] val httpServer = HttpServer.create(new InetSocketAddress(0), 10) - - private[this] val thread = { - // This HTTP server *must not* start non-daemon threads because such threads keep the JVM - // alive. A living JVM indicates to Spark that the job is incomplete. This does not manifest - // when you run jobs in a local pyspark (because you'll Ctrl-C out of Python regardless of the - // JVM's state) nor does it manifest in a Notebook (again, you'll kill the Notebook kernel - // explicitly regardless of the JVM). It *does* manifest when submitting jobs with - // - // gcloud dataproc submit ... - // - // or - // - // spark-submit - // - // setExecutor(null) ensures the server creates no new threads: - // - // > If this method is not called (before start()) or if it is called with a null Executor, then - // > a default implementation is used, which uses the thread which was created by the start() - // > method. - // - /* Source: - * https://docs.oracle.com/en/java/javase/11/docs/api/jdk.httpserver/com/sun/net/httpserver/HttpServer.html#setExecutor(java.util.concurrent.Executor) */ - // - httpServer.createContext("/", Handler) - httpServer.setExecutor(null) - val t = Executors.defaultThreadFactory().newThread(new Runnable() { - def run(): Unit = - httpServer.start() - }) - t.setDaemon(true) - t - } - - def port: Int = httpServer.getAddress.getPort - - def start(): Unit = - thread.start() - - override def close(): Unit = - httpServer.stop(10) - - private case class Request(exchange: HttpExchange, payload: JValue) - - private[this] object Handler extends HttpHandler with HttpLikeBackendRpc[Request] { - - override def handle(exchange: HttpExchange): Unit = { - val payload = using(exchange.getRequestBody)(JsonMethods.parse(_)) - runRpc(Request(exchange, payload)) - } - - implicit override protected object Ask extends Routing { - - import Routes._ - - override def route(a: Request): Route = - a.exchange.getRequestURI.getPath match { - case "/value/type" => TypeOf(Value) - case "/table/type" => TypeOf(Table) - case "/matrixtable/type" => TypeOf(Matrix) - case "/blockmatrix/type" => TypeOf(BlockMatrix) - case "/execute" => Execute - case "/vcf/metadata/parse" => ParseVcfMetadata - case "/fam/import" => ImportFam - case "/references/load" => LoadReferencesFromDataset - case "/references/from_fasta" => LoadReferencesFromFASTA - } - - override def payload(a: Request): JValue = a.payload - } - - implicit override protected object Write extends Write[Request] with ErrorHandling { - - override def timings(req: Request)(t: Timings): Unit = { - val ts = Serialization.write(Map("timings" -> t)) - req.exchange.getResponseHeaders.add("X-Hail-Timings", ts) - } - - override def result(req: Request)(result: Array[Byte]): Unit = - respond(req)(HttpStatusCodes.STATUS_CODE_OK, result) - - override def error(req: Request)(t: Throwable): Unit = - respond(req)( - HttpStatusCodes.STATUS_CODE_SERVER_ERROR, - jsonToBytes { - val (shortMessage, expandedMessage, errorId) = handleForPython(t) - JObject( - "short" -> JString(shortMessage), - "expanded" -> JString(expandedMessage), - "error_id" -> JInt(errorId), - ) - }, - ) - - private[this] def respond(req: Request)(code: Int, payload: Array[Byte]): Unit = { - req.exchange.sendResponseHeaders(code, payload.length) - using(req.exchange.getResponseBody)(_.write(payload)) - } - } - - implicit override protected object Context extends Context[Request] { - override def scoped[A](req: Request)(f: ExecuteContext => A): (A, Timings) = - backend.withExecuteContext(f) - } - } -} diff --git a/hail/src/main/scala/is/hail/backend/api/Py4JBackendApi.scala b/hail/src/main/scala/is/hail/backend/api/Py4JBackendApi.scala new file mode 100644 index 000000000000..3ef77598fbe0 --- /dev/null +++ b/hail/src/main/scala/is/hail/backend/api/Py4JBackendApi.scala @@ -0,0 +1,154 @@ +package is.hail.backend.py4j + +import com.google.api.client.http.HttpStatusCodes +import com.sun.net.{httpserver => http} +import is.hail.HailFeatureFlags +import is.hail.asm4s.HailClassLoader +import is.hail.backend.caching.BlockMatrixCache +import is.hail.backend.{Backend, BackendContext, ExecuteContext, HttpLikeBackendRpc, TempFileManager} +import is.hail.expr.ir.LoweredTableReader.LoweredTableReaderCoercer +import is.hail.expr.ir.lowering.IrMetadata +import is.hail.expr.ir.{BaseIR, CodeCacheKey, CompiledFunction} +import is.hail.linalg.BlockMatrix +import is.hail.types.virtual.Kinds.{BlockMatrix, Matrix, Table, Value} +import is.hail.utils.ExecutionTimer.Timings +import is.hail.utils._ +import is.hail.variant.ReferenceGenome +import org.json4s._ +import org.json4s.jackson.{JsonMethods, Serialization} + +import java.io.Closeable +import java.net.InetSocketAddress +import java.util.concurrent._ +import scala.collection.mutable + +final class Py4JBackendApi(override val backend: Backend) extends Py4JBackendExtensions with Closeable { + + override val references: mutable.Map[String, ReferenceGenome] = + mutable.Map(ReferenceGenome.builtinReferences().toSeq: _*) + + override val flags: HailFeatureFlags = HailFeatureFlags.fromEnv() + override def longLifeTempFileManager: TempFileManager = null + + private[this] val theHailClassLoader = new HailClassLoader(getClass.getClassLoader) + private[this] val bmCache = new BlockMatrixCache() + private[this] val codeCache = new Cache[CodeCacheKey, CompiledFunction[_]](50) + private[this] val persistedIr: mutable.Map[Int, BaseIR] = mutable.Map() + private[this] val coercerCache = new Cache[Any, LoweredTableReaderCoercer](32) + + + override def close(): Unit = { + HttpServer.close() + backend.close() + } + + object HttpServer extends HttpLikeBackendRpc[http.HttpExchange] with Closeable { + // 0 => let the OS pick an available port + private[this] val httpServer = http.HttpServer.create(new InetSocketAddress(0), 10) + + private[this] val thread = { + // This HTTP server *must not* start non-daemon threads because such threads keep the JVM + // alive. A living JVM indicates to Spark that the job is incomplete. This does not manifest + // when you run jobs in a local pyspark (because you'll Ctrl-C out of Python regardless of the + // JVM's state) nor does it manifest in a Notebook (again, you'll kill the Notebook kernel + // explicitly regardless of the JVM). It *does* manifest when submitting jobs with + // + // gcloud dataproc submit ... + // + // or + // + // spark-submit + // + // setExecutor(null) ensures the server creates no new threads: + // + /* > If this method is not called (before start()) or if it is called with a null Executor, + * then */ + // > a default implementation is used, which uses the thread which was created by the start() + // > method. + // + /* Source: + * https://docs.oracle.com/en/java/javase/11/docs/api/jdk.httpserver/com/sun/net/httpserver/HttpServer.html#setExecutor(java.util.concurrent.Executor) */ + // + httpServer.createContext("/", runRpc(_: http.HttpExchange)) + httpServer.setExecutor(null) + val t = Executors.defaultThreadFactory().newThread(() => httpServer.start()) + t.setDaemon(true) + t + } + + def port: Int = httpServer.getAddress.getPort + + override def close(): Unit = + httpServer.stop(10) + + thread.start() + + implicit private object Handler + extends Routing with Write[http.HttpExchange] with Context[http.HttpExchange] + with ErrorHandling { + + override def route(req: http.HttpExchange): Route = + req.getRequestURI.getPath match { + case "/value/type" => Routes.TypeOf(Value) + case "/table/type" => Routes.TypeOf(Table) + case "/matrixtable/type" => Routes.TypeOf(Matrix) + case "/blockmatrix/type" => Routes.TypeOf(BlockMatrix) + case "/execute" => Routes.Execute + case "/vcf/metadata/parse" => Routes.ParseVcfMetadata + case "/fam/import" => Routes.ImportFam + case "/references/load" => Routes.LoadReferencesFromDataset + case "/references/from_fasta" => Routes.LoadReferencesFromFASTA + } + + override def payload(req: http.HttpExchange): JValue = + using(req.getRequestBody)(JsonMethods.parse(_)) + + override def timings(req: http.HttpExchange)(t: Timings): Unit = { + val ts = Serialization.write(Map("timings" -> t)) + req.getResponseHeaders.add("X-Hail-Timings", ts) + } + + override def result(req: http.HttpExchange)(result: Array[Byte]): Unit = + respond(req)(HttpStatusCodes.STATUS_CODE_OK, result) + + override def error(req: http.HttpExchange)(t: Throwable): Unit = + respond(req)( + HttpStatusCodes.STATUS_CODE_SERVER_ERROR, + jsonToBytes { + val (shortMessage, expandedMessage, errorId) = handleForPython(t) + JObject( + "short" -> JString(shortMessage), + "expanded" -> JString(expandedMessage), + "error_id" -> JInt(errorId), + ) + }, + ) + + private[this] def respond(req: http.HttpExchange)(code: Int, payload: Array[Byte]): Unit = { + req.sendResponseHeaders(code, payload.length) + using(req.getResponseBody)(_.write(payload)) + } + + override def scoped[A](req: http.HttpExchange)(f: ExecuteContext => A): (A, Timings) = { + ExecutionTimer.time { timer => + ExecuteContext.scoped( + tmpdir = String, + localTmpdir = String, + backend = Py4JBackendApi.this.backend, + fs = Py4JBackendApi.this.fs, + timer = timer, + tempFileManager = null, + theHailClassLoader = Py4JBackendApi.this.theHailClassLoader, + flags = Py4JBackendApi.this.flags, + irMetadata = IrMetadata(None), + references = Py4JBackendApi.this.references, + blockMatrixCache = Py4JBackendApi.this.bmCache, + codeCache = Py4JBackendApi.this.codeCache, + irCache = Py4JBackendApi.this.persistedIr, + coercerCache = Py4JBackendApi.this.coercerCache, + )(f) + } + } + } + } +} diff --git a/hail/src/main/scala/is/hail/backend/api/ServiceBackendApi.scala b/hail/src/main/scala/is/hail/backend/api/ServiceBackendApi.scala new file mode 100644 index 000000000000..36fa994ef5b9 --- /dev/null +++ b/hail/src/main/scala/is/hail/backend/api/ServiceBackendApi.scala @@ -0,0 +1,196 @@ +package is.hail.backend.api + +import is.hail.asm4s.HailClassLoader +import is.hail.backend.caching.NoCaching +import is.hail.backend.service._ +import is.hail.backend.{ExecuteContext, HttpLikeBackendRpc} +import is.hail.expr.ir.lowering.IrMetadata +import is.hail.io.fs.{CloudStorageFSConfig, RouterFS} +import is.hail.io.reference.{IndexedFastaSequenceFile, LiftOver} +import is.hail.services._ +import is.hail.types.virtual.Kinds +import is.hail.utils.ExecutionTimer.Timings +import is.hail.utils.{ErrorHandling, ExecutionTimer, HailWorkerException, Logging, using} +import is.hail.variant.ReferenceGenome +import is.hail.{HailContext, HailFeatureFlags} +import org.json4s.JsonAST.JValue +import org.json4s.jackson.JsonMethods + +import java.nio.file.Path +import scala.annotation.switch +import scala.collection.mutable + +object ServiceBackendAPI extends HttpLikeBackendRpc[Request] with Logging { + + def main(argv: Array[String]): Unit = { + assert(argv.length == 7, argv.toFastSeq) + + val scratchDir = argv(0) + // val logFile = argv(1) + val jarLocation = argv(2) + val kind = argv(3) + assert(kind == Main.DRIVER) + val name = argv(4) + val inputURL = argv(5) + val outputURL = argv(6) + + val deployConfig = DeployConfig.fromConfigFile("/deploy-config/deploy-config.json") + DeployConfig.set(deployConfig) + sys.env.get("HAIL_SSL_CONFIG_DIR").foreach(tls.setSSLConfigFromDir) + + var fs = RouterFS.buildRoutes( + CloudStorageFSConfig.fromFlagsAndEnv( + Some(Path.of(scratchDir, "secrets/gsa-key/key.json")), + HailFeatureFlags.fromEnv(), + ) + ) + + val (rpcConfig, jobConfig, action, payload) = + using(fs.openNoCompression(inputURL)) { is => + val input = JsonMethods.parse(is) + ( + (input \ "rpc_config").extract[ServiceBackendRPCPayload], + (input \ "job_config").extract[BatchJobConfig], + (input \ "action").extract[Int], + input \ "payload", + ) + } + + // requester pays config is conveyed in feature flags currently + val featureFlags = HailFeatureFlags.fromEnv(rpcConfig.flags) + fs = RouterFS.buildRoutes( + CloudStorageFSConfig.fromFlagsAndEnv( + Some(Path.of(scratchDir, "secrets/gsa-key/key.json")), + featureFlags, + ) + ) + + val references: Map[String, ReferenceGenome] = + ReferenceGenome.builtinReferences() ++ + rpcConfig.custom_references.map(ReferenceGenome.fromJSON).map(rg => rg.name -> rg) + + rpcConfig.liftovers.foreach { case (sourceGenome, liftoversForSource) => + liftoversForSource.foreach { case (destGenome, chainFile) => + references(sourceGenome).addLiftover(references(destGenome), LiftOver(fs, chainFile)) + } + } + + rpcConfig.sequences.foreach { case (rg, seq) => + references(rg).addSequence(IndexedFastaSequenceFile(fs, seq.fasta, seq.index)) + } + + val backend = new ServiceBackend( + name, + BatchClient(deployConfig, Path.of(scratchDir, "secrets/gsa-key/key.json")), + jarLocation, + BatchConfig.fromConfigFile(Path.of(scratchDir, "batch-config/batch-config.json")), + jobConfig, + ) + + log.info("ServiceBackend allocated.") + if (HailContext.isInitialized) { + HailContext.get.backend = backend + log.info("Default references added to already initialized HailContexet.") + } else { + HailContext(backend, 50, 3) + log.info("HailContexet initialized.") + } + + // FIXME: when can the classloader be shared? (optimizer benefits!) + runRpc( + Request( + backend, + featureFlags, + new HailClassLoader(getClass.getClassLoader), + rpcConfig, + fs, + references, + outputURL, + action, + payload, + ) + ) + } + + implicit override protected object Handler + extends Routing with Write[Request] with Context[Request] with ErrorHandling { + import Routes._ + + override def route(a: Request): Route = + (a.action: @switch) match { + case 1 => TypeOf(Kinds.Value) + case 2 => TypeOf(Kinds.Table) + case 3 => TypeOf(Kinds.Matrix) + case 4 => TypeOf(Kinds.BlockMatrix) + case 5 => Execute + case 6 => ParseVcfMetadata + case 7 => ImportFam + case 8 => LoadReferencesFromDataset + case 9 => LoadReferencesFromFASTA + } + + override def payload(a: Request): JValue = a.payload + + // service backend doesn't support sending timings back to the python client + override def timings(env: Request)(t: Timings): Unit = + () + + override def result(env: Request)(result: Array[Byte]): Unit = + retryTransientErrors { + using(env.fs.createNoCompression(env.outputUrl)) { outputStream => + val output = new HailSocketAPIOutputStream(outputStream) + output.writeBool(true) + output.writeBytes(result) + } + } + + override def error(env: Request)(t: Throwable): Unit = + retryTransientErrors { + val (shortMessage, expandedMessage, errorId) = + t match { + case t: HailWorkerException => + log.error( + "A worker failed. The exception was written for Python but we will also throw an exception to fail this driver job.", + t, + ) + (t.shortMessage, t.expandedMessage, t.errorId) + case _ => + log.error( + "An exception occurred in the driver. The exception was written for Python but we will re-throw to fail this driver job.", + t, + ) + handleForPython(t) + } + + using(env.fs.createNoCompression(env.outputUrl)) { outputStream => + val output = new HailSocketAPIOutputStream(outputStream) + output.writeBool(false) + output.writeString(shortMessage) + output.writeString(expandedMessage) + output.writeInt(errorId) + } + + throw t + } + + override def scoped[A](env: Request)(f: ExecuteContext => A): (A, Timings) = + ExecutionTimer.time { timer => + ExecuteContext.scoped( + env.rpcConfig.tmp_dir, + env.rpcConfig.remote_tmpdir, + env.backend, + env.fs, + timer, + null, + env.hcl, + env.flags, + IrMetadata(None), + mutable.Map(env.references.toSeq: _*), + NoCaching, + NoCaching, + NoCaching, + NoCaching, + )(f) + } + } +} diff --git a/hail/src/main/scala/is/hail/backend/py4j/Py4JBackendExtensions.scala b/hail/src/main/scala/is/hail/backend/py4j/Py4JBackendExtensions.scala deleted file mode 100644 index f2d972e570a7..000000000000 --- a/hail/src/main/scala/is/hail/backend/py4j/Py4JBackendExtensions.scala +++ /dev/null @@ -1,239 +0,0 @@ -package is.hail.backend.py4j - -import is.hail.HailFeatureFlags -import is.hail.backend.{Backend, ExecuteContext, NonOwningTempFileManager, TempFileManager} -import is.hail.expr.{JSONAnnotationImpex, SparkAnnotationImpex} -import is.hail.expr.ir.{ - BaseIR, BlockMatrixIR, EncodedLiteral, GetFieldByIdx, IRParser, Interpret, MatrixIR, - MatrixNativeReader, MatrixRead, NativeReaderOptions, TableLiteral, TableValue, -} -import is.hail.expr.ir.IRParser.parseType -import is.hail.expr.ir.functions.IRFunctionRegistry -import is.hail.io.reference.{IndexedFastaSequenceFile, LiftOver} -import is.hail.linalg.RowMatrix -import is.hail.types.physical.PStruct -import is.hail.types.virtual.{TArray, TInterval} -import is.hail.utils.{fatal, log, toRichIterable, HailException, Interval} -import is.hail.variant.ReferenceGenome - -import scala.collection.mutable -import scala.jdk.CollectionConverters.{asScalaBufferConverter, seqAsJavaListConverter} - -import java.util - -import org.apache.spark.sql.DataFrame -import org.json4s -import org.json4s.jackson.JsonMethods -import sourcecode.Enclosing - -trait Py4JBackendExtensions { - def backend: Backend - def references: mutable.Map[String, ReferenceGenome] - def flags: HailFeatureFlags - def longLifeTempFileManager: TempFileManager - - def pyGetFlag(name: String): String = - flags.get(name) - - def pySetFlag(name: String, value: String): Unit = - flags.set(name, value) - - def pyAvailableFlags: java.util.ArrayList[String] = - flags.available - - private[this] var irID: Int = 0 - - private[this] def nextIRID(): Int = { - irID += 1 - irID - } - - private[this] def addJavaIR(ctx: ExecuteContext, ir: BaseIR): Int = { - val id = nextIRID() - ctx.IrCache += (id -> ir) - id - } - - def pyRemoveJavaIR(id: Int): Unit = - backend.withExecuteContext(_.IrCache.remove(id)) - - def pyAddSequence(name: String, fastaFile: String, indexFile: String): Unit = - backend.withExecuteContext { ctx => - references(name).addSequence(IndexedFastaSequenceFile(ctx.fs, fastaFile, indexFile)) - } - - def pyRemoveSequence(name: String): Unit = - references(name).removeSequence() - - def pyExportBlockMatrix( - pathIn: String, - pathOut: String, - delimiter: String, - header: String, - addIndex: Boolean, - exportType: String, - partitionSize: java.lang.Integer, - entries: String, - ): Unit = - backend.withExecuteContext { ctx => - val rm = RowMatrix.readBlockMatrix(ctx.fs, pathIn, partitionSize) - entries match { - case "full" => - rm.export(ctx, pathOut, delimiter, Option(header), addIndex, exportType) - case "lower" => - rm.exportLowerTriangle(ctx, pathOut, delimiter, Option(header), addIndex, exportType) - case "strict_lower" => - rm.exportStrictLowerTriangle( - ctx, - pathOut, - delimiter, - Option(header), - addIndex, - exportType, - ) - case "upper" => - rm.exportUpperTriangle(ctx, pathOut, delimiter, Option(header), addIndex, exportType) - case "strict_upper" => - rm.exportStrictUpperTriangle( - ctx, - pathOut, - delimiter, - Option(header), - addIndex, - exportType, - ) - } - } - - def pyRegisterIR( - name: String, - typeParamStrs: java.util.ArrayList[String], - argNameStrs: java.util.ArrayList[String], - argTypeStrs: java.util.ArrayList[String], - returnType: String, - bodyStr: String, - ): Unit = - backend.withExecuteContext { ctx => - IRFunctionRegistry.registerIR( - ctx, - name, - typeParamStrs.asScala.toArray, - argNameStrs.asScala.toArray, - argTypeStrs.asScala.toArray, - returnType, - bodyStr, - ) - } - - def pyExecuteLiteral(irStr: String): Int = - backend.withExecuteContext { ctx => - val ir = IRParser.parse_value_ir(ctx, irStr) - backend.execute(ctx, ir) match { - case Left(_) => throw new HailException("Can't create literal") - case Right((pt, addr)) => - val field = GetFieldByIdx(EncodedLiteral.fromPTypeAndAddress(pt, addr, ctx), 0) - addJavaIR(ctx, field) - } - }._1 - - def pyFromDF(df: DataFrame, jKey: java.util.List[String]): (Int, String) = { - val key = jKey.asScala.toArray.toFastSeq - val signature = - SparkAnnotationImpex.importType(df.schema).setRequired(true).asInstanceOf[PStruct] - withExecuteContext(selfContainedExecution = false) { ctx => - val tir = TableLiteral( - TableValue( - ctx, - signature.virtualType, - key, - df.rdd, - Some(signature), - ), - ctx.theHailClassLoader, - ) - val id = addJavaIR(ctx, tir) - (id, JsonMethods.compact(tir.typ.toJSON)) - } - } - - def pyToDF(s: String): DataFrame = - backend.withExecuteContext { ctx => - val tir = IRParser.parse_table_ir(ctx, s) - Interpret(tir, ctx).toDF() - }._1 - - def pyReadMultipleMatrixTables(jsonQuery: String): util.List[MatrixIR] = - backend.withExecuteContext { ctx => - log.info("pyReadMultipleMatrixTables: got query") - val kvs = JsonMethods.parse(jsonQuery) match { - case json4s.JObject(values) => values.toMap - } - - val paths = kvs("paths").asInstanceOf[json4s.JArray].arr.toArray.map { - case json4s.JString(s) => s - } - - val intervalPointType = parseType(kvs("intervalPointType").asInstanceOf[json4s.JString].s) - val intervalObjects = - JSONAnnotationImpex.importAnnotation(kvs("intervals"), TArray(TInterval(intervalPointType))) - .asInstanceOf[IndexedSeq[Interval]] - - val opts = NativeReaderOptions(intervalObjects, intervalPointType) - val matrixReaders: IndexedSeq[MatrixIR] = paths.map { p => - log.info(s"creating MatrixRead node for $p") - val mnr = MatrixNativeReader(ctx.fs, p, Some(opts)) - MatrixRead(mnr.fullMatrixTypeWithoutUIDs, false, false, mnr): MatrixIR - } - log.info("pyReadMultipleMatrixTables: returning N matrix tables") - matrixReaders.asJava - }._1 - - def pyAddReference(jsonConfig: String): Unit = - addReference(ReferenceGenome.fromJSON(jsonConfig)) - - def pyRemoveReference(name: String): Unit = - removeReference(name) - - def pyAddLiftover(name: String, chainFile: String, destRGName: String): Unit = - backend.withExecuteContext { ctx => - references(name).addLiftover(references(destRGName), LiftOver(ctx.fs, chainFile)) - } - - def pyRemoveLiftover(name: String, destRGName: String): Unit = - references(name).removeLiftover(destRGName) - - private[this] def addReference(rg: ReferenceGenome): Unit = { - references.get(rg.name) match { - case Some(rg2) => - if (rg != rg2) { - fatal( - s"Cannot add reference genome '${rg.name}', a different reference with that name already exists. Choose a reference name NOT in the following list:\n " + - s"@1", - references.keys.truncatable("\n "), - ) - } - case None => - references += (rg.name -> rg) - } - } - - private[this] def removeReference(name: String): Unit = - references -= name - - def parse_blockmatrix_ir(s: String): BlockMatrixIR = - withExecuteContext(selfContainedExecution = false) { ctx => - IRParser.parse_blockmatrix_ir(ctx, s) - } - - def withExecuteContext[T]( - selfContainedExecution: Boolean = true - )( - f: ExecuteContext => T - )(implicit E: Enclosing - ): T = - backend.withExecuteContext { ctx => - val tempFileManager = longLifeTempFileManager - if (selfContainedExecution && tempFileManager != null) f(ctx) - else ctx.local(tempFileManager = NonOwningTempFileManager(tempFileManager))(f) - }._1 -}