diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala index b6cc19226..6d98706a9 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala @@ -33,12 +33,13 @@ import spire.math.Integral import spire.syntax.cfor.cfor import java.nio.channels.FileChannel -import java.nio.file.{FileAlreadyExistsException, Path, Paths} +import java.nio.file.{FileAlreadyExistsException, Files, Path, Paths} import java.time.Duration import java.time.format.DateTimeFormatter import java.util.{ArrayList, Collections, Map, List => JList} import scala.collection.JavaConverters._ import scala.reflect._ +import scala.util.control.Breaks.break package object geotiff { @@ -827,7 +828,7 @@ package object geotiff { .toList.asJava } - private def writeGeoTiff(geoTiff: MultibandGeoTiff, path: String): String = { + def writeGeoTiff(geoTiff: MultibandGeoTiff, path: String): String = { import java.nio.file.Files if (path.startsWith("s3:/")) { val correctS3Path = path.replaceFirst("s3:/(?!/)", "s3://") @@ -842,17 +843,7 @@ package object geotiff { // TODO: Try to run fsync on the file opened by GeoTrellis (without the temporary copy) geoTiff.write(tempFile.toString, optimizedOrder = true) - // Geotrellis writes the file piecewise and sometimes files are only partially written. - // Maybe a move operation is easier for the fusemount: - try { - Files.move(tempFile, Path.of(path)) - } catch { - case e: FileAlreadyExistsException => - logger.info("FileAlreadyExistsException. Will overwrite file: " + e.getMessage) - // The existing file could be a partial result of a previous failing Spark task. - Files.deleteIfExists(Path.of(path)) - Files.move(tempFile, Path.of(path)) - } + moveOverwriteWithRetries(tempFile, Path.of(path)) // Call fsync on the parent path to assure the fusemount is up-to-date. // The equivalent of Python's os.fsync @@ -863,6 +854,27 @@ package object geotiff { } + def moveOverwriteWithRetries(oldPath: Path, newPath: Path): Unit = { + var try_count = 1 + while (true) { + try { + if (newPath.toFile.exists()) { + // It might be a partial result of a previous failing task. + logger.info(f"Will replace $newPath. (try $try_count)") + } + Files.deleteIfExists(newPath) + Files.move(oldPath, newPath) + break + } catch { + case e: FileAlreadyExistsException => + // Here if another executor wrote the file between the delete and the move statement. + try_count += 1 + if (try_count > 5) { + throw e + } + } + } + } def uploadToS3(localFile: Path, s3Path: String) = { val s3Uri = new AmazonS3URI(s3Path) diff --git a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/PackageTest.scala b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/PackageTest.scala index 1eae19c94..316d9dbf7 100644 --- a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/PackageTest.scala +++ b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/PackageTest.scala @@ -1,8 +1,12 @@ package org.openeo.geotrellis +import geotrellis.raster.io.geotiff.GeoTiff import geotrellis.raster.{ByteCellType, ByteUserDefinedNoDataCellType, FloatUserDefinedNoDataCellType, UByteCellType, UByteUserDefinedNoDataCellType} import org.junit.Assert._ import org.junit.Test +import org.openeo.geotrellis.geotiff._ + +import java.nio.file.{Files, Path} class PackageTest { @Test @@ -12,4 +16,23 @@ class PackageTest { assertEquals(FloatUserDefinedNoDataCellType(42), toSigned(FloatUserDefinedNoDataCellType(42))) assertEquals(ByteUserDefinedNoDataCellType(42), toSigned(ByteUserDefinedNoDataCellType(42))) } + + @Test + def testFileMove(): Unit = { + val refFile = Thread.currentThread().getContextClassLoader.getResource("org/openeo/geotrellis/Sentinel2FileLayerProvider_multiband_reference.tif") + val refTiff = GeoTiff.readMultiband(refFile.getPath) + val p = Path.of(f"tmp/testFileMove/") + Files.createDirectories(p) + + (1 to 20).foreach { i => + val dst = Path.of(p + f"/$i.tif") + // Limit the amount of parallel jobs to avoid getting over the max retries + (1 to 4).par.foreach { _ => + writeGeoTiff(refTiff, dst.toString) + assertTrue(Files.exists(dst)) + } + val refTiff2 = GeoTiff.readMultiband(dst.toString) + assertEquals(refTiff2.cellSize, refTiff.cellSize) + } + } }