Skip to content

Commit

Permalink
More cleanup for MR. Fix removing empty executorAttemptDirectories.
Browse files Browse the repository at this point in the history
  • Loading branch information
EmileSonneveld committed Nov 5, 2024
1 parent 2dc7b30 commit 63a39b5
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,7 @@ object CreoS3Utils {
val listObjectsResponse = getCreoS3Client().listObjects(listObjectsRequest)
listObjectsResponse.contents.asScala.map(o => f"s3://${s3Uri.getBucket}/${o.key}").toSet
} else {
val list = Files.list(Path.of(path))
List(list).map(_.toString).toSet
Files.list(Path.of(path)).toArray.map(_.toString).toSet
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,17 +120,17 @@ package object geotiff {
executorAttemptDirectory
}

private def moveFromExecutorAttemptDirectory(parentDirectory: Path, absolutePath: String): Path = {
private def moveFromExecutorAttemptDirectory(parentDirectory: Path, absoluteFilePath: String): Path = {
// Move output file to standard location. (On S3, a move is more a copy and delete):
val relativePath = parentDirectory.relativize(Path.of(absolutePath)).toString
if (!relativePath.startsWith(executorAttemptDirectoryPrefix)) throw new Exception()
val relativeFilePath = parentDirectory.relativize(Path.of(absoluteFilePath)).toString
if (!relativeFilePath.startsWith(executorAttemptDirectoryPrefix)) throw new Exception()
// Remove the executorAttemptDirectory part from the path:
val destinationPath = parentDirectory.resolve(relativePath.substring(relativePath.indexOf("/") + 1))
CreoS3Utils.waitTillPathAvailable(Path.of(absolutePath))
val destinationPath = parentDirectory.resolve(relativeFilePath.substring(relativeFilePath.indexOf("/") + 1))
CreoS3Utils.waitTillPathAvailable(Path.of(absoluteFilePath))
if (!CreoS3Utils.isS3(parentDirectory.toString)) {
Files.createDirectories(destinationPath.getParent)
}
CreoS3Utils.moveAsset(absolutePath, destinationPath.toString) // TODO: Use move instead of copy
CreoS3Utils.moveAsset(absoluteFilePath, destinationPath.toString)
destinationPath
}

Expand Down Expand Up @@ -214,9 +214,9 @@ package object geotiff {

// Each executor writes to a unique folder to avoid conflicts:
val executorAttemptDirectory = createExecutorAttemptDirectory(path)
val absolutePath = executorAttemptDirectory.resolve(filename)
absolutePath.toFile.getParentFile.mkdirs()
val thePath = absolutePath.toString
val absoluteFilePath = executorAttemptDirectory.resolve(filename)
absoluteFilePath.toFile.getParentFile.mkdirs()
val thePath = absoluteFilePath.toString

// filter band tags that match bandIndices
val fo = formatOptions.deepClone()
Expand All @@ -230,8 +230,8 @@ package object geotiff {
)
(correctedPath, timestamp, croppedExtent, bandIndices)
}.collect().map {
case (absolutePath, timestamp, croppedExtent, bandIndices) =>
val destinationPath = moveFromExecutorAttemptDirectory(Path.of(path), absolutePath)
case (absoluteFilePath, timestamp, croppedExtent, bandIndices) =>
val destinationPath = moveFromExecutorAttemptDirectory(Path.of(path), absoluteFilePath)
(destinationPath.toString, timestamp, croppedExtent, bandIndices)
}.toList.asJava

Expand Down Expand Up @@ -309,13 +309,13 @@ package object geotiff {
(stitchAndWriteToTiff(tiles, fixedPath, layout, crs, extent, None, None, compression, Some(fo)),
Collections.singletonList(bandIndex))
}.collect().map {
case (absolutePath, bandIndices) =>
case (absoluteFilePath, bandIndices) =>
if (path.endsWith("out")) {
val beforeOut = path.substring(0, path.length - "out".length)
val destinationPath = moveFromExecutorAttemptDirectory(Path.of(beforeOut), absolutePath)
val destinationPath = moveFromExecutorAttemptDirectory(Path.of(beforeOut), absoluteFilePath)
(destinationPath.toString, bandIndices)
} else {
(absolutePath, bandIndices)
(absoluteFilePath, bandIndices)
}
}.toList.sortBy(_._1).asJava

Expand Down Expand Up @@ -770,8 +770,8 @@ package object geotiff {

(stitchAndWriteToTiff(tiles, filePath, layout, crs, extent, croppedExtent, cropDimensions, compression), extent)
}.collect().map {
case (absolutePath, croppedExtent) =>
val destinationPath = moveFromExecutorAttemptDirectory(Path.of(path).getParent, absolutePath)
case (absoluteFilePath, croppedExtent) =>
val destinationPath = moveFromExecutorAttemptDirectory(Path.of(path).getParent, absoluteFilePath)
(destinationPath.toString, croppedExtent)
}.toList.asJava

Expand Down Expand Up @@ -916,7 +916,9 @@ package object geotiff {
geoTiff.write(tempFile.toString, optimizedOrder = true)
gtiffOptions.foreach(options => embedGdalMetadata(tempFile, options.tagsAsGdalMetadataXml))

if (path.startsWith("s3:/")) {
if (CreoS3Utils.isS3(path)) {
// Converting to Path and back could change the s3:// prefix to s3:/
// The following line corrects this:
val correctS3Path = path.replaceFirst("s3:/(?!/)", "s3://")
CreoS3Utils.uploadToS3(tempFile, correctS3Path)
} else {
Expand Down

0 comments on commit 63a39b5

Please sign in to comment.