Skip to content

Commit

Permalink
[query] Remove BlockMatrix persist from Backend interface
Browse files Browse the repository at this point in the history
  • Loading branch information
ehigham committed Jan 21, 2025
1 parent 482d085 commit 8fe9826
Show file tree
Hide file tree
Showing 13 changed files with 108 additions and 94 deletions.
12 changes: 1 addition & 11 deletions hail/src/main/scala/is/hail/backend/Backend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ import is.hail.io.{BufferSpec, TypedCodecSpec}
import is.hail.io.fs._
import is.hail.io.plink.LoadPlink
import is.hail.io.vcf.LoadVCF
import is.hail.linalg.BlockMatrix
import is.hail.types._
import is.hail.types.encoded.EType
import is.hail.types.physical.PTuple
import is.hail.types.virtual.{BlockMatrixType, TFloat64}
import is.hail.types.virtual.TFloat64
import is.hail.utils._
import is.hail.variant.ReferenceGenome

Expand Down Expand Up @@ -77,15 +76,6 @@ abstract class Backend extends Closeable {

def broadcast[T: ClassTag](value: T): BroadcastValue[T]

def persist(backendContext: BackendContext, id: String, value: BlockMatrix, storageLevel: String)
: Unit

def unpersist(backendContext: BackendContext, id: String): Unit

def getPersistedBlockMatrix(backendContext: BackendContext, id: String): BlockMatrix

def getPersistedBlockMatrixType(backendContext: BackendContext, id: String): BlockMatrixType

def parallelizeAndComputeWithIndex(
backendContext: BackendContext,
fs: FS,
Expand Down
6 changes: 6 additions & 0 deletions hail/src/main/scala/is/hail/backend/ExecuteContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import is.hail.asm4s.HailClassLoader
import is.hail.backend.local.LocalTaskContext
import is.hail.expr.ir.lowering.IrMetadata
import is.hail.io.fs.FS
import is.hail.linalg.BlockMatrix
import is.hail.utils._
import is.hail.variant.ReferenceGenome

Expand Down Expand Up @@ -71,6 +72,7 @@ object ExecuteContext {
flags: HailFeatureFlags,
backendContext: BackendContext,
irMetadata: IrMetadata,
blockMatrixCache: mutable.Map[String, BlockMatrix],
)(
f: ExecuteContext => T
): T = {
Expand All @@ -89,6 +91,7 @@ object ExecuteContext {
flags,
backendContext,
irMetadata,
blockMatrixCache,
))(f(_))
}
}
Expand Down Expand Up @@ -118,6 +121,7 @@ class ExecuteContext(
val flags: HailFeatureFlags,
val backendContext: BackendContext,
val irMetadata: IrMetadata,
val BlockMatrixCache: mutable.Map[String, BlockMatrix],
) extends Closeable {

val rngNonce: Long =
Expand Down Expand Up @@ -184,6 +188,7 @@ class ExecuteContext(
flags: HailFeatureFlags = this.flags,
backendContext: BackendContext = this.backendContext,
irMetadata: IrMetadata = this.irMetadata,
blockMatrixCache: mutable.Map[String, BlockMatrix] = this.BlockMatrixCache,
)(
f: ExecuteContext => A
): A =
Expand All @@ -200,5 +205,6 @@ class ExecuteContext(
flags,
backendContext,
irMetadata,
blockMatrixCache,
))(f)
}
33 changes: 33 additions & 0 deletions hail/src/main/scala/is/hail/backend/caching/BlockMatrixCache.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package is.hail.backend.caching

import is.hail.linalg.BlockMatrix

import scala.collection.mutable

class BlockMatrixCache extends mutable.AbstractMap[String, BlockMatrix] with AutoCloseable {

private[this] val blockmatrices: mutable.Map[String, BlockMatrix] =
mutable.LinkedHashMap.empty

override def +=(kv: (String, BlockMatrix)): BlockMatrixCache.this.type = {
blockmatrices += kv; this
}

override def -=(key: String): BlockMatrixCache.this.type = {
get(key).foreach { bm => bm.unpersist(); blockmatrices -= key }; this
}

override def get(key: String): Option[BlockMatrix] =
blockmatrices.get(key)

override def iterator: Iterator[(String, BlockMatrix)] =
blockmatrices.iterator

override def clear(): Unit = {
blockmatrices.values.foreach(_.unpersist())
blockmatrices.clear()
}

override def close(): Unit =
clear()
}
13 changes: 2 additions & 11 deletions hail/src/main/scala/is/hail/backend/local/LocalBackend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ import is.hail.expr.ir._
import is.hail.expr.ir.analyses.SemanticHash
import is.hail.expr.ir.lowering._
import is.hail.io.fs._
import is.hail.linalg.BlockMatrix
import is.hail.types._
import is.hail.types.physical.PTuple
import is.hail.types.physical.stypes.PTypeReferenceSingleCodeType
import is.hail.types.virtual.{BlockMatrixType, TVoid}
import is.hail.types.virtual.TVoid
import is.hail.utils._
import is.hail.variant.ReferenceGenome

Expand Down Expand Up @@ -113,6 +112,7 @@ class LocalBackend(
ExecutionCache.fromFlags(flags, fs, tmpdir)
},
new IrMetadata(),
ImmutableMap.empty,
)(f)
}

Expand Down Expand Up @@ -215,15 +215,6 @@ class LocalBackend(
): TableReader =
LowerDistributedSort.distributedSort(ctx, stage, sortFields, rt, nPartitions)

def persist(backendContext: BackendContext, id: String, value: BlockMatrix, storageLevel: String)
: Unit = ???

def unpersist(backendContext: BackendContext, id: String): Unit = ???

def getPersistedBlockMatrix(backendContext: BackendContext, id: String): BlockMatrix = ???

def getPersistedBlockMatrixType(backendContext: BackendContext, id: String): BlockMatrixType = ???

def tableToTableStage(ctx: ExecuteContext, inputIR: TableIR, analyses: LoweringAnalyses)
: TableStage =
LowerTableIR.applyTable(inputIR, DArrayLowering.All, ctx, analyses)
Expand Down
11 changes: 1 addition & 10 deletions hail/src/main/scala/is/hail/backend/service/ServiceBackend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import is.hail.expr.ir.functions.IRFunctionRegistry
import is.hail.expr.ir.lowering._
import is.hail.io.fs._
import is.hail.io.reference.{IndexedFastaSequenceFile, LiftOver}
import is.hail.linalg.BlockMatrix
import is.hail.services.{BatchClient, JobGroupRequest, _}
import is.hail.services.JobGroupStates.{Cancelled, Failure, Running, Success}
import is.hail.types._
Expand Down Expand Up @@ -375,15 +374,6 @@ class ServiceBackend(
): TableReader =
LowerDistributedSort.distributedSort(ctx, inputStage, sortFields, rt, nPartitions)

def persist(backendContext: BackendContext, id: String, value: BlockMatrix, storageLevel: String)
: Unit = ???

def unpersist(backendContext: BackendContext, id: String): Unit = ???

def getPersistedBlockMatrix(backendContext: BackendContext, id: String): BlockMatrix = ???

def getPersistedBlockMatrixType(backendContext: BackendContext, id: String): BlockMatrixType = ???

def tableToTableStage(ctx: ExecuteContext, inputIR: TableIR, analyses: LoweringAnalyses)
: TableStage =
LowerTableIR.applyTable(inputIR, DArrayLowering.All, ctx, analyses)
Expand All @@ -402,6 +392,7 @@ class ServiceBackend(
flags,
serviceBackendContext,
new IrMetadata(),
ImmutableMap.empty,
)(f)
}

Expand Down
21 changes: 6 additions & 15 deletions hail/src/main/scala/is/hail/backend/spark/SparkBackend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import is.hail.{HailContext, HailFeatureFlags}
import is.hail.annotations._
import is.hail.asm4s._
import is.hail.backend._
import is.hail.backend.caching.BlockMatrixCache
import is.hail.backend.py4j.Py4JBackendExtensions
import is.hail.expr.Validate
import is.hail.expr.ir._
import is.hail.expr.ir.analyses.SemanticHash
import is.hail.expr.ir.lowering._
import is.hail.io.{BufferSpec, TypedCodecSpec}
import is.hail.io.fs._
import is.hail.linalg.BlockMatrix
import is.hail.rvd.RVD
import is.hail.types._
import is.hail.types.physical.{PStruct, PTuple}
Expand Down Expand Up @@ -351,20 +351,8 @@ class SparkBackend(
override val longLifeTempFileManager: TempFileManager =
new OwningTempFileManager(fs)

val bmCache: SparkBlockMatrixCache = SparkBlockMatrixCache()

def persist(backendContext: BackendContext, id: String, value: BlockMatrix, storageLevel: String)
: Unit = bmCache.persistBlockMatrix(id, value, storageLevel)

def unpersist(backendContext: BackendContext, id: String): Unit = unpersist(id)

def getPersistedBlockMatrix(backendContext: BackendContext, id: String): BlockMatrix =
bmCache.getPersistedBlockMatrix(id)

def getPersistedBlockMatrixType(backendContext: BackendContext, id: String): BlockMatrixType =
bmCache.getPersistedBlockMatrixType(id)

def unpersist(id: String): Unit = bmCache.unpersistBlockMatrix(id)
private[this] val bmCache: BlockMatrixCache =
new BlockMatrixCache()

def createExecuteContextForTests(
timer: ExecutionTimer,
Expand All @@ -387,6 +375,7 @@ class SparkBackend(
ExecutionCache.forTesting
},
new IrMetadata(),
ImmutableMap.empty,
)

override def withExecuteContext[T](f: ExecuteContext => T)(implicit E: Enclosing): T =
Expand All @@ -406,6 +395,7 @@ class SparkBackend(
ExecutionCache.fromFlags(flags, fs, tmpdir)
},
new IrMetadata(),
bmCache,
)(f)
}

Expand Down Expand Up @@ -470,6 +460,7 @@ class SparkBackend(
override def asSpark(op: String): SparkBackend = this

def close(): Unit = {
bmCache.close()
SparkBackend.stop()
longLifeTempFileManager.close()
}
Expand Down

This file was deleted.

13 changes: 5 additions & 8 deletions hail/src/main/scala/is/hail/expr/ir/BlockMatrixIR.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package is.hail.expr.ir

import is.hail.HailContext
import is.hail.annotations.NDArray
import is.hail.backend.{BackendContext, ExecuteContext}
import is.hail.backend.ExecuteContext
import is.hail.expr.Nat
import is.hail.expr.ir.lowering.{BMSContexts, BlockMatrixStage2, LowererUnsupportedOperation}
import is.hail.io.{StreamBufferSpec, TypedCodecSpec}
Expand Down Expand Up @@ -106,7 +105,7 @@ object BlockMatrixReader {
def fromJValue(ctx: ExecuteContext, jv: JValue): BlockMatrixReader =
(jv \ "name").extract[String] match {
case "BlockMatrixNativeReader" => BlockMatrixNativeReader.fromJValue(ctx.fs, jv)
case "BlockMatrixPersistReader" => BlockMatrixPersistReader.fromJValue(ctx.backendContext, jv)
case "BlockMatrixPersistReader" => BlockMatrixPersistReader.fromJValue(ctx, jv)
case _ => jv.extract[BlockMatrixReader]
}
}
Expand Down Expand Up @@ -274,22 +273,20 @@ case class BlockMatrixBinaryReader(path: String, shape: IndexedSeq[Long], blockS
case class BlockMatrixNativePersistParameters(id: String)

object BlockMatrixPersistReader {
def fromJValue(ctx: BackendContext, jv: JValue): BlockMatrixPersistReader = {
def fromJValue(ctx: ExecuteContext, jv: JValue): BlockMatrixPersistReader = {
implicit val formats: Formats = BlockMatrixReader.formats
val params = jv.extract[BlockMatrixNativePersistParameters]
BlockMatrixPersistReader(
params.id,
HailContext.backend.getPersistedBlockMatrixType(ctx, params.id),
BlockMatrixType.fromBlockMatrix(ctx.BlockMatrixCache(params.id)),
)
}
}

case class BlockMatrixPersistReader(id: String, typ: BlockMatrixType) extends BlockMatrixReader {
def pathsUsed: Seq[String] = FastSeq()
lazy val fullType: BlockMatrixType = typ

def apply(ctx: ExecuteContext): BlockMatrix =
HailContext.backend.getPersistedBlockMatrix(ctx.backendContext, id)
def apply(ctx: ExecuteContext): BlockMatrix = ctx.BlockMatrixCache(id)
}

case class BlockMatrixMap(child: BlockMatrixIR, eltName: Name, f: IR, needsDense: Boolean)
Expand Down
3 changes: 1 addition & 2 deletions hail/src/main/scala/is/hail/expr/ir/BlockMatrixWriter.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package is.hail.expr.ir

import is.hail.HailContext
import is.hail.annotations.Region
import is.hail.asm4s._
import is.hail.backend.ExecuteContext
Expand Down Expand Up @@ -190,7 +189,7 @@ case class BlockMatrixPersistWriter(id: String, storageLevel: String) extends Bl
def pathOpt: Option[String] = None

def apply(ctx: ExecuteContext, bm: BlockMatrix): Unit =
HailContext.backend.persist(ctx.backendContext, id, bm, storageLevel)
ctx.BlockMatrixCache += id -> bm.persist(storageLevel)

def loweredTyp: Type = TVoid
}
Expand Down
2 changes: 1 addition & 1 deletion hail/src/main/scala/is/hail/utils/ErrorHandling.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class HailWorkerException(
trait ErrorHandling {
def fatal(msg: String): Nothing = throw new HailException(msg)

def fatal(msg: String, errorId: Int) = throw new HailException(msg, errorId)
def fatal(msg: String, errorId: Int): Nothing = throw new HailException(msg, errorId)

def fatal(msg: String, cause: Throwable): Nothing = throw new HailException(msg, None, cause)

Expand Down
25 changes: 25 additions & 0 deletions hail/src/main/scala/is/hail/utils/ImmutableMap.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package is.hail.utils

import scala.collection.mutable

case class ImmutableMap[K, V](m: Map[K, V]) extends mutable.AbstractMap[K, V] {
override def +=(kv: (K, V)): ImmutableMap.this.type =
throw new UnsupportedOperationException()

override def -=(key: K): ImmutableMap.this.type =
throw new UnsupportedOperationException()

override def get(key: K): Option[V] =
m.get(key)

override def iterator: Iterator[(K, V)] =
m.iterator

override def toMap[T, U](implicit ev: (K, V) <:< (T, U)): Map[T, U] =
m.toMap
}

object ImmutableMap {
def empty[K, V]: ImmutableMap[K, V] =
ImmutableMap(Map.empty)
}
7 changes: 7 additions & 0 deletions hail/src/main/scala/is/hail/utils/richUtils/RichMap.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package is.hail.utils.richUtils

import is.hail.utils.ImmutableMap

import scala.collection.mutable

class RichMap[K, V](val m: Map[K, V]) extends AnyVal {
def force =
m.map(identity) // needed to make serializable: https://issues.scala-lang.org/browse/SI-7005
Expand All @@ -9,4 +13,7 @@ class RichMap[K, V](val m: Map[K, V]) extends AnyVal {

def isTrivial(implicit eq: K =:= V): Boolean =
m.forall { case (k, v) => k == v }

def immutableMutableMap: mutable.Map[K, V] =
ImmutableMap(m)
}
Loading

0 comments on commit 8fe9826

Please sign in to comment.