Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Per executor output v02 #337

Merged
merged 20 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
3e67e96
Save output from executors to unique folders. Move the successful res…
EmileSonneveld Oct 17, 2024
fd6235a
Merge branch 'develop' into per_executor_output_v02
EmileSonneveld Oct 17, 2024
003736c
Fix output folder for tests.
EmileSonneveld Oct 18, 2024
f916512
Add waitTillPathAvailable when accessing file from driver that was wr…
EmileSonneveld Oct 29, 2024
bf32624
Merge branch 'develop' into per_executor_output_v02
EmileSonneveld Oct 29, 2024
7a00557
Use move trick for saveStitchedTileGrid too. https://github.com/Open-…
EmileSonneveld Oct 29, 2024
e81cc7b
Disable sentinelhub test for the moment.
EmileSonneveld Oct 29, 2024
65046dd
Disable sentinelhub test for the moment.
EmileSonneveld Oct 29, 2024
22327e1
React to MR: Cleanup code. Use @TempDir. Prune imports. https://githu…
EmileSonneveld Oct 31, 2024
d847b26
Merge branch 'develop' into per_executor_output_v02
EmileSonneveld Oct 31, 2024
dccc838
Fix for when using filepath_per_band. Re-enable shub tests.
EmileSonneveld Oct 31, 2024
41ab3dd
Use more @TempDir. Cleanup.
EmileSonneveld Oct 31, 2024
f199970
Support using S3 API directly for tiff output. https://github.com/Ope…
EmileSonneveld Nov 4, 2024
2dc7b30
Avoid SentinelHubException in CI
EmileSonneveld Nov 4, 2024
63a39b5
More cleanup for MR. Fix removing empty executorAttemptDirectories.
EmileSonneveld Nov 5, 2024
6a9bc4b
Allow override target files again to be sure.
EmileSonneveld Nov 6, 2024
5a710f4
Don't crash when geoTiff.write() does not return a file. https://gith…
EmileSonneveld Nov 7, 2024
5e31712
Fix for TileGridTest.testWriteRDDTileGrid.
EmileSonneveld Nov 7, 2024
a68be18
Merge branch 'develop' into per_executor_output_v02
EmileSonneveld Nov 7, 2024
7a6a8af
Enable test before merge
EmileSonneveld Nov 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import geotrellis.spark.util.SparkUtils
import geotrellis.vector.{Extent, MultiPolygon, ProjectedExtent}
import org.apache.spark.SparkContext
import org.junit.Test
import org.junit.jupiter.api.Disabled
import org.openeo.geotrelliscommon.{DataCubeParameters, ScopedMetadataTracker}
import org.openeo.geotrellissentinelhub.{PyramidFactory, SampleType}

import java.util.Collections
import scala.collection.JavaConverters._

@Disabled("Avoid 'You have exceeded your rate limit'")
EmileSonneveld marked this conversation as resolved.
Show resolved Hide resolved
class PyramidFactoryTest {

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ object PyramidFactoryTest {
)
}

@Disabled("Avoid 'You have exceeded your rate limit'")
EmileSonneveld marked this conversation as resolved.
Show resolved Hide resolved
class PyramidFactoryTest {
import PyramidFactoryTest._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import geotrellis.spark.pyramid.Pyramid
import geotrellis.store.s3._
import geotrellis.util._
import geotrellis.vector.{ProjectedExtent, _}
import org.apache.commons.io.FilenameUtils
import org.apache.commons.io.{FileUtils, FilenameUtils}
import org.apache.spark.SparkContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -135,7 +135,7 @@ package object geotiff {
val bandSegmentCount = totalCols * totalRows
val bandLabels = formatOptions.tags.bandTags.map(_("DESCRIPTION"))

preprocessedRdd.flatMap { case (key: SpaceTimeKey, multibandTile: MultibandTile) =>
val res = preprocessedRdd.flatMap { case (key: SpaceTimeKey, multibandTile: MultibandTile) =>
var bandIndex = -1
//Warning: for deflate compression, the segmentcount and index is not really used, making it stateless.
//Not sure how this works out for other types of compression!!!
Expand Down Expand Up @@ -174,7 +174,12 @@ package object geotiff {
val bandIndices = sequence.map(_._3).toSet.toList.asJava

val segmentCount = bandSegmentCount * tiffBands
val thePath = Paths.get(path).resolve(filename).toString

// Each executor writes to a unique folder to avoid conflicts:
val uniqueFolderName = "tmp" + java.lang.Long.toUnsignedString(new java.security.SecureRandom().nextLong())
val base = Paths.get(path + "/" + uniqueFolderName)
Files.createDirectories(base)
EmileSonneveld marked this conversation as resolved.
Show resolved Hide resolved
val thePath = base.resolve(filename).toString

// filter band tags that match bandIndices
val fo = formatOptions.deepClone()
Expand All @@ -187,8 +192,23 @@ package object geotiff {
tileLayout, compression, cellTypes.head, tiffBands, segmentCount, fo,
)
(correctedPath, timestamp, croppedExtent, bandIndices)
}.collect().toList.asJava

}.collect().map({
EmileSonneveld marked this conversation as resolved.
Show resolved Hide resolved
case (absolutePath, timestamp, croppedExtent, bandIndices) =>
EmileSonneveld marked this conversation as resolved.
Show resolved Hide resolved
// Move output file to standard location. (On S3, a move is more a copy and delete):
val relativePath = Path.of(path).relativize(Path.of(absolutePath)).toString
val destinationPath = Path.of(path).resolve(relativePath.substring(relativePath.indexOf("/") + 1))
EmileSonneveld marked this conversation as resolved.
Show resolved Hide resolved
waitTillPathAvailable(Path.of(absolutePath))
Files.move(Path.of(absolutePath), destinationPath)
(destinationPath.toString, timestamp, croppedExtent, bandIndices)
}).toList.asJava

// Clean up failed tasks:
Files.list(Path.of(path)).forEach { p =>
if (Files.isDirectory(p) && p.getFileName.toString.startsWith("tmp")) {
FileUtils.deleteDirectory(p.toFile)
}
}
EmileSonneveld marked this conversation as resolved.
Show resolved Hide resolved
res
}


Expand Down Expand Up @@ -233,10 +253,13 @@ package object geotiff {
((name, bandIndex), (key, t))
}
}
rdd_per_band.groupByKey().map { case ((name, bandIndex), tiles) =>
val res = rdd_per_band.groupByKey().map { case ((name, bandIndex), tiles) =>
val uniqueFolderName = "tmp" + java.lang.Long.toUnsignedString(new java.security.SecureRandom().nextLong())
val fixedPath =
if (path.endsWith("out")) {
path.substring(0, path.length - 3) + name
val base = path.substring(0, path.length - 3) + uniqueFolderName + "/"
Files.createDirectories(Path.of(base))
base + name
EmileSonneveld marked this conversation as resolved.
Show resolved Hide resolved
}
else {
path
Expand All @@ -249,7 +272,28 @@ package object geotiff {

(stitchAndWriteToTiff(tiles, fixedPath, layout, crs, extent, None, None, compression, Some(fo)),
Collections.singletonList(bandIndex))
}.collect().toList.sortBy(_._1).asJava
}.collect().map({
EmileSonneveld marked this conversation as resolved.
Show resolved Hide resolved
case (absolutePath, y) =>
EmileSonneveld marked this conversation as resolved.
Show resolved Hide resolved
if (path.endsWith("out")) {
// Move output file to standard location. (On S3, a move is more a copy and delete):
val beforeOut = path.substring(0, path.length - "out".length)
val relativePath = Path.of(beforeOut).relativize(Path.of(absolutePath)).toString
val destinationPath = beforeOut + relativePath.substring(relativePath.indexOf("/") + 1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid string manipulation and call relativePath.getParent() (see above).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does something different thatn getParent.
I added a comment:
// Remove the executorAttemptDirectory part from the path

waitTillPathAvailable(Path.of(absolutePath))
Files.move(Path.of(absolutePath), Path.of(destinationPath))
(destinationPath, y)
} else {
(absolutePath, y)
}
}).toList.sortBy(_._1).asJava
// Clean up failed tasks:
val beforeOut = path.substring(0, path.length - "out".length)
Files.list(Path.of(beforeOut)).forEach { p =>
if (Files.isDirectory(p) && p.getFileName.toString.startsWith("tmp")) {
FileUtils.deleteDirectory(p.toFile)
}
}
EmileSonneveld marked this conversation as resolved.
Show resolved Hide resolved
res
} else {
val tmp = saveRDDGeneric(rdd, bandCount, path, zLevel, cropBounds, formatOptions).asScala
tmp.map(t => (t, (0 until bandCount).toList.asJava)).asJava
Expand Down Expand Up @@ -679,7 +723,7 @@ package object geotiff {

val layout = rdd.metadata.layout
val crs = rdd.metadata.crs
rdd.flatMap {
val res = rdd.flatMap {
case (key, tile) => features.filter { case (_, extent) =>
val tileBounds = layout.mapTransform(extent)

Expand All @@ -688,12 +732,31 @@ package object geotiff {
((name, extent), (key, tile))
}
}.groupByKey()
.map { case ((name, extent), tiles) =>
val filePath = newFilePath(path, name)
.map { case ((tileId, extent), tiles) =>
// Each executor writes to a unique folder to avoid conflicts:
val uniqueFolderName = "tmp" + java.lang.Long.toUnsignedString(new java.security.SecureRandom().nextLong())
val base = Paths.get(Path.of(path).getParent + "/" + uniqueFolderName)
Files.createDirectories(base)
val filePath = base + "/" + newFilePath(Path.of(path).getFileName.toString, tileId)

(stitchAndWriteToTiff(tiles, filePath, layout, crs, extent, croppedExtent, cropDimensions, compression), extent)
}.collect()
.toList.asJava
}.collect().map({
case (absolutePath, croppedExtent) =>
// Move output file to standard location. (On S3, a move is more a copy and delete):
val relativePath = Path.of(path).getParent.relativize(Path.of(absolutePath)).toString
val destinationPath = Path.of(path).getParent.resolve(relativePath.substring(relativePath.indexOf("/") + 1))
waitTillPathAvailable(Path.of(absolutePath))
Files.move(Path.of(absolutePath), destinationPath)
(destinationPath.toString, croppedExtent)
}).toList.asJava

// Clean up failed tasks:
Files.list(Path.of(path).getParent).forEach { p =>
if (Files.isDirectory(p) && p.getFileName.toString.startsWith("tmp")) {
FileUtils.deleteDirectory(p.toFile)
}
EmileSonneveld marked this conversation as resolved.
Show resolved Hide resolved
}
res
}

private def stitchAndWriteToTiff(tiles: Iterable[(SpatialKey, MultibandTile)], filePath: String,
Expand Down Expand Up @@ -827,29 +890,34 @@ package object geotiff {
}

def writeGeoTiff(geoTiff: MultibandGeoTiff, path: String, gtiffOptions: Option[GTiffOptions]): String = {
val tempFile = getTempFile(null, ".tif")
geoTiff.write(tempFile.toString, optimizedOrder = true)
gtiffOptions.foreach(options => embedGdalMetadata(tempFile, options.tagsAsGdalMetadataXml))

if (path.startsWith("s3:/")) {
val tempFile = Files.createTempFile(null, null)
geoTiff.write(tempFile.toString, optimizedOrder = true)
gtiffOptions.foreach(options => embedGdalMetadata(tempFile, options.tagsAsGdalMetadataXml))
uploadToS3(tempFile, path.replaceFirst("s3:/(?!/)", "s3://"))
val correctS3Path = path.replaceFirst("s3:/(?!/)", "s3://")
uploadToS3(tempFile, correctS3Path)
} else {
val tempFile = getTempFile(null, ".tif")
// TODO: Try to run fsync on the file opened by GeoTrellis (without the temporary copy)
geoTiff.write(tempFile.toString, optimizedOrder = true)
gtiffOptions.foreach(options => embedGdalMetadata(tempFile, options.tagsAsGdalMetadataXml))

// TODO: Write to unique path instead to avoid collisions between executors. Let the driver choose the paths.
// Retry should not be needed at this point, but it is almost free to keep it.
moveOverwriteWithRetries(tempFile, Path.of(path))
path
}
}

// Call fsync on the parent path to assure the fusemount is up-to-date.
// The equivalent of Python's os.fsync
try {
FileChannel.open(Path.of(path)).force(true)
} catch {
case _: NoSuchFileException => // Ignore. The file may already be deleted by another executor
private def waitTillPathAvailable(path: Path): Unit = {
var retry = 0
val maxTries = 20
while (!path.toFile.exists()) {
if (retry < maxTries) {
retry += 1
val seconds = 5
logger.info(f"Waiting for path to be available. Try $retry/$maxTries (sleep:$seconds seconds): $path")
Thread.sleep(seconds * 1000)
} else {
logger.warn(f"Path is not available after $maxTries tries: $path")
// Throw error instead?
return
}

path
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.openeo.geotrellis.geotiff

import better.files.File.apply
EmileSonneveld marked this conversation as resolved.
Show resolved Hide resolved

import java.time.LocalTime.MIDNIGHT
import java.time.ZoneOffset.UTC
import java.time.{LocalDate, ZonedDateTime}
Expand All @@ -16,8 +18,10 @@ import org.openeo.geotrellis.png.PngTest
import org.openeo.geotrellis.tile_grid.TileGrid
import org.openeo.geotrellis.{LayerFixtures, geotiff}

import java.nio.file.{Files, Paths}
import java.time.format.DateTimeFormatter.ISO_ZONED_DATE_TIME
import scala.collection.JavaConverters._
import scala.reflect.io.Directory

object TileGridTest {
private var sc: SparkContext = _
Expand Down Expand Up @@ -48,6 +52,10 @@ class TileGridTest {

@Test
def testSaveStitchWithTileGrids(): Unit = {
val outDir = Paths.get("tmp/testSaveStitchWithTileGrids/")
new Directory(outDir.toFile).deepList().foreach(_.delete())
Files.createDirectories(outDir)
EmileSonneveld marked this conversation as resolved.
Show resolved Hide resolved

val date = ZonedDateTime.of(LocalDate.of(2020, 4, 5), MIDNIGHT, UTC)
val bbox = ProjectedExtent(Extent(1.95, 50.95, 2.05, 51.05), LatLng)

Expand All @@ -57,17 +65,27 @@ class TileGridTest {
.toSpatial()
.persist(DISK_ONLY)

val tiles = geotiff.saveStitchedTileGrid(spatialLayer, "/tmp/testSaveStitched.tiff", "10km", DeflateCompression(6))
val expectedPaths = Set("/tmp/testSaveStitched-31UDS_3_4.tiff", "/tmp/testSaveStitched-31UDS_2_4.tiff", "/tmp/testSaveStitched-31UDS_3_5.tiff", "/tmp/testSaveStitched-31UDS_2_5.tiff")
val tiles = geotiff.saveStitchedTileGrid(spatialLayer, outDir + "/testSaveStitched.tiff", "10km", DeflateCompression(6))
val expectedPaths = Set(
outDir + "/testSaveStitched-31UDS_3_4.tiff",
outDir + "/testSaveStitched-31UDS_2_4.tiff",
outDir + "/testSaveStitched-31UDS_3_5.tiff",
outDir + "/testSaveStitched-31UDS_2_5.tiff",
)

// TODO: check if extents (in the layer CRS) are 10000m wide/high (in UTM)
Assert.assertEquals(expectedPaths, tiles.asScala.map { case (path, _) => path }.toSet)

val extent = bbox.reproject(spatialLayer.metadata.crs)
val cropBounds = mapAsJavaMap(Map("xmin" -> extent.xmin, "xmax" -> extent.xmax, "ymin" -> extent.ymin, "ymax" -> extent.ymax))

val croppedTiles = geotiff.saveStitchedTileGrid(spatialLayer, "/tmp/testSaveStitched_cropped.tiff", "10km", cropBounds, DeflateCompression(6))
val expectedCroppedPaths = Set("/tmp/testSaveStitched_cropped-31UDS_3_4.tiff", "/tmp/testSaveStitched_cropped-31UDS_2_4.tiff", "/tmp/testSaveStitched_cropped-31UDS_3_5.tiff", "/tmp/testSaveStitched_cropped-31UDS_2_5.tiff")
val croppedTiles = geotiff.saveStitchedTileGrid(spatialLayer, outDir + "/testSaveStitched_cropped.tiff", "10km", cropBounds, DeflateCompression(6))
val expectedCroppedPaths = Set(
outDir + "/testSaveStitched_cropped-31UDS_3_4.tiff",
outDir + "/testSaveStitched_cropped-31UDS_2_4.tiff",
outDir + "/testSaveStitched_cropped-31UDS_3_5.tiff",
outDir + "/testSaveStitched_cropped-31UDS_2_5.tiff",
)

// TODO: also check extents
Assert.assertEquals(expectedCroppedPaths, croppedTiles.asScala.map { case (path, _) => path }.toSet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.openeo.geotrellis.{LayerFixtures, OpenEOProcesses, ProjectedPolygons}
import org.openeo.sparklisteners.GetInfoSparkListener
import org.slf4j.{Logger, LoggerFactory}

import java.nio.file.{Files, Paths}
import java.nio.file.{Files, Path, Paths}
import java.time.{LocalDate, LocalTime, ZoneOffset, ZonedDateTime}
import java.util
import java.util.zip.Deflater._
Expand Down Expand Up @@ -77,6 +77,7 @@ class WriteRDDToGeotiffTest {

val tileLayerRDD = TileLayerRDDBuilders.createMultibandTileLayerRDD(WriteRDDToGeotiffTest.sc,MultibandTile(imageTile),TileLayout(layoutCols,layoutRows,256,256),LatLng)
val filename = "out.tif"
Files.deleteIfExists(Path.of(filename))

saveRDD(tileLayerRDD.withContext{_.repartition(layoutCols*layoutRows)},1,filename,formatOptions = allOverviewOptions)

Expand Down Expand Up @@ -151,6 +152,10 @@ class WriteRDDToGeotiffTest {

@Test
def testWriteRDD_apply_neighborhood(): Unit ={
val outDir = Paths.get("tmp/testWriteRDD_apply_neighborhood/")
new Directory(outDir.toFile).deepList().foreach(_.delete())
Files.createDirectories(outDir)
EmileSonneveld marked this conversation as resolved.
Show resolved Hide resolved

val layoutCols = 8
val layoutRows = 4

Expand All @@ -159,15 +164,16 @@ class WriteRDDToGeotiffTest {

val tileLayerRDD = LayerFixtures.buildSingleBandSpatioTemporalDataCube(util.Arrays.asList(imageTile),Seq("2017-03-01T00:00:00Z"))

val filename = "openEO_2017-03-01Z.tif"
val filename = outDir + "/openEO_2017-03-01Z.tif"
val p = new OpenEOProcesses()
val buffered: MultibandTileLayerRDD[SpaceTimeKey] = p.remove_overlap(p.retileGeneric(tileLayerRDD,224,224,16,16),224,224,16,16)

val cropBounds = Extent(-115, -65, 5.0, 56)
saveRDDTemporal(buffered,"./",cropBounds = Some(cropBounds))
saveRDDTemporal(buffered, outDir.toString, cropBounds = Some(cropBounds))

val croppedRaster: Raster[MultibandTile] = tileLayerRDD.toSpatial().stitch().crop(cropBounds)
val referenceFile = "croppedRaster.tif"
val referenceFile = outDir + "/croppedRaster.tif"
Files.deleteIfExists(Path.of(referenceFile))
GeoTiff(croppedRaster,LatLng).write(referenceFile)

val result = GeoTiff.readMultiband(filename).raster
Expand All @@ -190,6 +196,7 @@ class WriteRDDToGeotiffTest {

val tileLayerRDD = TileLayerRDDBuilders.createMultibandTileLayerRDD(WriteRDDToGeotiffTest.sc,MultibandTile(imageTile,secondBand,thirdBand),TileLayout(layoutCols,layoutRows,256,256),LatLng)
val filename = "outRGB.tif"
Files.deleteIfExists(Path.of(filename))
saveRDD(tileLayerRDD.withContext{_.repartition(layoutCols*layoutRows)},3,filename)
val result = GeoTiff.readMultiband(filename).raster.tile
assertArrayEquals(imageTile.toArray(),result.band(0).toArray())
Expand All @@ -216,8 +223,10 @@ class WriteRDDToGeotiffTest {

val croppedRaster: Raster[MultibandTile] = tileLayerRDD.stitch().crop(cropBounds)
val referenceFile = "croppedRaster.tif"
Files.deleteIfExists(Path.of(referenceFile))
GeoTiff(croppedRaster,LatLng).write(referenceFile)
val filename = "outRGBCropped3.tif"
Files.deleteIfExists(Path.of(filename))
saveRDD(tileLayerRDD.withContext{_.repartition(layoutCols*layoutRows)},3,filename,cropBounds = Some(cropBounds))
val result = GeoTiff.readMultiband(filename).raster
val reference = GeoTiff.readMultiband(referenceFile).raster
Expand Down Expand Up @@ -248,6 +257,7 @@ class WriteRDDToGeotiffTest {
GeoTiff(croppedRaster,LatLng).write(referenceFile)

val filename = "outCropped.tif"
Files.deleteIfExists(Path.of(filename))
saveRDD(tileLayerRDD.withContext{_.repartition(tileLayerRDD.count().toInt)},3,filename,cropBounds = Some(cropBounds))
val resultRaster = GeoTiff.readMultiband(filename).raster

Expand All @@ -269,6 +279,7 @@ class WriteRDDToGeotiffTest {
val tileLayerRDD = TileLayerRDDBuilders.createMultibandTileLayerRDD(WriteRDDToGeotiffTest.sc,MultibandTile(imageTile),TileLayout(layoutCols,layoutRows,256,256),LatLng)
val empty = tileLayerRDD.withContext{_.filter(_ => false)}
val filename = "outEmpty.tif"
Files.deleteIfExists(Path.of(filename))
val cropBounds = Extent(-115, -65, 5.0, 56)
saveRDD(empty,-1,filename,cropBounds = Some(cropBounds))

Expand Down Expand Up @@ -311,7 +322,7 @@ class WriteRDDToGeotiffTest {
val (imageTile: ByteArrayTile, filtered: MultibandTileLayerRDD[SpatialKey]) = LayerFixtures.createLayerWithGaps(layoutCols, layoutRows)

val outDir = Paths.get("tmp/testWriteMultibandRDDWithGapsSeparateAssetPerBand/")
new Directory(outDir.toFile).deepFiles.foreach(_.delete())
new Directory(outDir.toFile).deepList().foreach(_.delete())
Files.createDirectories(outDir)

val filename = outDir + "/out"
Expand Down
Loading