Skip to content

Commit

Permalink
Don't crash when geoTiff.write() does not return a file. #329 (comment)
Browse files Browse the repository at this point in the history
  • Loading branch information
EmileSonneveld committed Nov 7, 2024
1 parent 6a9bc4b commit 5a710f4
Showing 1 changed file with 40 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,17 +120,19 @@ package object geotiff {
executorAttemptDirectory
}

private def moveFromExecutorAttemptDirectory(parentDirectory: Path, absoluteFilePath: String): Path = {
private def moveFromExecutorAttemptDirectory(parentDirectory: Path, absoluteFilePath: String, fileExists: Boolean): Path = {
// Move output file to standard location. (On S3, a move is more a copy and delete):
val relativeFilePath = parentDirectory.relativize(Path.of(absoluteFilePath)).toString
if (!relativeFilePath.startsWith(executorAttemptDirectoryPrefix)) throw new Exception()
// Remove the executorAttemptDirectory part from the path:
val destinationPath = parentDirectory.resolve(relativeFilePath.substring(relativeFilePath.indexOf("/") + 1))
CreoS3Utils.waitTillPathAvailable(Path.of(absoluteFilePath))
if (!CreoS3Utils.isS3(parentDirectory.toString)) {
Files.createDirectories(destinationPath.getParent)
if (fileExists) {
CreoS3Utils.waitTillPathAvailable(Path.of(absoluteFilePath))
if (!CreoS3Utils.isS3(parentDirectory.toString)) {
Files.createDirectories(destinationPath.getParent)
}
CreoS3Utils.moveOverwriteWithRetries(absoluteFilePath, destinationPath.toString)
}
CreoS3Utils.moveOverwriteWithRetries(absoluteFilePath, destinationPath.toString)
destinationPath
}

Expand Down Expand Up @@ -225,13 +227,13 @@ package object geotiff {
.map { case (bandTags, _) => bandTags }
fo.setBandTags(newBandTags)

val correctedPath = writeTiff(thePath, tiffs, gridBounds, croppedExtent, preprocessedRdd.metadata.crs,
val (correctedPath, fileExists) = writeTiff(thePath, tiffs, gridBounds, croppedExtent, preprocessedRdd.metadata.crs,
tileLayout, compression, cellTypes.head, tiffBands, segmentCount, fo,
)
(correctedPath, timestamp, croppedExtent, bandIndices)
(correctedPath, fileExists, timestamp, croppedExtent, bandIndices)
}.collect().map {
case (absoluteFilePath, timestamp, croppedExtent, bandIndices) =>
val destinationPath = moveFromExecutorAttemptDirectory(Path.of(path), absoluteFilePath)
case (absoluteFilePath, fileExists, timestamp, croppedExtent, bandIndices) =>
val destinationPath = moveFromExecutorAttemptDirectory(Path.of(path), absoluteFilePath, fileExists)
(destinationPath.toString, timestamp, croppedExtent, bandIndices)
}.toList.asJava

Expand Down Expand Up @@ -309,10 +311,10 @@ package object geotiff {
(stitchAndWriteToTiff(tiles, fixedPath, layout, crs, extent, None, None, compression, Some(fo)),
Collections.singletonList(bandIndex))
}.collect().map {
case (absoluteFilePath, bandIndices) =>
case ((absoluteFilePath, fileExists), bandIndices) =>
if (path.endsWith("out")) {
val beforeOut = path.substring(0, path.length - "out".length)
val destinationPath = moveFromExecutorAttemptDirectory(Path.of(beforeOut), absoluteFilePath)
val destinationPath = moveFromExecutorAttemptDirectory(Path.of(beforeOut), absoluteFilePath, fileExists)
(destinationPath.toString, bandIndices)
} else {
(absoluteFilePath, bandIndices)
Expand Down Expand Up @@ -487,7 +489,7 @@ package object geotiff {
val metadata = new STACItem()
metadata.asset(fixedPath)
metadata.write(stacItemPath)
val finalPath = writeTiff( fixedPath,tiffs, gridBounds, croppedExtent, preprocessedRdd.metadata.crs, preprocessedRdd.metadata.tileLayout, compression, cellType, detectedBandCount, segmentCount,formatOptions = formatOptions, overviews = overviews)
val finalPath = writeTiff( fixedPath,tiffs, gridBounds, croppedExtent, preprocessedRdd.metadata.crs, preprocessedRdd.metadata.tileLayout, compression, cellType, detectedBandCount, segmentCount,formatOptions = formatOptions, overviews = overviews)._1
return Collections.singletonList(finalPath)
}finally {
preprocessedRdd.unpersist()
Expand Down Expand Up @@ -644,7 +646,12 @@ package object geotiff {
.toList
}

private def writeTiff(path: String, tiffs: collection.Map[Int, Array[Byte]], gridBounds: GridBounds[Int], croppedExtent: Extent, crs: CRS, tileLayout: TileLayout, compression: DeflateCompression, cellType: CellType, detectedBandCount: Double, segmentCount: Int, formatOptions:GTiffOptions = new GTiffOptions,overviews: List[GeoTiffMultibandTile] = Nil) = {
private def writeTiff(path: String, tiffs: collection.Map[Int, Array[Byte]],
gridBounds: GridBounds[Int], croppedExtent: Extent, crs: CRS,
tileLayout: TileLayout, compression: DeflateCompression, cellType: CellType,
detectedBandCount: Double, segmentCount: Int,
formatOptions: GTiffOptions = new GTiffOptions, overviews: List[GeoTiffMultibandTile] = Nil
): (String, Boolean) = {
logger.info(s"Writing geotiff to $path with type ${cellType.toString()} and bands $detectedBandCount")
val tiffTile: GeoTiffMultibandTile = toTiff(tiffs, gridBounds, tileLayout, compression, cellType, detectedBandCount, segmentCount)
val options = if(formatOptions.colorMap.isDefined){
Expand Down Expand Up @@ -770,8 +777,8 @@ package object geotiff {

(stitchAndWriteToTiff(tiles, filePath, layout, crs, extent, croppedExtent, cropDimensions, compression), extent)
}.collect().map {
case (absoluteFilePath, croppedExtent) =>
val destinationPath = moveFromExecutorAttemptDirectory(Path.of(path).getParent, absoluteFilePath)
case ((absoluteFilePath, fileExists), croppedExtent) =>
val destinationPath = moveFromExecutorAttemptDirectory(Path.of(path).getParent, absoluteFilePath, fileExists)
(destinationPath.toString, croppedExtent)
}.toList.asJava

Expand All @@ -784,7 +791,7 @@ package object geotiff {
layout: LayoutDefinition, crs: CRS, geometry: Geometry,
croppedExtent: Option[Extent], cropDimensions: Option[java.util.ArrayList[Int]],
compression: Compression, formatOptions: Option[GTiffOptions] = None
) = {
):(String, Boolean) = {
val raster: Raster[MultibandTile] = ContextSeq(tiles, layout).stitch()

val re = raster.rasterExtent
Expand Down Expand Up @@ -904,31 +911,41 @@ package object geotiff {
val filename = s"${filenamePrefix.getOrElse("openEO")}_${DateTimeFormatter.ISO_DATE.format(time)}_$name.tif"
val filePath = Paths.get(path).resolve(filename).toString
val timestamp = time format DateTimeFormatter.ISO_ZONED_DATE_TIME
(stitchAndWriteToTiff(tiles, filePath, layout, crs, geometry, croppedExtent, cropDimensions, compression),
(stitchAndWriteToTiff(tiles, filePath, layout, crs, geometry, croppedExtent, cropDimensions, compression)._1,
timestamp, geometry.extent)
}
.collect()
.toList.asJava
}

def writeGeoTiff(geoTiff: MultibandGeoTiff, path: String, gtiffOptions: Option[GTiffOptions]): String = {
def writeGeoTiff(geoTiff: MultibandGeoTiff, path: String, gtiffOptions: Option[GTiffOptions]): (String, Boolean) = {
val tempFile = getTempFile(null, ".tif")
geoTiff.write(tempFile.toString, optimizedOrder = true)
gtiffOptions.foreach(options => embedGdalMetadata(tempFile, options.tagsAsGdalMetadataXml))

val fileExists = Files.exists(tempFile)
if (fileExists) {
gtiffOptions.foreach(options => embedGdalMetadata(tempFile, options.tagsAsGdalMetadataXml))
} else {
logger.warn("writeGeoTiff() File was not created: " + path)
}
if (CreoS3Utils.isS3(path)) {
// Converting to Path and back could change the s3:// prefix to s3:/
// The following line corrects this:
val correctS3Path = path.replaceFirst("s3:/(?!/)", "s3://")
CreoS3Utils.uploadToS3(tempFile, correctS3Path)
if (fileExists) {
CreoS3Utils.uploadToS3(tempFile, correctS3Path)
}
(correctS3Path, fileExists)
} else {
// Retry should not be needed at this point, but it is almost free to keep it.
CreoS3Utils.moveOverwriteWithRetries(tempFile.toString, path)
path
if (fileExists) {
CreoS3Utils.moveOverwriteWithRetries(tempFile.toString, path)
}
(path, fileExists)
}
}



case class ContextSeq[K, V, M](tiles: Iterable[(K, V)], metadata: LayoutDefinition) extends Seq[(K, V)] with Metadata[LayoutDefinition] {
override def length: Int = tiles.size

Expand Down

0 comments on commit 5a710f4

Please sign in to comment.