Skip to content

Commit

Permalink
Move tiff file to result, to avoid arriving empty on fusemount. #329
Browse files Browse the repository at this point in the history
  • Loading branch information
EmileSonneveld committed Oct 16, 2024
1 parent 4d44e53 commit 9e70298
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ import spire.math.Integral
import spire.syntax.cfor.cfor

import java.nio.channels.FileChannel
import java.nio.file.{FileAlreadyExistsException, Path, Paths}
import java.nio.file.{FileAlreadyExistsException, Files, Path, Paths}
import java.time.Duration
import java.time.format.DateTimeFormatter
import java.util.{ArrayList, Collections, Map, List => JList}
import scala.collection.JavaConverters._
import scala.reflect._
import scala.util.control.Breaks.break

package object geotiff {

Expand Down Expand Up @@ -827,7 +828,7 @@ package object geotiff {
.toList.asJava
}

private def writeGeoTiff(geoTiff: MultibandGeoTiff, path: String): String = {
def writeGeoTiff(geoTiff: MultibandGeoTiff, path: String): String = {
import java.nio.file.Files
if (path.startsWith("s3:/")) {
val correctS3Path = path.replaceFirst("s3:/(?!/)", "s3://")
Expand All @@ -842,17 +843,7 @@ package object geotiff {
// TODO: Try to run fsync on the file opened by GeoTrellis (without the temporary copy)
geoTiff.write(tempFile.toString, optimizedOrder = true)

// Geotrellis writes the file piecewise and sometimes files are only partially written.
// Maybe a move operation is easier for the fusemount:
try {
Files.move(tempFile, Path.of(path))
} catch {
case e: FileAlreadyExistsException =>
logger.info("FileAlreadyExistsException. Will overwrite file: " + e.getMessage)
// The existing file could be a partial result of a previous failing Spark task.
Files.deleteIfExists(Path.of(path))
Files.move(tempFile, Path.of(path))
}
moveOverwriteWithRetries(tempFile, Path.of(path))

// Call fsync on the parent path to assure the fusemount is up-to-date.
// The equivalent of Python's os.fsync
Expand All @@ -863,6 +854,27 @@ package object geotiff {

}

def moveOverwriteWithRetries(oldPath: Path, newPath: Path): Unit = {
var try_count = 1
while (true) {
try {
if (newPath.toFile.exists()) {
// It might be a partial result of a previous failing task.
logger.info(f"Will replace $newPath. (try $try_count)")
}
Files.deleteIfExists(newPath)
Files.move(oldPath, newPath)
break
} catch {
case e: FileAlreadyExistsException =>
// Here if another executor wrote the file between the delete and the move statement.
try_count += 1
if (try_count > 5) {
throw e
}
}
}
}

def uploadToS3(localFile: Path, s3Path: String) = {
val s3Uri = new AmazonS3URI(s3Path)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package org.openeo.geotrellis

import geotrellis.raster.io.geotiff.GeoTiff
import geotrellis.raster.{ByteCellType, ByteUserDefinedNoDataCellType, FloatUserDefinedNoDataCellType, UByteCellType, UByteUserDefinedNoDataCellType}
import org.junit.Assert._
import org.junit.Test
import org.openeo.geotrellis.geotiff._

import java.nio.file.{Files, Path}

class PackageTest {
@Test
Expand All @@ -12,4 +16,23 @@ class PackageTest {
assertEquals(FloatUserDefinedNoDataCellType(42), toSigned(FloatUserDefinedNoDataCellType(42)))
assertEquals(ByteUserDefinedNoDataCellType(42), toSigned(ByteUserDefinedNoDataCellType(42)))
}

@Test
def testFileMove(): Unit = {
val refFile = Thread.currentThread().getContextClassLoader.getResource("org/openeo/geotrellis/Sentinel2FileLayerProvider_multiband_reference.tif")
val refTiff = GeoTiff.readMultiband(refFile.getPath)
val p = Path.of(f"tmp/testFileMove/")
Files.createDirectories(p)

(1 to 20).foreach { i =>
val dst = Path.of(p + f"/$i.tif")
// Limit the amount of parallel jobs to avoid getting over the max retries
(1 to 4).par.foreach { _ =>
writeGeoTiff(refTiff, dst.toString)
assertTrue(Files.exists(dst))
}
val refTiff2 = GeoTiff.readMultiband(dst.toString)
assertEquals(refTiff2.cellSize, refTiff.cellSize)
}
}
}

0 comments on commit 9e70298

Please sign in to comment.