Skip to content

Commit

Permalink
improve FileLayerProvider resilience
Browse files Browse the repository at this point in the history
* retryForever: no delay after final failure

eu-cdse/openeo-cdse-infra#196

* smaller job runs successfully locally

eu-cdse/openeo-cdse-infra#196

* simple GDALRasterSource.read is also successful

eu-cdse/openeo-cdse-infra#196

* optimize retryForever

- remove outer retryForever in favor of more attempts for inner retryForever
- optimization: implement with exponential back-off

eu-cdse/openeo-cdse-infra#196

* disable test

eu-cdse/openeo-cdse-infra#196

* restore retry of RasterSource.reproject() as it can fail

	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2328)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1266)
	at org.openeo.geotrellis.netcdf.NetCDFRDDWriter$.cacheAndRepartition(NetCDFRDDWriter.scala:267)
	at org.openeo.geotrellis.netcdf.NetCDFRDDWriter$.saveSingleNetCDFGeneric(NetCDFRDDWriter.scala:126)
	at org.openeo.geotrellis.netcdf.NetCDFRDDWriter$.saveSingleNetCDFGeneric(NetCDFRDDWriter.scala:108)
	at org.openeo.geotrellis.netcdf.NetCDFRDDWriter$.writeRasters(NetCDFRDDWriter.scala:80)
	at org.openeo.geotrellis.netcdf.NetCDFRDDWriter.writeRasters(NetCDFRDDWriter.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: load_collection/load_stac: error while reading from: /vsis3/EODATA/Sentinel-2/MSI/L2A_N0500/2018/03/27/S2A_MSIL2A_20180327T114351_N0500_R123_T29UNV_20230828T122340.SAFE/GRANULE/L2A_T29UNV_A014420_20180327T114351/IMG_DATA/R10m/T29UNV_20180327T114351_B08_10m.jp2. Detailed error: Unable to parse projection as CRS. GDAL Error Code: 4
	at org.openeo.geotrellis.layers.FileLayerProvider$.$anonfun$loadPartitionBySource$1(FileLayerProvider.scala:663)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: geotrellis.raster.gdal.MalformedProjectionException: Unable to parse projection as CRS. GDAL Error Code: 4
	at geotrellis.raster.gdal.GDALDataset$.$anonfun$crs$1(GDALDataset.scala:293)
	at geotrellis.raster.gdal.GDALDataset$.$anonfun$crs$1$adapted(GDALDataset.scala:290)
	at geotrellis.raster.gdal.GDALDataset$.errorHandler$extension(GDALDataset.scala:422)
	at geotrellis.raster.gdal.GDALDataset$.crs$extension1(GDALDataset.scala:290)
	at geotrellis.raster.gdal.GDALDataset$.crs$extension0(GDALDataset.scala:282)
	at geotrellis.raster.gdal.GDALRasterSource.crs$lzycompute(GDALRasterSource.scala:84)
	at geotrellis.raster.gdal.GDALRasterSource.crs(GDALRasterSource.scala:84)
	at org.openeo.geotrellis.layers.ValueOffsetRasterSource.crs(ValueOffsetRasterSource.scala:93)
	at geotrellis.raster.RasterSource.reproject(RasterSource.scala:54)
	at org.openeo.geotrellis.layers.BandCompositeRasterSource.$anonfun$reprojectedSources$2(FileLayerProvider.scala:84)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.openeo.geotrellis.layers.BandCompositeRasterSource.reprojectedSources(FileLayerProvider.scala:84)
	at org.openeo.geotrellis.layers.BandCompositeRasterSource.read(FileLayerProvider.scala:129)
	at geotrellis.raster.RasterSource.read(RasterSource.scala:128)
	at org.openeo.geotrellis.layers.FileLayerProvider$.$anonfun$loadPartitionBySource$6(FileLayerProvider.scala:661)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.toStream(Iterator.scala:1417)
	at scala.collection.Iterator.toStream$(Iterator.scala:1416)
	at scala.collection.AbstractIterator.toStream(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toSeq(TraversableOnce.scala:354)
	at scala.collection.TraversableOnce.toSeq$(TraversableOnce.scala:354)
	at scala.collection.AbstractIterator.toSeq(Iterator.scala:1431)
	at org.openeo.geotrellis.layers.FileLayerProvider$.$anonfun$loadPartitionBySource$1(FileLayerProvider.scala:661)
	... 14 more

eu-cdse/openeo-cdse-infra#196

* make GDALRasterSource fail with an error

eu-cdse/openeo-cdse-infra#196

* add test

eu-cdse/openeo-cdse-infra#196

* support soft errors

eu-cdse/openeo-cdse-infra#196

* restore number-of-attempts and disable test

* make attempts argument explicit

eu-cdse/openeo-cdse-infra#196

* cleanup

eu-cdse/openeo-cdse-infra#196

* cleanup

eu-cdse/openeo-cdse-infra#196

* cleanup

eu-cdse/openeo-cdse-infra#196
  • Loading branch information
bossie authored Aug 20, 2024
1 parent 05957af commit 64d4812
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 40 deletions.
4 changes: 2 additions & 2 deletions geotrellis-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.3.2</version>
<version>5.10.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<version>5.3.2</version>
<version>5.10.3</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,16 +231,16 @@ package object geotrelliscommon {
import java.util.concurrent.TimeUnit


def retryForever[R](delay: Duration, retries: Int = 20, onAttemptFailed: Exception => Unit = _ => ())(f: => R): R = {
def retryForever[R](delay: Duration, attempts: Int = 20, onAttemptFailed: Exception => Unit = _ => ())(f: => R): R = {
var lastException: Exception = null
var countDown = retries
var countDown = attempts
while (countDown>0) {
try return f
catch {
case e: Exception =>
onAttemptFailed(e)
lastException = e
TimeUnit.SECONDS.sleep(delay.getSeconds)
if (countDown > 1) TimeUnit.SECONDS.sleep(delay.getSeconds)
}
countDown = countDown - 1
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.openeo.geotrelliscommon

import org.junit.jupiter.api.Assertions.{assertEquals, assertThrowsExactly, fail}
import org.junit.jupiter.api.{Test, Timeout}

import java.time.Duration

class PackageTest {
class FailedAttempt extends Exception

@Test
def retryForeverNumberOfAttempts(): Unit = {
var attempts = 0

try {
retryForever(delay = Duration.ZERO, attempts = 3, onAttemptFailed = _ => attempts += 1) {
println("attempting...")
throw new FailedAttempt
}

fail("should have thrown a FailedAttempt")
} catch {
case _: FailedAttempt =>
}

// count the number of failures to get the number of attempts
assertEquals(3, attempts)
}

@Test
@Timeout(5) // less than RetryForever's delay below
def retryForeverNoDelayAfterFinalFailure(): Unit =
assertThrowsExactly(classOf[FailedAttempt], () =>
retryForever(delay = Duration.ofSeconds(60), attempts = 1) {
println("attempting...")
throw new FailedAttempt
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ class PyramidFactory(openSearchClient: OpenSearchClient,
openSearchLinkTitles: util.List[String],
rootPath: String,
maxSpatialResolution: CellSize,
experimental: Boolean = false) {
experimental: Boolean = false,
maxSoftErrorsRatio: Double = 0.0,
) {
require(openSearchLinkTitles.size() > 0)

import PyramidFactory._
Expand Down Expand Up @@ -75,7 +77,8 @@ class PyramidFactory(openSearchClient: OpenSearchClient,
metadataProperties,
layoutScheme,
correlationId = correlationId,
experimental = experimental
experimental = experimental,
maxSoftErrorsRatio = maxSoftErrorsRatio,
)

def datacube_seq(polygons:ProjectedPolygons, from_date: String, to_date: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@ import geotrellis.spark.partition.SpacePartitioner
import geotrellis.vector
import geotrellis.vector.Extent.toPolygon
import geotrellis.vector._
import net.jodah.failsafe.{Failsafe, RetryPolicy}
import net.jodah.failsafe.event.ExecutionAttemptedEvent
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.locationtech.jts.geom.Geometry
import org.openeo.geotrellis.OpenEOProcessScriptBuilder.AnyProcess
import org.openeo.geotrellis.file.{AbstractPyramidFactory, FixedFeaturesOpenSearchClient}
import org.openeo.geotrellis.tile_grid.TileGrid
import org.openeo.geotrellis.{OpenEOProcessScriptBuilder, sortableSourceName}
import org.openeo.geotrelliscommon.DatacubeSupport.prepareMask
import org.openeo.geotrelliscommon.{BatchJobMetadataTracker, ByKeyPartitioner, CloudFilterStrategy, ConfigurableSpatialPartitioner, DataCubeParameters, DatacubeSupport, L1CCloudFilterStrategy, MaskTileLoader, NoCloudFilterStrategy, ResampledTile, SCLConvolutionFilterStrategy, SpaceTimeByMonthPartitioner, SparseSpaceTimePartitioner, autoUtmEpsg, retryForever}
import org.openeo.geotrelliscommon.{BatchJobMetadataTracker, ByKeyPartitioner, CloudFilterStrategy, ConfigurableSpatialPartitioner, DataCubeParameters, DatacubeSupport, L1CCloudFilterStrategy, MaskTileLoader, NoCloudFilterStrategy, ResampledTile, SCLConvolutionFilterStrategy, SpaceTimeByMonthPartitioner, SparseSpaceTimePartitioner, autoUtmEpsg}
import org.openeo.opensearch.OpenSearchClient
import org.openeo.opensearch.OpenSearchResponses.{Feature, Link}
import org.slf4j.LoggerFactory
Expand All @@ -38,11 +39,11 @@ import java.io.{IOException, Serializable}
import java.net.URI
import java.nio.file.{Path, Paths}
import java.time._
import java.time.temporal.ChronoUnit
import java.time.temporal.ChronoUnit.{DAYS, SECONDS}
import java.util
import java.util.concurrent.TimeUnit
import scala.collection.GenSeq
import scala.collection.JavaConverters._
import scala.collection.parallel.immutable.{ParMap, ParSeq}
import scala.reflect.ClassTag
import scala.util.matching.Regex

Expand All @@ -63,6 +64,19 @@ private class LayoutTileSourceFixed[K: SpatialComponent](

object BandCompositeRasterSource {
private val logger = LoggerFactory.getLogger(classOf[BandCompositeRasterSource])

private def retryWithBackoff[R](maxAttempts: Int = 20, onAttemptFailed: Exception => Unit = _ => ())(f: => R): R = {
val retryPolicy = new RetryPolicy[R]
.handle(classOf[Exception]) // will otherwise retry Error
.withMaxAttempts(maxAttempts)
.withBackoff(1, 16, SECONDS)
.onFailedAttempt((attempt: ExecutionAttemptedEvent[R]) =>
onAttemptFailed(attempt.getLastFailure.asInstanceOf[Exception]))

Failsafe
.`with`(util.Collections.singletonList(retryPolicy))
.get(f _)
}
}


Expand All @@ -72,21 +86,29 @@ class BandCompositeRasterSource(override val sources: NonEmptyList[RasterSource]
override val crs: CRS,
override val attributes: Map[String, String] = Map.empty,
val predefinedExtent: Option[GridExtent[Long]] = None,
val parallelRead: Boolean = true
parallelRead: Boolean = true,
softErrors: Boolean = false,
) extends MosaicRasterSource { // TODO: don't inherit?
import BandCompositeRasterSource._

private val maxRetries = sys.env.getOrElse("GDALREAD_MAXRETRIES", "10").toInt
private val maxRetries = sys.env.getOrElse("GDALREAD_MAXRETRIES", "20").toInt

protected def reprojectedSources: NonEmptyList[RasterSource] = sources map { _.reproject(crs) }

protected def reprojectedSources(bands: Seq[Int]): Seq[RasterSource] = {
val selectedBands = bands.map(sources.toList)
def reprojectRasterSourceAttemptFailed(source: RasterSource)(e: Exception): Unit =
logger.warn(s"attempt to reproject ${source.name} to $crs failed", e)

selectedBands map { rs =>
try retryForever(Duration.ofSeconds(10), maxRetries)(rs.reproject(crs))
val selectedBands = bands.map(sources.toList)
selectedBands flatMap { rs =>
try Some(retryWithBackoff(maxRetries, reprojectRasterSourceAttemptFailed(rs))(rs.reproject(crs)))
catch {
case e: Exception => throw new IOException(s"Error while reading: ${rs.name.toString}", e)
// reading the CRS from a GDALRasterSource can fail
case e: Exception =>
if (softErrors) {
logger.warn(s"ignoring soft error for ${rs.name}", e)
None
} else throw new IOException(s"Error while reading: ${rs.name}", e)
}
}
}
Expand Down Expand Up @@ -146,15 +168,19 @@ class BandCompositeRasterSource(override val sources: NonEmptyList[RasterSource]
logger.debug(s"finished reading $bounds from ${source.name}")
raster
} catch {
case e: Exception => throw new IOException(s"Error while reading $bounds from ${source.name}", e)
case e: Exception =>
if (softErrors) {
logger.warn(s"ignoring soft error for ${source.name}", e)
None
} else throw new IOException(s"Error while reading $bounds from ${source.name}", e)
}
}

def readBoundsAttemptFailed(source: RasterSource)(e: Exception): Unit =
logger.warn(s"attempt to read $bounds from ${source.name} failed", e)

val singleBandRasters = selectedSources
.map(rs => retryForever(Duration.ofSeconds(10), maxRetries, readBoundsAttemptFailed(rs)) {
.map(rs => retryWithBackoff(maxRetries, readBoundsAttemptFailed(rs)) {
readBounds(rs)
})
.collect { case Some(raster) => raster }
Expand Down Expand Up @@ -187,14 +213,16 @@ class BandCompositeRasterSource(override val sources: NonEmptyList[RasterSource]
method: ResampleMethod,
strategy: OverviewStrategy
): RasterSource = new BandCompositeRasterSource(
reprojectedSources map { _.resample(resampleTarget, method, strategy) }, crs)
reprojectedSources map { _.resample(resampleTarget, method, strategy) }, crs, parallelRead = parallelRead,
softErrors = softErrors)

override def convert(targetCellType: TargetCellType): RasterSource =
new BandCompositeRasterSource(reprojectedSources map { _.convert(targetCellType) }, crs, parallelRead = parallelRead)
new BandCompositeRasterSource(reprojectedSources map { _.convert(targetCellType) }, crs,
parallelRead = parallelRead, softErrors = softErrors)

override def reprojection(targetCRS: CRS, resampleTarget: ResampleTarget, method: ResampleMethod, strategy: OverviewStrategy): RasterSource =
new BandCompositeRasterSource(reprojectedSources map { _.reproject(targetCRS, resampleTarget, method, strategy) },
crs, parallelRead = parallelRead)
crs, parallelRead = parallelRead, softErrors = softErrors)
}

// TODO: is this class necessary? Looks like a more general case of BandCompositeRasterSource so maybe the inheritance
Expand Down Expand Up @@ -248,7 +276,6 @@ class MultibandCompositeRasterSource(val sourcesListWithBandIds: NonEmptyList[(R
object FileLayerProvider {

private val logger = LoggerFactory.getLogger(classOf[FileLayerProvider])
private val maxRetries = sys.env.getOrElse("GDALREAD_MAXRETRIES", "10").toInt



Expand Down Expand Up @@ -281,9 +308,9 @@ object FileLayerProvider {
def apply(openSearch: OpenSearchClient, openSearchCollectionId: String, openSearchLinkTitles: NonEmptyList[String], rootPath: String,
maxSpatialResolution: CellSize, pathDateExtractor: PathDateExtractor, attributeValues: Map[String, Any] = Map(), layoutScheme: LayoutScheme = ZoomedLayoutScheme(WebMercator, 256),
bandIndices: Seq[Int] = Seq(), correlationId: String = "", experimental: Boolean = false,
retainNoDataTiles: Boolean = false): FileLayerProvider = new FileLayerProvider(
retainNoDataTiles: Boolean = false, maxSoftErrorsRatio: Double = 0.0): FileLayerProvider = new FileLayerProvider(
openSearch, openSearchCollectionId, openSearchLinkTitles, rootPath, maxSpatialResolution, pathDateExtractor,
attributeValues, layoutScheme, bandIndices, correlationId, experimental, retainNoDataTiles,
attributeValues, layoutScheme, bandIndices, correlationId, experimental, retainNoDataTiles, maxSoftErrorsRatio,
disambiguateConstructors = null
)

Expand All @@ -303,7 +330,7 @@ object FileLayerProvider {
def rasterSourceRDD(rasterSources: Seq[RasterSource], metadata: TileLayerMetadata[SpaceTimeKey], maxSpatialResolution: CellSize, collection: String)(implicit sc: SparkContext): RDD[LayoutTileSource[SpaceTimeKey]] = {

val keyExtractor = new TemporalKeyExtractor {
def getMetadata(rs: RasterMetadata): ZonedDateTime = ZonedDateTime.parse(rs.attributes("date")).truncatedTo(ChronoUnit.DAYS)
def getMetadata(rs: RasterMetadata): ZonedDateTime = ZonedDateTime.parse(rs.attributes("date")).truncatedTo(DAYS)
}
val sources = sc.parallelize(rasterSources,rasterSources.size)

Expand Down Expand Up @@ -502,13 +529,14 @@ object FileLayerProvider {
private val PIXEL_COUNTER = "InputPixels"

private def rasterRegionsToTilesLoadPerProductStrategy(rasterRegionRDD: RDD[(SpaceTimeKey, (RasterRegion, SourceName))],
metadata: TileLayerMetadata[SpaceTimeKey],
retainNoDataTiles: Boolean,
cloudFilterStrategy: CloudFilterStrategy = NoCloudFilterStrategy,
partitionerOption: Option[SpacePartitioner[SpaceTimeKey]] = None,
datacubeParams : Option[DataCubeParameters] = None,
metadata: TileLayerMetadata[SpaceTimeKey],
retainNoDataTiles: Boolean,
cloudFilterStrategy: CloudFilterStrategy = NoCloudFilterStrategy,
partitionerOption: Option[SpacePartitioner[SpaceTimeKey]] = None,
datacubeParams : Option[DataCubeParameters] = None,
expectedBandCount : Int = -1,
sources: Seq[(RasterSource, Feature)]
sources: Seq[(RasterSource, Feature)],
softErrors: Boolean,
): RDD[(SpaceTimeKey, MultibandTile)] with Metadata[TileLayerMetadata[SpaceTimeKey]] = {

if(cloudFilterStrategy!=NoCloudFilterStrategy) {
Expand Down Expand Up @@ -538,7 +566,7 @@ object FileLayerProvider {

case source1: BandCompositeRasterSource =>
//decompose into individual bands
source1.sources.map(s => (s.name, GridBoundsRasterRegion(new BandCompositeRasterSource(NonEmptyList.one(s),source1.crs,source1.attributes,source1.predefinedExtent, parallelRead = datacubeParams.forall(!_.loadPerProduct)), bounds))).zipWithIndex.map(t => (t._1._1, (Seq(t._2), key_region_sourcename._1, t._1._2))).toList.toSeq
source1.sources.map(s => (s.name, GridBoundsRasterRegion(new BandCompositeRasterSource(NonEmptyList.one(s),source1.crs,source1.attributes,source1.predefinedExtent, parallelRead = datacubeParams.forall(!_.loadPerProduct), softErrors = softErrors), bounds))).zipWithIndex.map(t => (t._1._1, (Seq(t._2), key_region_sourcename._1, t._1._2))).toList.toSeq

case _ =>
Seq((source.name, (Seq(0), key_region_sourcename._1, key_region_sourcename._2._1)))
Expand Down Expand Up @@ -663,7 +691,7 @@ object FileLayerProvider {

val allRasters =
try{
bounds.toIterator.flatMap(b => retryForever(Duration.ofSeconds(10),maxRetries)(source.read(b).iterator)).map(_.mapTile(_.convert(cellType))).toSeq
bounds.toIterator.flatMap(b => source.read(b).iterator).map(_.mapTile(_.convert(cellType))).toSeq
} catch {
case e: Exception => throw new IOException(s"load_collection/load_stac: error while reading from: ${source.name.toString}. Detailed error: ${e.getMessage}", e)
}
Expand Down Expand Up @@ -871,7 +899,7 @@ object FileLayerProvider {
class FileLayerProvider private(openSearch: OpenSearchClient, openSearchCollectionId: String, openSearchLinkTitles: NonEmptyList[String], rootPath: String,
maxSpatialResolution: CellSize, pathDateExtractor: PathDateExtractor, attributeValues: Map[String, Any], layoutScheme: LayoutScheme,
bandIndices: Seq[Int], correlationId: String, experimental: Boolean,
retainNoDataTiles: Boolean,
retainNoDataTiles: Boolean, maxSoftErrorsRatio: Double,
disambiguateConstructors: Null) extends LayerProvider { // workaround for: constructors have the same type after erasure

import DatacubeSupport._
Expand All @@ -882,15 +910,15 @@ class FileLayerProvider private(openSearch: OpenSearchClient, openSearchCollecti
def this(openSearch: OpenSearchClient, openSearchCollectionId: String, openSearchLinkTitles: NonEmptyList[String], rootPath: String,
maxSpatialResolution: CellSize, pathDateExtractor: PathDateExtractor, attributeValues: Map[String, Any] = Map(), layoutScheme: LayoutScheme = ZoomedLayoutScheme(WebMercator, 256),
bandIds: Seq[Seq[Int]] = Seq(), correlationId: String = "", experimental: Boolean = false,
retainNoDataTiles: Boolean = false) = this(openSearch, openSearchCollectionId,
retainNoDataTiles: Boolean = false, maxSoftErrorsRatio: Double = 0.0) = this(openSearch, openSearchCollectionId,
openSearchLinkTitles = NonEmptyList.fromListUnsafe(for {
(title, bandIndices) <- openSearchLinkTitles.toList.zipAll(bandIds, thisElem = "", thatElem = Seq(0))
_ <- bandIndices
} yield title),
rootPath, maxSpatialResolution, pathDateExtractor, attributeValues, layoutScheme,
bandIndices = bandIds.flatten,
correlationId, experimental,
retainNoDataTiles, disambiguateConstructors = null)
retainNoDataTiles, maxSoftErrorsRatio, disambiguateConstructors = null)

assert(bandIndices.isEmpty || bandIndices.size == openSearchLinkTitles.size)

Expand All @@ -900,6 +928,7 @@ class FileLayerProvider private(openSearch: OpenSearchClient, openSearchCollecti

private val _rootPath = if(rootPath != null) Paths.get(rootPath) else null
private val fromLoadStac = openSearch.isInstanceOf[FixedFeaturesOpenSearchClient]
private val softErrors = maxSoftErrorsRatio > 0.0

private val openSearchLinkTitlesWithBandId: Seq[(String, Int)] = {
if (bandIndices.nonEmpty) {
Expand Down Expand Up @@ -1240,7 +1269,7 @@ class FileLayerProvider private(openSearch: OpenSearchClient, openSearchCollecti
if(!datacubeParams.map(_.loadPerProduct).getOrElse(false) || theMaskStrategy != NoCloudFilterStrategy ){
rasterRegionsToTiles(regions, metadata, retainNoDataTiles, theMaskStrategy, partitioner, datacubeParams)
}else{
rasterRegionsToTilesLoadPerProductStrategy(regions, metadata, retainNoDataTiles, NoCloudFilterStrategy, partitioner, datacubeParams, openSearchLinkTitlesWithBandId.size,readKeysToRasterSourcesResult._4)
rasterRegionsToTilesLoadPerProductStrategy(regions, metadata, retainNoDataTiles, NoCloudFilterStrategy, partitioner, datacubeParams, openSearchLinkTitlesWithBandId.size,readKeysToRasterSourcesResult._4, softErrors)
}
logger.info(s"Created cube for ${openSearchCollectionId} with metadata ${cube.metadata} and partitioner ${cube.partitioner}")
cube
Expand Down Expand Up @@ -1511,7 +1540,7 @@ class FileLayerProvider private(openSearch: OpenSearchClient, openSearchCollecti
return None
}

Some((new BandCompositeRasterSource(sources.map { case (rasterSource, _) => rasterSource }, targetExtent.crs, attributes, predefinedExtent = predefinedExtent), feature))
Some((new BandCompositeRasterSource(sources.map { case (rasterSource, _) => rasterSource }, targetExtent.crs, attributes, predefinedExtent = predefinedExtent, softErrors = softErrors), feature))
} else Some((new MultibandCompositeRasterSource(sources.map { case (rasterSource, bandIndex) => (rasterSource, Seq(bandIndex))}, targetExtent.crs, attributes), feature))
}
}
Expand Down
Loading

0 comments on commit 64d4812

Please sign in to comment.