Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AN-380 Make call cache hashing strategy configurable per filesystem and backend #7683

Open
wants to merge 33 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
e712005
Specify filesystemTypeKey on each Path subclass
jgainerdewar Jan 24, 2025
449567c
Read per-filesystem AsyncFileHashingStrategy from config
jgainerdewar Jan 24, 2025
4a36763
Set default hashing strategies for each backend
jgainerdewar Jan 24, 2025
98beda3
Move AsyncFileHashingStrategy to core
jgainerdewar Jan 24, 2025
7ea5618
Plumb hashStrategy through io commands, use for batch commands
jgainerdewar Jan 24, 2025
2f55cd4
Rename GcsBatchCrc32cCommand -> GcsBatchHashCommand
jgainerdewar Jan 24, 2025
6854497
Rename S3BatchTagCommand -> S3BatchHashCommand
jgainerdewar Jan 24, 2025
61eae83
Rename AsyncFileHashingStrategy -> FileHashStrategy
jgainerdewar Jan 24, 2025
be3a8a3
Update non-DRS NioHashing hash logic
jgainerdewar Jan 27, 2025
0c32295
Progress on DRS
jgainerdewar Jan 28, 2025
baf8151
Switch FileHashStrategy to list approach, make DRS conform
jgainerdewar Jan 28, 2025
6b435d5
Also check in this file
jgainerdewar Jan 28, 2025
c10ae98
Eliminate special GcsCrc32c hash type
jgainerdewar Jan 30, 2025
065ff05
Lazily evaluate hashes
jgainerdewar Jan 30, 2025
df9dccc
Test fixes
jgainerdewar Jan 30, 2025
b82f2ec
Remove defunct tests
jgainerdewar Jan 30, 2025
0a101df
Better handling for hex vs b64 crc32c representations
jgainerdewar Jan 31, 2025
add5a8c
Rename test file
jgainerdewar Jan 31, 2025
de33a85
Comments
jgainerdewar Jan 31, 2025
875f9b5
FileHashStrategy tests
jgainerdewar Jan 31, 2025
16137f2
Imports
jgainerdewar Jan 31, 2025
1966d28
Tests
jgainerdewar Feb 3, 2025
d0ad499
Scalafmt
jgainerdewar Feb 3, 2025
e30b7c2
Remove a few TODOs
jgainerdewar Feb 3, 2025
05876b3
Allow users to configure hash strategy as single string
jgainerdewar Feb 3, 2025
3a9e180
Explanatory comment for unprotected get
jgainerdewar Feb 4, 2025
22cbc1f
Cleanup
jgainerdewar Feb 4, 2025
83c381d
Consistent blob id strings
jgainerdewar Feb 4, 2025
1458942
Ignore failing test
jgainerdewar Feb 4, 2025
c1c9c38
Cleanup
jgainerdewar Feb 4, 2025
ed662f5
Merge branch 'develop' into jd_AN-380_hash
jgainerdewar Feb 4, 2025
b3f04fd
Docs
jgainerdewar Feb 5, 2025
c03f985
Merge branch 'jd_AN-380_hash' of github.com:broadinstitute/cromwell i…
jgainerdewar Feb 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ Cloud Life Sciences (aka `v2Beta`, deprecated) and Google Batch (aka `batch`, re
* Default GPU on GCP Batch is now Nvidia T4
* Updated runtime attributes documentation to clarify that the `nvidiaDriverVersion` key is ignored on GCP Batch.

#### Call Caching Hash Strategy
Users can now configure which algorithm is used to hash files for call caching purposes. See Configuring page in
ReadTheDocs for details. Default behavior is unchanged.

## 87 Release Notes

### GCP Batch
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package cromwell.backend.standard.callcaching

import java.util.concurrent.TimeoutException

import akka.actor.{Actor, ActorLogging, ActorRef, Timers}
import akka.event.LoggingAdapter
import com.typesafe.config.Config
import cromwell.backend.standard.StandardCachingActorHelper
import cromwell.backend.standard.callcaching.RootWorkflowFileHashCacheActor.IoHashCommandWithContext
import cromwell.backend.standard.callcaching.StandardFileHashingActor._
Expand All @@ -12,8 +12,11 @@ import cromwell.core.JobKey
import cromwell.core.callcaching._
import cromwell.core.io._
import cromwell.core.logging.JobLogging
import cromwell.core.path.Path
import net.ceedubs.ficus.Ficus._
import wom.values.WomFile

import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}

/**
Expand Down Expand Up @@ -48,6 +51,10 @@ case class FileHashContext(hashKey: HashKey, file: String)
class DefaultStandardFileHashingActor(standardParams: StandardFileHashingActorParams)
extends StandardFileHashingActor(standardParams) {
override val ioCommandBuilder: IoCommandBuilder = DefaultIoCommandBuilder

override val defaultHashingStrategies: Map[String, FileHashStrategy] = Map(
("drs", FileHashStrategy.Drs)
)
}

object StandardFileHashingActor {
Expand Down Expand Up @@ -80,10 +87,36 @@ abstract class StandardFileHashingActor(standardParams: StandardFileHashingActor
override lazy val serviceRegistryActor: ActorRef = standardParams.serviceRegistryActor
override lazy val configurationDescriptor: BackendConfigurationDescriptor = standardParams.configurationDescriptor

// Child classes can override to set per-filesystem defaults
val defaultHashingStrategies: Map[String, FileHashStrategy] = Map.empty

// Hashing strategy to use if none is specified.
val fallbackHashingStrategy: FileHashStrategy = FileHashStrategy.Md5

// Combines defaultHashingStrategies with user-provided configuration
lazy val hashingStrategies: Map[String, FileHashStrategy] = {
val configuredHashingStrategies = for {
fsConfigs <- configurationDescriptor.backendConfig.as[Option[Config]]("filesystems").toList
fsKey <- fsConfigs.root.keySet().asScala
configKey = s"${fsKey}.caching.hashing-strategy"
// TODO this allows users to override with an empty list to prevent caching, is that desirable or a footgun?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already a global call-caching.enabled flag that seems like a better choice.

I don't it makes sense to let users toggle call caching on a per-filesystem basis because one backend can use multiple filesystems in the same task (e.g. GCS + DRS).

I would make empty list an exception if call-caching.enabled is true.

fileHashStrategyFromList = Try(fsConfigs.as[List[String]](configKey)).toOption
fileHashStrategyFromString = Try(fsConfigs.as[String](configKey)).toOption.map(List(_))
fileHashStrategy <- fileHashStrategyFromList.orElse(fileHashStrategyFromString)
} yield (fsKey, FileHashStrategy.of(fileHashStrategy))

defaultHashingStrategies ++ configuredHashingStrategies

}

protected def ioCommandBuilder: IoCommandBuilder = DefaultIoCommandBuilder

// Used by ConfigBackend for synchronous hashing of local files
def customHashStrategy(fileRequest: SingleFileHashRequest): Option[Try[String]] = None

def hashStrategyForPath(p: Path): FileHashStrategy =
hashingStrategies.getOrElse(p.filesystemTypeKey, fallbackHashingStrategy)

def fileHashingReceive: Receive = {
// Hash Request
case fileRequest: SingleFileHashRequest =>
Expand Down Expand Up @@ -115,8 +148,9 @@ abstract class StandardFileHashingActor(standardParams: StandardFileHashingActor
def asyncHashing(fileRequest: SingleFileHashRequest, replyTo: ActorRef): Unit = {
val fileAsString = fileRequest.file.value
val ioHashCommandTry = for {
gcsPath <- getPath(fileAsString)
command <- ioCommandBuilder.hashCommand(gcsPath)
path <- getPath(fileAsString)
hashStrategy = hashStrategyForPath(path)
command <- ioCommandBuilder.hashCommand(path, hashStrategy)
} yield command
lazy val fileHashContext = FileHashContext(fileRequest.hashKey, fileRequest.file.value)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import akka.actor.Props
import akka.testkit._
import cromwell.backend.standard.callcaching.RootWorkflowFileHashCacheActor.{IoHashCommandWithContext, _}
import cromwell.core.actor.RobustClientHelper.RequestTimeout
import cromwell.core.callcaching.HashKey
import cromwell.core.callcaching.{FileHashStrategy, HashKey}
import cromwell.core.io.DefaultIoCommand.DefaultIoHashCommand
import cromwell.core.io.IoSuccess
import cromwell.core.path.DefaultPathBuilder
Expand All @@ -28,7 +28,7 @@ class RootWorkflowHashCacheActorSpec extends TestKitSuite with ImplicitSender wi
)

val ioHashCommandWithContext =
IoHashCommandWithContext(DefaultIoHashCommand(DefaultPathBuilder.build("").get),
IoHashCommandWithContext(DefaultIoHashCommand(DefaultPathBuilder.build("").get, FileHashStrategy.Md5),
FileHashContext(HashKey(checkForHitOrMiss = false, List.empty), fakeFileName)
)
rootWorkflowFileHashCacheActor ! ioHashCommandWithContext
Expand Down Expand Up @@ -56,7 +56,7 @@ class RootWorkflowHashCacheActorSpec extends TestKitSuite with ImplicitSender wi
)

val ioHashCommandWithContext =
IoHashCommandWithContext(DefaultIoHashCommand(DefaultPathBuilder.build("").get),
IoHashCommandWithContext(DefaultIoHashCommand(DefaultPathBuilder.build("").get, FileHashStrategy.Md5),
FileHashContext(HashKey(checkForHitOrMiss = false, List.empty), fakeFileName)
)
rootWorkflowFileHashCacheActor ! ioHashCommandWithContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package cromwell.backend.standard.callcaching

import akka.actor.{ActorRef, Props}
import akka.testkit._
import com.typesafe.config.ConfigFactory
import cromwell.backend.standard.callcaching.StandardFileHashingActor.SingleFileHashRequest
import cromwell.backend.{BackendConfigurationDescriptor, BackendInitializationData, BackendJobDescriptor}
import cromwell.core.TestKitSuite
import cromwell.core.callcaching.HashingFailedMessage
import cromwell.core.callcaching.{FileHashStrategy, HashingFailedMessage, HashType, SuccessfulHashResultMessage}
import cromwell.core.io.{IoCommand, IoCommandBuilder, IoHashCommand, IoSuccess, PartialIoCommandBuilder}
import cromwell.core.path.{DefaultPathBuilder, Path}
import org.scalatest.flatspec.AnyFlatSpecLike
Expand All @@ -16,7 +17,7 @@ import wom.values.WomSingleFile
import scala.concurrent.TimeoutException
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import scala.util.{Failure, Try}
import scala.util.{Failure, Success, Try}

class StandardFileHashingActorSpec
extends TestKitSuite
Expand Down Expand Up @@ -98,7 +99,7 @@ class StandardFileHashingActorSpec
}
}

it should "handle string hash responses" in {
it should "handle non-string hash responses" in {
val parentProbe = TestProbe("testParentHashString")
val params = StandardFileHashingActorSpec.ioActorParams(ActorRef.noSender)
val props = Props(new StandardFileHashingActor(params) {
Expand All @@ -124,17 +125,89 @@ class StandardFileHashingActorSpec
}
}

it should "handle string hash responses" in {
val parentProbe = TestProbe("testParentHashString")
val params = StandardFileHashingActorSpec.ioActorParams(ActorRef.noSender)
val props = Props(new StandardFileHashingActor(params) {
override lazy val defaultIoTimeout: FiniteDuration = 1.second.dilated

override def getPath(str: String): Try[Path] = Try(DefaultPathBuilder.get(str))
})
val standardFileHashingActorRef = parentProbe.childActorOf(props, "testStandardFileHashingActorHashString")

val fileHashContext = mock[FileHashContext]
fileHashContext.file returns "/expected/failure/path"
val command = mock[IoCommand[String]]
val message: (FileHashContext, IoSuccess[String]) = (fileHashContext, IoSuccess(command, "a_nice_hash"))

standardFileHashingActorRef ! message

parentProbe.expectMsgPF(10.seconds.dilated) {
case succeeded: SuccessfulHashResultMessage =>
succeeded.hashes.map(_.hashValue.value).headOption shouldBe Some("a_nice_hash")
case unexpected => fail(s"received unexpected message $unexpected")
}
}

it should "use the right hashing strategies" in {
val parentProbe = TestProbe("testParentHashStrategies")
val ioActorProbe = TestProbe("ioActorProbe")
val backendConfig = ConfigFactory.parseString(
"""filesystems.gcs.caching.hashing-strategy = ["md5", "identity"]
|filesystems.s3.caching.hashing-strategy = "etag"
|filesystems.http.some-other-config = "foobar"
|filesystems.ftp.caching.hashing-strategy = []""".stripMargin
)
val config = BackendConfigurationDescriptor(backendConfig, ConfigFactory.empty)

val props =
Props(new StandardFileHashingActor(StandardFileHashingActorSpec.ioActorParams(ioActorProbe.ref, config)) {
override val defaultHashingStrategies: Map[String, FileHashStrategy] = Map(
"gcs" -> FileHashStrategy.Crc32c,
"drs" -> FileHashStrategy.Drs
)
override val fallbackHashingStrategy: FileHashStrategy = FileHashStrategy(List(HashType.Sha256))

override def getPath(str: String): Try[Path] = {
val p = mock[Path]
p.filesystemTypeKey returns str
Success(p)
}
})
val standardFileHashingActorRef = parentProbe.childActorOf(props, "testStandardFileHashingActorHashStrategy")

def checkHashStrategy(filesystemKey: String, expectedStrategy: FileHashStrategy): Unit = {
val request = SingleFileHashRequest(null, null, WomSingleFile(filesystemKey), None)
standardFileHashingActorRef ! request
ioActorProbe.expectMsgPF(10.seconds.dilated) {
case (_: FileHashContext, cmd: IoHashCommand) if cmd.hashStrategy == expectedStrategy =>
case unexpected => fail(s"received unexpected ${filesystemKey} message $unexpected")
}
}

checkHashStrategy("gcs", FileHashStrategy(List(HashType.Md5, HashType.Identity)))
checkHashStrategy("s3", FileHashStrategy.ETag)
checkHashStrategy("ftp", FileHashStrategy(List()))
checkHashStrategy("drs", FileHashStrategy.Drs)
checkHashStrategy("http", FileHashStrategy(List(HashType.Sha256)))
checkHashStrategy("blob", FileHashStrategy(List(HashType.Sha256)))
}

}

object StandardFileHashingActorSpec {
private def testing: Nothing = throw new UnsupportedOperationException("should not be run during tests")
private val emptyBackendConfig = BackendConfigurationDescriptor(ConfigFactory.empty, ConfigFactory.empty)

def defaultParams(): StandardFileHashingActorParams = defaultParams(testing, testing, testing, testing, testing)
def defaultParams(config: BackendConfigurationDescriptor = emptyBackendConfig): StandardFileHashingActorParams =
defaultParams(testing, emptyBackendConfig, testing, testing, testing)

def ioActorParams(ioActor: ActorRef): StandardFileHashingActorParams =
def ioActorParams(ioActor: ActorRef,
config: BackendConfigurationDescriptor = emptyBackendConfig
): StandardFileHashingActorParams =
defaultParams(
withJobDescriptor = testing,
withConfigurationDescriptor = testing,
withConfigurationDescriptor = config,
withIoActor = ioActor,
withServiceRegistryActor = testing,
withBackendInitializationDataOption = testing
Expand All @@ -161,4 +234,13 @@ object StandardFileHashingActorSpec {
override def fileHashCachingActor: Option[ActorRef] = None
}

class StrategyTestFileHashingActor(standardParams: StandardFileHashingActorParams)
extends StandardFileHashingActor(standardParams) {
override val defaultHashingStrategies: Map[String, FileHashStrategy] = Map(
"gcs" -> FileHashStrategy.Crc32c,
"drs" -> FileHashStrategy.Drs
)
override val fallbackHashingStrategy: FileHashStrategy = FileHashStrategy(List(HashType.Sha256))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ class DrsCloudNioFileProvider(drsPathResolver: DrsPathResolver, drsReadInterpret
val fileAttributesIO = for {
drsResolverResponse <- drsPathResolver.resolveDrs(drsPath, fields)
sizeOption = drsResolverResponse.size
hashOption = getPreferredHash(drsResolverResponse.hashes)
hashOptions = drsResolverResponse.hashes.getOrElse(Map.empty)
timeCreatedOption <- convertToFileTime(drsPath, DrsResolverField.TimeCreated, drsResolverResponse.timeCreated)
timeUpdatedOption <- convertToFileTime(drsPath, DrsResolverField.TimeUpdated, drsResolverResponse.timeUpdated)
} yield new DrsCloudNioRegularFileAttributes(drsPath, sizeOption, hashOption, timeCreatedOption, timeUpdatedOption)
} yield new DrsCloudNioRegularFileAttributes(drsPath, sizeOption, hashOptions, timeCreatedOption, timeUpdatedOption)

Option(fileAttributesIO.unsafeRunSync())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ package cloud.nio.impl.drs
import java.nio.file.attribute.FileTime
import java.time.{LocalDateTime, OffsetDateTime, ZoneOffset}
import cats.effect.IO
import cloud.nio.spi.HashType.HashType
import cloud.nio.spi.{CloudNioRegularFileAttributes, FileHash, HashType}
import cloud.nio.spi.CloudNioRegularFileAttributes
import org.apache.commons.lang3.exception.ExceptionUtils

class DrsCloudNioRegularFileAttributes(drsPath: String,
sizeOption: Option[Long],
hashOption: Option[FileHash],
val fileHashes: Map[String, String],
timeCreatedOption: Option[FileTime],
timeUpdatedOption: Option[FileTime]
) extends CloudNioRegularFileAttributes {
Expand All @@ -18,30 +17,12 @@ class DrsCloudNioRegularFileAttributes(drsPath: String,

override def size(): Long = sizeOption.getOrElse(0)

override def fileHash: Option[FileHash] = hashOption

override def creationTime(): FileTime = timeCreatedOption.getOrElse(lastModifiedTime())

override def lastModifiedTime(): FileTime = timeUpdatedOption.getOrElse(FileTime.fromMillis(0))
}

object DrsCloudNioRegularFileAttributes {
private val priorityHashList: Seq[(String, HashType)] = Seq(
("crc32c", HashType.Crc32c),
("md5", HashType.Md5),
("sha256", HashType.Sha256),
("etag", HashType.S3Etag)
)

def getPreferredHash(hashesOption: Option[Map[String, String]]): Option[FileHash] =
hashesOption match {
case Some(hashes: Map[String, String]) if hashes.nonEmpty =>
priorityHashList collectFirst {
case (key, hashType) if hashes.contains(key) => FileHash(hashType, hashes(key))
}
// if no preferred hash was found, go ahead and return none because we don't support anything that the DRS object is offering
case _ => None
}

private def convertToOffsetDateTime(timeInString: String): IO[OffsetDateTime] =
// Here timeInString is assumed to be a ISO-8601 DateTime with timezone
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package cloud.nio.impl.drs
import cats.data.NonEmptyList
import cats.effect.IO
import cloud.nio.impl.drs.DrsCloudNioFileProvider.DrsReadInterpreter
import cloud.nio.spi.{FileHash, HashType}
import com.typesafe.config.ConfigFactory
import common.assertion.CromwellTimeoutSpec
import org.apache.http.HttpVersion
Expand Down Expand Up @@ -145,7 +144,7 @@ class DrsCloudNioFileProviderSpec extends AnyFlatSpecLike with CromwellTimeoutSp
drsFileAttributes.creationTime().toMillis should be(123L)
drsFileAttributes.lastModifiedTime().toMillis should be(456L)
drsFileAttributes.size() should be(789L)
drsFileAttributes.fileHash should be(Option(FileHash(HashType.Md5, "gg0217869")))
drsFileAttributes.fileHashes should be(Map("md5" -> "gg0217869"))
}

it should "throw exceptions for unsupported methods" in {
Expand Down
Loading
Loading