diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala index 5d8b2a8f..eeeee640 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala @@ -120,17 +120,19 @@ package object geotiff { executorAttemptDirectory } - private def moveFromExecutorAttemptDirectory(parentDirectory: Path, absoluteFilePath: String): Path = { + private def moveFromExecutorAttemptDirectory(parentDirectory: Path, absoluteFilePath: String, fileExists: Boolean): Path = { // Move output file to standard location. (On S3, a move is more a copy and delete): val relativeFilePath = parentDirectory.relativize(Path.of(absoluteFilePath)).toString if (!relativeFilePath.startsWith(executorAttemptDirectoryPrefix)) throw new Exception() // Remove the executorAttemptDirectory part from the path: val destinationPath = parentDirectory.resolve(relativeFilePath.substring(relativeFilePath.indexOf("/") + 1)) - CreoS3Utils.waitTillPathAvailable(Path.of(absoluteFilePath)) - if (!CreoS3Utils.isS3(parentDirectory.toString)) { - Files.createDirectories(destinationPath.getParent) + if (fileExists) { + CreoS3Utils.waitTillPathAvailable(Path.of(absoluteFilePath)) + if (!CreoS3Utils.isS3(parentDirectory.toString)) { + Files.createDirectories(destinationPath.getParent) + } + CreoS3Utils.moveOverwriteWithRetries(absoluteFilePath, destinationPath.toString) } - CreoS3Utils.moveOverwriteWithRetries(absoluteFilePath, destinationPath.toString) destinationPath } @@ -225,13 +227,13 @@ package object geotiff { .map { case (bandTags, _) => bandTags } fo.setBandTags(newBandTags) - val correctedPath = writeTiff(thePath, tiffs, gridBounds, croppedExtent, preprocessedRdd.metadata.crs, + val (correctedPath, fileExists) = writeTiff(thePath, tiffs, gridBounds, croppedExtent, preprocessedRdd.metadata.crs, tileLayout, compression, cellTypes.head, tiffBands, segmentCount, fo, ) - (correctedPath, timestamp, croppedExtent, bandIndices) + (correctedPath, fileExists, timestamp, croppedExtent, bandIndices) }.collect().map { - case (absoluteFilePath, timestamp, croppedExtent, bandIndices) => - val destinationPath = moveFromExecutorAttemptDirectory(Path.of(path), absoluteFilePath) + case (absoluteFilePath, fileExists, timestamp, croppedExtent, bandIndices) => + val destinationPath = moveFromExecutorAttemptDirectory(Path.of(path), absoluteFilePath, fileExists) (destinationPath.toString, timestamp, croppedExtent, bandIndices) }.toList.asJava @@ -309,10 +311,10 @@ package object geotiff { (stitchAndWriteToTiff(tiles, fixedPath, layout, crs, extent, None, None, compression, Some(fo)), Collections.singletonList(bandIndex)) }.collect().map { - case (absoluteFilePath, bandIndices) => + case ((absoluteFilePath, fileExists), bandIndices) => if (path.endsWith("out")) { val beforeOut = path.substring(0, path.length - "out".length) - val destinationPath = moveFromExecutorAttemptDirectory(Path.of(beforeOut), absoluteFilePath) + val destinationPath = moveFromExecutorAttemptDirectory(Path.of(beforeOut), absoluteFilePath, fileExists) (destinationPath.toString, bandIndices) } else { (absoluteFilePath, bandIndices) @@ -487,7 +489,7 @@ package object geotiff { val metadata = new STACItem() metadata.asset(fixedPath) metadata.write(stacItemPath) - val finalPath = writeTiff( fixedPath,tiffs, gridBounds, croppedExtent, preprocessedRdd.metadata.crs, preprocessedRdd.metadata.tileLayout, compression, cellType, detectedBandCount, segmentCount,formatOptions = formatOptions, overviews = overviews) + val finalPath = writeTiff( fixedPath,tiffs, gridBounds, croppedExtent, preprocessedRdd.metadata.crs, preprocessedRdd.metadata.tileLayout, compression, cellType, detectedBandCount, segmentCount,formatOptions = formatOptions, overviews = overviews)._1 return Collections.singletonList(finalPath) }finally { preprocessedRdd.unpersist() @@ -644,7 +646,12 @@ package object geotiff { .toList } - private def writeTiff(path: String, tiffs: collection.Map[Int, Array[Byte]], gridBounds: GridBounds[Int], croppedExtent: Extent, crs: CRS, tileLayout: TileLayout, compression: DeflateCompression, cellType: CellType, detectedBandCount: Double, segmentCount: Int, formatOptions:GTiffOptions = new GTiffOptions,overviews: List[GeoTiffMultibandTile] = Nil) = { + private def writeTiff(path: String, tiffs: collection.Map[Int, Array[Byte]], + gridBounds: GridBounds[Int], croppedExtent: Extent, crs: CRS, + tileLayout: TileLayout, compression: DeflateCompression, cellType: CellType, + detectedBandCount: Double, segmentCount: Int, + formatOptions: GTiffOptions = new GTiffOptions, overviews: List[GeoTiffMultibandTile] = Nil + ): (String, Boolean) = { logger.info(s"Writing geotiff to $path with type ${cellType.toString()} and bands $detectedBandCount") val tiffTile: GeoTiffMultibandTile = toTiff(tiffs, gridBounds, tileLayout, compression, cellType, detectedBandCount, segmentCount) val options = if(formatOptions.colorMap.isDefined){ @@ -770,8 +777,8 @@ package object geotiff { (stitchAndWriteToTiff(tiles, filePath, layout, crs, extent, croppedExtent, cropDimensions, compression), extent) }.collect().map { - case (absoluteFilePath, croppedExtent) => - val destinationPath = moveFromExecutorAttemptDirectory(Path.of(path).getParent, absoluteFilePath) + case ((absoluteFilePath, fileExists), croppedExtent) => + val destinationPath = moveFromExecutorAttemptDirectory(Path.of(path).getParent, absoluteFilePath, fileExists) (destinationPath.toString, croppedExtent) }.toList.asJava @@ -784,7 +791,7 @@ package object geotiff { layout: LayoutDefinition, crs: CRS, geometry: Geometry, croppedExtent: Option[Extent], cropDimensions: Option[java.util.ArrayList[Int]], compression: Compression, formatOptions: Option[GTiffOptions] = None - ) = { + ):(String, Boolean) = { val raster: Raster[MultibandTile] = ContextSeq(tiles, layout).stitch() val re = raster.rasterExtent @@ -904,31 +911,41 @@ package object geotiff { val filename = s"${filenamePrefix.getOrElse("openEO")}_${DateTimeFormatter.ISO_DATE.format(time)}_$name.tif" val filePath = Paths.get(path).resolve(filename).toString val timestamp = time format DateTimeFormatter.ISO_ZONED_DATE_TIME - (stitchAndWriteToTiff(tiles, filePath, layout, crs, geometry, croppedExtent, cropDimensions, compression), + (stitchAndWriteToTiff(tiles, filePath, layout, crs, geometry, croppedExtent, cropDimensions, compression)._1, timestamp, geometry.extent) } .collect() .toList.asJava } - def writeGeoTiff(geoTiff: MultibandGeoTiff, path: String, gtiffOptions: Option[GTiffOptions]): String = { + def writeGeoTiff(geoTiff: MultibandGeoTiff, path: String, gtiffOptions: Option[GTiffOptions]): (String, Boolean) = { val tempFile = getTempFile(null, ".tif") geoTiff.write(tempFile.toString, optimizedOrder = true) - gtiffOptions.foreach(options => embedGdalMetadata(tempFile, options.tagsAsGdalMetadataXml)) - + val fileExists = Files.exists(tempFile) + if (fileExists) { + gtiffOptions.foreach(options => embedGdalMetadata(tempFile, options.tagsAsGdalMetadataXml)) + } else { + logger.warn("writeGeoTiff() File was not created: " + path) + } if (CreoS3Utils.isS3(path)) { // Converting to Path and back could change the s3:// prefix to s3:/ // The following line corrects this: val correctS3Path = path.replaceFirst("s3:/(?!/)", "s3://") - CreoS3Utils.uploadToS3(tempFile, correctS3Path) + if (fileExists) { + CreoS3Utils.uploadToS3(tempFile, correctS3Path) + } + (correctS3Path, fileExists) } else { // Retry should not be needed at this point, but it is almost free to keep it. - CreoS3Utils.moveOverwriteWithRetries(tempFile.toString, path) - path + if (fileExists) { + CreoS3Utils.moveOverwriteWithRetries(tempFile.toString, path) + } + (path, fileExists) } } + case class ContextSeq[K, V, M](tiles: Iterable[(K, V)], metadata: LayoutDefinition) extends Seq[(K, V)] with Metadata[LayoutDefinition] { override def length: Int = tiles.size