diff --git a/Jenkinsfile b/Jenkinsfile index 16f2fe2aa..46a9efabd 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -37,7 +37,9 @@ pipeline { PACKAGE_NAME = "${package_name}" WORKSPACE = "${env.WORKSPACE}" } - + parameters { + booleanParam(name: 'skip_tests', defaultValue: false, description: 'Check this if you want to skip running tests.') + } stages { stage('Checkout') { steps { @@ -55,7 +57,7 @@ pipeline { steps { script { rel_version = getMavenVersion() - build() + build( !params.skip_tests) utils.setWorkspacePermissions() } } @@ -196,12 +198,17 @@ void build(tests = true){ sh "dnf install -y maven git java-11-openjdk-devel gdal-3.8.4" def server = Artifactory.server('vitoartifactory') def rtMaven = Artifactory.newMavenBuild() - rtMaven.deployer server: server, releaseRepo: 'libs-release-public', snapshotRepo: 'libs-snapshot-public' + def snapshotRepo = 'libs-snapshot-public' + if (!publishable_branches.contains(env.BRANCH_NAME)) { + snapshotRepo = 'openeo-branch-builds' + rtMaven.opts += " -Drevision=${env.BRANCH_NAME}" + } + rtMaven.deployer server: server, releaseRepo: 'libs-release-public', snapshotRepo: snapshotRepo rtMaven.tool = maven if (!tests) { rtMaven.opts += ' -DskipTests=true' } - rtMaven.deployer.deployArtifacts = publishable_branches.contains(env.BRANCH_NAME) || publishable_branches.contains(env.CHANGE_BRANCH) + rtMaven.deployer.deployArtifacts = true //use '--projects StatisticsMapReduce' in 'goals' to build specific module try { withCredentials([ diff --git a/geopyspark-geotrellis/pom.xml b/geopyspark-geotrellis/pom.xml index 2a7c2ae19..36362d3e8 100644 --- a/geopyspark-geotrellis/pom.xml +++ b/geopyspark-geotrellis/pom.xml @@ -5,7 +5,7 @@ openeo-geotrellis-extensions org.openeo - 2.4.0_2.12-SNAPSHOT + ${revision} 4.0.0 diff --git a/geotrellis-accumulo-extensions/pom.xml b/geotrellis-accumulo-extensions/pom.xml index b45603736..0761ec2d4 100644 --- a/geotrellis-accumulo-extensions/pom.xml +++ b/geotrellis-accumulo-extensions/pom.xml @@ -3,7 +3,7 @@ openeo-geotrellis-extensions org.openeo - 2.4.0_2.12-SNAPSHOT + ${revision} 4.0.0 diff --git a/geotrellis-benchmarks/pom.xml b/geotrellis-benchmarks/pom.xml index c55952de0..ed5eecceb 100644 --- a/geotrellis-benchmarks/pom.xml +++ b/geotrellis-benchmarks/pom.xml @@ -5,7 +5,7 @@ openeo-geotrellis-extensions org.openeo - 2.4.0_2.12-SNAPSHOT + ${revision} 4.0.0 diff --git a/geotrellis-common/pom.xml b/geotrellis-common/pom.xml index a3d7e81d3..4028fd302 100644 --- a/geotrellis-common/pom.xml +++ b/geotrellis-common/pom.xml @@ -5,7 +5,7 @@ openeo-geotrellis-extensions org.openeo - 2.4.0_2.12-SNAPSHOT + ${revision} 4.0.0 @@ -55,13 +55,13 @@ org.junit.jupiter junit-jupiter-api - 5.3.2 + 5.10.3 test org.junit.vintage junit-vintage-engine - 5.3.2 + 5.10.3 test diff --git a/geotrellis-common/src/main/scala/org/openeo/geotrelliscommon/DatacubeSupport.scala b/geotrellis-common/src/main/scala/org/openeo/geotrelliscommon/DatacubeSupport.scala index 212e0d1ab..408252cac 100644 --- a/geotrellis-common/src/main/scala/org/openeo/geotrelliscommon/DatacubeSupport.scala +++ b/geotrellis-common/src/main/scala/org/openeo/geotrelliscommon/DatacubeSupport.scala @@ -8,7 +8,8 @@ import geotrellis.spark.partition.{PartitionerIndex, SpacePartitioner} import geotrellis.spark.{MultibandTileLayerRDD, _} import geotrellis.util.GetComponent import geotrellis.vector.{Extent, MultiPolygon, ProjectedExtent} -import org.apache.spark.rdd.RDD +import org.apache.spark.Partitioner +import org.apache.spark.rdd.{CoGroupedRDD, RDD} import org.slf4j.LoggerFactory import java.time.ZonedDateTime @@ -194,7 +195,25 @@ object DatacubeSupport { ignoreKeysWithoutMask: Boolean = false, ): RDD[(K, MultibandTile)] with Metadata[M] = { val joined = if (ignoreKeysWithoutMask) { - val tmpRdd = SpatialJoin.join(datacube, mask).mapValues(v => (v._1, Option(v._2))) + //inner join, try to preserve partitioner + val tmpRdd: RDD[(K, (MultibandTile, Option[MultibandTile]))] = + if(datacube.partitioner.isDefined && datacube.partitioner.get.isInstanceOf[SpacePartitioner[K]]){ + val part = datacube.partitioner.get.asInstanceOf[SpacePartitioner[K]] + new CoGroupedRDD[K](List(datacube, part(mask)), part) + .flatMapValues { case Array(l, r) => + if (l.isEmpty) { + Seq.empty[(MultibandTile, Option[MultibandTile])] + } + else if (r.isEmpty) + Seq.empty[(MultibandTile, Option[MultibandTile])] + else + for (v <- l.iterator; w <- r.iterator) yield (v, Some(w)) + }.asInstanceOf[RDD[(K, (MultibandTile, Option[MultibandTile]))]] + }else{ + SpatialJoin.join(datacube, mask).mapValues(v => (v._1, Option(v._2))) + } + + ContextRDD(tmpRdd, datacube.metadata) } else { SpatialJoin.leftOuterJoin(datacube, mask) @@ -210,7 +229,8 @@ object DatacubeSupport { if (dataTile.bandCount == maskTile.bandCount) { maskIndex = index } - tile.dualCombine(maskTile.band(maskIndex))((v1, v2) => if (v2 != 0 && isData(v1)) replacementInt else v1)((v1, v2) => if (v2 != 0.0 && isData(v1)) replacementDouble else v1) + //tile has to be 'mutable', for instant ConstantTile implements dualCombine, but not correctly converting celltype!! + tile.mutable.dualCombine(maskTile.band(maskIndex))((v1, v2) => if (v2 != 0 && isData(v1)) replacementInt else v1)((v1, v2) => if (v2 != 0.0 && isData(v1)) replacementDouble else v1) }) } else { @@ -236,16 +256,8 @@ object DatacubeSupport { logger.debug(s"Spacetime mask is used to reduce input.") } - val alignedMask: MultibandTileLayerRDD[SpaceTimeKey] = - if(spacetimeMask.metadata.crs.equals(metadata.crs) && spacetimeMask.metadata.layout.equals(metadata.layout)) { - spacetimeMask - }else{ - logger.debug(s"mask: automatically resampling mask to match datacube: ${spacetimeMask.metadata}") - spacetimeMask.reproject(metadata.crs,metadata.layout,16,rdd.partitioner)._2 - } - - // retain only tiles where there is at least one valid pixel (mask value == 0), others will be fully removed - val filtered = alignedMask.withContext{_.filter(_._2.band(0).toArray().exists(pixel => pixel == 0))} + val partitioner = rdd.partitioner + val filtered = prepareMask(spacetimeMask, metadata, partitioner) if (pixelwiseMasking) { val spacetimeDataContextRDD = ContextRDD(rdd, metadata) @@ -263,4 +275,23 @@ object DatacubeSupport { rdd } } + + def prepareMask(spacetimeMask: MultibandTileLayerRDD[SpaceTimeKey], metadata: TileLayerMetadata[SpaceTimeKey], partitioner: Option[Partitioner]): ContextRDD[SpaceTimeKey, MultibandTile, TileLayerMetadata[SpaceTimeKey]] = { + val alignedMask: MultibandTileLayerRDD[SpaceTimeKey] = + if (spacetimeMask.metadata.crs.equals(metadata.crs) && spacetimeMask.metadata.layout.equals(metadata.layout)) { + spacetimeMask + } else { + logger.debug(s"mask: automatically resampling mask to match datacube: ${spacetimeMask.metadata}") + spacetimeMask.reproject(metadata.crs, metadata.layout, 16, partitioner)._2 + } + + val keyBounds = metadata.bounds.get + // retain only tiles where there is at least one valid pixel (mask value == 0), others will be fully removed + val filtered = alignedMask.withContext { + _.filter(t => { + keyBounds.includes(t._1) && t._2.band(0).toArray().exists(pixel => pixel == 0) + }) + } + filtered + } } diff --git a/geotrellis-common/src/main/scala/org/openeo/geotrelliscommon/package.scala b/geotrellis-common/src/main/scala/org/openeo/geotrelliscommon/package.scala index de10848d4..4d00127c0 100644 --- a/geotrellis-common/src/main/scala/org/openeo/geotrelliscommon/package.scala +++ b/geotrellis-common/src/main/scala/org/openeo/geotrelliscommon/package.scala @@ -231,16 +231,16 @@ package object geotrelliscommon { import java.util.concurrent.TimeUnit - def retryForever[R](delay: Duration, retries: Int = 20, onAttemptFailed: Exception => Unit = _ => ())(f: => R): R = { + def retryForever[R](delay: Duration, attempts: Int = 20, onAttemptFailed: Exception => Unit = _ => ())(f: => R): R = { var lastException: Exception = null - var countDown = retries + var countDown = attempts while (countDown>0) { try return f catch { case e: Exception => onAttemptFailed(e) lastException = e - TimeUnit.SECONDS.sleep(delay.getSeconds) + if (countDown > 1) TimeUnit.SECONDS.sleep(delay.getSeconds) } countDown = countDown - 1 } diff --git a/geotrellis-common/src/test/scala/org/openeo/geotrelliscommon/PackageTest.scala b/geotrellis-common/src/test/scala/org/openeo/geotrelliscommon/PackageTest.scala new file mode 100644 index 000000000..806731885 --- /dev/null +++ b/geotrellis-common/src/test/scala/org/openeo/geotrelliscommon/PackageTest.scala @@ -0,0 +1,38 @@ +package org.openeo.geotrelliscommon + +import org.junit.jupiter.api.Assertions.{assertEquals, assertThrowsExactly, fail} +import org.junit.jupiter.api.{Test, Timeout} + +import java.time.Duration + +class PackageTest { + class FailedAttempt extends Exception + + @Test + def retryForeverNumberOfAttempts(): Unit = { + var attempts = 0 + + try { + retryForever(delay = Duration.ZERO, attempts = 3, onAttemptFailed = _ => attempts += 1) { + println("attempting...") + throw new FailedAttempt + } + + fail("should have thrown a FailedAttempt") + } catch { + case _: FailedAttempt => + } + + // count the number of failures to get the number of attempts + assertEquals(3, attempts) + } + + @Test + @Timeout(5) // less than RetryForever's delay below + def retryForeverNoDelayAfterFinalFailure(): Unit = + assertThrowsExactly(classOf[FailedAttempt], () => + retryForever(delay = Duration.ofSeconds(60), attempts = 1) { + println("attempting...") + throw new FailedAttempt + }) +} diff --git a/geotrellis-extensions/pom.xml b/geotrellis-extensions/pom.xml index 56d210a8c..d1af3a854 100644 --- a/geotrellis-extensions/pom.xml +++ b/geotrellis-extensions/pom.xml @@ -3,7 +3,7 @@ openeo-geotrellis-extensions org.openeo - 2.4.0_2.12-SNAPSHOT + ${revision} 4.0.0 diff --git a/geotrellis-integrationtests/pom.xml b/geotrellis-integrationtests/pom.xml index f3c713336..d0fcd73c7 100644 --- a/geotrellis-integrationtests/pom.xml +++ b/geotrellis-integrationtests/pom.xml @@ -5,7 +5,7 @@ openeo-geotrellis-extensions org.openeo - 2.4.0_2.12-SNAPSHOT + ${revision} 4.0.0 diff --git a/geotrellis-s3-extensions/pom.xml b/geotrellis-s3-extensions/pom.xml index 0b91abd6b..b5d790f35 100644 --- a/geotrellis-s3-extensions/pom.xml +++ b/geotrellis-s3-extensions/pom.xml @@ -5,7 +5,7 @@ openeo-geotrellis-extensions org.openeo - 2.4.0_2.12-SNAPSHOT + ${revision} 4.0.0 diff --git a/geotrellis-seeder/pom.xml b/geotrellis-seeder/pom.xml index 5809cab6f..360de265b 100644 --- a/geotrellis-seeder/pom.xml +++ b/geotrellis-seeder/pom.xml @@ -5,7 +5,7 @@ openeo-geotrellis-extensions org.openeo - 2.4.0_2.12-SNAPSHOT + ${revision} 4.0.0 diff --git a/geotrellis-sentinelhub/pom.xml b/geotrellis-sentinelhub/pom.xml index e94719ed4..e49384a82 100644 --- a/geotrellis-sentinelhub/pom.xml +++ b/geotrellis-sentinelhub/pom.xml @@ -5,7 +5,7 @@ openeo-geotrellis-extensions org.openeo - 2.4.0_2.12-SNAPSHOT + ${revision} 4.0.0 diff --git a/openeo-geotrellis/pom.xml b/openeo-geotrellis/pom.xml index f2739212e..e31d315a3 100644 --- a/openeo-geotrellis/pom.xml +++ b/openeo-geotrellis/pom.xml @@ -5,7 +5,7 @@ openeo-geotrellis-extensions org.openeo - 2.4.0_2.12-SNAPSHOT + ${revision} 4.0.0 @@ -245,6 +245,22 @@ 1.19.0 test + + io.opentelemetry + opentelemetry-sdk + 1.38.0 + + + io.opentelemetry + opentelemetry-sdk-metrics + 1.38.0 + + + io.opentelemetry + opentelemetry-api + 1.38.0 + + diff --git a/openeo-geotrellis/src/main/java/org/openeo/sparklisteners/BatchJobProgressListener.scala b/openeo-geotrellis/src/main/java/org/openeo/sparklisteners/BatchJobProgressListener.scala new file mode 100644 index 000000000..f4583538e --- /dev/null +++ b/openeo-geotrellis/src/main/java/org/openeo/sparklisteners/BatchJobProgressListener.scala @@ -0,0 +1,66 @@ +package org.openeo.sparklisteners; + +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.scheduler.SparkListener; +import org.apache.spark.scheduler.SparkListenerStageCompleted; +import org.apache.spark.scheduler.SparkListenerStageSubmitted; +import org.apache.spark.util.AccumulatorV2; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.Traversable; +import scala.collection.mutable.Map; + +import java.time.Duration; + +object BatchJobProgressListener { + + val logger = LoggerFactory.getLogger(BatchJobProgressListener.getClass) + +} + +class BatchJobProgressListener extends SparkListener { + + import BatchJobProgressListener.logger + + override def onStageSubmitted( stageSubmitted:SparkListenerStageSubmitted):Unit = { + logger.info("Starting part of the process graph: " + stageSubmitted.stageInfo.name) + } + + override def onStageCompleted( stageCompleted: SparkListenerStageCompleted):Unit = { + val taskMetrics = stageCompleted.stageInfo.taskMetrics + + if(stageCompleted.stageInfo.failureReason.isDefined){ + val message = + f""" + |"A part of the process graph failed, and will be retried, the reason was: ${stageCompleted.stageInfo.failureReason.get} + |"Your job may still complete if the failure was caused by a transient error, but will take more time. A common cause of transient errors is too little executor memory (overhead). Too low executor-memory can be seen by a high 'garbage collection' time, which was: ${Duration.ofMillis(taskMetrics.jvmGCTime).toSeconds/1000.0} seconds." + |""".stripMargin + logger.warn(message); + + }else{ + val duration = Duration.ofMillis(taskMetrics.executorRunTime) + val timeString = if(duration.toSeconds>60) { + duration.toMinutes + " minutes" + } else { + duration.toMillis.toFloat / 1000.0 + " seconds" + } + val megabytes = taskMetrics.shuffleWriteMetrics.bytesWritten.toFloat/(1024.0*1024.0) + val name = stageCompleted.stageInfo.name + logger.info(f"Finished part ${stageCompleted.stageInfo.stageId} of the process graph: ${name}.\n The total computing time was: $timeString. It produced: $megabytes%.2f MB of data."); + + val accumulators = stageCompleted.stageInfo.accumulables; + val chunkCounts = accumulators.filter(_._2.name.get.startsWith("ChunkCount")); + if (chunkCounts.nonEmpty) { + val totalChunks = chunkCounts.head._2.value + val megapixel = totalChunks.get.asInstanceOf[Long] * 256 * 256 / (1024 * 1024) + if(taskMetrics.executorRunTime > 0) { + logger.info(f"load_collection: data was loaded with an average speed of: ${megapixel.toFloat/ duration.toSeconds.toFloat}%.3f Megapixel per second.") + }; + } + } + + + } + + +} diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/OpenEOProcessScriptBuilder.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/OpenEOProcessScriptBuilder.scala index faff24ec5..d45757928 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/OpenEOProcessScriptBuilder.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/OpenEOProcessScriptBuilder.scala @@ -6,6 +6,7 @@ import geotrellis.raster.mapalgebra.local._ import geotrellis.raster.{ArrayTile, BitCellType, ByteUserDefinedNoDataCellType, CellType, Dimensions, DoubleConstantNoDataCellType, DoubleConstantTile, FloatConstantNoDataCellType, FloatConstantTile, IntConstantNoDataCellType, IntConstantTile, MultibandTile, MutableArrayTile, NODATA, ShortConstantNoDataCellType, ShortConstantTile, Tile, UByteCells, UByteConstantTile, UByteUserDefinedNoDataCellType, UShortCells, UShortUserDefinedNoDataCellType, isData, isNoData} import org.apache.commons.math3.exception.NotANumberException import org.apache.commons.math3.stat.descriptive.rank.Percentile +import org.apache.commons.math3.stat.descriptive.rank.Percentile.EstimationType import org.apache.commons.math3.stat.ranking.NaNStrategy import org.apache.spark.ml import org.apache.spark.mllib.linalg @@ -1322,9 +1323,9 @@ class OpenEOProcessScriptBuilder { multibandMapToNewTiles(MultibandTile(data),ts => { val p = if(ignoreNoData){ - new Percentile() + new Percentile().withEstimationType(EstimationType.R_7) }else{ - new Percentile().withNaNStrategy(NaNStrategy.FAILED) + new Percentile().withEstimationType(EstimationType.R_7).withNaNStrategy(NaNStrategy.FAILED) } p.setData(ts.toArray) diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/OpenEOProcesses.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/OpenEOProcesses.scala index c87df32d0..52a433ecf 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/OpenEOProcesses.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/OpenEOProcesses.scala @@ -27,7 +27,7 @@ import org.apache.spark.{Partitioner, SparkContext} import org.openeo.geotrellis.OpenEOProcessScriptBuilder.{MaxIgnoreNoData, MinIgnoreNoData, OpenEOProcess} import org.openeo.geotrellis.focal._ import org.openeo.geotrellis.netcdf.NetCDFRDDWriter.ContextSeq -import org.openeo.geotrelliscommon.{ByTileSpacetimePartitioner, ByTileSpatialPartitioner, DatacubeSupport, FFTConvolve, OpenEORasterCube, OpenEORasterCubeMetadata, SCLConvolutionFilter, SpaceTimeByMonthPartitioner, SparseSpaceOnlyPartitioner, SparseSpaceTimePartitioner, SparseSpatialPartitioner} +import org.openeo.geotrelliscommon.{ByTileSpacetimePartitioner, ByTileSpatialPartitioner, ConfigurableSpaceTimePartitioner, DatacubeSupport, FFTConvolve, OpenEORasterCube, OpenEORasterCubeMetadata, SCLConvolutionFilter, SpaceTimeByMonthPartitioner, SparseSpaceOnlyPartitioner, SparseSpaceTimePartitioner, SparseSpatialPartitioner} import org.slf4j.LoggerFactory import java.io.File @@ -132,23 +132,23 @@ class OpenEOProcesses extends Serializable { * @return */ def applyTimeDimension(datacube:MultibandTileLayerRDD[SpaceTimeKey], scriptBuilder:OpenEOProcessScriptBuilder,context: java.util.Map[String,Any]):MultibandTileLayerRDD[SpaceTimeKey] = { - val rdd = transformTimeDimension[SpaceTimeKey](datacube, scriptBuilder, context) - if(datacube.partitioner.isDefined) { - ContextRDD(rdd.partitionBy(datacube.partitioner.get),datacube.metadata.copy(cellType = scriptBuilder.getOutputCellType())) - }else{ - ContextRDD(rdd,datacube.metadata.copy(cellType = scriptBuilder.getOutputCellType())) + datacube.context.setCallSite(s"apply_dimension target='t' ") + try{ + val rdd = transformTimeDimension[SpaceTimeKey](datacube, scriptBuilder, context) + if(datacube.partitioner.isDefined) { + ContextRDD(rdd.partitionBy(datacube.partitioner.get),datacube.metadata.copy(cellType = scriptBuilder.getOutputCellType())) + }else{ + ContextRDD(rdd,datacube.metadata.copy(cellType = scriptBuilder.getOutputCellType())) + } + }finally{ + datacube.context.clearCallSite() } + } - private def transformTimeDimension[KT](datacube: MultibandTileLayerRDD[SpaceTimeKey], scriptBuilder: OpenEOProcessScriptBuilder, context: util.Map[String, Any], reduce:Boolean=false) = { + private def transformTimeDimension[KT](datacube: MultibandTileLayerRDD[SpaceTimeKey], scriptBuilder: OpenEOProcessScriptBuilder, context: util.Map[String, Any], reduce:Boolean=false): RDD[(KT, MultibandTile)] = { + - val index: Option[PartitionerIndex[SpaceTimeKey]] = - if (datacube.partitioner.isDefined && datacube.partitioner.get.isInstanceOf[SpacePartitioner[SpaceTimeKey]]) { - Some(datacube.partitioner.get.asInstanceOf[SpacePartitioner[SpaceTimeKey]].index) - } else { - None - } - logger.info(s"Applying callback on time dimension of cube with partitioner: ${datacube.partitioner.getOrElse("no partitioner")} - index: ${index.getOrElse("no index")} and metadata ${datacube.metadata}") val expectedCellType = datacube.metadata.cellType val applyToTimeseries: Iterable[(SpaceTimeKey, MultibandTile)] => Map[KT, MultibandTile] = if(reduce){ @@ -161,28 +161,41 @@ class OpenEOProcesses extends Serializable { createTemporalCallback(scriptBuilder.inputFunction.asInstanceOf[OpenEOProcess], context.asScala.toMap, expectedCellType).asInstanceOf[Iterable[(SpaceTimeKey, MultibandTile)] => Map[KT,MultibandTile]] } - val rdd: RDD[(SpaceTimeKey, MultibandTile)] = - if (index.isDefined && index.get.isInstanceOf[SparseSpaceOnlyPartitioner]) { - datacube - } else { - val keys: Option[Array[SpaceTimeKey]] = findPartitionerKeys(datacube) - val spatiallyGroupingIndex = - if(keys.isDefined){ - new SparseSpaceOnlyPartitioner(keys.get.map(SparseSpaceOnlyPartitioner.toIndex(_, indexReduction = 0)).distinct.sorted, 0, keys) - }else{ - ByTileSpacetimePartitioner - } - logger.info(f"Regrouping data cube along the time dimension, with index $spatiallyGroupingIndex. Cube metadata: ${datacube.metadata}") - val partitioner: Partitioner = new SpacePartitioner(datacube.metadata.bounds)(implicitly, implicitly, spatiallyGroupingIndex) - //regular partitionBy doesn't work because Partitioners appear to be equal while they're not - new ShuffledRDD[SpaceTimeKey,MultibandTile,MultibandTile](datacube, partitioner) - } - rdd.mapPartitions(p => { - val bySpatialKey: Map[SpatialKey, Seq[(SpaceTimeKey, MultibandTile)]] = p.toSeq.groupBy(_._1.spatialKey) - bySpatialKey.mapValues(applyToTimeseries).flatMap(_._2).iterator - }, preservesPartitioning = reduce) + transformTimeDimension(datacube,applyToTimeseries,reduce) } + private def transformTimeDimension[KT](datacube: MultibandTileLayerRDD[SpaceTimeKey],applyToTimeseries: Iterable[(SpaceTimeKey, MultibandTile)] => Map[KT, MultibandTile], reduce:Boolean ): RDD[(KT, MultibandTile)] = { + + + val index: Option[PartitionerIndex[SpaceTimeKey]] = + if (datacube.partitioner.isDefined && datacube.partitioner.get.isInstanceOf[SpacePartitioner[SpaceTimeKey]]) { + Some(datacube.partitioner.get.asInstanceOf[SpacePartitioner[SpaceTimeKey]].index) + } else { + None + } + logger.info(s"Applying callback on time dimension of cube with partitioner: ${datacube.partitioner.getOrElse("no partitioner")} - index: ${index.getOrElse("no index")} and metadata ${datacube.metadata}") + val rdd: RDD[(SpaceTimeKey, MultibandTile)] = + if (index.isDefined && (index.get.isInstanceOf[SparseSpaceOnlyPartitioner] || index.get == ByTileSpacetimePartitioner )) { + datacube + } else { + val keys: Option[Array[SpaceTimeKey]] = findPartitionerKeys(datacube) + val spatiallyGroupingIndex = + if(keys.isDefined){ + new SparseSpaceOnlyPartitioner(keys.get.map(SparseSpaceOnlyPartitioner.toIndex(_, indexReduction = 0)).distinct.sorted, 0, keys) + }else{ + ByTileSpacetimePartitioner + } + logger.info(f"Regrouping data cube along the time dimension, with index $spatiallyGroupingIndex. Cube metadata: ${datacube.metadata}") + val partitioner: Partitioner = new SpacePartitioner(datacube.metadata.bounds)(implicitly, implicitly, spatiallyGroupingIndex) + //regular partitionBy doesn't work because Partitioners appear to be equal while they're not + new ShuffledRDD[SpaceTimeKey,MultibandTile,MultibandTile](datacube, partitioner) + } + rdd.mapPartitions(p => { + val bySpatialKey: Map[SpatialKey, Seq[(SpaceTimeKey, MultibandTile)]] = p.toSeq.groupBy(_._1.spatialKey) + bySpatialKey.mapValues(applyToTimeseries).flatMap(_._2).iterator + }, preservesPartitioning = reduce) + } + /** @@ -199,23 +212,27 @@ class OpenEOProcesses extends Serializable { val function = scriptBuilder.inputFunction.asInstanceOf[OpenEOProcess] val currentTileSize = datacube.metadata.tileLayout.tileSize var tileSize = context.getOrDefault("TileSize",0).asInstanceOf[Int] - if(currentTileSize>=512 && tileSize==0) { + if(currentTileSize>=512*512 && tileSize==0) { tileSize = 128//right value here depends on how many bands we're going to create, but can be a high number } + val index = if (datacube.partitioner.isDefined && datacube.partitioner.get.isInstanceOf[SpacePartitioner[SpaceTimeKey]]) { + datacube.partitioner.get.asInstanceOf[SpacePartitioner[SpaceTimeKey]].index + } + SparkContext.getOrCreate().setCallSite(s"apply_dimension target='bands' TileSize: $tileSize Input index: $index ") + val retiled = if (tileSize > 0 && tileSize <= 1024) { - val theResult = retile(datacube, tileSize, tileSize, 0, 0) + val theResult = retileGeneric(datacube, tileSize, tileSize, 0, 0) theResult } else { datacube } - val groupedOnTime: RDD[(SpatialKey, Iterable[(SpaceTimeKey, MultibandTile)])] = groupOnTimeDimension(retiled) val outputCelltype = scriptBuilder.getOutputCellType() - - val resultRDD: RDD[(SpatialKey, MultibandTile)] = groupedOnTime.mapValues{ tiles => { + val applyToTimeseries: Iterable[(SpaceTimeKey, MultibandTile)] => Map[SpatialKey, MultibandTile] = tiles => { val aTile = firstTile(tiles.map(_._2)) + val theKey = tiles.head._1.spatialKey val labels = tiles.map(_._1).toList.sortBy(_.instant) val theContext = context.asScala.toMap + ("array_labels" -> labels.map(_.time.format(DateTimeFormatter.ISO_INSTANT))) @@ -234,15 +251,30 @@ class OpenEOProcesses extends Serializable { val resultTile = result.flatMap(_._2) if(resultTile.nonEmpty) { - MultibandTile(resultTile) + Map( theKey -> MultibandTile(resultTile) ) }else{ // Note: Is this code ever reached? aTile.bandCount is always > 0. - new EmptyMultibandTile(aTile.cols,aTile.rows,outputCelltype) + Map( theKey -> new EmptyMultibandTile(aTile.cols,aTile.rows,outputCelltype) ) } - }} + } + + val resultRDD = transformTimeDimension(retiled,applyToTimeseries,reduce = false) + + val newBounds = retiled.metadata.bounds.asInstanceOf[KeyBounds[SpaceTimeKey]].toSpatial + + val keys = findPartitionerKeys(datacube) + val spatiallyGroupingIndex = + if(keys.isDefined){ + val spatialKeys: Array[SpatialKey] = keys.get.map(_.spatialKey).distinct + new SparseSpatialPartitioner(spatialKeys.map(ByTileSpatialPartitioner.toIndex).distinct.sorted, 0, Some(spatialKeys)) + }else{ + ByTileSpatialPartitioner + } + val partitioner: Partitioner = new SpacePartitioner(newBounds)(implicitly, implicitly, spatiallyGroupingIndex) - ContextRDD(resultRDD,retiled.metadata.copy(bounds = retiled.metadata.bounds.asInstanceOf[KeyBounds[SpaceTimeKey]].toSpatial,cellType = scriptBuilder.getOutputCellType())) + SparkContext.getOrCreate().clearCallSite() + ContextRDD(resultRDD.partitionBy(partitioner),retiled.metadata.copy(bounds = newBounds,cellType = outputCelltype)) } private def groupOnTimeDimension(datacube: MultibandTileLayerRDD[SpaceTimeKey]) = { @@ -434,7 +466,11 @@ class OpenEOProcesses extends Serializable { logger.info(s"aggregate_temporal results in ${allPossibleSpacetime.size} keys, using partitioner index: ${index} with bounds ${newBounds}" ) val partitioner: SpacePartitioner[SpaceTimeKey] = SpacePartitioner[SpaceTimeKey](newBounds)(implicitly,implicitly, index) - val allKeysRDD: RDD[(SpaceTimeKey, Null)] = SparkContext.getOrCreate().parallelize(allPossibleSpacetime) + + val sc = SparkContext.getOrCreate() + sc.setCallSite("aggregate_temporal") + val allKeysRDD: RDD[(SpaceTimeKey, Null)] = sc.parallelize(allPossibleSpacetime) + sc.clearCallSite() def mapToNewKey(tuple: (SpaceTimeKey, MultibandTile)): Seq[(SpaceTimeKey, (SpaceTimeKey,MultibandTile))] = { val instant = tuple._1.time.toInstant @@ -468,7 +504,7 @@ class OpenEOProcesses extends Serializable { } - + sc.setCallSite("aggregate_temporal") val tilesByInterval: RDD[(SpaceTimeKey, MultibandTile)] = if(reduce) { if(datacube.partitioner.isDefined && datacube.partitioner.get.isInstanceOf[SpacePartitioner[SpaceTimeKey]] && datacube.partitioner.get.asInstanceOf[SpacePartitioner[SpaceTimeKey]].index.isInstanceOf[SparseSpaceOnlyPartitioner]) { @@ -506,7 +542,10 @@ class OpenEOProcesses extends Serializable { } } val metadata = if(reduce) datacube.metadata.copy(bounds = newBounds,cellType = outputCellType) else datacube.metadata.copy(cellType = outputCellType) - return ContextRDD(filledRDD, metadata) + sc.clearCallSite() + val aggregatedCube = ContextRDD(filledRDD, metadata) + aggregatedCube.name = "aggregate_temporal result" + return aggregatedCube } def mapBands(datacube:MultibandTileLayerRDD[SpaceTimeKey], scriptBuilder:OpenEOProcessScriptBuilder, context: java.util.Map[String,Any] = new util.HashMap[String, Any]()): RDD[(SpaceTimeKey, MultibandTile)] with Metadata[TileLayerMetadata[SpaceTimeKey]]= { @@ -790,7 +829,21 @@ class OpenEOProcesses extends Serializable { logger.info(s"resample_cube_spatial: No resampling required for cube: ${data.metadata}") (0,data) }else{ - val reprojected = org.openeo.geotrellis.reproject.TileRDDReproject(data, target.metadata.crs, Right(target.metadata.layout), 16, method, target.partitioner) + //construct a partitioner that is compatible with data cube + val targetPartitioner = + if(target.partitioner.isDefined && target.partitioner.get.isInstanceOf[SpacePartitioner[SpaceTimeKey]]) { + val index = target.partitioner.get.asInstanceOf[SpacePartitioner[SpaceTimeKey]].index + val theIndex = index match { + case partitioner: SparseSpaceTimePartitioner => + new ConfigurableSpaceTimePartitioner(partitioner.indexReduction) + case _ => + index + } + Some(SpacePartitioner[SpaceTimeKey](target.metadata.bounds)(implicitly,implicitly,theIndex)) + }else{ + target.partitioner + } + val reprojected = org.openeo.geotrellis.reproject.TileRDDReproject(data, target.metadata.crs, Right(target.metadata.layout), 16, method, targetPartitioner) filterNegativeSpatialKeys(reprojected) } } @@ -822,6 +875,7 @@ class OpenEOProcesses extends Serializable { } def mergeCubes_SpaceTime_Spatial(leftCube: MultibandTileLayerRDD[SpaceTimeKey], rightCube: MultibandTileLayerRDD[SpatialKey], operator:String, swapOperands:Boolean): ContextRDD[SpaceTimeKey, MultibandTile, TileLayerMetadata[SpaceTimeKey]] = { + leftCube.sparkContext.setCallSite("merge_cubes - (x,y,bands,t) + (x,y,bands)") val resampled = resampleCubeSpatial_spatial(rightCube,leftCube.metadata.crs,leftCube.metadata.layout,ResampleMethods.NearestNeighbor,rightCube.partitioner.orNull)._2 checkMetadataCompatible(leftCube.metadata,resampled.metadata) val rdd = new SpatialToSpacetimeJoinRdd[MultibandTile](leftCube, resampled) @@ -865,6 +919,7 @@ class OpenEOProcesses extends Serializable { } def mergeSpatialCubes(leftCube: MultibandTileLayerRDD[SpatialKey], rightCube: MultibandTileLayerRDD[SpatialKey], operator:String): ContextRDD[SpatialKey, MultibandTile, TileLayerMetadata[SpatialKey]] = { + leftCube.sparkContext.setCallSite("merge_cubes - (x,y,bands)") val resampled = resampleCubeSpatial_spatial(rightCube,leftCube.metadata.crs,leftCube.metadata.layout,NearestNeighbor,leftCube.partitioner.orNull)._2 checkMetadataCompatible(leftCube.metadata,resampled.metadata) val joined = outerJoin(leftCube,resampled) @@ -874,6 +929,7 @@ class OpenEOProcesses extends Serializable { } def mergeCubes(leftCube: MultibandTileLayerRDD[SpaceTimeKey], rightCube: MultibandTileLayerRDD[SpaceTimeKey], operator:String): ContextRDD[SpaceTimeKey, MultibandTile, TileLayerMetadata[SpaceTimeKey]] = { + leftCube.sparkContext.setCallSite("merge_cubes - (x,y,bands,t)") val resampled = resampleCubeSpatial(rightCube,leftCube,NearestNeighbor)._2 checkMetadataCompatible(leftCube.metadata,resampled.metadata) val joined = outerJoin(leftCube,resampled) @@ -938,7 +994,18 @@ class OpenEOProcesses extends Serializable { } - def retile(datacube: MultibandTileLayerRDD[SpaceTimeKey], sizeX:Int, sizeY:Int, overlapX:Int, overlapY:Int): MultibandTileLayerRDD[SpaceTimeKey] = { + def retile(datacube: Object, sizeX:Int, sizeY:Int, overlapX:Int, overlapY:Int): Object = { + + datacube match { + case rdd1 if datacube.asInstanceOf[MultibandTileLayerRDD[SpatialKey]].metadata.bounds.get.maxKey.isInstanceOf[SpatialKey] => + retileGeneric(rdd1.asInstanceOf[MultibandTileLayerRDD[SpatialKey]], sizeX, sizeY, overlapX, overlapY) + case rdd2 if datacube.asInstanceOf[MultibandTileLayerRDD[SpaceTimeKey]].metadata.bounds.get.maxKey.isInstanceOf[SpaceTimeKey] => + retileGeneric(rdd2.asInstanceOf[MultibandTileLayerRDD[SpaceTimeKey]], sizeX, sizeY, overlapX, overlapY) + case _ => throw new IllegalArgumentException(s"Unsupported rdd type to retile: ${datacube}") + } + } + def retileGeneric[K: SpatialComponent: ClassTag + ](datacube: MultibandTileLayerRDD[K], sizeX:Int, sizeY:Int, overlapX:Int, overlapY:Int): MultibandTileLayerRDD[K] = { val regridded = if(sizeX >0 && sizeY > 0){ RegridFixed(filterNegativeSpatialKeys(datacube),sizeX,sizeY) diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/file/FileRDDFactory.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/file/FileRDDFactory.scala index a5fda4aae..c3ffb8014 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/file/FileRDDFactory.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/file/FileRDDFactory.scala @@ -8,12 +8,14 @@ import geotrellis.vector._ import org.apache.spark.SparkContext import org.apache.spark.api.java.JavaRDD import org.apache.spark.rdd.RDD -import org.openeo.geotrellis.ProjectedPolygons +import org.openeo.geotrellis.OpenEOProcessScriptBuilder.AnyProcess +import org.openeo.geotrellis.{OpenEOProcessScriptBuilder, ProjectedPolygons} import org.openeo.geotrelliscommon.DatacubeSupport.layerMetadata import org.openeo.geotrelliscommon.{BatchJobMetadataTracker, DataCubeParameters, DatacubeSupport} import org.openeo.opensearch.OpenSearchClient import org.openeo.opensearch.OpenSearchResponses.{Feature, Link} import org.openeo.opensearch.backends.{CreodiasClient, OscarsClient} +import org.slf4j.LoggerFactory import java.net.URL import java.time.{LocalTime, ZonedDateTime} @@ -27,7 +29,7 @@ import scala.collection.JavaConverters._ class FileRDDFactory(openSearch: OpenSearchClient, openSearchCollectionId: String, attributeValues: util.Map[String, Any], correlationId: String, val maxSpatialResolution: CellSize) { - + import FileRDDFactory._ def this(openSearch: OpenSearchClient, openSearchCollectionId: String, openSearchLinkTitles: util.List[String], attributeValues: util.Map[String, Any], correlationId: String) = this(openSearch, openSearchCollectionId, attributeValues, correlationId, maxSpatialResolution = CellSize(10, 10)) @@ -63,7 +65,17 @@ class FileRDDFactory(openSearch: OpenSearchClient, openSearchCollectionId: Strin val boundingBox = ProjectedExtent(bbox, polygons.crs) //load product metadata from OpenSearch - val productMetadata: Seq[Feature] = getFeatures(boundingBox, from, to, zoom,sc) + var productMetadata: Seq[Feature] = getFeatures(boundingBox, from, to, zoom,sc) + + val filter = Option(dataCubeParameters).map(_.timeDimensionFilter) + if (filter.isDefined && filter.get.isDefined) { + val condition = filter.get.get.asInstanceOf[OpenEOProcessScriptBuilder].inputFunction.asInstanceOf[AnyProcess] + //TODO how do we pass in user context + val before = productMetadata.map(_.nominalDate).toSet + productMetadata = productMetadata.filter(f => condition.apply(Map("value" -> f.nominalDate)).apply(f.nominalDate).asInstanceOf[Boolean]) + val after = productMetadata.map(_.nominalDate).toSet + logger.info("Dates removed by timeDimensionFilter: " + (before -- after).mkString(",")) + } //construct layer metadata //hardcoded celltype of float: assuming we will generate floats in further processing @@ -127,7 +139,8 @@ class FileRDDFactory(openSearch: OpenSearchClient, openSearchCollectionId: Strin } object FileRDDFactory { - + // Ignore trailing $'s in the class names for Scala objects + private implicit val logger = LoggerFactory.getLogger(this.getClass.getName.stripSuffix("$")) def oscars(openSearchCollectionId: String, openSearchLinkTitles: util.List[String], attributeValues: util.Map[String, Any] = util.Collections.emptyMap(), correlationId: String = ""): FileRDDFactory = { val openSearch: OpenSearchClient = new OscarsClient(new URL("https://services.terrascope.be/catalogue")) new FileRDDFactory(openSearch, openSearchCollectionId, openSearchLinkTitles, attributeValues, correlationId = correlationId) diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/file/PyramidFactory.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/file/PyramidFactory.scala index 7182a04fb..34d4778d9 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/file/PyramidFactory.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/file/PyramidFactory.scala @@ -43,7 +43,9 @@ class PyramidFactory(openSearchClient: OpenSearchClient, openSearchLinkTitles: util.List[String], rootPath: String, maxSpatialResolution: CellSize, - experimental: Boolean = false) { + experimental: Boolean = false, + maxSoftErrorsRatio: Double = 0.0, + ) { require(openSearchLinkTitles.size() > 0) import PyramidFactory._ @@ -75,7 +77,8 @@ class PyramidFactory(openSearchClient: OpenSearchClient, metadataProperties, layoutScheme, correlationId = correlationId, - experimental = experimental + experimental = experimental, + maxSoftErrorsRatio = maxSoftErrorsRatio, ) def datacube_seq(polygons:ProjectedPolygons, from_date: String, to_date: String, diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/GTiffOptions.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/GTiffOptions.scala index 403264457..dc98d7a89 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/GTiffOptions.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/GTiffOptions.scala @@ -13,9 +13,12 @@ class GTiffOptions extends Serializable { var tags: Tags = Tags.empty var overviews:String = "OFF" var resampleMethod:String = "near" + var separateAssetPerBand = false def setFilenamePrefix(name: String): Unit = this.filenamePrefix = name + def setSeparateAssetPerBand(value: Boolean): Unit = this.separateAssetPerBand = value + def setColorMap(colors: util.ArrayList[Int]): Unit = { colorMap = Some(new IndexedColorMap(colors.asScala)) } @@ -56,5 +59,23 @@ class GTiffOptions extends Serializable { tags = Tags(tags.headTags ,newBandTags.toList) } + def setBandTags(newBandTags: List[Map[String, String]]): Unit = { + tags = Tags(tags.headTags, newBandTags) + } + /** + * Avoids error when using .clone(): + * "method clone in class Object cannot be accessed in org.openeo.geotrellis.geotiff.GTiffOptions" + */ + def deepClone(): GTiffOptions = { + // https://www.avajava.com/tutorials/lessons/how-do-i-perform-a-deep-clone-using-serializable.html + // TODO: Check for a better implementation + val baos = new java.io.ByteArrayOutputStream() + val oos = new java.io.ObjectOutputStream(baos) + oos.writeObject(this) + oos.close() + val bais = new java.io.ByteArrayInputStream(baos.toByteArray()) + val ois = new java.io.ObjectInputStream(bais) + ois.readObject().asInstanceOf[GTiffOptions] + } } diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala index 38ead77aa..dabd20d2f 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala @@ -84,6 +84,22 @@ package object geotiff { }) } + + def saveRDDTemporal(rdd: MultibandTileLayerRDD[SpaceTimeKey], + path: String, + zLevel: Int = 6, + cropBounds: Option[Extent] = Option.empty[Extent], + formatOptions: GTiffOptions = new GTiffOptions + ): java.util.List[(String, String, Extent)] = { + val ret = saveRDDTemporalAllowAssetPerBand(rdd, path, zLevel, cropBounds, formatOptions).asScala + logger.warn("Calling backwards compatibility version for saveRDDTemporalConsiderAssetPerBand") + // val duplicates = ret.groupBy(_._2).filter(_._2.size > 1) + // if (duplicates.nonEmpty) { + // throw new Exception(s"Multiple returned files with same timestamp: ${duplicates.keys.mkString(", ")}") + // } + ret.map(t => (t._1, t._2, t._3)).asJava + } + /** * Save temporal rdd, on the executors * @@ -92,7 +108,13 @@ package object geotiff { * @param zLevel * @param cropBounds */ - def saveRDDTemporal(rdd:MultibandTileLayerRDD[SpaceTimeKey], path:String,zLevel:Int=6,cropBounds:Option[Extent]=Option.empty[Extent], formatOptions:GTiffOptions = new GTiffOptions): java.util.List[(String, String, Extent)] = { + //noinspection ScalaWeakerAccess + def saveRDDTemporalAllowAssetPerBand(rdd: MultibandTileLayerRDD[SpaceTimeKey], + path: String, + zLevel: Int = 6, + cropBounds: Option[Extent] = Option.empty[Extent], + formatOptions: GTiffOptions = new GTiffOptions + ): java.util.List[(String, String, Extent, java.util.List[Int])] = { val preProcessResult: (GridBounds[Int], Extent, RDD[(SpaceTimeKey, MultibandTile)] with Metadata[TileLayerMetadata[SpaceTimeKey]]) = preProcess(rdd,cropBounds) val gridBounds: GridBounds[Int] = preProcessResult._1 val croppedExtent: Extent = preProcessResult._2 @@ -107,54 +129,126 @@ package object geotiff { val compression = Deflate(zLevel) val bandSegmentCount = totalCols * totalRows + val bandLabels = formatOptions.tags.bandTags.map(_("DESCRIPTION")) - preprocessedRdd.map { case (key: SpaceTimeKey, multibandTile: MultibandTile) => + preprocessedRdd.flatMap { case (key: SpaceTimeKey, multibandTile: MultibandTile) => var bandIndex = -1 //Warning: for deflate compression, the segmentcount and index is not really used, making it stateless. //Not sure how this works out for other types of compression!!! val theCompressor = compression.createCompressor(multibandTile.bandCount) - (key, multibandTile.bands.map { + multibandTile.bands.map { tile => bandIndex += 1 val layoutCol = key.getComponent[SpatialKey]._1 val layoutRow = key.getComponent[SpatialKey]._2 - val bandSegmentOffset = bandSegmentCount * bandIndex + val bandSegmentOffset = bandSegmentCount * (if (formatOptions.separateAssetPerBand) 0 else bandIndex) val index = totalCols * layoutRow + layoutCol + bandSegmentOffset //tiff format seems to require that we provide 'full' tiles val bytes = raster.CroppedTile(tile, raster.GridBounds(0, 0, tileLayout.tileCols - 1, tileLayout.tileRows - 1)).toBytes() val compressedBytes = theCompressor.compress(bytes, 0) - (index, (multibandTile.cellType, compressedBytes)) - }) - }.map(tuple => { - val isDays = Duration.between(fixedTimeOffset, tuple._1.time).getSeconds % secondsPerDay == 0 - val filename = if(isDays) { - s"${formatOptions.filenamePrefix}_${DateTimeFormatter.ISO_DATE.format(tuple._1.time)}.tif" - } else{ - // ':' is not valid in a Windows filename - s"${formatOptions.filenamePrefix}_${DateTimeFormatter.ISO_ZONED_DATE_TIME.format(tuple._1.time).replace(":", "").replace("-","")}.tif" - } - - - val timestamp = tuple._1.time format DateTimeFormatter.ISO_ZONED_DATE_TIME - ((filename, timestamp), tuple._2) - }).groupByKey().map((tuple: ((String, String), Iterable[Vector[(Int, (CellType, Array[Byte]))]])) => { - val detectedBandCount = tuple._2.map(_.size).max - val segments: Iterable[(Int, (CellType, Array[Byte]))] = tuple._2.flatten - val cellTypes = segments.map(_._2._1).toSet - val tiffs: Predef.Map[Int, Array[Byte]] = segments.map(tuple => (tuple._1, tuple._2._2)).toMap - - val segmentCount = (bandSegmentCount*detectedBandCount) - val thePath = Paths.get(path).resolve(tuple._1._1).toString - val correctedPath = writeTiff( thePath ,tiffs, gridBounds, croppedExtent, preprocessedRdd.metadata.crs, tileLayout, compression, cellTypes.head, detectedBandCount, segmentCount,formatOptions) - val (_, timestamp) = tuple._1 - (correctedPath, timestamp, croppedExtent) - }).collect().toList.asJava + + val isDays = Duration.between(fixedTimeOffset, key.time).getSeconds % secondsPerDay == 0 + val timePieceSlug = if (isDays) { + "_" + DateTimeFormatter.ISO_DATE.format(key.time) + } else { + // ':' is not valid in a Windows filename + "_" + DateTimeFormatter.ISO_ZONED_DATE_TIME.format(key.time).replace(":", "").replace("-", "") + } + // TODO: Get band names from metadata? + val bandPiece = if (formatOptions.separateAssetPerBand) "_" + bandLabels(bandIndex) else "" + //noinspection RedundantBlock + val filename = s"${formatOptions.filenamePrefix}${timePieceSlug}${bandPiece}.tif" + + val timestamp = DateTimeFormatter.ISO_ZONED_DATE_TIME.format(key.time) + val tiffBands = if (formatOptions.separateAssetPerBand) 1 else multibandTile.bandCount + ((filename, timestamp, tiffBands), (index, (multibandTile.cellType, compressedBytes), bandIndex)) + } + }.groupByKey().map { case ((filename: String, timestamp: String, tiffBands:Int), sequence) => + val cellTypes = sequence.map(_._2._1).toSet + val tiffs: Predef.Map[Int, Array[Byte]] = sequence.map(tuple => (tuple._1, tuple._2._2)).toMap + val bandIndices = sequence.map(_._3).toSet.toList.asJava + + val segmentCount = bandSegmentCount * tiffBands + val thePath = Paths.get(path).resolve(filename).toString + + // filter band tags that match bandIndices + val fo = formatOptions.deepClone() + val newBandTags = formatOptions.tags.bandTags.zipWithIndex + .filter { case (_, bandIndex) => bandIndices.contains(bandIndex) } + .map { case (bandTags, _) => bandTags } + fo.setBandTags(newBandTags) + + val correctedPath = writeTiff(thePath, tiffs, gridBounds, croppedExtent, preprocessedRdd.metadata.crs, + tileLayout, compression, cellTypes.head, tiffBands, segmentCount, fo, + ) + (correctedPath, timestamp, croppedExtent, bandIndices) + }.collect().toList.asJava } - def saveRDD(rdd:MultibandTileLayerRDD[SpatialKey], bandCount:Int, path:String,zLevel:Int=6,cropBounds:Option[Extent]=Option.empty[Extent], formatOptions:GTiffOptions = new GTiffOptions):java.util.List[String] = { - saveRDDGeneric(rdd,bandCount, path, zLevel, cropBounds,formatOptions) + + def saveRDD(rdd: MultibandTileLayerRDD[SpatialKey], + bandCount: Int, + path: String, + zLevel: Int = 6, + cropBounds: Option[Extent] = Option.empty[Extent], + formatOptions: GTiffOptions = new GTiffOptions + ): java.util.List[String] = { + val tmp = saveRDDAllowAssetPerBand(rdd, bandCount, path, zLevel, cropBounds, formatOptions).asScala + logger.warn("Calling backwards compatibility version for saveRDDAllowAssetPerBand") + // if (tmp.size() > 1) { + // throw new Exception("Multiple returned files, probably meant to call saveRDDAllowAssetPerBand") + // } + tmp.map(_._1).asJava + } + + //noinspection ScalaWeakerAccess + def saveRDDAllowAssetPerBand(rdd: MultibandTileLayerRDD[SpatialKey], + bandCount: Int, + path: String, + zLevel: Int = 6, + cropBounds: Option[Extent] = Option.empty[Extent], + formatOptions: GTiffOptions = new GTiffOptions + ): java.util.List[(String, java.util.List[Int])] = { + if (formatOptions.separateAssetPerBand) { + val bandLabels = formatOptions.tags.bandTags.map(_("DESCRIPTION")) + val layout = rdd.metadata.layout + val crs = rdd.metadata.crs + val extent = rdd.metadata.extent + val compression = Deflate(zLevel) + + val rdd_per_band = rdd.flatMap { case (key: SpatialKey, multibandTile: MultibandTile) => + var bandIndex = -1 + multibandTile.bands.map { + tile => + bandIndex += 1 + val t = _root_.geotrellis.raster.MultibandTile(Seq(tile)) + val name = formatOptions.filenamePrefix + "_" + bandLabels(bandIndex) + ".tif" + ((name, bandIndex), (key, t)) + } + } + rdd_per_band.groupByKey().map { case ((name, bandIndex), tiles) => + val fixedPath = + if (path.endsWith("out")) { + path.substring(0, path.length - 3) + name + } + else { + path + } + + val fo = formatOptions.deepClone() + // Keep only one band tag + val newBandTags = List(formatOptions.tags.bandTags(bandIndex)) + fo.setBandTags(newBandTags) + + (stitchAndWriteToTiff(tiles, fixedPath, layout, crs, extent, None, None, compression, Some(fo)), + Collections.singletonList(bandIndex)) + }.collect().toList.sortBy(_._1).asJava + } else { + val tmp = saveRDDGeneric(rdd, bandCount, path, zLevel, cropBounds, formatOptions).asScala + tmp.map(t => (t, (0 until bandCount).toList.asJava)).asJava + } } def saveRDDTileGrid(rdd:MultibandTileLayerRDD[SpatialKey], bandCount:Int, path:String, tileGrid: String, zLevel:Int=6,cropBounds:Option[Extent]=Option.empty[Extent]) = { @@ -323,7 +417,7 @@ package object geotiff { } - private def getCompressedTiles[K: SpatialComponent : Boundable : ClassTag](preprocessedRdd: RDD[(K, MultibandTile)] with Metadata[TileLayerMetadata[K]],gridBounds: GridBounds[Int], compression: Compression) = { + private def getCompressedTiles[K: SpatialComponent : Boundable : ClassTag](preprocessedRdd: RDD[(K, MultibandTile)] with Metadata[TileLayerMetadata[K]],gridBounds: GridBounds[Int], compression: Compression): (collection.Map[Int, Array[Byte]], CellType, Double, Int) = { val tileLayout = preprocessedRdd.metadata.tileLayout val totalCols = math.ceil(gridBounds.width.toDouble / tileLayout.tileCols).toInt @@ -597,7 +691,11 @@ package object geotiff { .toList.asJava } - private def stitchAndWriteToTiff(tiles: Iterable[(SpatialKey, MultibandTile)], filePath: String, layout: LayoutDefinition, crs: CRS, geometry: Geometry, croppedExtent: Option[Extent], cropDimensions: Option[java.util.ArrayList[Int]], compression: Compression) = { + private def stitchAndWriteToTiff(tiles: Iterable[(SpatialKey, MultibandTile)], filePath: String, + layout: LayoutDefinition, crs: CRS, geometry: Geometry, + croppedExtent: Option[Extent], cropDimensions: Option[java.util.ArrayList[Int]], + compression: Compression, formatOptions: Option[GTiffOptions] = None + ) = { val raster: Raster[MultibandTile] = ContextSeq(tiles, layout).stitch() val re = raster.rasterExtent @@ -624,9 +722,22 @@ package object geotiff { resampled } - val geotiff = MultibandGeoTiff(adjusted, crs, GeoTiffOptions(compression)) - .withOverviews(NearestNeighbor, List(4, 8, 16)) - + val fo = formatOptions match { + case Some(fo) => fo + case None => + val fo = new GTiffOptions() + // If no formatOptions was specified, the default was to generate pyramids + fo.overviews = "ALL" + fo + } + var geotiff = MultibandGeoTiff(adjusted.tile, adjusted.extent, crs, + fo.tags, GeoTiffOptions(compression)) + val gridBounds = adjusted.extent + if (fo.overviews.toUpperCase == "ALL" || + fo.overviews.toUpperCase == "AUTO" && (gridBounds.width > 1024 || gridBounds.height > 1024) + ) { + geotiff = geotiff.withOverviews(NearestNeighbor, List(4, 8, 16)) + } writeGeoTiff(geotiff, filePath) } diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/layers/FileLayerProvider.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/layers/FileLayerProvider.scala index fb88b05bb..7f17c5f87 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/layers/FileLayerProvider.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/layers/FileLayerProvider.scala @@ -11,17 +11,19 @@ import geotrellis.raster.gdal.{GDALPath, GDALRasterSource, GDALWarpOptions} import geotrellis.raster.geotiff.{GeoTiffPath, GeoTiffRasterSource, GeoTiffReprojectRasterSource, GeoTiffResampleRasterSource} import geotrellis.raster.io.geotiff.OverviewStrategy import geotrellis.raster.rasterize.Rasterizer -import geotrellis.raster.{CellSize, CellType, ConvertTargetCellType, CroppedTile, FloatConstantNoDataCellType, FloatConstantTile, GridBounds, GridExtent, MosaicRasterSource, MultibandTile, NoNoData, PaddedTile, Raster, RasterExtent, RasterMetadata, RasterRegion, RasterSource, ResampleMethod, ResampleTarget, ShortConstantNoDataCellType, SourceName, TargetAlignment, TargetCellType, TargetRegion, Tile, UByteUserDefinedNoDataCellType, UShortConstantNoDataCellType} +import geotrellis.raster.{CellSize, CellType, ConvertTargetCellType, CropOptions, CroppedTile, FloatConstantNoDataCellType, FloatConstantTile, GridBounds, GridExtent, MosaicRasterSource, MultibandTile, NoNoData, PaddedTile, Raster, RasterExtent, RasterMetadata, RasterRegion, RasterSource, ResampleMethod, ResampleTarget, ShortConstantNoDataCellType, SourceName, TargetAlignment, TargetCellType, TargetRegion, Tile, UByteUserDefinedNoDataCellType, UShortConstantNoDataCellType} import geotrellis.spark._ import geotrellis.spark.clip.ClipToGrid import geotrellis.spark.clip.ClipToGrid.clipFeatureToExtent -import geotrellis.spark.join.VectorJoin +import geotrellis.spark.join.{SpatialJoin, VectorJoin} import geotrellis.spark.partition.SpacePartitioner import geotrellis.vector import geotrellis.vector.Extent.toPolygon import geotrellis.vector._ +import net.jodah.failsafe.{Failsafe, RetryPolicy} +import net.jodah.failsafe.event.ExecutionAttemptedEvent import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{CoGroupedRDD, RDD} import org.apache.spark.util.LongAccumulator import org.locationtech.jts.geom.Geometry import org.openeo.geotrellis.OpenEOProcessScriptBuilder.AnyProcess @@ -29,6 +31,9 @@ import org.openeo.geotrellis.file.{AbstractPyramidFactory, FixedFeaturesOpenSear import org.openeo.geotrellis.tile_grid.TileGrid import org.openeo.geotrellis.{OpenEOProcessScriptBuilder, healthCheckExtent, sortableSourceName} import org.openeo.geotrelliscommon.{BatchJobMetadataTracker, ByKeyPartitioner, CloudFilterStrategy, ConfigurableSpatialPartitioner, DataCubeParameters, DatacubeSupport, L1CCloudFilterStrategy, MaskTileLoader, NoCloudFilterStrategy, ResampledTile, SCLConvolutionFilterStrategy, SpaceTimeByMonthPartitioner, SparseSpaceTimePartitioner, autoUtmEpsg, retryForever} +import org.openeo.geotrellis.{OpenEOProcessScriptBuilder, sortableSourceName} +import org.openeo.geotrelliscommon.DatacubeSupport.prepareMask +import org.openeo.geotrelliscommon.{BatchJobMetadataTracker, ByKeyPartitioner, CloudFilterStrategy, ConfigurableSpatialPartitioner, DataCubeParameters, DatacubeSupport, L1CCloudFilterStrategy, MaskTileLoader, NoCloudFilterStrategy, ResampledTile, SCLConvolutionFilterStrategy, SpaceTimeByMonthPartitioner, SparseSpaceTimePartitioner, autoUtmEpsg} import org.openeo.opensearch.OpenSearchClient import org.openeo.opensearch.OpenSearchResponses.{Feature, Link} import org.slf4j.{Logger, LoggerFactory} @@ -37,10 +42,11 @@ import java.io.{IOException, Serializable} import java.net.URI import java.nio.file.{Path, Paths} import java.time._ -import java.time.temporal.ChronoUnit +import java.time.temporal.ChronoUnit.{DAYS, SECONDS} +import java.util import java.util.concurrent.TimeUnit +import scala.collection.GenSeq import scala.collection.JavaConverters._ -import scala.collection.parallel.immutable.{ParMap, ParSeq} import scala.reflect.ClassTag import scala.util.matching.Regex @@ -61,6 +67,19 @@ private class LayoutTileSourceFixed[K: SpatialComponent]( object BandCompositeRasterSource { private val logger = LoggerFactory.getLogger(classOf[BandCompositeRasterSource]) + + private def retryWithBackoff[R](maxAttempts: Int = 20, onAttemptFailed: Exception => Unit = _ => ())(f: => R): R = { + val retryPolicy = new RetryPolicy[R] + .handle(classOf[Exception]) // will otherwise retry Error + .withMaxAttempts(maxAttempts) + .withBackoff(1, 16, SECONDS) + .onFailedAttempt((attempt: ExecutionAttemptedEvent[R]) => + onAttemptFailed(attempt.getLastFailure.asInstanceOf[Exception])) + + Failsafe + .`with`(util.Collections.singletonList(retryPolicy)) + .get(f _) + } } @@ -70,20 +89,30 @@ class BandCompositeRasterSource(override val sources: NonEmptyList[RasterSource] override val crs: CRS, override val attributes: Map[String, String] = Map.empty, val predefinedExtent: Option[GridExtent[Long]] = None, + parallelRead: Boolean = true, + softErrors: Boolean = false, + readFullTile: Boolean = false ) extends MosaicRasterSource { // TODO: don't inherit? import BandCompositeRasterSource._ - private val maxRetries = sys.env.getOrElse("GDALREAD_MAXRETRIES", "10").toInt + private val maxRetries = sys.env.getOrElse("GDALREAD_MAXRETRIES", "20").toInt protected def reprojectedSources: NonEmptyList[RasterSource] = sources map { _.reproject(crs) } protected def reprojectedSources(bands: Seq[Int]): Seq[RasterSource] = { - val selectedBands = bands.map(sources.toList) + def reprojectRasterSourceAttemptFailed(source: RasterSource)(e: Exception): Unit = + logger.warn(s"attempt to reproject ${source.name} to $crs failed", e) - selectedBands map { rs => - try retryForever(Duration.ofSeconds(10), maxRetries)(rs.reproject(crs)) + val selectedBands = bands.map(sources.toList) + selectedBands flatMap { rs => + try Some(retryWithBackoff(maxRetries, reprojectRasterSourceAttemptFailed(rs))(rs.reproject(crs))) catch { - case e: Exception => throw new IOException(s"Error while reading: ${rs.name.toString}", e) + // reading the CRS from a GDALRasterSource can fail + case e: Exception => + if (softErrors) { + logger.warn(s"ignoring soft error for ${rs.name}", e) + None + } else throw new IOException(s"Error while reading: ${rs.name}", e) } } } @@ -101,22 +130,43 @@ class BandCompositeRasterSource(override val sources: NonEmptyList[RasterSource] override def name: SourceName = sources.head.name override def bandCount: Int = sources.size - override def readBounds(bounds: Traversable[GridBounds[Long]]): Iterator[Raster[MultibandTile]] = { + def readBoundsFullTile(bounds: Traversable[GridBounds[Long]]): Iterator[Raster[MultibandTile]] = { + var union = bounds.reduce(_ combine _) - val rastersByBounds = reprojectedSources.zipWithIndex.toList.flatMap(s => { - s._1.readBounds(bounds).zipWithIndex.map(raster_int => ((raster_int._2,(s._2,raster_int._1)))) - }).groupBy(_._1) - rastersByBounds.toSeq.sortBy(_._1).map(_._2).map((rasters) => { - val sortedRasters = rasters.toList.sortBy(_._2._1).map(_._2._2) - Raster(MultibandTile(sortedRasters.map(_.tile.band(0).convert(cellType))), sortedRasters.head.extent) - }).toIterator + // rastersource contract: do not read negative gridbounds + union = union.copy(colMin=math.max(union.colMin,0),rowMin=math.max(union.rowMin,0)) + + val fullRaster = read(union).get + val mappedBounds = bounds.map(b=> b.offset(-union.colMin,-union.rowMin).toGridType[Int]) + return mappedBounds.map(b => fullRaster.crop(b, CropOptions(force = true,clamp=true))).toIterator + + } + + override def readBounds(bounds: Traversable[GridBounds[Long]]): Iterator[Raster[MultibandTile]] = { + var union = bounds.reduce(_ combine _) + val percentageToRead = bounds.map(_.size).sum / union.size + if(percentageToRead> 0.5 && readFullTile){ + return readBoundsFullTile(bounds) + }else{ + val rastersByBounds = reprojectedSources.zipWithIndex.toList.flatMap(s => { + s._1.readBounds(bounds).zipWithIndex.map(raster_int => ((raster_int._2,(s._2,raster_int._1)))) + }).groupBy(_._1) + rastersByBounds.toSeq.sortBy(_._1).map(_._2).map((rasters) => { + val sortedRasters = rasters.toList.sortBy(_._2._1).map(_._2._2) + Raster(MultibandTile(sortedRasters.map(_.tile.band(0).convert(cellType))), sortedRasters.head.extent) + }).toIterator + } } override def read(extent: Extent, bands: Seq[Int]): Option[Raster[MultibandTile]] = { - val selectedSources = reprojectedSources(bands) + var selectedSources: GenSeq[RasterSource] = reprojectedSources(bands) + + if(parallelRead){ + selectedSources = selectedSources.par + } - val singleBandRasters = selectedSources.par + val singleBandRasters = selectedSources .map { _.read(extent, Seq(0)) map { case Raster(multibandTile, extent) => Raster(multibandTile.band(0), extent) } } .collect { case Some(raster) => raster } @@ -126,7 +176,11 @@ class BandCompositeRasterSource(override val sources: NonEmptyList[RasterSource] } override def read(bounds: GridBounds[Long], bands: Seq[Int]): Option[Raster[MultibandTile]] = { - val selectedSources = reprojectedSources(bands) + var selectedSources: GenSeq[RasterSource] = reprojectedSources(bands) + + if(parallelRead){ + selectedSources = selectedSources.par + } def readBounds(source: RasterSource): Option[Raster[Tile]] = { try { @@ -135,15 +189,19 @@ class BandCompositeRasterSource(override val sources: NonEmptyList[RasterSource] logger.debug(s"finished reading $bounds from ${source.name}") raster } catch { - case e: Exception => throw new IOException(s"Error while reading $bounds from ${source.name}", e) + case e: Exception => + if (softErrors) { + logger.warn(s"ignoring soft error for ${source.name}", e) + None + } else throw new IOException(s"Error while reading $bounds from ${source.name}", e) } } def readBoundsAttemptFailed(source: RasterSource)(e: Exception): Unit = logger.warn(s"attempt to read $bounds from ${source.name} failed", e) - val singleBandRasters = selectedSources.par - .map(rs => retryForever(Duration.ofSeconds(10), maxRetries, readBoundsAttemptFailed(rs)) { + val singleBandRasters = selectedSources + .map(rs => retryWithBackoff(maxRetries, readBoundsAttemptFailed(rs)) { readBounds(rs) }) .collect { case Some(raster) => raster } @@ -176,14 +234,16 @@ class BandCompositeRasterSource(override val sources: NonEmptyList[RasterSource] method: ResampleMethod, strategy: OverviewStrategy ): RasterSource = new BandCompositeRasterSource( - reprojectedSources map { _.resample(resampleTarget, method, strategy) }, crs) + reprojectedSources map { _.resample(resampleTarget, method, strategy) }, crs, parallelRead = parallelRead, + softErrors = softErrors) override def convert(targetCellType: TargetCellType): RasterSource = - new BandCompositeRasterSource(reprojectedSources map { _.convert(targetCellType) }, crs) + new BandCompositeRasterSource(reprojectedSources map { _.convert(targetCellType) }, crs, + parallelRead = parallelRead, softErrors = softErrors) override def reprojection(targetCRS: CRS, resampleTarget: ResampleTarget, method: ResampleMethod, strategy: OverviewStrategy): RasterSource = new BandCompositeRasterSource(reprojectedSources map { _.reproject(targetCRS, resampleTarget, method, strategy) }, - crs) + crs, parallelRead = parallelRead, softErrors = softErrors) } // TODO: is this class necessary? Looks like a more general case of BandCompositeRasterSource so maybe the inheritance @@ -234,10 +294,21 @@ class MultibandCompositeRasterSource(val sourcesListWithBandIds: NonEmptyList[(R ) } + + object FileLayerProvider { private implicit val logger: Logger = LoggerFactory.getLogger(classOf[FileLayerProvider]) - private val maxRetries = sys.env.getOrElse("GDALREAD_MAXRETRIES", "10").toInt + + + + + lazy val sdk = { + import _root_.io.opentelemetry.api.OpenTelemetry + import _root_.io.opentelemetry.api.GlobalOpenTelemetry + GlobalOpenTelemetry.get() + } + lazy val megapixelPerSecondMeter = sdk.meterBuilder("load_collection_read").build().gaugeBuilder("megapixel_per_second").build() { try { @@ -260,9 +331,9 @@ object FileLayerProvider { def apply(openSearch: OpenSearchClient, openSearchCollectionId: String, openSearchLinkTitles: NonEmptyList[String], rootPath: String, maxSpatialResolution: CellSize, pathDateExtractor: PathDateExtractor, attributeValues: Map[String, Any] = Map(), layoutScheme: LayoutScheme = ZoomedLayoutScheme(WebMercator, 256), bandIndices: Seq[Int] = Seq(), correlationId: String = "", experimental: Boolean = false, - retainNoDataTiles: Boolean = false): FileLayerProvider = new FileLayerProvider( + retainNoDataTiles: Boolean = false, maxSoftErrorsRatio: Double = 0.0): FileLayerProvider = new FileLayerProvider( openSearch, openSearchCollectionId, openSearchLinkTitles, rootPath, maxSpatialResolution, pathDateExtractor, - attributeValues, layoutScheme, bandIndices, correlationId, experimental, retainNoDataTiles, + attributeValues, layoutScheme, bandIndices, correlationId, experimental, retainNoDataTiles, maxSoftErrorsRatio, disambiguateConstructors = null ) @@ -282,7 +353,7 @@ object FileLayerProvider { def rasterSourceRDD(rasterSources: Seq[RasterSource], metadata: TileLayerMetadata[SpaceTimeKey], maxSpatialResolution: CellSize, collection: String)(implicit sc: SparkContext): RDD[LayoutTileSource[SpaceTimeKey]] = { val keyExtractor = new TemporalKeyExtractor { - def getMetadata(rs: RasterMetadata): ZonedDateTime = ZonedDateTime.parse(rs.attributes("date")).truncatedTo(ChronoUnit.DAYS) + def getMetadata(rs: RasterMetadata): ZonedDateTime = ZonedDateTime.parse(rs.attributes("date")).truncatedTo(DAYS) } val sources = sc.parallelize(rasterSources,rasterSources.size) @@ -337,23 +408,25 @@ object FileLayerProvider { def applySpaceTimeMask(datacubeParams: Option[DataCubeParameters], requiredSpacetimeKeys: RDD[(SpaceTimeKey, vector.Feature[Geometry, (RasterSource, Feature)])], metadata: TileLayerMetadata[SpaceTimeKey]): RDD[(SpaceTimeKey, vector.Feature[Geometry, (RasterSource, Feature)])] = { if (datacubeParams.exists(_.maskingCube.isDefined)) { val maskObject = datacubeParams.get.maskingCube.get + requiredSpacetimeKeys.sparkContext.setCallSite("load_collection: filter mask keys") maskObject match { case theMask: MultibandTileLayerRDD[SpaceTimeKey] => if (theMask.metadata.bounds.get._1.isInstanceOf[SpaceTimeKey]) { - val filtered = theMask.withContext { - _.filter(_._2.band(0).toArray().exists(pixel => pixel == 0)).distinct() - } - val maskKeys = - if (theMask.metadata.crs.equals(metadata.crs) && theMask.metadata.layout.equals(metadata.layout)) { - filtered - } else { - logger.debug(s"mask: automatically resampling mask to match datacube: ${theMask.metadata}") - filtered.reproject(metadata.crs, metadata.layout, 16, requiredSpacetimeKeys.partitioner)._2 - } + + //TODO: this partioner is none most of the time + // Perhaps try using the partitioner from the mask, but only valid after reprojection + val partitioner = requiredSpacetimeKeys.partitioner + // filtered mask to tiles with at least one valid pixel, remove others, so need to perform inner join + val filtered = prepareMask(theMask, metadata, partitioner) + if (logger.isDebugEnabled) { - logger.debug(s"SpacetimeMask mask reduces the input to: ${maskKeys.countApproxDistinct()} keys.") + logger.debug(s"SpacetimeMask mask reduces the input to: ${filtered.countApproxDistinct()} keys.") } - return requiredSpacetimeKeys.join(maskKeys).map(tuple => (tuple._1, tuple._2._1)) + + datacubeParams.get.maskingCube = Some(filtered) + val result = requiredSpacetimeKeys.join(filtered).map(tuple => (tuple._1, tuple._2._1)) + requiredSpacetimeKeys.sparkContext.clearCallSite() + return result } case _ => } @@ -482,12 +555,14 @@ object FileLayerProvider { private val PIXEL_COUNTER = "InputPixels" private def rasterRegionsToTilesLoadPerProductStrategy(rasterRegionRDD: RDD[(SpaceTimeKey, (RasterRegion, SourceName))], - metadata: TileLayerMetadata[SpaceTimeKey], - retainNoDataTiles: Boolean, - cloudFilterStrategy: CloudFilterStrategy = NoCloudFilterStrategy, - partitionerOption: Option[SpacePartitioner[SpaceTimeKey]] = None, - datacubeParams : Option[DataCubeParameters] = None, - expectedBandCount : Int = -1 + metadata: TileLayerMetadata[SpaceTimeKey], + retainNoDataTiles: Boolean, + cloudFilterStrategy: CloudFilterStrategy = NoCloudFilterStrategy, + partitionerOption: Option[SpacePartitioner[SpaceTimeKey]] = None, + datacubeParams : Option[DataCubeParameters] = None, + expectedBandCount : Int = -1, + sources: Seq[(RasterSource, Feature)], + softErrors: Boolean, ): RDD[(SpaceTimeKey, MultibandTile)] with Metadata[TileLayerMetadata[SpaceTimeKey]] = { if(cloudFilterStrategy!=NoCloudFilterStrategy) { @@ -503,6 +578,8 @@ object FileLayerProvider { val crs = metadata.crs val layout = metadata.layout + rasterRegionRDD.sparkContext.setCallSite("load_collection: group by input product") + val loadPerProduct = datacubeParams.forall(!_.loadPerProduct) val byBandSource = rasterRegionRDD.flatMap(key_region_sourcename => { val source = key_region_sourcename._2._1.asInstanceOf[GridBoundsRasterRegion].source val bounds = key_region_sourcename._2._1.asInstanceOf[GridBoundsRasterRegion].bounds @@ -516,7 +593,7 @@ object FileLayerProvider { case source1: BandCompositeRasterSource => //decompose into individual bands - source1.sources.map(s => (s.name, GridBoundsRasterRegion(new BandCompositeRasterSource(NonEmptyList.one(s),source1.crs,source1.attributes,source1.predefinedExtent), bounds))).zipWithIndex.map(t => (t._1._1, (Seq(t._2), key_region_sourcename._1, t._1._2))).toList.toSeq + source1.sources.map(s => (s.name, GridBoundsRasterRegion(new BandCompositeRasterSource(NonEmptyList.one(s), source1.crs, source1.attributes, source1.predefinedExtent, parallelRead = loadPerProduct, softErrors = softErrors, readFullTile = true), bounds))).zipWithIndex.map(t => (t._1._1, (Seq(t._2), key_region_sourcename._1, t._1._2))).toList.toSeq case _ => Seq((source.name, (Seq(0), key_region_sourcename._1, key_region_sourcename._2._1))) @@ -526,9 +603,26 @@ object FileLayerProvider { result }) - val allSources: Array[SourceName] = byBandSource.keys.distinct().collect() - val theCellType = metadata.cellType + /** + * Avoid the use of the rdd to simply compute source names, because this triggers a lot of computation which is then repeated later on, even touching the rasters in some cases. + */ + val allSources: Array[SourceName] = sources.flatMap( t =>{ + t._1 match { + case source1: MultibandCompositeRasterSource => + //decompose into individual bands + //TODO do something like line below, but make sure that band order is maintained! For now we just return the composite source. + //source1.sourcesListWithBandIds.map(s => (s._1.name, (s._2,key_region_sourcename._1,GridBoundsRasterRegion(s._1, bounds)))) + Seq(t._1.name) + case source1: BandCompositeRasterSource => + //decompose into individual bands + source1.sources.map(s => s.name).toList + case _ => + Seq(t._1.name) + } + }).distinct.toArray + val theCellType = metadata.cellType + rasterRegionRDD.sparkContext.setCallSite("load_collection: read by input product") var tiledRDD: RDD[(SpaceTimeKey, MultibandTile)] = byBandSource.groupByKey(new ByKeyPartitioner(allSources)).mapPartitions((partitionIterator: Iterator[(SourceName, Iterable[(Seq[Int], SpaceTimeKey, RasterRegion)])]) => { var totalPixelsPartition = 0 val startTime = System.currentTimeMillis() @@ -540,6 +634,8 @@ object FileLayerProvider { if (totalPixelsPartition > 0) { val secondsPerChunk = (durationMillis / 1000.0) / (totalPixelsPartition / (256 * 256)) loadingTimeAcc.add(secondsPerChunk) + val megapixelPerSecond = (totalPixelsPartition /(1024*1024)) / (durationMillis / 1000.0) + megapixelPerSecondMeter.set(megapixelPerSecond) } loadedPartitions @@ -556,8 +652,9 @@ object FileLayerProvider { } ).filter { case (_, tile) => retainNoDataTiles || !tile.bands.forall(_.isNoDataTile) } + rasterRegionRDD.sparkContext.setCallSite("load_collection: apply mask pixel wise") tiledRDD = DatacubeSupport.applyDataMask(datacubeParams,tiledRDD,metadata, pixelwiseMasking = true) - + rasterRegionRDD.sparkContext.clearCallSite() val cRDD = ContextRDD(tiledRDD, metadata) cRDD.name = rasterRegionRDD.name cRDD @@ -621,7 +718,7 @@ object FileLayerProvider { val allRasters = try{ - bounds.toIterator.flatMap(b => retryForever(Duration.ofSeconds(10),maxRetries)(source.read(b).iterator)).map(_.mapTile(_.convert(cellType))).toSeq + source.readBounds(bounds).map(_.mapTile(_.convert(cellType))).toSeq } catch { case e: Exception => throw new IOException(s"load_collection/load_stac: error while reading from: ${source.name.toString}. Detailed error: ${e.getMessage}", e) } @@ -641,6 +738,12 @@ object FileLayerProvider { val rowOffset = math.abs(theBounds.rowMin - intersection.get.rowMin) require(colOffset <= Int.MaxValue && rowOffset <= Int.MaxValue, "Computed offsets are outside of RasterBounds") Some(raster.mapTile { + //GridBounds(16,0,79,58) + //coloffset = 16 , rowOffset = 0 + // band = 64 x 59 + //theBounds = 64x64 + //require((chunk.cols (64) + colOffset (16) <= cols (64)) && (chunk.rows + rowOffset <= rows), + // chunk at GridBounds(16,0,79,58) exceeds tile boundary at (64, 64) _.mapBands { (_, band) => PaddedTile(band, colOffset.toInt, rowOffset.toInt, theBounds.width.toInt, theBounds.height.toInt) } }) } @@ -829,7 +932,7 @@ object FileLayerProvider { class FileLayerProvider private(openSearch: OpenSearchClient, openSearchCollectionId: String, openSearchLinkTitles: NonEmptyList[String], rootPath: String, maxSpatialResolution: CellSize, pathDateExtractor: PathDateExtractor, attributeValues: Map[String, Any], layoutScheme: LayoutScheme, bandIndices: Seq[Int], correlationId: String, experimental: Boolean, - retainNoDataTiles: Boolean, + retainNoDataTiles: Boolean, maxSoftErrorsRatio: Double, disambiguateConstructors: Null) extends LayerProvider { // workaround for: constructors have the same type after erasure import DatacubeSupport._ @@ -840,7 +943,7 @@ class FileLayerProvider private(openSearch: OpenSearchClient, openSearchCollecti def this(openSearch: OpenSearchClient, openSearchCollectionId: String, openSearchLinkTitles: NonEmptyList[String], rootPath: String, maxSpatialResolution: CellSize, pathDateExtractor: PathDateExtractor, attributeValues: Map[String, Any] = Map(), layoutScheme: LayoutScheme = ZoomedLayoutScheme(WebMercator, 256), bandIds: Seq[Seq[Int]] = Seq(), correlationId: String = "", experimental: Boolean = false, - retainNoDataTiles: Boolean = false) = this(openSearch, openSearchCollectionId, + retainNoDataTiles: Boolean = false, maxSoftErrorsRatio: Double = 0.0) = this(openSearch, openSearchCollectionId, openSearchLinkTitles = NonEmptyList.fromListUnsafe(for { (title, bandIndices) <- openSearchLinkTitles.toList.zipAll(bandIds, thisElem = "", thatElem = Seq(0)) _ <- bandIndices @@ -848,7 +951,7 @@ class FileLayerProvider private(openSearch: OpenSearchClient, openSearchCollecti rootPath, maxSpatialResolution, pathDateExtractor, attributeValues, layoutScheme, bandIndices = bandIds.flatten, correlationId, experimental, - retainNoDataTiles, disambiguateConstructors = null) + retainNoDataTiles, maxSoftErrorsRatio, disambiguateConstructors = null) assert(bandIndices.isEmpty || bandIndices.size == openSearchLinkTitles.size) @@ -858,6 +961,7 @@ class FileLayerProvider private(openSearch: OpenSearchClient, openSearchCollecti private val _rootPath = if(rootPath != null) Paths.get(rootPath) else null private val fromLoadStac = openSearch.isInstanceOf[FixedFeaturesOpenSearchClient] + private val softErrors = maxSoftErrorsRatio > 0.0 private val openSearchLinkTitlesWithBandId: Seq[(String, Int)] = { if (bandIndices.nonEmpty) { @@ -939,6 +1043,7 @@ class FileLayerProvider private(openSearch: OpenSearchClient, openSearchCollecti } } + sc.setCallSite(s"load_collection: $openSearchCollectionId resolution $maxSpatialResolution construct input product metadata" ) val workingPartitioner = SpacePartitioner(metadata.bounds.get.toSpatial)(implicitly,implicitly,new ConfigurableSpatialPartitioner(3)) val requiredSpatialKeys: RDD[(SpatialKey, Iterable[Geometry])] = @@ -1163,6 +1268,8 @@ class FileLayerProvider private(openSearch: OpenSearchClient, openSearchCollecti // https://github.com/Open-EO/openeo-geotrellis-extensions/issues/69 val theResampleMethod = datacubeParams.map(_.resampleMethod).getOrElse(NearestNeighbor) + requiredSpacetimeKeys.sparkContext.setCallSite(s"load_collection: determine raster regions to read resample: ${resample}") + val regions: RDD[(SpaceTimeKey, (RasterRegion, SourceName))] = requiredSpacetimeKeys .groupBy { case (_, vector.Feature(_, (rasterSource, _))) => rasterSource } .flatMap { case (rasterSource, keyedFeatures) => @@ -1195,9 +1302,9 @@ class FileLayerProvider private(openSearch: OpenSearchClient, openSearchCollecti if(!datacubeParams.map(_.loadPerProduct).getOrElse(false) || theMaskStrategy != NoCloudFilterStrategy ){ rasterRegionsToTiles(regions, metadata, retainNoDataTiles, theMaskStrategy, partitioner, datacubeParams) }else{ - rasterRegionsToTilesLoadPerProductStrategy(regions, metadata, retainNoDataTiles, NoCloudFilterStrategy, partitioner, datacubeParams, openSearchLinkTitlesWithBandId.size) + rasterRegionsToTilesLoadPerProductStrategy(regions, metadata, retainNoDataTiles, NoCloudFilterStrategy, partitioner, datacubeParams, openSearchLinkTitlesWithBandId.size,readKeysToRasterSourcesResult._4, softErrors) } - logger.info(s"Created cube for ${openSearchCollectionId} with metadata ${cube.metadata} and partitioner ${cube.partitioner}") + logger.info(s"Created cube for ${openSearchCollectionId} with metadata ${cube.metadata} and partitioner ${cube.partitioner.get.asInstanceOf[SpacePartitioner[SpaceTimeKey]].index}") cube }finally{ requiredSpacetimeKeys.unpersist(false) @@ -1480,7 +1587,7 @@ class FileLayerProvider private(openSearch: OpenSearchClient, openSearchCollecti return None } - Some((new BandCompositeRasterSource(sources.map { case (rasterSource, _) => rasterSource }, targetExtent.crs, attributes, predefinedExtent = predefinedExtent), feature)) + Some((new BandCompositeRasterSource(sources.map { case (rasterSource, _) => rasterSource }, targetExtent.crs, attributes, predefinedExtent = predefinedExtent, softErrors = softErrors), feature)) } else Some((new MultibandCompositeRasterSource(sources.map { case (rasterSource, bandIndex) => (rasterSource, Seq(bandIndex))}, targetExtent.crs, attributes), feature)) } } diff --git a/openeo-geotrellis/src/test/java/org/openeo/geotrellis/TestOpenEOProcessScriptBuilder.java b/openeo-geotrellis/src/test/java/org/openeo/geotrellis/TestOpenEOProcessScriptBuilder.java index 30e499691..eda43f740 100644 --- a/openeo-geotrellis/src/test/java/org/openeo/geotrellis/TestOpenEOProcessScriptBuilder.java +++ b/openeo-geotrellis/src/test/java/org/openeo/geotrellis/TestOpenEOProcessScriptBuilder.java @@ -1579,7 +1579,26 @@ public void testQuantiles() { //nd,3,nd,3,3,-10,nd,19,nd // -10,1 ,3 3 3 19 nd nd nd nd - assertArrayEquals(new Object[]{-1,3,7}, elements); + assertArrayEquals(new Object[]{1,3,3}, elements); + } + + @DisplayName("Test quantiles process on floats") + @Test + public void testQuantilesOnFloats() { + + double[] values = {0.00612713, 0.01104487, 0.01374031, 0.01521673, 0.01546687, + 0.01585949, 0.01644365, 0.01667373, 0.01726482, 0.01831796, + 0.02303652, 0.02482071}; + + List tiles = Arrays.stream(values).mapToObj(d -> FloatConstantNoDataArrayTile.fill((float)d, 4, 4).mutable()).collect(Collectors.toList()); + + + Seq result = createQuantiles(null,10).generateFunction().apply(JavaConversions.asScalaBuffer(tiles)); + Collection javaCollection = JavaConversions.asJavaCollection(result); + double[] quantiles = javaCollection.stream().mapToDouble(t -> t.getDouble(0, 0)).toArray(); + double[] expected = {0.01131441444158554, 0.014035594649612904, 0.015291771851480007, 0.015623917803168297, 0.01615156978368759, 0.016581697389483452, 0.01708749309182167, 0.018107332289218903, 0.022564664483070374}; + + assertArrayEquals(expected, quantiles,0.00001); } @DisplayName("Test clip process") @@ -2232,7 +2251,7 @@ static OpenEOProcessScriptBuilder createQuantiles(Boolean ignoreNoData, int qVal builder.argumentStart("data"); builder.argumentEnd(); - builder.constantArgument("q",2); + builder.constantArgument("q",qValue); if (ignoreNoData != null) { builder.constantArgument("ignore_nodata",ignoreNoData.booleanValue()); diff --git a/openeo-geotrellis/src/test/java/org/openeo/geotrellis/TestOpenEOProcesses.java b/openeo-geotrellis/src/test/java/org/openeo/geotrellis/TestOpenEOProcesses.java index d558f093f..99b8bb288 100644 --- a/openeo-geotrellis/src/test/java/org/openeo/geotrellis/TestOpenEOProcesses.java +++ b/openeo-geotrellis/src/test/java/org/openeo/geotrellis/TestOpenEOProcesses.java @@ -278,9 +278,9 @@ public void testApplyTimeDimensionToBandB04() { double[] inputTSAsArray = doubleValues.toArray(); double sd = new StandardDeviation().evaluate(inputTSAsArray); - double p25 = new Percentile().evaluate(inputTSAsArray,25); - double p50 = new Percentile().evaluate(inputTSAsArray,50); - double p75 = new Percentile().evaluate(inputTSAsArray,75); + double p25 = new Percentile().withEstimationType(Percentile.EstimationType.R_7).evaluate(inputTSAsArray,25); + double p50 = new Percentile().withEstimationType(Percentile.EstimationType.R_7).evaluate(inputTSAsArray,50); + double p75 = new Percentile().withEstimationType(Percentile.EstimationType.R_7).evaluate(inputTSAsArray,75); assertArrayEquals(new Object[]{(int)p25, (int)p50, (int)p75, (int)sd, (int) Arrays.stream(inputTSAsArray).average().getAsDouble()}, bandValues.toArray()); diff --git a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/WriteRDDToGeotiffTest.scala b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/WriteRDDToGeotiffTest.scala index 8f408288d..01f2d2444 100644 --- a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/WriteRDDToGeotiffTest.scala +++ b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/geotiff/WriteRDDToGeotiffTest.scala @@ -16,6 +16,8 @@ import org.junit.Assert._ import org.junit._ import org.junit.rules.TemporaryFolder import org.openeo.geotrellis.{LayerFixtures, OpenEOProcesses, ProjectedPolygons} +import org.openeo.sparklisteners.GetInfoSparkListener +import org.slf4j.{Logger, LoggerFactory} import java.nio.file.{Files, Paths} import java.time.{LocalDate, LocalTime, ZoneOffset, ZonedDateTime} @@ -28,6 +30,7 @@ import scala.reflect.io.Directory object WriteRDDToGeotiffTest{ + private implicit val logger: Logger = LoggerFactory.getLogger(classOf[WriteRDDToGeotiffTest]) var sc: SparkContext = _ @@ -37,8 +40,10 @@ object WriteRDDToGeotiffTest{ val conf = new SparkConf().setMaster("local[2]").setAppName(getClass.getSimpleName) .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.kryo.registrator", classOf[geotrellis.spark.store.kryo.KryoRegistrator].getName) + .set("spark.ui.enabled", "true") SparkContext.getOrCreate(conf) } + if (sc.uiWebUrl.isDefined) logger.info("Spark uiWebUrl: " + sc.uiWebUrl.get) } @AfterClass @@ -47,6 +52,8 @@ object WriteRDDToGeotiffTest{ class WriteRDDToGeotiffTest { + import WriteRDDToGeotiffTest._ + @(Rule @getter) val temporaryFolder = new TemporaryFolder @@ -154,7 +161,7 @@ class WriteRDDToGeotiffTest { val filename = "openEO_2017-03-01Z.tif" val p = new OpenEOProcesses() - val buffered: MultibandTileLayerRDD[SpaceTimeKey] = p.remove_overlap(p.retile(tileLayerRDD,224,224,16,16),224,224,16,16) + val buffered: MultibandTileLayerRDD[SpaceTimeKey] = p.remove_overlap(p.retileGeneric(tileLayerRDD,224,224,16,16),224,224,16,16) val cropBounds = Extent(-115, -65, 5.0, 56) saveRDDTemporal(buffered,"./",cropBounds = Some(cropBounds)) @@ -282,7 +289,11 @@ class WriteRDDToGeotiffTest { val layoutRows = 4 val ( imageTile:ByteArrayTile, filtered:MultibandTileLayerRDD[SpatialKey]) = LayerFixtures.createLayerWithGaps(layoutCols,layoutRows) - val filename = "outFiltered.tif" + val outDir = Paths.get("tmp/testWriteMultibandRDDWithGaps/") + new Directory(outDir.toFile).deepFiles.foreach(_.delete()) + Files.createDirectories(outDir) + + val filename = outDir + "/outFiltered.tif" saveRDD(filtered.withContext{_.repartition(layoutCols*layoutRows)},3,filename) val result = GeoTiff.readMultiband(filename).raster.tile @@ -293,14 +304,50 @@ class WriteRDDToGeotiffTest { assertArrayEquals(croppedReference.toArray(),croppedOutput.toArray()) } + @Test + def testWriteMultibandRDDWithGapsSeparateAssetPerBand(): Unit = { + val layoutCols = 8 + val layoutRows = 4 + val (imageTile: ByteArrayTile, filtered: MultibandTileLayerRDD[SpatialKey]) = LayerFixtures.createLayerWithGaps(layoutCols, layoutRows) + + val outDir = Paths.get("tmp/testWriteMultibandRDDWithGapsSeparateAssetPerBand/") + new Directory(outDir.toFile).deepFiles.foreach(_.delete()) + Files.createDirectories(outDir) + + val filename = outDir + "/out" + val options = new GTiffOptions() + options.separateAssetPerBand = true + options.addBandTag(0, "DESCRIPTION", "B01") + options.addBandTag(1, "DESCRIPTION", "B02") + options.addBandTag(2, "DESCRIPTION", "B03") + val paths = saveRDD(filtered.withContext { + _.repartition(layoutCols * layoutRows) + }, 3, filename, formatOptions = options) + assertEquals(3, paths.size()) + + GeoTiff.readMultiband(outDir.resolve("openEO_B01.tif").toString).raster.tile + GeoTiff.readMultiband(outDir.resolve("openEO_B02.tif").toString).raster.tile + GeoTiff.readMultiband(outDir.resolve("openEO_B03.tif").toString).raster.tile + + val result = GeoTiff.readMultiband(paths.get(0)).raster.tile + + //crop away the area where data was removed, and check if rest of geotiff is still fine + val croppedReference = imageTile.crop(2 * 256, 0, layoutCols * 256, layoutRows * 256).toArrayTile() + + val resultWidth = result.band(0).toArrayTile().dimensions.cols + val croppedOutput = result.band(0).toArrayTile().crop(resultWidth - (6 * 256), 0, layoutCols * 256, layoutRows * 256) + assertArrayEquals(croppedReference.toArray(), croppedOutput.toArray()) + } + + @Test def testWriteMultibandTemporalRDDWithGaps(): Unit = { val layoutCols = 8 val layoutRows = 4 val (layer, imageTile) = LayerFixtures.aSpacetimeTileLayerRdd(layoutCols, layoutRows) - val outDir = Paths.get("tmp/geotiffGaps/") - new Directory(outDir.toFile).deleteRecursively() + val outDir = Paths.get("tmp/testWriteMultibandTemporalRDDWithGaps/") + new Directory(outDir.toFile).deepFiles.foreach(_.delete()) Files.createDirectories(outDir) saveRDDTemporal(layer, outDir.toString) @@ -315,6 +362,33 @@ class WriteRDDToGeotiffTest { assertArrayEquals(croppedReference.toArray(), result2.band(0).toArrayTile().crop(2 * 256, 0, layoutCols * 256, layoutRows * 256).toArray()) } + + @Test + def testWriteMultibandTemporalRDDWithGapsSeparateAssetPerBand(): Unit = { + val layoutCols = 8 + val layoutRows = 4 + val (layer, imageTile) = LayerFixtures.aSpacetimeTileLayerRdd(layoutCols, layoutRows) + + val outDir = Paths.get("tmp/testWriteMultibandTemporalRDDWithGapsSeparateAssetPerBand/") + new Directory(outDir.toFile).deepFiles.foreach(_.delete()) + Files.createDirectories(outDir) + + val options = new GTiffOptions() + options.separateAssetPerBand = true + options.addBandTag(0, "DESCRIPTION", "B01") + options.addBandTag(1, "DESCRIPTION", "B02") + options.addBandTag(2, "DESCRIPTION", "B03") + saveRDDTemporal(layer, outDir.toString, formatOptions = options) + + GeoTiff.readMultiband(outDir.resolve("openEO_2017-01-02Z_B01.tif").toString).raster.tile + GeoTiff.readMultiband(outDir.resolve("openEO_2017-01-02Z_B02.tif").toString).raster.tile + GeoTiff.readMultiband(outDir.resolve("openEO_2017-01-02Z_B03.tif").toString).raster.tile + + GeoTiff.readMultiband(outDir.resolve("openEO_2017-01-03Z_B01.tif").toString).raster.tile + GeoTiff.readMultiband(outDir.resolve("openEO_2017-01-03Z_B02.tif").toString).raster.tile + GeoTiff.readMultiband(outDir.resolve("openEO_2017-01-03Z_B03.tif").toString).raster.tile + } + @Test def testWriteMultibandTemporalHourlyRDDWithGaps(): Unit = { val layoutCols = 8 diff --git a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/BandCompositeRasterSourceTest.scala b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/BandCompositeRasterSourceTest.scala index 273ab79d9..2091c6aff 100644 --- a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/BandCompositeRasterSourceTest.scala +++ b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/BandCompositeRasterSourceTest.scala @@ -2,18 +2,47 @@ package org.openeo.geotrellis.layers import cats.data.NonEmptyList import geotrellis.proj4.LatLng +import geotrellis.raster.{GridBounds, MultibandTile, Raster} +import geotrellis.raster.gdal.{GDALRasterSource, GDALWarpOptions} import geotrellis.raster.geotiff.GeoTiffRasterSource +import geotrellis.raster.testkit.RasterMatchers import geotrellis.vector.{Extent, ProjectedExtent} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.api.Test -class BandCompositeRasterSourceTest { +class BandCompositeRasterSourceTest extends RasterMatchers { // extent: 125.8323451450973920, -26.4635378273783921, // 128.0585356212979775, -24.4605616369025221 private val singleBandGeotiffPath = Thread.currentThread().getContextClassLoader.getResource("org/openeo/geotrellis/cgls_ndvi300.tif").getPath + @Test + def readManyBounds(): Unit = { + + val warpOptions = GDALWarpOptions(alignTargetPixels = true) + val rs = GDALRasterSource("/vsicurl/https://artifactory.vgt.vito.be/artifactory/testdata-public/eodata/Sentinel-2/MSI/L1C/2021/01/01/S2B_MSIL1C_20210101T184759_N0209_R070_T11TNM_20210101T202401/S2B_MSIL1C_20210101T184759_N0209_R070_T11TNM_20210101T202401.SAFE/GRANULE/L1C_T11TNM_A019973_20210101T184756/IMG_DATA/T11TNM_20210101T184759_B02.jp2",warpOptions) + + val rasterSources = NonEmptyList.of( + rs + ) + + def compareForBounds(bounds: Seq[GridBounds[Long]] ) = { + val composite = new BandCompositeRasterSource(rasterSources, crs = rs.crs, readFullTile = true, parallelRead = false) + val result = composite.readBounds(bounds) + + val refComposite = new BandCompositeRasterSource(rasterSources, crs = rs.crs, readFullTile = false, parallelRead = false) + val ref = refComposite.readBounds(bounds) + + result.zip(ref).foreach{case (a,b) => assertRastersEqual(a,b)} + + } + + compareForBounds(Seq(GridBounds(-20, 10, 200, 400), GridBounds(200, 10, 400, 400))) + compareForBounds(Seq(GridBounds(20, 10, 200, 400), GridBounds(400, 300, 500, 400))) + + } + @Test def singleBandGeoTiffRasterSource(): Unit = { val bbox = ProjectedExtent(Extent(126.0, -26.0, 127.0, -25.0), LatLng) diff --git a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/FileLayerProviderTest.scala b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/FileLayerProviderTest.scala index f42345f85..c0b53ff45 100644 --- a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/FileLayerProviderTest.scala +++ b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/FileLayerProviderTest.scala @@ -3,6 +3,7 @@ package org.openeo.geotrellis.layers import cats.data.NonEmptyList import geotrellis.layer.{FloatingLayoutScheme, LayoutTileSource, SpaceTimeKey, SpatialKey, TileLayerMetadata} import geotrellis.proj4.{CRS, LatLng} +import geotrellis.raster.gdal.{GDALIOException, GDALRasterSource} import geotrellis.raster.io.geotiff.GeoTiff import geotrellis.raster.summary.polygonal.Summary import geotrellis.raster.summary.polygonal.visitors.MeanVisitor @@ -16,7 +17,7 @@ import geotrellis.vector._ import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.junit.jupiter.api.Assertions.{assertEquals, assertNotSame, assertSame, assertTrue} -import org.junit.jupiter.api.{AfterAll, BeforeAll, Test, Timeout} +import org.junit.jupiter.api.{AfterAll, BeforeAll, Disabled, Test, Timeout} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource import org.openeo.geotrellis.TestImplicits._ @@ -1340,4 +1341,17 @@ class FileLayerProviderTest extends RasterMatchers{ assertEquals(Some((Link(URI.create("NETCDF:http://openeo.vito.be/job-xxx/results/result.nc:dry_matter_productivity"),Some("DMP")),0)),httpResult) } + + @Disabled("temporarily disabled: lowering geotrellis.raster.gdal.number-of-attempts does not work") + @Test + def readGDALRasterSourceFromCorruptTileThrows(): Unit = { + val rs = GDALRasterSource("https://artifactory.vgt.vito.be/artifactory/testdata-public/T29UMV_20180327T114351_B04_10m.jp2") + + try { + rs.read() + fail(s"should have thrown a GDALIOException (geotrellis.raster.gdal.numberOfAttempts is ${geotrellis.raster.gdal.numberOfAttempts})") + } catch { + case _: GDALIOException => // OK + } + } } diff --git a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/Sentinel2FileLayerProviderTest.scala b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/Sentinel2FileLayerProviderTest.scala index a27267459..e69a9e930 100644 --- a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/Sentinel2FileLayerProviderTest.scala +++ b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/Sentinel2FileLayerProviderTest.scala @@ -5,6 +5,7 @@ import geotrellis.layer.{FloatingLayoutScheme, Metadata, SpaceTimeKey, SpatialKe import geotrellis.proj4.{CRS, LatLng, WebMercator} import geotrellis.raster.geotiff.GeoTiffRasterSource import geotrellis.raster.io.geotiff.{GeoTiff, GeoTiffReader, MultibandGeoTiff} +import geotrellis.raster.resample.Average import geotrellis.raster.summary.polygonal.visitors.MeanVisitor import geotrellis.raster.summary.polygonal.{PolygonalSummaryResult, Summary} import geotrellis.raster.summary.types.MeanValue @@ -12,6 +13,7 @@ import geotrellis.raster.testkit.RasterMatchers import geotrellis.raster.{CellSize, MultibandTile, NODATA, PaddedTile, ShortUserDefinedNoDataCellType} import geotrellis.shapefile.ShapeFileReader import geotrellis.spark._ +import geotrellis.spark.partition.SpacePartitioner import geotrellis.spark.summary.polygonal._ import geotrellis.spark.util.SparkUtils import geotrellis.vector._ @@ -27,10 +29,12 @@ import org.junit.jupiter.params.provider.{Arguments, MethodSource} import org.junit.{AfterClass, BeforeClass} import org.openeo.geotrellis.TestImplicits._ import org.openeo.geotrellis.geotiff.{GTiffOptions, saveRDD} -import org.openeo.geotrellis.{LayerFixtures, MergeCubesSpec, OpenEOProcessScriptBuilder, OpenEOProcesses} -import org.openeo.geotrelliscommon.{BatchJobMetadataTracker, DataCubeParameters, ResampledTile} +import org.openeo.geotrellis.netcdf.{NetCDFOptions, NetCDFRDDWriter} +import org.openeo.geotrellis.{LayerFixtures, MergeCubesSpec, OpenEOProcessScriptBuilder, OpenEOProcesses, ProjectedPolygons, TestOpenEOProcessScriptBuilder} +import org.openeo.geotrelliscommon.{BatchJobMetadataTracker, ConfigurableSpaceTimePartitioner, DataCubeParameters, ResampledTile} import org.openeo.opensearch.OpenSearchResponses.Link import org.openeo.opensearch.{OpenSearchClient, OpenSearchResponses} +import org.openeo.sparklisteners.GetInfoSparkListener import java.net.URI import java.time.LocalTime.MIDNIGHT @@ -109,6 +113,16 @@ object Sentinel2FileLayerProviderTest { arguments(Map("method"->"mask_scl_dilation","erosion_kernel_size"->3,"kernel1_size"->0).asJava.asInstanceOf[util.Map[String,Object]],"https://artifactory.vgt.vito.be/artifactory/testdata-public/masked_erosion.tif") )) + def datacubeParams: Stream[Arguments] = Arrays.stream(Array( + arguments(new DataCubeParameters(),8.asInstanceOf[Integer]), + arguments({ + val p = new DataCubeParameters() + p.resampleMethod = Average + p.loadPerProduct = true + p + },9.asInstanceOf[Integer] + ) + )) } @@ -260,8 +274,9 @@ class Sentinel2FileLayerProviderTest extends RasterMatchers { m } - @Test - def multibandWithSpacetimeMask(): Unit = { + @ParameterizedTest + @MethodSource(Array("datacubeParams")) + def multibandWithSpacetimeMask(parameters: DataCubeParameters, expectedNBStages: Int): Unit = { val date = ZonedDateTime.of(LocalDate.of(2020, 4, 5), MIDNIGHT, UTC) val bbox = ProjectedExtent(Extent(1.90283, 50.9579, 1.97116, 51.0034), LatLng) @@ -282,11 +297,16 @@ class Sentinel2FileLayerProviderTest extends RasterMatchers { var layer = tocLayerProvider.readMultibandTileLayer(from = date, to = date, bbox, Array(MultiPolygon(bbox.extent.toPolygon())),bbox.crs, sc = sc,zoom = 14,datacubeParams = Option.empty) val originalCount = layer.count() - val parameters = new DataCubeParameters() parameters.maskingCube = Some(mask) - layer = tocLayerProvider.readMultibandTileLayer(from = date, to = date, bbox, Array(MultiPolygon(bbox.extent.toPolygon())),bbox.crs, sc = sc,zoom = 14,datacubeParams = Some(parameters)) + val listener = new GetInfoSparkListener() + SparkContext.getOrCreate().addSparkListener(listener) + + layer = tocLayerProvider.readMultibandTileLayer(from = date, to = date, bbox, Array(MultiPolygon(bbox.extent.toPolygon())),bbox.crs, sc = sc,zoom = 14,datacubeParams = Some(parameters)) + assertTrue(layer.partitioner.get.asInstanceOf[SpacePartitioner[SpaceTimeKey]].index.isInstanceOf[ConfigurableSpaceTimePartitioner]) val maskedCount = layer.count() + SparkContext.getOrCreate().removeSparkListener(listener) + assertEquals(expectedNBStages,listener.getStagesCompleted) val spatialLayer = p.rasterMask(layer,mask,Double.NaN) .toSpatial(date) .cache() @@ -299,9 +319,13 @@ class Sentinel2FileLayerProviderTest extends RasterMatchers { val refFile = Thread.currentThread().getContextClassLoader.getResource("org/openeo/geotrellis/Sentinel2FileLayerProvider_multiband_reference.tif") val refTiff = GeoTiff.readMultiband(refFile.getPath) - val mse = MergeCubesSpec.simpleMeanSquaredError(resultTiff.tile.band(0), refTiff.tile.band(0)) - println("MSE = " + mse) - assertTrue(mse < 0.1) + + withGeoTiffClue(resultTiff.raster, refTiff.raster, refTiff.crs) { + //TODO lower the threshold from this silly high value. It is so high because of nearest neighbour resampling, which causes this + // Due to issue with resampling, we're temporarily stuck with it + assertRastersEqual(refTiff.raster,resultTiff.raster,601) + } + } diff --git a/openeo-logging/pom.xml b/openeo-logging/pom.xml index c65a52b90..38f8aed9c 100644 --- a/openeo-logging/pom.xml +++ b/openeo-logging/pom.xml @@ -5,7 +5,7 @@ openeo-geotrellis-extensions org.openeo - 2.4.0_2.12-SNAPSHOT + ${revision} 4.0.0 diff --git a/pom.xml b/pom.xml index 90804d68e..e8d95bb52 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 org.openeo openeo-geotrellis-extensions - 2.4.0_2.12-SNAPSHOT + ${revision} pom openeo-geotrellis-extensions @@ -17,10 +17,11 @@ 3.8.0 1.10 0.17.0_2.12-SNAPSHOT - 1.2.0_2.12-SNAPSHOT + 1.4.0_2.12-SNAPSHOT 2.21.26 2.3.0 UTF-8 + 2.4.0_2.12-SNAPSHOT @@ -60,6 +61,7 @@ geotrellis-common openeo-logging geopyspark-geotrellis + geotrellis-integrationtests @@ -151,6 +153,31 @@ + + org.codehaus.mojo + flatten-maven-plugin + 1.5.0 + + true + resolveCiFriendliesOnly + + + + flatten + process-resources + + flatten + + + + flatten.clean + clean + + clean + + + +