Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AN-380 Make call cache hashing strategy configurable per filesystem and backend #7683

Draft
wants to merge 21 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package cromwell.backend.standard.callcaching

import java.util.concurrent.TimeoutException

import akka.actor.{Actor, ActorLogging, ActorRef, Timers}
import akka.event.LoggingAdapter
import com.typesafe.config.Config
import cromwell.backend.standard.StandardCachingActorHelper
import cromwell.backend.standard.callcaching.RootWorkflowFileHashCacheActor.IoHashCommandWithContext
import cromwell.backend.standard.callcaching.StandardFileHashingActor._
Expand All @@ -12,8 +12,11 @@ import cromwell.core.JobKey
import cromwell.core.callcaching._
import cromwell.core.io._
import cromwell.core.logging.JobLogging
import cromwell.core.path.Path
import net.ceedubs.ficus.Ficus._
import wom.values.WomFile

import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}

/**
Expand Down Expand Up @@ -48,6 +51,10 @@ case class FileHashContext(hashKey: HashKey, file: String)
class DefaultStandardFileHashingActor(standardParams: StandardFileHashingActorParams)
extends StandardFileHashingActor(standardParams) {
override val ioCommandBuilder: IoCommandBuilder = DefaultIoCommandBuilder

override val defaultHashingStrategies: Map[String, FileHashStrategy] = Map(
("drs", FileHashStrategy.Crc32c)
)
}

object StandardFileHashingActor {
Expand Down Expand Up @@ -80,10 +87,40 @@ abstract class StandardFileHashingActor(standardParams: StandardFileHashingActor
override lazy val serviceRegistryActor: ActorRef = standardParams.serviceRegistryActor
override lazy val configurationDescriptor: BackendConfigurationDescriptor = standardParams.configurationDescriptor

// Child classes can override to set per-filesystem defaults
val defaultHashingStrategies: Map[String, FileHashStrategy] = Map.empty

// Hashing strategy to use if none is configured.
val fallbackHashingStrategy: FileHashStrategy = FileHashStrategy.Md5

// Combines defaultHashingStrategies with user-provided configuration
lazy val hashingStrategies: Map[String, FileHashStrategy] = {

val configuredHashingStrategies = for {
fsConfigs <- configurationDescriptor.backendConfig.as[Option[Config]]("filesystems").toList
fsKey <- fsConfigs.entrySet.asScala.map(_.getKey)
fileHashStrategyName <- fsConfigs.as[Option[String]](s"fileSystems.${fsKey}.caching.hash-strategy")
fileHashStrategy <- FileHashStrategy(fileHashStrategyName)
_ = log.info(s"Call caching hash strategy for ${fsKey} files will be ${fileHashStrategy}")
} yield (fsKey, fileHashStrategy)

val strats = defaultHashingStrategies ++ configuredHashingStrategies
val stratsReport = strats.keys.toList.sorted.map(k => s"$k -> ${strats.get(k)}").mkString(", ")
log.info(
s"Call caching configured with per-filesystem file hashing strategies: $stratsReport. " +
s"Others will use $fallbackHashingStrategy."
)
strats
}

protected def ioCommandBuilder: IoCommandBuilder = DefaultIoCommandBuilder

// Used by ConfigBackend for synchronous hashing of local files
def customHashStrategy(fileRequest: SingleFileHashRequest): Option[Try[String]] = None

def hashStrategyForPath(p: Path): FileHashStrategy =
hashingStrategies.getOrElse(p.filesystemTypeKey, fallbackHashingStrategy)

def fileHashingReceive: Receive = {
// Hash Request
case fileRequest: SingleFileHashRequest =>
Expand Down Expand Up @@ -115,8 +152,9 @@ abstract class StandardFileHashingActor(standardParams: StandardFileHashingActor
def asyncHashing(fileRequest: SingleFileHashRequest, replyTo: ActorRef): Unit = {
val fileAsString = fileRequest.file.value
val ioHashCommandTry = for {
gcsPath <- getPath(fileAsString)
command <- ioCommandBuilder.hashCommand(gcsPath)
path <- getPath(fileAsString)
hashStrategy = hashStrategyForPath(path)
command <- ioCommandBuilder.hashCommand(path, hashStrategy)
} yield command
lazy val fileHashContext = FileHashContext(fileRequest.hashKey, fileRequest.file.value)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package cromwell.core.callcaching

// File hashing strategies used by IoHashCommand, primarily when obtaining file hashes
// for call caching purposes.
sealed trait FileHashStrategy

object FileHashStrategy {
case object Crc32c extends FileHashStrategy
case object Md5 extends FileHashStrategy
case object Md5ThenIdentity extends FileHashStrategy
case object ETag extends FileHashStrategy

// TODO validate fs type here?
def apply(s: String): Option[FileHashStrategy] = s.toLowerCase() match {
case "md5" => Some(Md5)
case "crc32c" => Some(Crc32c)
case "md5+identity" => Some(Md5ThenIdentity)
case "etag" => Some(ETag)
case _ => None
}
}
5 changes: 3 additions & 2 deletions core/src/main/scala/cromwell/core/io/AsyncIo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cromwell.core.io

import akka.actor.ActorRef
import com.typesafe.config.{Config, ConfigFactory}
import cromwell.core.callcaching.FileHashStrategy
import cromwell.core.io.IoPromiseProxyActor.IoCommandWithPromise
import cromwell.core.path.BetterFileMethods.OpenOptions
import cromwell.core.path.Path
Expand Down Expand Up @@ -47,8 +48,8 @@ class AsyncIo(ioEndpoint: ActorRef, ioCommandBuilder: IoCommandBuilder) {
def sizeAsync(path: Path): Future[Long] =
asyncCommand(ioCommandBuilder.sizeCommand(path))

def hashAsync(path: Path): Future[String] =
asyncCommand(ioCommandBuilder.hashCommand(path))
def hashAsync(path: Path, hashStrategy: FileHashStrategy): Future[String] =
asyncCommand(ioCommandBuilder.hashCommand(path, hashStrategy))

def deleteAsync(path: Path, swallowIoExceptions: Boolean = false): Future[Unit] =
asyncCommand(ioCommandBuilder.deleteCommand(path, swallowIoExceptions))
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/cromwell/core/io/DefaultIoCommand.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cromwell.core.io

import better.files.File.OpenOptions
import cromwell.core.callcaching.FileHashStrategy
import cromwell.core.io.IoContentAsStringCommand.IoReadOptions
import cromwell.core.path.Path

Expand Down Expand Up @@ -42,8 +43,9 @@ object DefaultIoCommand {
s"DefaultIoDeleteCommand file '$file' swallowIOExceptions '$swallowIOExceptions'"
}

case class DefaultIoHashCommand(override val file: Path) extends IoHashCommand(file) {
override def commandDescription: String = s"DefaultIoHashCommand file '$file'"
case class DefaultIoHashCommand(override val file: Path, override val hashStrategy: FileHashStrategy)
extends IoHashCommand(file, hashStrategy) {
override def commandDescription: String = s"DefaultIoHashCommand file '$file' hashStrategy '$hashStrategy'"
}

case class DefaultIoTouchCommand(override val file: Path) extends IoTouchCommand(file) {
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/cromwell/core/io/IoCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package cromwell.core.io

import java.time.OffsetDateTime
import java.util.UUID

import better.files.File.OpenOptions
import com.google.api.client.util.ExponentialBackOff
import common.util.Backoff
import common.util.StringUtil.EnhancedToStringable
import cromwell.core.callcaching.FileHashStrategy
import cromwell.core.io.IoContentAsStringCommand.IoReadOptions
import cromwell.core.path.Path
import cromwell.core.retry.SimpleExponentialBackoff
Expand Down Expand Up @@ -161,8 +161,9 @@ abstract class IoDeleteCommand(val file: Path, val swallowIOExceptions: Boolean)
/**
* Get Hash value for file
*/
abstract class IoHashCommand(val file: Path) extends SingleFileIoCommand[String] {
override def toString = s"get hash of ${file.pathAsString}"
abstract class IoHashCommand(val file: Path, val hashStrategy: FileHashStrategy)
extends SingleFileIoCommand[String] {
override def toString = s"get $hashStrategy hash of ${file.pathAsString}"
override lazy val name = "hash"
}

Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/cromwell/core/io/IoCommandBuilder.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package cromwell.core.io

import cromwell.core.callcaching.FileHashStrategy
import cromwell.core.io.DefaultIoCommand._
import cromwell.core.io.IoContentAsStringCommand.IoReadOptions
import cromwell.core.path.BetterFileMethods.OpenOptions
Expand All @@ -18,7 +19,7 @@ abstract class PartialIoCommandBuilder {
def sizeCommand: PartialFunction[Path, Try[IoSizeCommand]] = PartialFunction.empty
def deleteCommand: PartialFunction[(Path, Boolean), Try[IoDeleteCommand]] = PartialFunction.empty
def copyCommand: PartialFunction[(Path, Path), Try[IoCopyCommand]] = PartialFunction.empty
def hashCommand: PartialFunction[Path, Try[IoHashCommand]] = PartialFunction.empty
def hashCommand: PartialFunction[(Path, FileHashStrategy), Try[IoHashCommand]] = PartialFunction.empty
def touchCommand: PartialFunction[Path, Try[IoTouchCommand]] = PartialFunction.empty
def existsCommand: PartialFunction[Path, Try[IoExistsCommand]] = PartialFunction.empty
def isDirectoryCommand: PartialFunction[Path, Try[IoIsDirectoryCommand]] = PartialFunction.empty
Expand Down Expand Up @@ -85,8 +86,8 @@ class IoCommandBuilder(partialBuilders: List[PartialIoCommandBuilder] = List.emp
def copyCommand(src: Path, dest: Path): Try[IoCopyCommand] =
buildOrDefault(_.copyCommand, (src, dest), DefaultIoCopyCommand(src, dest))

def hashCommand(file: Path): Try[IoHashCommand] =
buildOrDefault(_.hashCommand, file, DefaultIoHashCommand(file))
def hashCommand(file: Path, hashStrategy: FileHashStrategy): Try[IoHashCommand] =
buildOrDefault(_.hashCommand, (file, hashStrategy), DefaultIoHashCommand(file, hashStrategy))

def touchCommand(file: Path): Try[IoTouchCommand] =
buildOrDefault(_.touchCommand, file, DefaultIoTouchCommand(file))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ case object DefaultPathBuilder extends PathBuilder {
}

case class DefaultPath private[path] (nioPath: NioPath) extends Path {

val filesystemTypeKey = "local"

override protected def newPath(nioPath: NioPath): DefaultPath = DefaultPath(nioPath)

override def pathAsString: String = nioPath.toString
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/cromwell/core/path/PathBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ trait PreResolvePathBuilder extends PathBuilder {
*/
trait Path extends PathObjectMethods with NioPathMethods with BetterFileMethods with EvenBetterPathMethods {

/**
* A string key corresponding to the filesystem this Path is associated with, ex. gcs, drs, local etc.
* This should be the string used to identify the filesystem in Cromwell's config file.
*/
val filesystemTypeKey: String
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the most questionable part of this diff IMO.


/**
* A reference to the underlying nioPath, used to create new java.nio.Path's that will then be sent to newPath
* for wrapping.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class NioFlow(parallelism: Int,
}

private def hash(hash: IoHashCommand): IO[String] =
NioHashing.hash(hash.file)
NioHashing.hash(hash.file, hash.hashStrategy)

private def touch(touch: IoTouchCommand) = IO {
touch.file.touch()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cromwell.engine.io.nio
import cats.effect.IO
import cloud.nio.spi.{FileHash, HashType}
import common.util.StringUtil.EnhancedString
import cromwell.core.callcaching.FileHashStrategy
import cromwell.core.path.Path
import cromwell.filesystems.blob.BlobPath
import cromwell.filesystems.drs.DrsPath
Expand All @@ -15,7 +16,8 @@ import scala.util.Try

object NioHashing {

def hash(file: Path): IO[String] =
// TODO update logic to respect hashStrategy
def hash(file: Path, hashStrategy: FileHashStrategy): IO[String] =
// If there is no hash accessible from the file storage system,
// we'll read the file and generate the hash ourselves if we can.
getStoredHash(file)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class IoActorProxyGcsBatchSpec

val copyCommand = GcsBatchCopyCommand.forPaths(src, dst).get
val sizeCommand = GcsBatchSizeCommand.forPath(src).get
val hashCommand = GcsBatchCrc32Command.forPath(src).get
val hashCommand = GcsBatchHashCommand.forPath(src).get
// Should return true
val isDirectoryCommand = GcsBatchIsDirectoryCommand.forPath(directory).get
// Should return false
Expand All @@ -112,7 +112,7 @@ class IoActorProxyGcsBatchSpec
fileSize shouldBe 5
}

received1 collect { case IoSuccess(_: GcsBatchCrc32Command, hash: String) =>
received1 collect { case IoSuccess(_: GcsBatchHashCommand, hash: String) =>
hash shouldBe "mnG7TA=="
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import common.assertion.CromwellTimeoutSpec
import cromwell.core.TestKitSuite
import cromwell.engine.io.IoCommandContext
import cromwell.filesystems.gcs.GcsPath
import cromwell.filesystems.gcs.batch.GcsBatchCrc32Command
import cromwell.filesystems.gcs.batch.GcsBatchHashCommand
import org.scalatest.PrivateMethodTester
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.matchers.should.Matchers
Expand Down Expand Up @@ -85,7 +85,7 @@ class GcsBatchFlowSpec
projectId = "GcsBatchFlowSpec-project"
)
val gcsBatchCommandContext =
GcsBatchCommandContext(GcsBatchCrc32Command.forPath(mockGcsPath).get, TestProbe().ref, 5)
GcsBatchCommandContext(GcsBatchHashCommand.forPath(mockGcsPath).get, TestProbe().ref, 5)
val recoverCommandPrivateMethod =
PrivateMethod[PartialFunction[Throwable, Future[GcsBatchResponse[_]]]](Symbol("recoverCommand"))
val partialFuncAcceptingThrowable = gcsBatchFlow invokePrivate recoverCommandPrivateMethod(gcsBatchCommandContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ object BlobPath {
case class BlobPath private[blob] (pathString: String, endpoint: EndpointURL, container: BlobContainerName)(
private val fsm: BlobFileSystemManager
) extends Path {

val filesystemTypeKey = "blob"

override def nioPath: NioPath = findNioPath(pathString)

override protected def newPath(nioPath: NioPath): Path = BlobPath(nioPath, endpoint, container, fsm)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import java.io.IOException

case class DrsPath(drsPath: CloudNioPath, requesterPaysProjectIdOption: Option[String]) extends Path {

val filesystemTypeKey = "drs"

override def nioPath: NioPath = drsPath

override protected def newPath(nioPath: NioPath): Path =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import cloud.nio.spi.CloudNioPath
import cromwell.core.path.{NioPath, Path}

case class FtpPath(ftpPath: CloudNioPath) extends Path {
val filesystemTypeKey = "ftp"
override def nioPath = ftpPath
override protected def newPath(nioPath: NioPath) = FtpPath(nioPath.asInstanceOf[CloudNioPath])
override def pathAsString = nioPath.uriAsString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ case class GcsPath private[gcs] (nioPath: NioPath,
cloudStorage: com.google.cloud.storage.Storage,
projectId: String
) extends Path {

val filesystemTypeKey = "gcs"

lazy val objectBlobId: Try[BlobId] = Try {
val bucketName = cloudStoragePath.bucket
val objectName = cloudStoragePath.toRealPath().toString
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package cromwell.filesystems.gcs.batch

import cromwell.core.callcaching.FileHashStrategy
import cromwell.core.io._
import cromwell.core.path.Path
import cromwell.filesystems.gcs.GcsPath
Expand All @@ -19,8 +20,9 @@ private case object PartialGcsBatchCommandBuilder extends PartialIoCommandBuilde
case (gcsSrc: GcsPath, gcsDest: GcsPath) => GcsBatchCopyCommand.forPaths(gcsSrc, gcsDest)
}

override def hashCommand: PartialFunction[Path, Try[GcsBatchCrc32Command]] = { case gcsPath: GcsPath =>
GcsBatchCrc32Command.forPath(gcsPath)
override def hashCommand: PartialFunction[(Path, FileHashStrategy), Try[GcsBatchHashCommand]] = {
case (gcsPath: GcsPath, s) =>
GcsBatchHashCommand.forPath(gcsPath, s)
}

override def touchCommand: PartialFunction[Path, Try[GcsBatchTouchCommand]] = { case gcsPath: GcsPath =>
Expand Down
Loading
Loading