From 3e67e96e5294d161e0ff58f41a8c7fdfbb8b4b44 Mon Sep 17 00:00:00 2001 From: Emile Sonneveld Date: Fri, 18 Oct 2024 00:47:12 +0200 Subject: [PATCH 01/16] Save output from executors to unique folders. Move the successful results to the output folder from driver, and clean up afterwards. https://github.com/Open-EO/openeo-geotrellis-extensions/issues/329 --- .../openeo/geotrellis/geotiff/package.scala | 81 ++++++++++++------- .../geotiff/WriteRDDToGeotiffTest.scala | 17 +++- 2 files changed, 67 insertions(+), 31 deletions(-) diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala index 98ac2c12..6100048a 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala @@ -15,7 +15,7 @@ import geotrellis.spark.pyramid.Pyramid import geotrellis.store.s3._ import geotrellis.util._ import geotrellis.vector.{ProjectedExtent, _} -import org.apache.commons.io.FilenameUtils +import org.apache.commons.io.{FileUtils, FilenameUtils} import org.apache.spark.SparkContext import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD @@ -134,7 +134,7 @@ package object geotiff { val bandSegmentCount = totalCols * totalRows val bandLabels = formatOptions.tags.bandTags.map(_("DESCRIPTION")) - preprocessedRdd.flatMap { case (key: SpaceTimeKey, multibandTile: MultibandTile) => + val res = preprocessedRdd.flatMap { case (key: SpaceTimeKey, multibandTile: MultibandTile) => var bandIndex = -1 //Warning: for deflate compression, the segmentcount and index is not really used, making it stateless. //Not sure how this works out for other types of compression!!! @@ -173,7 +173,12 @@ package object geotiff { val bandIndices = sequence.map(_._3).toSet.toList.asJava val segmentCount = bandSegmentCount * tiffBands - val thePath = Paths.get(path).resolve(filename).toString + + // Each executor writes to a unique folder to avoid conflicts: + val uniqueFolderName = "tmp" + java.lang.Long.toUnsignedString(new java.security.SecureRandom().nextLong()) + val base = Paths.get(path + "/" + uniqueFolderName) + Files.createDirectories(base) + val thePath = base.resolve(filename).toString // filter band tags that match bandIndices val fo = formatOptions.deepClone() @@ -186,8 +191,22 @@ package object geotiff { tileLayout, compression, cellTypes.head, tiffBands, segmentCount, fo, ) (correctedPath, timestamp, croppedExtent, bandIndices) - }.collect().toList.asJava - + }.collect().map({ + case (absolutePath, timestamp, croppedExtent, bandIndices) => + // Move output file to standard location: + val relativePath = Path.of(path).relativize(Path.of(absolutePath)).toString + val destinationPath = Path.of(path).resolve(relativePath.substring(relativePath.indexOf("/") + 1)) + Files.move(Path.of(absolutePath), destinationPath) + (destinationPath.toString, timestamp, croppedExtent, bandIndices) + }).toList.asJava + + // Clean up failed tasks: + Files.list(Path.of(path)).forEach { p => + if (Files.isDirectory(p) && p.getFileName.toString.startsWith("tmp")) { + FileUtils.deleteDirectory(p.toFile) + } + } + res } @@ -232,10 +251,13 @@ package object geotiff { ((name, bandIndex), (key, t)) } } - rdd_per_band.groupByKey().map { case ((name, bandIndex), tiles) => + val res = rdd_per_band.groupByKey().map { case ((name, bandIndex), tiles) => + val uniqueFolderName = "tmp" + java.lang.Long.toUnsignedString(new java.security.SecureRandom().nextLong()) val fixedPath = if (path.endsWith("out")) { - path.substring(0, path.length - 3) + name + val base = path.substring(0, path.length - 3) + uniqueFolderName + "/" + Files.createDirectories(Path.of(base)) + base + name } else { path @@ -248,7 +270,27 @@ package object geotiff { (stitchAndWriteToTiff(tiles, fixedPath, layout, crs, extent, None, None, compression, Some(fo)), Collections.singletonList(bandIndex)) - }.collect().toList.sortBy(_._1).asJava + }.collect().map({ + case (absolutePath, y) => + if (path.endsWith("out")) { + // Move output file to standard location: + val beforeOut = path.substring(0, path.length - "out".length) + val relativePath = Path.of(beforeOut).relativize(Path.of(absolutePath)).toString + val destinationPath = beforeOut + relativePath.substring(relativePath.indexOf("/") + 1) + Files.move(Path.of(absolutePath), Path.of(destinationPath)) + (destinationPath, y) + } else { + (absolutePath, y) + } + }).toList.sortBy(_._1).asJava + // Clean up failed tasks: + val beforeOut = path.substring(0, path.length - "out".length) + Files.list(Path.of(beforeOut)).forEach { p => + if (Files.isDirectory(p) && p.getFileName.toString.startsWith("tmp")) { + FileUtils.deleteDirectory(p.toFile) + } + } + res } else { val tmp = saveRDDGeneric(rdd, bandCount, path, zLevel, cropBounds, formatOptions).asScala tmp.map(t => (t, (0 until bandCount).toList.asJava)).asJava @@ -826,33 +868,16 @@ package object geotiff { } def writeGeoTiff(geoTiff: MultibandGeoTiff, path: String): String = { - import java.nio.file.Files + val tempFile = getTempFile(null, ".tif") + geoTiff.write(tempFile.toString, optimizedOrder = true) + if (path.startsWith("s3:/")) { val correctS3Path = path.replaceFirst("s3:/(?!/)", "s3://") - - - val tempFile = Files.createTempFile(null, null) - geoTiff.write(tempFile.toString, optimizedOrder = true) uploadToS3(tempFile, correctS3Path) - } else { - val tempFile = getTempFile(null, ".tif") - // TODO: Try to run fsync on the file opened by GeoTrellis (without the temporary copy) - geoTiff.write(tempFile.toString, optimizedOrder = true) - - // TODO: Write to unique path instead to avoid collisions between executors. Let the driver choose the paths. moveOverwriteWithRetries(tempFile, Path.of(path)) - - // Call fsync on the parent path to assure the fusemount is up-to-date. - // The equivalent of Python's os.fsync - try { - FileChannel.open(Path.of(path)).force(true) - } catch { - case _: NoSuchFileException => // Ignore. The file may already be deleted by another executor - } path } - } def moveOverwriteWithRetries(oldPath: Path, newPath: Path): Unit = { diff --git a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/WriteRDDToGeotiffTest.scala b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/WriteRDDToGeotiffTest.scala index 01f2d244..bfccf740 100644 --- a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/WriteRDDToGeotiffTest.scala +++ b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/WriteRDDToGeotiffTest.scala @@ -19,7 +19,7 @@ import org.openeo.geotrellis.{LayerFixtures, OpenEOProcesses, ProjectedPolygons} import org.openeo.sparklisteners.GetInfoSparkListener import org.slf4j.{Logger, LoggerFactory} -import java.nio.file.{Files, Paths} +import java.nio.file.{Files, Path, Paths} import java.time.{LocalDate, LocalTime, ZoneOffset, ZonedDateTime} import java.util import java.util.zip.Deflater._ @@ -77,6 +77,7 @@ class WriteRDDToGeotiffTest { val tileLayerRDD = TileLayerRDDBuilders.createMultibandTileLayerRDD(WriteRDDToGeotiffTest.sc,MultibandTile(imageTile),TileLayout(layoutCols,layoutRows,256,256),LatLng) val filename = "out.tif" + Files.deleteIfExists(Path.of(filename)) saveRDD(tileLayerRDD.withContext{_.repartition(layoutCols*layoutRows)},1,filename,formatOptions = allOverviewOptions) @@ -151,6 +152,10 @@ class WriteRDDToGeotiffTest { @Test def testWriteRDD_apply_neighborhood(): Unit ={ + val outDir = Paths.get("tmp/testWriteRDD_apply_neighborhood/") + new Directory(outDir.toFile).deepList().foreach(_.delete()) + Files.createDirectories(outDir) + val layoutCols = 8 val layoutRows = 4 @@ -164,10 +169,11 @@ class WriteRDDToGeotiffTest { val buffered: MultibandTileLayerRDD[SpaceTimeKey] = p.remove_overlap(p.retileGeneric(tileLayerRDD,224,224,16,16),224,224,16,16) val cropBounds = Extent(-115, -65, 5.0, 56) - saveRDDTemporal(buffered,"./",cropBounds = Some(cropBounds)) + saveRDDTemporal(buffered, outDir.toString, cropBounds = Some(cropBounds)) val croppedRaster: Raster[MultibandTile] = tileLayerRDD.toSpatial().stitch().crop(cropBounds) val referenceFile = "croppedRaster.tif" + Files.deleteIfExists(Path.of(referenceFile)) GeoTiff(croppedRaster,LatLng).write(referenceFile) val result = GeoTiff.readMultiband(filename).raster @@ -190,6 +196,7 @@ class WriteRDDToGeotiffTest { val tileLayerRDD = TileLayerRDDBuilders.createMultibandTileLayerRDD(WriteRDDToGeotiffTest.sc,MultibandTile(imageTile,secondBand,thirdBand),TileLayout(layoutCols,layoutRows,256,256),LatLng) val filename = "outRGB.tif" + Files.deleteIfExists(Path.of(filename)) saveRDD(tileLayerRDD.withContext{_.repartition(layoutCols*layoutRows)},3,filename) val result = GeoTiff.readMultiband(filename).raster.tile assertArrayEquals(imageTile.toArray(),result.band(0).toArray()) @@ -216,8 +223,10 @@ class WriteRDDToGeotiffTest { val croppedRaster: Raster[MultibandTile] = tileLayerRDD.stitch().crop(cropBounds) val referenceFile = "croppedRaster.tif" + Files.deleteIfExists(Path.of(referenceFile)) GeoTiff(croppedRaster,LatLng).write(referenceFile) val filename = "outRGBCropped3.tif" + Files.deleteIfExists(Path.of(filename)) saveRDD(tileLayerRDD.withContext{_.repartition(layoutCols*layoutRows)},3,filename,cropBounds = Some(cropBounds)) val result = GeoTiff.readMultiband(filename).raster val reference = GeoTiff.readMultiband(referenceFile).raster @@ -248,6 +257,7 @@ class WriteRDDToGeotiffTest { GeoTiff(croppedRaster,LatLng).write(referenceFile) val filename = "outCropped.tif" + Files.deleteIfExists(Path.of(filename)) saveRDD(tileLayerRDD.withContext{_.repartition(tileLayerRDD.count().toInt)},3,filename,cropBounds = Some(cropBounds)) val resultRaster = GeoTiff.readMultiband(filename).raster @@ -269,6 +279,7 @@ class WriteRDDToGeotiffTest { val tileLayerRDD = TileLayerRDDBuilders.createMultibandTileLayerRDD(WriteRDDToGeotiffTest.sc,MultibandTile(imageTile),TileLayout(layoutCols,layoutRows,256,256),LatLng) val empty = tileLayerRDD.withContext{_.filter(_ => false)} val filename = "outEmpty.tif" + Files.deleteIfExists(Path.of(filename)) val cropBounds = Extent(-115, -65, 5.0, 56) saveRDD(empty,-1,filename,cropBounds = Some(cropBounds)) @@ -311,7 +322,7 @@ class WriteRDDToGeotiffTest { val (imageTile: ByteArrayTile, filtered: MultibandTileLayerRDD[SpatialKey]) = LayerFixtures.createLayerWithGaps(layoutCols, layoutRows) val outDir = Paths.get("tmp/testWriteMultibandRDDWithGapsSeparateAssetPerBand/") - new Directory(outDir.toFile).deepFiles.foreach(_.delete()) + new Directory(outDir.toFile).deepList().foreach(_.delete()) Files.createDirectories(outDir) val filename = outDir + "/out" From 003736c7b8e8034eba1f9103a3040e8b859d54b7 Mon Sep 17 00:00:00 2001 From: Emile Sonneveld Date: Fri, 18 Oct 2024 10:07:22 +0200 Subject: [PATCH 02/16] Fix output folder for tests. --- .../main/scala/org/openeo/geotrellis/geotiff/package.scala | 4 ++-- .../org/openeo/geotrellis/geotiff/WriteRDDToGeotiffTest.scala | 4 ++-- .../org/openeo/geotrellis/layers/FileLayerProviderTest.scala | 4 ++++ 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala index 6100048a..575c968a 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala @@ -193,7 +193,7 @@ package object geotiff { (correctedPath, timestamp, croppedExtent, bandIndices) }.collect().map({ case (absolutePath, timestamp, croppedExtent, bandIndices) => - // Move output file to standard location: + // Move output file to standard location. (On S3, a move is more a copy and delete): val relativePath = Path.of(path).relativize(Path.of(absolutePath)).toString val destinationPath = Path.of(path).resolve(relativePath.substring(relativePath.indexOf("/") + 1)) Files.move(Path.of(absolutePath), destinationPath) @@ -273,7 +273,7 @@ package object geotiff { }.collect().map({ case (absolutePath, y) => if (path.endsWith("out")) { - // Move output file to standard location: + // Move output file to standard location. (On S3, a move is more a copy and delete): val beforeOut = path.substring(0, path.length - "out".length) val relativePath = Path.of(beforeOut).relativize(Path.of(absolutePath)).toString val destinationPath = beforeOut + relativePath.substring(relativePath.indexOf("/") + 1) diff --git a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/WriteRDDToGeotiffTest.scala b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/WriteRDDToGeotiffTest.scala index bfccf740..947c2b7a 100644 --- a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/WriteRDDToGeotiffTest.scala +++ b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/WriteRDDToGeotiffTest.scala @@ -164,7 +164,7 @@ class WriteRDDToGeotiffTest { val tileLayerRDD = LayerFixtures.buildSingleBandSpatioTemporalDataCube(util.Arrays.asList(imageTile),Seq("2017-03-01T00:00:00Z")) - val filename = "openEO_2017-03-01Z.tif" + val filename = outDir + "/openEO_2017-03-01Z.tif" val p = new OpenEOProcesses() val buffered: MultibandTileLayerRDD[SpaceTimeKey] = p.remove_overlap(p.retileGeneric(tileLayerRDD,224,224,16,16),224,224,16,16) @@ -172,7 +172,7 @@ class WriteRDDToGeotiffTest { saveRDDTemporal(buffered, outDir.toString, cropBounds = Some(cropBounds)) val croppedRaster: Raster[MultibandTile] = tileLayerRDD.toSpatial().stitch().crop(cropBounds) - val referenceFile = "croppedRaster.tif" + val referenceFile = outDir + "/croppedRaster.tif" Files.deleteIfExists(Path.of(referenceFile)) GeoTiff(croppedRaster,LatLng).write(referenceFile) diff --git a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/FileLayerProviderTest.scala b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/FileLayerProviderTest.scala index d9396a70..1ec2198f 100644 --- a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/FileLayerProviderTest.scala +++ b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/FileLayerProviderTest.scala @@ -1080,6 +1080,7 @@ class FileLayerProviderTest extends RasterMatchers{ @Test def testPixelValueOffsetNeededCorner(): Unit = { + Files.createDirectories(Paths.get("tmp/")) // This selection will go over a corner that has nodata pixels val layer = testPixelValueOffsetNeeded( "/org/openeo/geotrellis/testPixelValueOffsetNeededCorner.json", @@ -1329,6 +1330,7 @@ class FileLayerProviderTest extends RasterMatchers{ @Test def testMultibandCOGViaSTAC(): Unit = { + Files.createDirectories(Paths.get("tmp/")) val factory = LayerFixtures.STACCOGCollection() val extent = Extent(-162.2501, 70.1839, -161.2879, 70.3401) @@ -1351,6 +1353,7 @@ class FileLayerProviderTest extends RasterMatchers{ @Test def testMultibandCOGViaSTACResample(): Unit = { + Files.createDirectories(Paths.get("tmp/")) val factory = LayerFixtures.STACCOGCollection(resolution = CellSize(10.0,10.0)) val extent = Extent(-162.2501, 70.1839, -161.2879, 70.3401) @@ -1370,6 +1373,7 @@ class FileLayerProviderTest extends RasterMatchers{ @Test def testMultibandCOGViaSTACResampleReadOneBand(): Unit = { + Files.createDirectories(Paths.get("tmp/")) val factory = LayerFixtures.STACCOGCollection(resolution = CellSize(10.0,10.0),util.Arrays.asList("precipitation-flux")) val extent = Extent(-162.2501, 70.1839, -161.2879, 70.3401) From f916512e659af48819bf56753b91b84e19a5f4e5 Mon Sep 17 00:00:00 2001 From: Emile Sonneveld Date: Tue, 29 Oct 2024 10:13:02 +0100 Subject: [PATCH 03/16] Add waitTillPathAvailable when accessing file from driver that was written in executor. https://github.com/Open-EO/openeo-geotrellis-extensions/issues/329 --- .../openeo/geotrellis/geotiff/package.scala | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala index 575c968a..a50b727f 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala @@ -196,6 +196,7 @@ package object geotiff { // Move output file to standard location. (On S3, a move is more a copy and delete): val relativePath = Path.of(path).relativize(Path.of(absolutePath)).toString val destinationPath = Path.of(path).resolve(relativePath.substring(relativePath.indexOf("/") + 1)) + waitTillPathAvailable(Path.of(absolutePath)) Files.move(Path.of(absolutePath), destinationPath) (destinationPath.toString, timestamp, croppedExtent, bandIndices) }).toList.asJava @@ -277,6 +278,7 @@ package object geotiff { val beforeOut = path.substring(0, path.length - "out".length) val relativePath = Path.of(beforeOut).relativize(Path.of(absolutePath)).toString val destinationPath = beforeOut + relativePath.substring(relativePath.indexOf("/") + 1) + waitTillPathAvailable(Path.of(absolutePath)) Files.move(Path.of(absolutePath), Path.of(destinationPath)) (destinationPath, y) } else { @@ -875,11 +877,29 @@ package object geotiff { val correctS3Path = path.replaceFirst("s3:/(?!/)", "s3://") uploadToS3(tempFile, correctS3Path) } else { + // Retry should not be needed at this point, but it is almost free to keep it. moveOverwriteWithRetries(tempFile, Path.of(path)) path } } + private def waitTillPathAvailable(path: Path): Unit = { + var retry = 0 + val maxTries = 20 + while (!path.toFile.exists()) { + if (retry < maxTries) { + retry += 1 + val seconds = 5 + logger.info(f"Waiting for path to be available. Try $retry/$maxTries (sleep:$seconds seconds): $path") + Thread.sleep(seconds * 1000) + } else { + logger.warn(f"Path is not available after $maxTries tries: $path") + // Throw error instead? + return + } + } + } + def moveOverwriteWithRetries(oldPath: Path, newPath: Path): Unit = { var try_count = 1 breakable { From 7a00557094eafa08c6f7d1bbdcfb8f6f04701063 Mon Sep 17 00:00:00 2001 From: Emile Sonneveld Date: Tue, 29 Oct 2024 11:26:59 +0100 Subject: [PATCH 04/16] Use move trick for saveStitchedTileGrid too. https://github.com/Open-EO/openeo-geotrellis-extensions/issues/329 --- .../openeo/geotrellis/geotiff/package.scala | 29 +++++++++++++++---- .../geotrellis/geotiff/TileGridTest.scala | 26 ++++++++++++++--- 2 files changed, 46 insertions(+), 9 deletions(-) diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala index bd899934..b5812425 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala @@ -723,7 +723,7 @@ package object geotiff { val layout = rdd.metadata.layout val crs = rdd.metadata.crs - rdd.flatMap { + val res = rdd.flatMap { case (key, tile) => features.filter { case (_, extent) => val tileBounds = layout.mapTransform(extent) @@ -732,12 +732,31 @@ package object geotiff { ((name, extent), (key, tile)) } }.groupByKey() - .map { case ((name, extent), tiles) => - val filePath = newFilePath(path, name) + .map { case ((tileId, extent), tiles) => + // Each executor writes to a unique folder to avoid conflicts: + val uniqueFolderName = "tmp" + java.lang.Long.toUnsignedString(new java.security.SecureRandom().nextLong()) + val base = Paths.get(Path.of(path).getParent + "/" + uniqueFolderName) + Files.createDirectories(base) + val filePath = base + "/" + newFilePath(Path.of(path).getFileName.toString, tileId) (stitchAndWriteToTiff(tiles, filePath, layout, crs, extent, croppedExtent, cropDimensions, compression), extent) - }.collect() - .toList.asJava + }.collect().map({ + case (absolutePath, croppedExtent) => + // Move output file to standard location. (On S3, a move is more a copy and delete): + val relativePath = Path.of(path).getParent.relativize(Path.of(absolutePath)).toString + val destinationPath = Path.of(path).getParent.resolve(relativePath.substring(relativePath.indexOf("/") + 1)) + waitTillPathAvailable(Path.of(absolutePath)) + Files.move(Path.of(absolutePath), destinationPath) + (destinationPath.toString, croppedExtent) + }).toList.asJava + + // Clean up failed tasks: + Files.list(Path.of(path).getParent).forEach { p => + if (Files.isDirectory(p) && p.getFileName.toString.startsWith("tmp")) { + FileUtils.deleteDirectory(p.toFile) + } + } + res } private def stitchAndWriteToTiff(tiles: Iterable[(SpatialKey, MultibandTile)], filePath: String, diff --git a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/TileGridTest.scala b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/TileGridTest.scala index a32b0259..d1d3945b 100644 --- a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/TileGridTest.scala +++ b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/TileGridTest.scala @@ -1,5 +1,7 @@ package org.openeo.geotrellis.geotiff +import better.files.File.apply + import java.time.LocalTime.MIDNIGHT import java.time.ZoneOffset.UTC import java.time.{LocalDate, ZonedDateTime} @@ -16,8 +18,10 @@ import org.openeo.geotrellis.png.PngTest import org.openeo.geotrellis.tile_grid.TileGrid import org.openeo.geotrellis.{LayerFixtures, geotiff} +import java.nio.file.{Files, Paths} import java.time.format.DateTimeFormatter.ISO_ZONED_DATE_TIME import scala.collection.JavaConverters._ +import scala.reflect.io.Directory object TileGridTest { private var sc: SparkContext = _ @@ -48,6 +52,10 @@ class TileGridTest { @Test def testSaveStitchWithTileGrids(): Unit = { + val outDir = Paths.get("tmp/testSaveStitchWithTileGrids/") + new Directory(outDir.toFile).deepList().foreach(_.delete()) + Files.createDirectories(outDir) + val date = ZonedDateTime.of(LocalDate.of(2020, 4, 5), MIDNIGHT, UTC) val bbox = ProjectedExtent(Extent(1.95, 50.95, 2.05, 51.05), LatLng) @@ -57,8 +65,13 @@ class TileGridTest { .toSpatial() .persist(DISK_ONLY) - val tiles = geotiff.saveStitchedTileGrid(spatialLayer, "/tmp/testSaveStitched.tiff", "10km", DeflateCompression(6)) - val expectedPaths = Set("/tmp/testSaveStitched-31UDS_3_4.tiff", "/tmp/testSaveStitched-31UDS_2_4.tiff", "/tmp/testSaveStitched-31UDS_3_5.tiff", "/tmp/testSaveStitched-31UDS_2_5.tiff") + val tiles = geotiff.saveStitchedTileGrid(spatialLayer, outDir + "/testSaveStitched.tiff", "10km", DeflateCompression(6)) + val expectedPaths = Set( + outDir + "/testSaveStitched-31UDS_3_4.tiff", + outDir + "/testSaveStitched-31UDS_2_4.tiff", + outDir + "/testSaveStitched-31UDS_3_5.tiff", + outDir + "/testSaveStitched-31UDS_2_5.tiff", + ) // TODO: check if extents (in the layer CRS) are 10000m wide/high (in UTM) Assert.assertEquals(expectedPaths, tiles.asScala.map { case (path, _) => path }.toSet) @@ -66,8 +79,13 @@ class TileGridTest { val extent = bbox.reproject(spatialLayer.metadata.crs) val cropBounds = mapAsJavaMap(Map("xmin" -> extent.xmin, "xmax" -> extent.xmax, "ymin" -> extent.ymin, "ymax" -> extent.ymax)) - val croppedTiles = geotiff.saveStitchedTileGrid(spatialLayer, "/tmp/testSaveStitched_cropped.tiff", "10km", cropBounds, DeflateCompression(6)) - val expectedCroppedPaths = Set("/tmp/testSaveStitched_cropped-31UDS_3_4.tiff", "/tmp/testSaveStitched_cropped-31UDS_2_4.tiff", "/tmp/testSaveStitched_cropped-31UDS_3_5.tiff", "/tmp/testSaveStitched_cropped-31UDS_2_5.tiff") + val croppedTiles = geotiff.saveStitchedTileGrid(spatialLayer, outDir + "/testSaveStitched_cropped.tiff", "10km", cropBounds, DeflateCompression(6)) + val expectedCroppedPaths = Set( + outDir + "/testSaveStitched_cropped-31UDS_3_4.tiff", + outDir + "/testSaveStitched_cropped-31UDS_2_4.tiff", + outDir + "/testSaveStitched_cropped-31UDS_3_5.tiff", + outDir + "/testSaveStitched_cropped-31UDS_2_5.tiff", + ) // TODO: also check extents Assert.assertEquals(expectedCroppedPaths, croppedTiles.asScala.map { case (path, _) => path }.toSet) From e81cc7b6d40ef2efcdb8b364a09fbafb3350cb0f Mon Sep 17 00:00:00 2001 From: Emile Sonneveld Date: Tue, 29 Oct 2024 11:27:27 +0100 Subject: [PATCH 05/16] Disable sentinelhub test for the moment. --- .../test/scala/org/openeo/extensions/PyramidFactoryTest.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/geotrellis-extensions/src/test/scala/org/openeo/extensions/PyramidFactoryTest.scala b/geotrellis-extensions/src/test/scala/org/openeo/extensions/PyramidFactoryTest.scala index 85dc6b06..a87339d8 100644 --- a/geotrellis-extensions/src/test/scala/org/openeo/extensions/PyramidFactoryTest.scala +++ b/geotrellis-extensions/src/test/scala/org/openeo/extensions/PyramidFactoryTest.scala @@ -8,12 +8,14 @@ import geotrellis.spark.util.SparkUtils import geotrellis.vector.{Extent, MultiPolygon, ProjectedExtent} import org.apache.spark.SparkContext import org.junit.Test +import org.junit.jupiter.api.Disabled import org.openeo.geotrelliscommon.{DataCubeParameters, ScopedMetadataTracker} import org.openeo.geotrellissentinelhub.{PyramidFactory, SampleType} import java.util.Collections import scala.collection.JavaConverters._ +@Disabled("Avoid 'You have exceeded your rate limit'") class PyramidFactoryTest { @Test From 65046dd3a1751c15a032ae29a1a5c9238f1301fa Mon Sep 17 00:00:00 2001 From: Emile Sonneveld Date: Tue, 29 Oct 2024 12:59:07 +0100 Subject: [PATCH 06/16] Disable sentinelhub test for the moment. --- .../org/openeo/geotrellissentinelhub/PyramidFactoryTest.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/geotrellis-sentinelhub/src/test/scala/org/openeo/geotrellissentinelhub/PyramidFactoryTest.scala b/geotrellis-sentinelhub/src/test/scala/org/openeo/geotrellissentinelhub/PyramidFactoryTest.scala index 48917eec..1e202430 100644 --- a/geotrellis-sentinelhub/src/test/scala/org/openeo/geotrellissentinelhub/PyramidFactoryTest.scala +++ b/geotrellis-sentinelhub/src/test/scala/org/openeo/geotrellissentinelhub/PyramidFactoryTest.scala @@ -134,6 +134,7 @@ object PyramidFactoryTest { ) } +@Disabled("Avoid 'You have exceeded your rate limit'") class PyramidFactoryTest { import PyramidFactoryTest._ From 22327e175983c5a5fdca013092bd301399eb478c Mon Sep 17 00:00:00 2001 From: Emile Sonneveld Date: Thu, 31 Oct 2024 16:03:08 +0100 Subject: [PATCH 07/16] React to MR: Cleanup code. Use @TempDir. Prune imports. https://github.com/Open-EO/openeo-geotrellis-extensions/issues/329 --- .../openeo/geotrellis/geotiff/package.scala | 117 ++++++++++-------- .../geotrellis/geotiff/TileGridTest.scala | 23 ++-- .../geotiff/WriteRDDToGeotiffTest.scala | 15 +-- .../layers/FileLayerProviderTest.scala | 57 ++++----- 4 files changed, 103 insertions(+), 109 deletions(-) diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala index b5812425..d08a85e2 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala @@ -104,6 +104,42 @@ package object geotiff { ret.map(t => (t._1, t._2, t._3)).asJava } + private val executorAttemptDirectoryPrefix = "executorAttemptDirectory" + + private def createExecutorAttemptDirectory(parentDirectory: String): Path = { + createExecutorAttemptDirectory(Path.of(parentDirectory)) + } + + private def createExecutorAttemptDirectory(parentDirectory: Path): Path = { + // Multiple executors with the same task can run at the same time. + // Writing their output to the same path would create a racing condition. + // Let's provide a unique directory for each executor: + val rand = new java.security.SecureRandom().nextLong() + val uniqueFolderName = executorAttemptDirectoryPrefix + java.lang.Long.toUnsignedString(rand) + val executorAttemptDirectory = Paths.get(parentDirectory + "/" + uniqueFolderName) + Files.createDirectories(executorAttemptDirectory) + executorAttemptDirectory + } + + private def moveFromExecutorAttemptDirectory(parentDirectory: Path, absolutePath: String): Path = { + // Move output file to standard location. (On S3, a move is more a copy and delete): + val relativePath = parentDirectory.relativize(Path.of(absolutePath)).toString + if (!relativePath.startsWith(executorAttemptDirectoryPrefix)) throw new Exception() + // Remove the executorAttemptDirectory part from the path: + val destinationPath = parentDirectory.resolve(relativePath.substring(relativePath.indexOf("/") + 1)) + waitTillPathAvailable(Path.of(absolutePath)) + Files.move(Path.of(absolutePath), destinationPath) + destinationPath + } + + private def cleanUpExecutorAttemptDirectory(parentDirectory: Path): Unit = { + Files.list(parentDirectory).forEach { p => + if (Files.isDirectory(p) && p.getFileName.toString.startsWith(executorAttemptDirectoryPrefix)) { + FileUtils.deleteDirectory(p.toFile) + } + } + } + /** * Save temporal rdd, on the executors * @@ -176,10 +212,8 @@ package object geotiff { val segmentCount = bandSegmentCount * tiffBands // Each executor writes to a unique folder to avoid conflicts: - val uniqueFolderName = "tmp" + java.lang.Long.toUnsignedString(new java.security.SecureRandom().nextLong()) - val base = Paths.get(path + "/" + uniqueFolderName) - Files.createDirectories(base) - val thePath = base.resolve(filename).toString + val executorAttemptDirectory = createExecutorAttemptDirectory(path) + val thePath = executorAttemptDirectory.resolve(filename).toString // filter band tags that match bandIndices val fo = formatOptions.deepClone() @@ -192,22 +226,14 @@ package object geotiff { tileLayout, compression, cellTypes.head, tiffBands, segmentCount, fo, ) (correctedPath, timestamp, croppedExtent, bandIndices) - }.collect().map({ + }.collect().map { case (absolutePath, timestamp, croppedExtent, bandIndices) => - // Move output file to standard location. (On S3, a move is more a copy and delete): - val relativePath = Path.of(path).relativize(Path.of(absolutePath)).toString - val destinationPath = Path.of(path).resolve(relativePath.substring(relativePath.indexOf("/") + 1)) - waitTillPathAvailable(Path.of(absolutePath)) - Files.move(Path.of(absolutePath), destinationPath) + val destinationPath = moveFromExecutorAttemptDirectory(Path.of(path), absolutePath) (destinationPath.toString, timestamp, croppedExtent, bandIndices) - }).toList.asJava + }.toList.asJava + + cleanUpExecutorAttemptDirectory(Path.of(path)) - // Clean up failed tasks: - Files.list(Path.of(path)).forEach { p => - if (Files.isDirectory(p) && p.getFileName.toString.startsWith("tmp")) { - FileUtils.deleteDirectory(p.toFile) - } - } res } @@ -254,12 +280,10 @@ package object geotiff { } } val res = rdd_per_band.groupByKey().map { case ((name, bandIndex), tiles) => - val uniqueFolderName = "tmp" + java.lang.Long.toUnsignedString(new java.security.SecureRandom().nextLong()) val fixedPath = if (path.endsWith("out")) { - val base = path.substring(0, path.length - 3) + uniqueFolderName + "/" - Files.createDirectories(Path.of(base)) - base + name + val executorAttemptDirectory = createExecutorAttemptDirectory(path.substring(0, path.length - 3)) + executorAttemptDirectory + "/" + name } else { path @@ -272,27 +296,22 @@ package object geotiff { (stitchAndWriteToTiff(tiles, fixedPath, layout, crs, extent, None, None, compression, Some(fo)), Collections.singletonList(bandIndex)) - }.collect().map({ - case (absolutePath, y) => + }.collect().map { + case (absolutePath, bandIndices) => if (path.endsWith("out")) { - // Move output file to standard location. (On S3, a move is more a copy and delete): val beforeOut = path.substring(0, path.length - "out".length) - val relativePath = Path.of(beforeOut).relativize(Path.of(absolutePath)).toString - val destinationPath = beforeOut + relativePath.substring(relativePath.indexOf("/") + 1) - waitTillPathAvailable(Path.of(absolutePath)) - Files.move(Path.of(absolutePath), Path.of(destinationPath)) - (destinationPath, y) + val destinationPath = moveFromExecutorAttemptDirectory(Path.of(beforeOut), absolutePath) + (destinationPath.toString, bandIndices) } else { - (absolutePath, y) + (absolutePath, bandIndices) } - }).toList.sortBy(_._1).asJava - // Clean up failed tasks: - val beforeOut = path.substring(0, path.length - "out".length) - Files.list(Path.of(beforeOut)).forEach { p => - if (Files.isDirectory(p) && p.getFileName.toString.startsWith("tmp")) { - FileUtils.deleteDirectory(p.toFile) - } + }.toList.sortBy(_._1).asJava + + if (path.endsWith("out")) { + val beforeOut = path.substring(0, path.length - "out".length) + cleanUpExecutorAttemptDirectory(Path.of(beforeOut)) } + res } else { val tmp = saveRDDGeneric(rdd, bandCount, path, zLevel, cropBounds, formatOptions).asScala @@ -734,28 +753,18 @@ package object geotiff { }.groupByKey() .map { case ((tileId, extent), tiles) => // Each executor writes to a unique folder to avoid conflicts: - val uniqueFolderName = "tmp" + java.lang.Long.toUnsignedString(new java.security.SecureRandom().nextLong()) - val base = Paths.get(Path.of(path).getParent + "/" + uniqueFolderName) - Files.createDirectories(base) - val filePath = base + "/" + newFilePath(Path.of(path).getFileName.toString, tileId) + val executorAttemptDirectory = createExecutorAttemptDirectory(Path.of(path).getParent) + val filePath = executorAttemptDirectory + "/" + newFilePath(Path.of(path).getFileName.toString, tileId) (stitchAndWriteToTiff(tiles, filePath, layout, crs, extent, croppedExtent, cropDimensions, compression), extent) - }.collect().map({ + }.collect().map { case (absolutePath, croppedExtent) => - // Move output file to standard location. (On S3, a move is more a copy and delete): - val relativePath = Path.of(path).getParent.relativize(Path.of(absolutePath)).toString - val destinationPath = Path.of(path).getParent.resolve(relativePath.substring(relativePath.indexOf("/") + 1)) - waitTillPathAvailable(Path.of(absolutePath)) - Files.move(Path.of(absolutePath), destinationPath) + val destinationPath = moveFromExecutorAttemptDirectory(Path.of(path).getParent, absolutePath) (destinationPath.toString, croppedExtent) - }).toList.asJava + }.toList.asJava + + cleanUpExecutorAttemptDirectory(Path.of(path).getParent) - // Clean up failed tasks: - Files.list(Path.of(path).getParent).forEach { p => - if (Files.isDirectory(p) && p.getFileName.toString.startsWith("tmp")) { - FileUtils.deleteDirectory(p.toFile) - } - } res } diff --git a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/TileGridTest.scala b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/TileGridTest.scala index d1d3945b..d7cdd3cc 100644 --- a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/TileGridTest.scala +++ b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/TileGridTest.scala @@ -1,10 +1,5 @@ package org.openeo.geotrellis.geotiff -import better.files.File.apply - -import java.time.LocalTime.MIDNIGHT -import java.time.ZoneOffset.UTC -import java.time.{LocalDate, ZonedDateTime} import geotrellis.proj4.{CRS, LatLng} import geotrellis.raster.io.geotiff.compression.DeflateCompression import geotrellis.spark._ @@ -12,21 +7,25 @@ import geotrellis.spark.util.SparkUtils import geotrellis.vector.{Extent, ProjectedExtent} import org.apache.spark.SparkContext import org.apache.spark.storage.StorageLevel.DISK_ONLY -import org.junit._ +import org.junit.jupiter.api.io.TempDir +import org.junit.jupiter.api.{BeforeAll, Test} +import org.junit.{AfterClass, Assert} import org.openeo.geotrellis.LayerFixtures.rgbLayerProvider import org.openeo.geotrellis.png.PngTest import org.openeo.geotrellis.tile_grid.TileGrid import org.openeo.geotrellis.{LayerFixtures, geotiff} -import java.nio.file.{Files, Paths} +import java.nio.file.Path +import java.time.LocalTime.MIDNIGHT +import java.time.ZoneOffset.UTC import java.time.format.DateTimeFormatter.ISO_ZONED_DATE_TIME +import java.time.{LocalDate, ZonedDateTime} import scala.collection.JavaConverters._ -import scala.reflect.io.Directory object TileGridTest { private var sc: SparkContext = _ - @BeforeClass + @BeforeAll def setupSpark(): Unit = { // originally geotrellis.spark.util.SparkUtils.createLocalSparkContext val conf = SparkUtils.createSparkConf @@ -51,11 +50,7 @@ class TileGridTest { import TileGridTest._ @Test - def testSaveStitchWithTileGrids(): Unit = { - val outDir = Paths.get("tmp/testSaveStitchWithTileGrids/") - new Directory(outDir.toFile).deepList().foreach(_.delete()) - Files.createDirectories(outDir) - + def testSaveStitchWithTileGrids(@TempDir outDir: Path): Unit = { val date = ZonedDateTime.of(LocalDate.of(2020, 4, 5), MIDNIGHT, UTC) val bbox = ProjectedExtent(Extent(1.95, 50.95, 2.05, 51.05), LatLng) diff --git a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/WriteRDDToGeotiffTest.scala b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/WriteRDDToGeotiffTest.scala index 947c2b7a..f7c1c5ac 100644 --- a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/WriteRDDToGeotiffTest.scala +++ b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/WriteRDDToGeotiffTest.scala @@ -13,14 +13,15 @@ import geotrellis.vector._ import geotrellis.vector.io.json.GeoJson import org.apache.spark.{SparkConf, SparkContext, SparkEnv} import org.junit.Assert._ -import org.junit._ +import org.junit.jupiter.api.io.TempDir +import org.junit.jupiter.api.{BeforeAll, Test} import org.junit.rules.TemporaryFolder +import org.junit.{AfterClass, Rule} import org.openeo.geotrellis.{LayerFixtures, OpenEOProcesses, ProjectedPolygons} -import org.openeo.sparklisteners.GetInfoSparkListener import org.slf4j.{Logger, LoggerFactory} import java.nio.file.{Files, Path, Paths} -import java.time.{LocalDate, LocalTime, ZoneOffset, ZonedDateTime} +import java.time.{LocalTime, ZoneOffset, ZonedDateTime} import java.util import java.util.zip.Deflater._ import scala.annotation.meta.getter @@ -34,7 +35,7 @@ object WriteRDDToGeotiffTest{ var sc: SparkContext = _ - @BeforeClass + @BeforeAll def setupSpark() = { sc = { val conf = new SparkConf().setMaster("local[2]").setAppName(getClass.getSimpleName) @@ -151,11 +152,7 @@ class WriteRDDToGeotiffTest { } @Test - def testWriteRDD_apply_neighborhood(): Unit ={ - val outDir = Paths.get("tmp/testWriteRDD_apply_neighborhood/") - new Directory(outDir.toFile).deepList().foreach(_.delete()) - Files.createDirectories(outDir) - + def testWriteRDD_apply_neighborhood(@TempDir outDir: Path): Unit = { val layoutCols = 8 val layoutRows = 4 diff --git a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/FileLayerProviderTest.scala b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/FileLayerProviderTest.scala index 1ec2198f..575b14db 100644 --- a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/FileLayerProviderTest.scala +++ b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/FileLayerProviderTest.scala @@ -4,7 +4,6 @@ import cats.data.NonEmptyList import geotrellis.layer.{FloatingLayoutScheme, LayoutTileSource, SpaceTimeKey, SpatialKey, TileLayerMetadata} import geotrellis.proj4.{CRS, LatLng} import geotrellis.raster.gdal.{GDALIOException, GDALRasterSource} -import geotrellis.raster.geotiff.GeoTiffRasterSource import geotrellis.raster.io.geotiff.GeoTiff import geotrellis.raster.resample.{Bilinear, CubicConvolution, ResampleMethod} import geotrellis.raster.summary.polygonal.Summary @@ -19,27 +18,27 @@ import geotrellis.vector._ import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.junit.jupiter.api.Assertions.{assertEquals, assertNotSame, assertSame, assertTrue} -import org.junit.jupiter.api.{AfterAll, BeforeAll, Disabled, Test, Timeout} +import org.junit.jupiter.api.io.TempDir +import org.junit.jupiter.api._ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource import org.openeo.geotrellis.TestImplicits._ import org.openeo.geotrellis.file.PyramidFactory -import org.openeo.geotrellis.layers.FileLayerProvider.rasterSourceRDD import org.openeo.geotrellis.geotiff._ +import org.openeo.geotrellis.layers.FileLayerProvider.rasterSourceRDD import org.openeo.geotrellis.netcdf.{NetCDFOptions, NetCDFRDDWriter} -import org.openeo.geotrellis.{LayerFixtures, OpenEOProcesses, ProjectedPolygons} +import org.openeo.geotrellis.{LayerFixtures, ProjectedPolygons} import org.openeo.geotrelliscommon.DatacubeSupport._ import org.openeo.geotrelliscommon.{ConfigurableSpaceTimePartitioner, DataCubeParameters, DatacubeSupport, NoCloudFilterStrategy, SpaceTimeByMonthPartitioner, SparseSpaceTimePartitioner} import org.openeo.opensearch.OpenSearchResponses.{CreoFeatureCollection, FeatureCollection, Link} import org.openeo.opensearch.backends.CreodiasClient import org.openeo.opensearch.{OpenSearchClient, OpenSearchResponses} import org.openeo.sparklisteners.GetInfoSparkListener -import org.slf4j.LoggerFactory import ucar.nc2.NetcdfFile import ucar.nc2.util.CompareNetcdf2 import java.net.{URI, URL} -import java.nio.file.{Files, Paths} +import java.nio.file.{Files, Path, Paths} import java.time.ZoneOffset.UTC import java.time.{LocalDate, ZoneId, ZonedDateTime} import java.util @@ -47,7 +46,6 @@ import java.util.Formatter import java.util.concurrent.TimeUnit import scala.collection.immutable import scala.io.Source -import scala.reflect.io.Directory object FileLayerProviderTest { private var _sc: Option[SparkContext] = None @@ -1079,8 +1077,7 @@ class FileLayerProviderTest extends RasterMatchers{ } @Test - def testPixelValueOffsetNeededCorner(): Unit = { - Files.createDirectories(Paths.get("tmp/")) + def testPixelValueOffsetNeededCorner(@TempDir outDir: Path): Unit = { // This selection will go over a corner that has nodata pixels val layer = testPixelValueOffsetNeeded( "/org/openeo/geotrellis/testPixelValueOffsetNeededCorner.json", @@ -1088,14 +1085,14 @@ class FileLayerProviderTest extends RasterMatchers{ LocalDate.of(2023, 4, 5), ) val cubeSpatial = layer.toSpatial() - cubeSpatial.writeGeoTiff("tmp/testPixelValueOffsetNeededCorner.tiff") + cubeSpatial.writeGeoTiff(f"$outDir/testPixelValueOffsetNeededCorner.tiff") val arr = cubeSpatial.collect().array assertTrue(isNoData(arr(1)._2.toArrayTile().band(0).get(162, 250))) assertEquals(172, arr(0)._2.toArrayTile().band(0).get(5, 5), 1) } @Test - def testPixelValueOffsetNeededDark(): Unit = { + def testPixelValueOffsetNeededDark(@TempDir outDir: Path): Unit = { // This will cover an area where pixels go under 0 val layer = testPixelValueOffsetNeeded( "/org/openeo/geotrellis/testPixelValueOffsetNeededDark.json", @@ -1103,7 +1100,7 @@ class FileLayerProviderTest extends RasterMatchers{ LocalDate.of(2023, 1, 17), ) val cubeSpatial = layer.toSpatial() - cubeSpatial.writeGeoTiff("tmp/testPixelValueOffsetNeededDark.tiff") + cubeSpatial.writeGeoTiff(f"$outDir/testPixelValueOffsetNeededDark.tiff") val band = cubeSpatial.collect().array(0)._2.toArrayTile().band(0) assertEquals(888, band.get(0, 0), 1) @@ -1123,11 +1120,7 @@ class FileLayerProviderTest extends RasterMatchers{ @Test - def testMissingS2(): Unit = { - val outDir = Paths.get("tmp/FileLayerProviderTest/") - new Directory(outDir.toFile).deleteRecursively() - Files.createDirectories(outDir) - + def testMissingS2(@TempDir outDir: Path): Unit = { val from = ZonedDateTime.parse("2024-03-24T00:00:00Z") val extent = Extent(-162.2501, 70.1839, -161.2879, 70.3401) @@ -1277,8 +1270,7 @@ class FileLayerProviderTest extends RasterMatchers{ } @Test - def testSamplingLoadPerProduct():Unit = { - + def testSamplingLoadPerProduct(@TempDir outDir: Path):Unit = { val srs32631 = "EPSG:32631" val projected_polygons_native_crs = ProjectedPolygons.fromExtent(Extent(703109 - 100, 5600100, 709000, 5610000 - 100), srs32631) val dataCubeParameters = new DataCubeParameters() @@ -1290,16 +1282,16 @@ class FileLayerProviderTest extends RasterMatchers{ val cube = LayerFixtures.sentinel2Cube(LocalDate.of(2023, 4, 5), projected_polygons_native_crs, "/org/openeo/geotrellis/testPixelValueOffsetNeededCorner.json",dataCubeParameters) val opts = new GTiffOptions opts.setFilenamePrefix("load_per_product") - saveRDDTemporal(cube,"./", formatOptions = opts) + saveRDDTemporal(cube,outDir.toString, formatOptions = opts) dataCubeParameters.loadPerProduct = false val cube_ref = LayerFixtures.sentinel2Cube(LocalDate.of(2023, 4, 5), projected_polygons_native_crs, "/org/openeo/geotrellis/testPixelValueOffsetNeededCorner.json",dataCubeParameters) opts.setFilenamePrefix("load_regular") - saveRDDTemporal(cube_ref,"./", formatOptions = opts) + saveRDDTemporal(cube_ref, outDir.toString, formatOptions = opts) - val reference = GeoTiff.readMultiband("./load_regular_2023-04-05Z.tif").raster - val actual = GeoTiff.readMultiband("./load_per_product_2023-04-05Z.tif").raster + val reference = GeoTiff.readMultiband(f"$outDir/load_regular_2023-04-05Z.tif").raster + val actual = GeoTiff.readMultiband(f"$outDir/load_per_product_2023-04-05Z.tif").raster assertRastersEqual(actual,reference) @@ -1329,8 +1321,7 @@ class FileLayerProviderTest extends RasterMatchers{ } @Test - def testMultibandCOGViaSTAC(): Unit = { - Files.createDirectories(Paths.get("tmp/")) + def testMultibandCOGViaSTAC(@TempDir outDir: Path): Unit = { val factory = LayerFixtures.STACCOGCollection() val extent = Extent(-162.2501, 70.1839, -161.2879, 70.3401) @@ -1343,7 +1334,7 @@ class FileLayerProviderTest extends RasterMatchers{ bands.add("temperature-mean") bands.add("precipitation-flux") - val outLocation = "tmp/testMultibandCOGViaSTAC.nc" + val outLocation = f"$outDir/testMultibandCOGViaSTAC.nc" val referenceFile = "https://artifactory.vgt.vito.be/artifactory/testdata-public/openeo/geotrellis_extrensions/testMultibandCOGViaSTAC.nc" writeToNetCDFAndCompare(projected_polygons_native_crs, dataCubeParameters, bands, factory, outLocation, referenceFile) @@ -1352,8 +1343,7 @@ class FileLayerProviderTest extends RasterMatchers{ @Test - def testMultibandCOGViaSTACResample(): Unit = { - Files.createDirectories(Paths.get("tmp/")) + def testMultibandCOGViaSTACResample(@TempDir outDir: Path): Unit = { val factory = LayerFixtures.STACCOGCollection(resolution = CellSize(10.0,10.0)) val extent = Extent(-162.2501, 70.1839, -161.2879, 70.3401) @@ -1368,12 +1358,13 @@ class FileLayerProviderTest extends RasterMatchers{ bands.add("temperature-mean") bands.add("precipitation-flux") - writeToNetCDFAndCompare(projected_polygons_native_crs, dataCubeParameters, bands, factory, "tmp/testMultibandCOGViaSTACResampledCubic.nc", "https://artifactory.vgt.vito.be/artifactory/testdata-public/openeo/geotrellis_extrensions/testMultibandCOGViaSTACResampledCubic.nc") + val referenceFile = "https://artifactory.vgt.vito.be/artifactory/testdata-public/openeo/geotrellis_extrensions/testMultibandCOGViaSTACResampledCubic.nc" + writeToNetCDFAndCompare(projected_polygons_native_crs, dataCubeParameters, bands, factory, + f"$outDir/testMultibandCOGViaSTACResampledCubic.nc", referenceFile) } @Test - def testMultibandCOGViaSTACResampleReadOneBand(): Unit = { - Files.createDirectories(Paths.get("tmp/")) + def testMultibandCOGViaSTACResampleReadOneBand(@TempDir outDir: Path): Unit = { val factory = LayerFixtures.STACCOGCollection(resolution = CellSize(10.0,10.0),util.Arrays.asList("precipitation-flux")) val extent = Extent(-162.2501, 70.1839, -161.2879, 70.3401) @@ -1385,7 +1376,9 @@ class FileLayerProviderTest extends RasterMatchers{ val bands: util.ArrayList[String] = new util.ArrayList[String]() bands.add("precipitation-flux") - writeToNetCDFAndCompare(projected_polygons_native_crs, dataCubeParameters, bands, factory, "tmp/testSinglebandCOGViaSTACResampled.nc", "https://artifactory.vgt.vito.be/artifactory/testdata-public/openeo/geotrellis_extrensions/testSinglebandCOGViaSTACResampled.nc") + val referenceFile = "https://artifactory.vgt.vito.be/artifactory/testdata-public/openeo/geotrellis_extrensions/testSinglebandCOGViaSTACResampled.nc" + writeToNetCDFAndCompare(projected_polygons_native_crs, dataCubeParameters, bands, factory, + f"$outDir/testSinglebandCOGViaSTACResampled.nc", referenceFile) } private def datacubeParams(polygonsAOI: ProjectedPolygons, resampleMethod: ResampleMethod) = { From dccc8383fd6b3e05a2e9e8ebe361fa46fcaad835 Mon Sep 17 00:00:00 2001 From: Emile Sonneveld Date: Thu, 31 Oct 2024 17:25:03 +0100 Subject: [PATCH 08/16] Fix for when using filepath_per_band. Re-enable shub tests. --- .../test/scala/org/openeo/extensions/PyramidFactoryTest.scala | 1 - .../org/openeo/geotrellissentinelhub/PyramidFactoryTest.scala | 1 - .../src/main/scala/org/openeo/geotrellis/geotiff/package.scala | 1 + 3 files changed, 1 insertion(+), 2 deletions(-) diff --git a/geotrellis-extensions/src/test/scala/org/openeo/extensions/PyramidFactoryTest.scala b/geotrellis-extensions/src/test/scala/org/openeo/extensions/PyramidFactoryTest.scala index a87339d8..4768d5ee 100644 --- a/geotrellis-extensions/src/test/scala/org/openeo/extensions/PyramidFactoryTest.scala +++ b/geotrellis-extensions/src/test/scala/org/openeo/extensions/PyramidFactoryTest.scala @@ -15,7 +15,6 @@ import org.openeo.geotrellissentinelhub.{PyramidFactory, SampleType} import java.util.Collections import scala.collection.JavaConverters._ -@Disabled("Avoid 'You have exceeded your rate limit'") class PyramidFactoryTest { @Test diff --git a/geotrellis-sentinelhub/src/test/scala/org/openeo/geotrellissentinelhub/PyramidFactoryTest.scala b/geotrellis-sentinelhub/src/test/scala/org/openeo/geotrellissentinelhub/PyramidFactoryTest.scala index 1e202430..48917eec 100644 --- a/geotrellis-sentinelhub/src/test/scala/org/openeo/geotrellissentinelhub/PyramidFactoryTest.scala +++ b/geotrellis-sentinelhub/src/test/scala/org/openeo/geotrellissentinelhub/PyramidFactoryTest.scala @@ -134,7 +134,6 @@ object PyramidFactoryTest { ) } -@Disabled("Avoid 'You have exceeded your rate limit'") class PyramidFactoryTest { import PyramidFactoryTest._ diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala index 12455919..75a52560 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala @@ -129,6 +129,7 @@ package object geotiff { // Remove the executorAttemptDirectory part from the path: val destinationPath = parentDirectory.resolve(relativePath.substring(relativePath.indexOf("/") + 1)) waitTillPathAvailable(Path.of(absolutePath)) + Files.createDirectories(destinationPath.getParent) Files.move(Path.of(absolutePath), destinationPath) destinationPath } From 41ab3ddd0fbe95dd4429c4d4f7edf6317f6ab251 Mon Sep 17 00:00:00 2001 From: Emile Sonneveld Date: Thu, 31 Oct 2024 19:55:25 +0100 Subject: [PATCH 09/16] Use more @TempDir. Cleanup. --- .../extensions/PyramidFactoryTest.scala | 1 - .../geotiff/WriteRDDToGeotiffTest.scala | 31 ++++++++----------- 2 files changed, 13 insertions(+), 19 deletions(-) diff --git a/geotrellis-extensions/src/test/scala/org/openeo/extensions/PyramidFactoryTest.scala b/geotrellis-extensions/src/test/scala/org/openeo/extensions/PyramidFactoryTest.scala index 4768d5ee..85dc6b06 100644 --- a/geotrellis-extensions/src/test/scala/org/openeo/extensions/PyramidFactoryTest.scala +++ b/geotrellis-extensions/src/test/scala/org/openeo/extensions/PyramidFactoryTest.scala @@ -8,7 +8,6 @@ import geotrellis.spark.util.SparkUtils import geotrellis.vector.{Extent, MultiPolygon, ProjectedExtent} import org.apache.spark.SparkContext import org.junit.Test -import org.junit.jupiter.api.Disabled import org.openeo.geotrelliscommon.{DataCubeParameters, ScopedMetadataTracker} import org.openeo.geotrellissentinelhub.{PyramidFactory, SampleType} diff --git a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/WriteRDDToGeotiffTest.scala b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/WriteRDDToGeotiffTest.scala index 587b21b4..c1cdfdf7 100644 --- a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/WriteRDDToGeotiffTest.scala +++ b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/WriteRDDToGeotiffTest.scala @@ -1,5 +1,6 @@ package org.openeo.geotrellis.geotiff +import better.files.File.apply import geotrellis.layer.{CRSWorldExtent, SpaceTimeKey, SpatialKey, ZoomedLayoutScheme} import geotrellis.proj4.LatLng import geotrellis.raster.io.geotiff.GeoTiff @@ -69,7 +70,7 @@ class WriteRDDToGeotiffTest { @Test - def testWriteRDD(): Unit ={ + def testWriteRDD(@TempDir tempDir: Path): Unit ={ val layoutCols = 8 val layoutRows = 4 @@ -77,8 +78,7 @@ class WriteRDDToGeotiffTest { val imageTile = ByteArrayTile(intImage,layoutCols*256, layoutRows*256) val tileLayerRDD = TileLayerRDDBuilders.createMultibandTileLayerRDD(WriteRDDToGeotiffTest.sc,MultibandTile(imageTile),TileLayout(layoutCols,layoutRows,256,256),LatLng) - val filename = "out.tif" - Files.deleteIfExists(Path.of(filename)) + val filename = (tempDir / "out.tif").toString() saveRDD(tileLayerRDD.withContext{_.repartition(layoutCols*layoutRows)},1,filename,formatOptions = allOverviewOptions) @@ -181,7 +181,7 @@ class WriteRDDToGeotiffTest { } @Test - def testWriteMultibandRDD(): Unit ={ + def testWriteMultibandRDD(@TempDir tempDir: Path): Unit ={ val layoutCols = 8 val layoutRows = 4 @@ -192,8 +192,7 @@ class WriteRDDToGeotiffTest { val thirdBand = imageTile.map{x => if(x >= 5 ) 50 else 200 } val tileLayerRDD = TileLayerRDDBuilders.createMultibandTileLayerRDD(WriteRDDToGeotiffTest.sc,MultibandTile(imageTile,secondBand,thirdBand),TileLayout(layoutCols,layoutRows,256,256),LatLng) - val filename = "outRGB.tif" - Files.deleteIfExists(Path.of(filename)) + val filename = (tempDir / "outRGB.tif").toString() saveRDD(tileLayerRDD.withContext{_.repartition(layoutCols*layoutRows)},3,filename) val result = GeoTiff.readMultiband(filename).raster.tile assertArrayEquals(imageTile.toArray(),result.band(0).toArray()) @@ -203,7 +202,7 @@ class WriteRDDToGeotiffTest { @Test - def testWriteCroppedRDD(): Unit ={ + def testWriteCroppedRDD(@TempDir tempDir: Path): Unit ={ val layoutCols = 8 val layoutRows = 4 @@ -219,11 +218,9 @@ class WriteRDDToGeotiffTest { val cropBounds = Extent(-115, -65, 5.0, 56) val croppedRaster: Raster[MultibandTile] = tileLayerRDD.stitch().crop(cropBounds) - val referenceFile = "croppedRaster.tif" - Files.deleteIfExists(Path.of(referenceFile)) + val referenceFile = (tempDir / "croppedRaster.tif").toString() GeoTiff(croppedRaster,LatLng).write(referenceFile) - val filename = "outRGBCropped3.tif" - Files.deleteIfExists(Path.of(filename)) + val filename = (tempDir / "outRGBCropped3.tif").toString() saveRDD(tileLayerRDD.withContext{_.repartition(layoutCols*layoutRows)},3,filename,cropBounds = Some(cropBounds)) val result = GeoTiff.readMultiband(filename).raster val reference = GeoTiff.readMultiband(referenceFile).raster @@ -234,7 +231,7 @@ class WriteRDDToGeotiffTest { } @Test - def testWriteRDDGlobalLayout(): Unit ={ + def testWriteRDDGlobalLayout(@TempDir tempDir: Path): Unit ={ val layoutCols = 8 val layoutRows = 8 @@ -250,11 +247,10 @@ class WriteRDDToGeotiffTest { val cropBounds = Extent(0, -90, 180, 90) val croppedRaster: Raster[MultibandTile] = tileLayerRDD.stitch().crop(cropBounds) - val referenceFile = "croppedRasterGlobalLayout.tif" + val referenceFile = (tempDir / "croppedRasterGlobalLayout.tif").toString() GeoTiff(croppedRaster,LatLng).write(referenceFile) - val filename = "outCropped.tif" - Files.deleteIfExists(Path.of(filename)) + val filename = (tempDir / "outCropped.tif").toString() saveRDD(tileLayerRDD.withContext{_.repartition(tileLayerRDD.count().toInt)},3,filename,cropBounds = Some(cropBounds)) val resultRaster = GeoTiff.readMultiband(filename).raster @@ -266,7 +262,7 @@ class WriteRDDToGeotiffTest { } @Test - def testWriteEmptyRdd(): Unit ={ + def testWriteEmptyRdd(@TempDir tempDir: Path): Unit ={ val layoutCols = 8 val layoutRows = 4 @@ -275,8 +271,7 @@ class WriteRDDToGeotiffTest { val tileLayerRDD = TileLayerRDDBuilders.createMultibandTileLayerRDD(WriteRDDToGeotiffTest.sc,MultibandTile(imageTile),TileLayout(layoutCols,layoutRows,256,256),LatLng) val empty = tileLayerRDD.withContext{_.filter(_ => false)} - val filename = "outEmpty.tif" - Files.deleteIfExists(Path.of(filename)) + val filename = (tempDir / "outEmpty.tif").toString() val cropBounds = Extent(-115, -65, 5.0, 56) saveRDD(empty,-1,filename,cropBounds = Some(cropBounds)) From f199970970e90644988f2b9c1cf62781100b5d9e Mon Sep 17 00:00:00 2001 From: Emile Sonneveld Date: Mon, 4 Nov 2024 19:29:43 +0100 Subject: [PATCH 10/16] Support using S3 API directly for tiff output. https://github.com/Open-EO/openeo-geotrellis-extensions/issues/329 --- .../openeo/geotrellis/creo/CreoS3Utils.scala | 179 +++++++++++++++++- .../openeo/geotrellis/geotiff/package.scala | 88 ++------- .../org/openeo/geotrellis/png/package.scala | 3 +- .../org/openeo/geotrellis/stac/STACItem.scala | 2 +- 4 files changed, 198 insertions(+), 74 deletions(-) diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/creo/CreoS3Utils.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/creo/CreoS3Utils.scala index 56083219..4b1e0744 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/creo/CreoS3Utils.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/creo/CreoS3Utils.scala @@ -1,19 +1,29 @@ package org.openeo.geotrellis.creo +import geotrellis.store.s3.AmazonS3URI +import org.apache.commons.io.FileUtils import org.openeo.geotrelliss3.S3Utils +import org.slf4j.LoggerFactory import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider} import software.amazon.awssdk.awscore.retry.conditions.RetryOnErrorCodeCondition import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration import software.amazon.awssdk.core.retry.RetryPolicy import software.amazon.awssdk.core.retry.backoff.FullJitterBackoffStrategy import software.amazon.awssdk.core.retry.conditions.{OrRetryCondition, RetryCondition} +import software.amazon.awssdk.core.sync.RequestBody import software.amazon.awssdk.regions.Region +import software.amazon.awssdk.services.s3.model._ import software.amazon.awssdk.services.s3.{S3AsyncClient, S3Client, S3Configuration} import java.net.URI +import java.nio.file.{FileAlreadyExistsException, Files, Path} import java.time.Duration +import scala.collection.JavaConverters._ +import scala.collection.immutable.Iterable +import scala.util.control.Breaks.{break, breakable} object CreoS3Utils { + private val logger = LoggerFactory.getLogger(getClass) private val cloudFerroRegion: Region = Region.of("RegionOne") @@ -66,8 +76,175 @@ object CreoS3Utils { overrideConfig } - def deleteCreoSubFolder(bucket_name: String, subfolder: String) = { + //noinspection ScalaWeakerAccess + def deleteCreoSubFolder(bucket_name: String, subfolder: String): Unit = { val s3Client = getCreoS3Client() S3Utils.deleteSubFolder(s3Client, bucket_name, subfolder) } + + def isS3(path: String): Boolean = { + path.toLowerCase.startsWith("s3:/") + } + + private def toAmazonS3URI(path: String): AmazonS3URI = { + val correctS3Path = path.replaceFirst("(?i)s3:/(?!/)", "s3://") + new AmazonS3URI(correctS3Path) + } + + // In the following functions an asset path could be a local path or an S3 path. + + /** + * S3 does not have folders, so we interpret the path as a prefix. + */ + def assetDeleteFolders(paths: Iterable[String]): Unit = { + for (path <- paths) { + if (isS3(path)) { + val s3Uri = toAmazonS3URI(path) + deleteCreoSubFolder(s3Uri.getBucket, s3Uri.getKey) + } else { + val p = Path.of(path) + if (Files.exists(p)) { + if (Files.isDirectory(p)) { + FileUtils.deleteDirectory(p.toFile) + } else { + throw new IllegalArgumentException(f"Can only delete directory here: $path") + } + } + } + } + } + + def assetDelete(path: String): Unit = { + if (isS3(path)) { + val s3Uri = toAmazonS3URI(path) + val keys = Seq(path) + val deleteObjectsRequest = DeleteObjectsRequest.builder + .bucket(s3Uri.getBucket) + .delete(Delete.builder.objects(keys.map(key => ObjectIdentifier.builder.key(key).build).asJavaCollection).build) + .build + getCreoS3Client().deleteObjects(deleteObjectsRequest) + } else { + val p = Path.of(path) + if (Files.isDirectory(p)) { + throw new IllegalArgumentException(f"Cannot delete directory like this: $path") + } else { + Files.deleteIfExists(p) + } + } + } + + def asseetPathListDirectChildren(path: String): Set[String] = { + if (isS3(path)) { + val s3Uri = toAmazonS3URI(path) + val listObjectsRequest = ListObjectsRequest.builder + .bucket(s3Uri.getBucket) + .prefix(s3Uri.getKey) + .build + val listObjectsResponse = getCreoS3Client().listObjects(listObjectsRequest) + listObjectsResponse.contents.asScala.map(o => f"s3://${s3Uri.getBucket}/${o.key}").toSet + } else { + val list = Files.list(Path.of(path)) + List(list).map(_.toString).toSet + } + } + + def assetExists(path: String): Boolean = { + if (isS3(path)) { + try { + // https://stackoverflow.com/a/56038360/1448736 + val s3Uri = toAmazonS3URI(path) + val objectRequest = HeadObjectRequest.builder + .bucket(s3Uri.getBucket) + .key(s3Uri.getKey) + .build + getCreoS3Client().headObject(objectRequest) + true + } catch { + case _: NoSuchKeyException => false + } + } else { + Files.exists(Path.of(path)) + } + } + + def copyAsset(pathOrigin: String, pathDestination: String): Unit = { + if (isS3(pathOrigin) && isS3(pathDestination)) { + val s3UriOrigin = toAmazonS3URI(pathOrigin) + val s3UriDestination = toAmazonS3URI(pathDestination) + val copyRequest = CopyObjectRequest.builder + .sourceBucket(s3UriOrigin.getBucket) + .sourceKey(s3UriOrigin.getKey) + .destinationBucket(s3UriDestination.getBucket) + .destinationKey(s3UriDestination.getKey) + .build + getCreoS3Client().copyObject(copyRequest) + } else if (!isS3(pathOrigin) && !isS3(pathDestination)) { + Files.copy(Path.of(pathOrigin), Path.of(pathDestination)) + } else if (!isS3(pathOrigin) && isS3(pathDestination)) { + uploadToS3(Path.of(pathOrigin), pathDestination) + } else if (isS3(pathOrigin) && !isS3(pathDestination)) { + // TODO: Download + throw new IllegalArgumentException(f"S3->local not supported here yet ($pathOrigin, $pathDestination)") + } else { + throw new IllegalArgumentException(f"Should be impossible to get here ($pathOrigin, $pathDestination)") + } + } + + def moveAsset(pathOrigin: String, pathDestination: String): Unit = { + // This could be optimized using move when on file system. + copyAsset(pathOrigin, pathDestination) + assetDelete(pathOrigin) + } + + def waitTillPathAvailable(path: Path): Unit = { + var retry = 0 + val maxTries = 20 + while (!assetExists(path.toString)) { + if (retry < maxTries) { + retry += 1 + val seconds = 5 + logger.info(f"Waiting for path to be available. Try $retry/$maxTries (sleep:$seconds seconds): $path") + Thread.sleep(seconds * 1000) + } else { + logger.warn(f"Path is not available after $maxTries tries: $path") + // Throw error instead? + return + } + } + } + + def moveOverwriteWithRetries(oldPath: Path, newPath: Path): Unit = { + var try_count = 1 + breakable { + while (true) { + try { + if (assetExists(newPath.toString)) { + // It might be a partial result of a previous failing task. + logger.info(f"Will replace $newPath. (try $try_count)") + } + Files.deleteIfExists(newPath) + Files.move(oldPath, newPath) + break + } catch { + case e: FileAlreadyExistsException => + // Here if another executor wrote the file between the delete and the move statement. + try_count += 1 + if (try_count > 5) { + throw e + } + } + } + } + } + + def uploadToS3(localFile: Path, s3Path: String) = { + val s3Uri = toAmazonS3URI(s3Path) + val objectRequest = PutObjectRequest.builder + .bucket(s3Uri.getBucket) + .key(s3Uri.getKey) + .build + + getCreoS3Client().putObject(objectRequest, RequestBody.fromFile(localFile)) + s3Path + } } diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala index 75a52560..09c71f56 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala @@ -27,20 +27,16 @@ import org.openeo.geotrellis.netcdf.NetCDFRDDWriter.fixedTimeOffset import org.openeo.geotrellis.stac.STACItem import org.openeo.geotrellis.tile_grid.TileGrid import org.slf4j.LoggerFactory -import software.amazon.awssdk.core.sync.RequestBody -import software.amazon.awssdk.services.s3.model.PutObjectRequest import spire.math.Integral import spire.syntax.cfor.cfor import java.io.IOException -import java.nio.channels.FileChannel -import java.nio.file.{FileAlreadyExistsException, Files, NoSuchFileException, Path, Paths} +import java.nio.file.{Files, Path, Paths} import java.time.Duration import java.time.format.DateTimeFormatter import java.util.{ArrayList, Collections, Map, List => JList} import scala.collection.JavaConverters._ import scala.reflect._ -import scala.util.control.Breaks.{break, breakable} package object geotiff { @@ -118,7 +114,9 @@ package object geotiff { val rand = new java.security.SecureRandom().nextLong() val uniqueFolderName = executorAttemptDirectoryPrefix + java.lang.Long.toUnsignedString(rand) val executorAttemptDirectory = Paths.get(parentDirectory + "/" + uniqueFolderName) - Files.createDirectories(executorAttemptDirectory) + if (!CreoS3Utils.isS3(parentDirectory.toString)) { + Files.createDirectories(executorAttemptDirectory) + } executorAttemptDirectory } @@ -128,18 +126,17 @@ package object geotiff { if (!relativePath.startsWith(executorAttemptDirectoryPrefix)) throw new Exception() // Remove the executorAttemptDirectory part from the path: val destinationPath = parentDirectory.resolve(relativePath.substring(relativePath.indexOf("/") + 1)) - waitTillPathAvailable(Path.of(absolutePath)) - Files.createDirectories(destinationPath.getParent) - Files.move(Path.of(absolutePath), destinationPath) + CreoS3Utils.waitTillPathAvailable(Path.of(absolutePath)) + if (!CreoS3Utils.isS3(parentDirectory.toString)) { + Files.createDirectories(destinationPath.getParent) + } + CreoS3Utils.moveAsset(absolutePath, destinationPath.toString) // TODO: Use move instead of copy destinationPath } - private def cleanUpExecutorAttemptDirectory(parentDirectory: Path): Unit = { - Files.list(parentDirectory).forEach { p => - if (Files.isDirectory(p) && p.getFileName.toString.startsWith(executorAttemptDirectoryPrefix)) { - FileUtils.deleteDirectory(p.toFile) - } - } + private def cleanUpExecutorAttemptDirectory(parentDirectory: String): Unit = { + val list = CreoS3Utils.asseetPathListDirectChildren(parentDirectory).filter(_.contains(executorAttemptDirectoryPrefix)) + CreoS3Utils.assetDeleteFolders(list) } /** @@ -238,7 +235,7 @@ package object geotiff { (destinationPath.toString, timestamp, croppedExtent, bandIndices) }.toList.asJava - cleanUpExecutorAttemptDirectory(Path.of(path)) + cleanUpExecutorAttemptDirectory(path) res } @@ -324,7 +321,7 @@ package object geotiff { if (path.endsWith("out")) { val beforeOut = path.substring(0, path.length - "out".length) - cleanUpExecutorAttemptDirectory(Path.of(beforeOut)) + cleanUpExecutorAttemptDirectory(beforeOut) } res @@ -778,7 +775,7 @@ package object geotiff { (destinationPath.toString, croppedExtent) }.toList.asJava - cleanUpExecutorAttemptDirectory(Path.of(path).getParent) + cleanUpExecutorAttemptDirectory(Path.of(path).getParent.toString) res } @@ -921,65 +918,14 @@ package object geotiff { if (path.startsWith("s3:/")) { val correctS3Path = path.replaceFirst("s3:/(?!/)", "s3://") - uploadToS3(tempFile, correctS3Path) + CreoS3Utils.uploadToS3(tempFile, correctS3Path) } else { // Retry should not be needed at this point, but it is almost free to keep it. - moveOverwriteWithRetries(tempFile, Path.of(path)) + CreoS3Utils.moveOverwriteWithRetries(tempFile, Path.of(path)) path } } - private def waitTillPathAvailable(path: Path): Unit = { - var retry = 0 - val maxTries = 20 - while (!path.toFile.exists()) { - if (retry < maxTries) { - retry += 1 - val seconds = 5 - logger.info(f"Waiting for path to be available. Try $retry/$maxTries (sleep:$seconds seconds): $path") - Thread.sleep(seconds * 1000) - } else { - logger.warn(f"Path is not available after $maxTries tries: $path") - // Throw error instead? - return - } - } - } - - def moveOverwriteWithRetries(oldPath: Path, newPath: Path): Unit = { - var try_count = 1 - breakable { - while (true) { - try { - if (newPath.toFile.exists()) { - // It might be a partial result of a previous failing task. - logger.info(f"Will replace $newPath. (try $try_count)") - } - Files.deleteIfExists(newPath) - Files.move(oldPath, newPath) - break - } catch { - case e: FileAlreadyExistsException => - // Here if another executor wrote the file between the delete and the move statement. - try_count += 1 - if (try_count > 5) { - throw e - } - } - } - } - } - - def uploadToS3(localFile: Path, s3Path: String) = { - val s3Uri = new AmazonS3URI(s3Path) - val objectRequest = PutObjectRequest.builder - .bucket(s3Uri.getBucket) - .key(s3Uri.getKey) - .build - - CreoS3Utils.getCreoS3Client().putObject(objectRequest, RequestBody.fromFile(localFile)) - s3Path - } case class ContextSeq[K, V, M](tiles: Iterable[(K, V)], metadata: LayoutDefinition) extends Seq[(K, V)] with Metadata[LayoutDefinition] { override def length: Int = tiles.size diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/png/package.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/png/package.scala index 1da4f2b4..acd5fcfe 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/png/package.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/png/package.scala @@ -6,7 +6,8 @@ import geotrellis.raster.render.RGBA import geotrellis.raster.{MultibandTile, UByteCellType} import geotrellis.spark._ import geotrellis.vector.{Extent, ProjectedExtent} -import org.openeo.geotrellis.geotiff.{SRDD, uploadToS3} +import org.openeo.geotrellis.creo.CreoS3Utils.uploadToS3 +import org.openeo.geotrellis.geotiff.SRDD import java.io.File import java.nio.file.{Files, Paths} diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/stac/STACItem.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/stac/STACItem.scala index 9abfeaf6..5b39bcfa 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/stac/STACItem.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/stac/STACItem.scala @@ -1,6 +1,6 @@ package org.openeo.geotrellis.stac -import org.openeo.geotrellis.geotiff.uploadToS3 +import org.openeo.geotrellis.creo.CreoS3Utils.uploadToS3 import org.openeo.geotrellis.getTempFile import org.slf4j.LoggerFactory From 2dc7b301a17223ca157e262dd124bd02d37752c9 Mon Sep 17 00:00:00 2001 From: Emile Sonneveld Date: Mon, 4 Nov 2024 20:53:22 +0100 Subject: [PATCH 11/16] Avoid SentinelHubException in CI --- .../geotrellissentinelhub/BatchProcessingServiceTest.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/geotrellis-sentinelhub/src/test/scala/org/openeo/geotrellissentinelhub/BatchProcessingServiceTest.scala b/geotrellis-sentinelhub/src/test/scala/org/openeo/geotrellissentinelhub/BatchProcessingServiceTest.scala index e45d00d1..57bee773 100644 --- a/geotrellis-sentinelhub/src/test/scala/org/openeo/geotrellissentinelhub/BatchProcessingServiceTest.scala +++ b/geotrellis-sentinelhub/src/test/scala/org/openeo/geotrellissentinelhub/BatchProcessingServiceTest.scala @@ -13,6 +13,7 @@ import java.time.LocalTime import scala.annotation.meta.getter import scala.collection.JavaConverters._ +@Ignore("Avoid SentinelHubException in CI") class BatchProcessingServiceTest { private val endpoint = "https://services.sentinel-hub.com" // TODO: this depends on the dataset private val authorizer = new MemoizedAuthApiAccessTokenAuthorizer(Utils.clientId, Utils.clientSecret) From 63a39b5f38d7ce60421643b529102d1acc81a9ad Mon Sep 17 00:00:00 2001 From: Emile Sonneveld Date: Tue, 5 Nov 2024 10:38:02 +0100 Subject: [PATCH 12/16] More cleanup for MR. Fix removing empty executorAttemptDirectories. --- .../openeo/geotrellis/creo/CreoS3Utils.scala | 3 +- .../openeo/geotrellis/geotiff/package.scala | 36 ++++++++++--------- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/creo/CreoS3Utils.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/creo/CreoS3Utils.scala index 4b1e0744..ee0d1c9c 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/creo/CreoS3Utils.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/creo/CreoS3Utils.scala @@ -143,8 +143,7 @@ object CreoS3Utils { val listObjectsResponse = getCreoS3Client().listObjects(listObjectsRequest) listObjectsResponse.contents.asScala.map(o => f"s3://${s3Uri.getBucket}/${o.key}").toSet } else { - val list = Files.list(Path.of(path)) - List(list).map(_.toString).toSet + Files.list(Path.of(path)).toArray.map(_.toString).toSet } } diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala index 09c71f56..acd872a4 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala @@ -120,17 +120,17 @@ package object geotiff { executorAttemptDirectory } - private def moveFromExecutorAttemptDirectory(parentDirectory: Path, absolutePath: String): Path = { + private def moveFromExecutorAttemptDirectory(parentDirectory: Path, absoluteFilePath: String): Path = { // Move output file to standard location. (On S3, a move is more a copy and delete): - val relativePath = parentDirectory.relativize(Path.of(absolutePath)).toString - if (!relativePath.startsWith(executorAttemptDirectoryPrefix)) throw new Exception() + val relativeFilePath = parentDirectory.relativize(Path.of(absoluteFilePath)).toString + if (!relativeFilePath.startsWith(executorAttemptDirectoryPrefix)) throw new Exception() // Remove the executorAttemptDirectory part from the path: - val destinationPath = parentDirectory.resolve(relativePath.substring(relativePath.indexOf("/") + 1)) - CreoS3Utils.waitTillPathAvailable(Path.of(absolutePath)) + val destinationPath = parentDirectory.resolve(relativeFilePath.substring(relativeFilePath.indexOf("/") + 1)) + CreoS3Utils.waitTillPathAvailable(Path.of(absoluteFilePath)) if (!CreoS3Utils.isS3(parentDirectory.toString)) { Files.createDirectories(destinationPath.getParent) } - CreoS3Utils.moveAsset(absolutePath, destinationPath.toString) // TODO: Use move instead of copy + CreoS3Utils.moveAsset(absoluteFilePath, destinationPath.toString) destinationPath } @@ -214,9 +214,9 @@ package object geotiff { // Each executor writes to a unique folder to avoid conflicts: val executorAttemptDirectory = createExecutorAttemptDirectory(path) - val absolutePath = executorAttemptDirectory.resolve(filename) - absolutePath.toFile.getParentFile.mkdirs() - val thePath = absolutePath.toString + val absoluteFilePath = executorAttemptDirectory.resolve(filename) + absoluteFilePath.toFile.getParentFile.mkdirs() + val thePath = absoluteFilePath.toString // filter band tags that match bandIndices val fo = formatOptions.deepClone() @@ -230,8 +230,8 @@ package object geotiff { ) (correctedPath, timestamp, croppedExtent, bandIndices) }.collect().map { - case (absolutePath, timestamp, croppedExtent, bandIndices) => - val destinationPath = moveFromExecutorAttemptDirectory(Path.of(path), absolutePath) + case (absoluteFilePath, timestamp, croppedExtent, bandIndices) => + val destinationPath = moveFromExecutorAttemptDirectory(Path.of(path), absoluteFilePath) (destinationPath.toString, timestamp, croppedExtent, bandIndices) }.toList.asJava @@ -309,13 +309,13 @@ package object geotiff { (stitchAndWriteToTiff(tiles, fixedPath, layout, crs, extent, None, None, compression, Some(fo)), Collections.singletonList(bandIndex)) }.collect().map { - case (absolutePath, bandIndices) => + case (absoluteFilePath, bandIndices) => if (path.endsWith("out")) { val beforeOut = path.substring(0, path.length - "out".length) - val destinationPath = moveFromExecutorAttemptDirectory(Path.of(beforeOut), absolutePath) + val destinationPath = moveFromExecutorAttemptDirectory(Path.of(beforeOut), absoluteFilePath) (destinationPath.toString, bandIndices) } else { - (absolutePath, bandIndices) + (absoluteFilePath, bandIndices) } }.toList.sortBy(_._1).asJava @@ -770,8 +770,8 @@ package object geotiff { (stitchAndWriteToTiff(tiles, filePath, layout, crs, extent, croppedExtent, cropDimensions, compression), extent) }.collect().map { - case (absolutePath, croppedExtent) => - val destinationPath = moveFromExecutorAttemptDirectory(Path.of(path).getParent, absolutePath) + case (absoluteFilePath, croppedExtent) => + val destinationPath = moveFromExecutorAttemptDirectory(Path.of(path).getParent, absoluteFilePath) (destinationPath.toString, croppedExtent) }.toList.asJava @@ -916,7 +916,9 @@ package object geotiff { geoTiff.write(tempFile.toString, optimizedOrder = true) gtiffOptions.foreach(options => embedGdalMetadata(tempFile, options.tagsAsGdalMetadataXml)) - if (path.startsWith("s3:/")) { + if (CreoS3Utils.isS3(path)) { + // Converting to Path and back could change the s3:// prefix to s3:/ + // The following line corrects this: val correctS3Path = path.replaceFirst("s3:/(?!/)", "s3://") CreoS3Utils.uploadToS3(tempFile, correctS3Path) } else { From 6a9bc4bc4f893cfe5aedfc5738f0c8b6baf95067 Mon Sep 17 00:00:00 2001 From: Emile Sonneveld Date: Wed, 6 Nov 2024 22:35:01 +0100 Subject: [PATCH 13/16] Allow override target files again to be sure. --- .../scala/org/openeo/geotrellis/creo/CreoS3Utils.scala | 10 +++++----- .../scala/org/openeo/geotrellis/geotiff/package.scala | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/creo/CreoS3Utils.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/creo/CreoS3Utils.scala index ee0d1c9c..ca8f16b4 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/creo/CreoS3Utils.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/creo/CreoS3Utils.scala @@ -212,20 +212,20 @@ object CreoS3Utils { } } - def moveOverwriteWithRetries(oldPath: Path, newPath: Path): Unit = { + def moveOverwriteWithRetries(oldPath: String, newPath: String): Unit = { var try_count = 1 breakable { while (true) { try { - if (assetExists(newPath.toString)) { + if (assetExists(newPath)) { // It might be a partial result of a previous failing task. logger.info(f"Will replace $newPath. (try $try_count)") + assetDelete(newPath) } - Files.deleteIfExists(newPath) - Files.move(oldPath, newPath) + moveAsset(oldPath, newPath) break } catch { - case e: FileAlreadyExistsException => + case e: Exception => // Here if another executor wrote the file between the delete and the move statement. try_count += 1 if (try_count > 5) { diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala index acd872a4..5d8b2a8f 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala @@ -130,7 +130,7 @@ package object geotiff { if (!CreoS3Utils.isS3(parentDirectory.toString)) { Files.createDirectories(destinationPath.getParent) } - CreoS3Utils.moveAsset(absoluteFilePath, destinationPath.toString) + CreoS3Utils.moveOverwriteWithRetries(absoluteFilePath, destinationPath.toString) destinationPath } @@ -923,7 +923,7 @@ package object geotiff { CreoS3Utils.uploadToS3(tempFile, correctS3Path) } else { // Retry should not be needed at this point, but it is almost free to keep it. - CreoS3Utils.moveOverwriteWithRetries(tempFile, Path.of(path)) + CreoS3Utils.moveOverwriteWithRetries(tempFile.toString, path) path } } From 5a710f451b870a32f063ef7716b87283d5975fd0 Mon Sep 17 00:00:00 2001 From: Emile Sonneveld Date: Thu, 7 Nov 2024 09:30:58 +0100 Subject: [PATCH 14/16] Don't crash when geoTiff.write() does not return a file. https://github.com/Open-EO/openeo-geotrellis-extensions/issues/329#issuecomment-2459513796 --- .../openeo/geotrellis/geotiff/package.scala | 63 ++++++++++++------- 1 file changed, 40 insertions(+), 23 deletions(-) diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala index 5d8b2a8f..eeeee640 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala @@ -120,17 +120,19 @@ package object geotiff { executorAttemptDirectory } - private def moveFromExecutorAttemptDirectory(parentDirectory: Path, absoluteFilePath: String): Path = { + private def moveFromExecutorAttemptDirectory(parentDirectory: Path, absoluteFilePath: String, fileExists: Boolean): Path = { // Move output file to standard location. (On S3, a move is more a copy and delete): val relativeFilePath = parentDirectory.relativize(Path.of(absoluteFilePath)).toString if (!relativeFilePath.startsWith(executorAttemptDirectoryPrefix)) throw new Exception() // Remove the executorAttemptDirectory part from the path: val destinationPath = parentDirectory.resolve(relativeFilePath.substring(relativeFilePath.indexOf("/") + 1)) - CreoS3Utils.waitTillPathAvailable(Path.of(absoluteFilePath)) - if (!CreoS3Utils.isS3(parentDirectory.toString)) { - Files.createDirectories(destinationPath.getParent) + if (fileExists) { + CreoS3Utils.waitTillPathAvailable(Path.of(absoluteFilePath)) + if (!CreoS3Utils.isS3(parentDirectory.toString)) { + Files.createDirectories(destinationPath.getParent) + } + CreoS3Utils.moveOverwriteWithRetries(absoluteFilePath, destinationPath.toString) } - CreoS3Utils.moveOverwriteWithRetries(absoluteFilePath, destinationPath.toString) destinationPath } @@ -225,13 +227,13 @@ package object geotiff { .map { case (bandTags, _) => bandTags } fo.setBandTags(newBandTags) - val correctedPath = writeTiff(thePath, tiffs, gridBounds, croppedExtent, preprocessedRdd.metadata.crs, + val (correctedPath, fileExists) = writeTiff(thePath, tiffs, gridBounds, croppedExtent, preprocessedRdd.metadata.crs, tileLayout, compression, cellTypes.head, tiffBands, segmentCount, fo, ) - (correctedPath, timestamp, croppedExtent, bandIndices) + (correctedPath, fileExists, timestamp, croppedExtent, bandIndices) }.collect().map { - case (absoluteFilePath, timestamp, croppedExtent, bandIndices) => - val destinationPath = moveFromExecutorAttemptDirectory(Path.of(path), absoluteFilePath) + case (absoluteFilePath, fileExists, timestamp, croppedExtent, bandIndices) => + val destinationPath = moveFromExecutorAttemptDirectory(Path.of(path), absoluteFilePath, fileExists) (destinationPath.toString, timestamp, croppedExtent, bandIndices) }.toList.asJava @@ -309,10 +311,10 @@ package object geotiff { (stitchAndWriteToTiff(tiles, fixedPath, layout, crs, extent, None, None, compression, Some(fo)), Collections.singletonList(bandIndex)) }.collect().map { - case (absoluteFilePath, bandIndices) => + case ((absoluteFilePath, fileExists), bandIndices) => if (path.endsWith("out")) { val beforeOut = path.substring(0, path.length - "out".length) - val destinationPath = moveFromExecutorAttemptDirectory(Path.of(beforeOut), absoluteFilePath) + val destinationPath = moveFromExecutorAttemptDirectory(Path.of(beforeOut), absoluteFilePath, fileExists) (destinationPath.toString, bandIndices) } else { (absoluteFilePath, bandIndices) @@ -487,7 +489,7 @@ package object geotiff { val metadata = new STACItem() metadata.asset(fixedPath) metadata.write(stacItemPath) - val finalPath = writeTiff( fixedPath,tiffs, gridBounds, croppedExtent, preprocessedRdd.metadata.crs, preprocessedRdd.metadata.tileLayout, compression, cellType, detectedBandCount, segmentCount,formatOptions = formatOptions, overviews = overviews) + val finalPath = writeTiff( fixedPath,tiffs, gridBounds, croppedExtent, preprocessedRdd.metadata.crs, preprocessedRdd.metadata.tileLayout, compression, cellType, detectedBandCount, segmentCount,formatOptions = formatOptions, overviews = overviews)._1 return Collections.singletonList(finalPath) }finally { preprocessedRdd.unpersist() @@ -644,7 +646,12 @@ package object geotiff { .toList } - private def writeTiff(path: String, tiffs: collection.Map[Int, Array[Byte]], gridBounds: GridBounds[Int], croppedExtent: Extent, crs: CRS, tileLayout: TileLayout, compression: DeflateCompression, cellType: CellType, detectedBandCount: Double, segmentCount: Int, formatOptions:GTiffOptions = new GTiffOptions,overviews: List[GeoTiffMultibandTile] = Nil) = { + private def writeTiff(path: String, tiffs: collection.Map[Int, Array[Byte]], + gridBounds: GridBounds[Int], croppedExtent: Extent, crs: CRS, + tileLayout: TileLayout, compression: DeflateCompression, cellType: CellType, + detectedBandCount: Double, segmentCount: Int, + formatOptions: GTiffOptions = new GTiffOptions, overviews: List[GeoTiffMultibandTile] = Nil + ): (String, Boolean) = { logger.info(s"Writing geotiff to $path with type ${cellType.toString()} and bands $detectedBandCount") val tiffTile: GeoTiffMultibandTile = toTiff(tiffs, gridBounds, tileLayout, compression, cellType, detectedBandCount, segmentCount) val options = if(formatOptions.colorMap.isDefined){ @@ -770,8 +777,8 @@ package object geotiff { (stitchAndWriteToTiff(tiles, filePath, layout, crs, extent, croppedExtent, cropDimensions, compression), extent) }.collect().map { - case (absoluteFilePath, croppedExtent) => - val destinationPath = moveFromExecutorAttemptDirectory(Path.of(path).getParent, absoluteFilePath) + case ((absoluteFilePath, fileExists), croppedExtent) => + val destinationPath = moveFromExecutorAttemptDirectory(Path.of(path).getParent, absoluteFilePath, fileExists) (destinationPath.toString, croppedExtent) }.toList.asJava @@ -784,7 +791,7 @@ package object geotiff { layout: LayoutDefinition, crs: CRS, geometry: Geometry, croppedExtent: Option[Extent], cropDimensions: Option[java.util.ArrayList[Int]], compression: Compression, formatOptions: Option[GTiffOptions] = None - ) = { + ):(String, Boolean) = { val raster: Raster[MultibandTile] = ContextSeq(tiles, layout).stitch() val re = raster.rasterExtent @@ -904,31 +911,41 @@ package object geotiff { val filename = s"${filenamePrefix.getOrElse("openEO")}_${DateTimeFormatter.ISO_DATE.format(time)}_$name.tif" val filePath = Paths.get(path).resolve(filename).toString val timestamp = time format DateTimeFormatter.ISO_ZONED_DATE_TIME - (stitchAndWriteToTiff(tiles, filePath, layout, crs, geometry, croppedExtent, cropDimensions, compression), + (stitchAndWriteToTiff(tiles, filePath, layout, crs, geometry, croppedExtent, cropDimensions, compression)._1, timestamp, geometry.extent) } .collect() .toList.asJava } - def writeGeoTiff(geoTiff: MultibandGeoTiff, path: String, gtiffOptions: Option[GTiffOptions]): String = { + def writeGeoTiff(geoTiff: MultibandGeoTiff, path: String, gtiffOptions: Option[GTiffOptions]): (String, Boolean) = { val tempFile = getTempFile(null, ".tif") geoTiff.write(tempFile.toString, optimizedOrder = true) - gtiffOptions.foreach(options => embedGdalMetadata(tempFile, options.tagsAsGdalMetadataXml)) - + val fileExists = Files.exists(tempFile) + if (fileExists) { + gtiffOptions.foreach(options => embedGdalMetadata(tempFile, options.tagsAsGdalMetadataXml)) + } else { + logger.warn("writeGeoTiff() File was not created: " + path) + } if (CreoS3Utils.isS3(path)) { // Converting to Path and back could change the s3:// prefix to s3:/ // The following line corrects this: val correctS3Path = path.replaceFirst("s3:/(?!/)", "s3://") - CreoS3Utils.uploadToS3(tempFile, correctS3Path) + if (fileExists) { + CreoS3Utils.uploadToS3(tempFile, correctS3Path) + } + (correctS3Path, fileExists) } else { // Retry should not be needed at this point, but it is almost free to keep it. - CreoS3Utils.moveOverwriteWithRetries(tempFile.toString, path) - path + if (fileExists) { + CreoS3Utils.moveOverwriteWithRetries(tempFile.toString, path) + } + (path, fileExists) } } + case class ContextSeq[K, V, M](tiles: Iterable[(K, V)], metadata: LayoutDefinition) extends Seq[(K, V)] with Metadata[LayoutDefinition] { override def length: Int = tiles.size From 5e31712fc1403883da1689d763f288c54bd08bf8 Mon Sep 17 00:00:00 2001 From: Emile Sonneveld Date: Thu, 7 Nov 2024 11:18:42 +0100 Subject: [PATCH 15/16] Fix for TileGridTest.testWriteRDDTileGrid. --- .../src/main/scala/org/openeo/geotrellis/geotiff/package.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala index eeeee640..7b81b7fc 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala @@ -641,7 +641,7 @@ package object geotiff { val segmentCount = bandSegmentCount * detectedBandCount val newPath = newFilePath(path, name) - writeTiff(newPath, tiffs, gridBounds, extent.intersection(croppedExtent).get, preprocessedRdd.metadata.crs, tileLayout, compression, cellType, detectedBandCount, segmentCount) + writeTiff(newPath, tiffs, gridBounds, extent.intersection(croppedExtent).get, preprocessedRdd.metadata.crs, tileLayout, compression, cellType, detectedBandCount, segmentCount)._1 }.collect() .toList } From 7a6a8afcdc793921e579d84b16a7d86edd913438 Mon Sep 17 00:00:00 2001 From: Emile Sonneveld Date: Thu, 7 Nov 2024 15:00:48 +0100 Subject: [PATCH 16/16] Enable test before merge --- .../geotrellissentinelhub/BatchProcessingServiceTest.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/geotrellis-sentinelhub/src/test/scala/org/openeo/geotrellissentinelhub/BatchProcessingServiceTest.scala b/geotrellis-sentinelhub/src/test/scala/org/openeo/geotrellissentinelhub/BatchProcessingServiceTest.scala index 57bee773..e45d00d1 100644 --- a/geotrellis-sentinelhub/src/test/scala/org/openeo/geotrellissentinelhub/BatchProcessingServiceTest.scala +++ b/geotrellis-sentinelhub/src/test/scala/org/openeo/geotrellissentinelhub/BatchProcessingServiceTest.scala @@ -13,7 +13,6 @@ import java.time.LocalTime import scala.annotation.meta.getter import scala.collection.JavaConverters._ -@Ignore("Avoid SentinelHubException in CI") class BatchProcessingServiceTest { private val endpoint = "https://services.sentinel-hub.com" // TODO: this depends on the dataset private val authorizer = new MemoizedAuthApiAccessTokenAuthorizer(Utils.clientId, Utils.clientSecret)