Skip to content

Commit 221b1c9

Browse files
committed
load_stac for netcdf: improved partitioning
#270
1 parent d302886 commit 221b1c9

File tree

1 file changed

+7
-4
lines changed

1 file changed

+7
-4
lines changed

openeo-geotrellis/src/main/scala/org/openeo/geotrellis/layers/NetCDFCollection.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@ import geotrellis.raster.{CellSize, MultibandTile, Raster, RasterExtent, Tile, T
77
import geotrellis.raster.gdal.{DefaultDomain, GDALException, GDALRasterSource, MalformedProjectionException}
88
import geotrellis.spark.{ContextRDD, MultibandTileLayerRDD, withTilerMethods}
99
import geotrellis.spark._
10+
import geotrellis.spark.partition.SpacePartitioner
1011
import geotrellis.vector.{Extent, ProjectedExtent}
11-
import org.apache.spark.SparkContext
12+
import org.apache.spark.{Partitioner, SparkContext}
1213
import org.apache.spark.rdd.RDD
1314
import org.openeo.geotrellis.ProjectedPolygons
14-
import org.openeo.geotrelliscommon.DataCubeParameters
15+
import org.openeo.geotrelliscommon.{ByTileSpacetimePartitioner, ByTileSpatialPartitioner, DataCubeParameters}
1516
import org.openeo.opensearch.OpenSearchClient
1617

1718
import java.time.{LocalDate, ZoneId, ZonedDateTime}
@@ -45,7 +46,7 @@ object NetCDFCollection {
4546
val bboxWGS84: Extent = items.map(_.bbox).reduce((a, b)=>(a.combine(b)))
4647

4748

48-
val features: RDD[(TemporalProjectedExtent, MultibandTile)] = items.flatMap(f=>{
49+
val features: RDD[(TemporalProjectedExtent, MultibandTile)] = items.repartition(stacItems.length).flatMap(f=>{
4950
val allTiles = f.links.flatMap(l=>{
5051
l.bandNames.get.flatMap(b=> {
5152
var gdalNetCDFLink = s"${l.href.toString.replace("file:", "NETCDF:")}:${b}"
@@ -106,8 +107,10 @@ object NetCDFCollection {
106107
val spatialBounds = KeyBounds(layout.mapTransform(extent))
107108
val temporalBounds = KeyBounds(SpaceTimeKey(spatialBounds.minKey,TemporalKey(LocalDate.of(1990,1,1).atStartOfDay(ZoneId.of("UTC")))),SpaceTimeKey(spatialBounds.maxKey,TemporalKey(LocalDate.now().atStartOfDay(ZoneId.of("UTC")))))
108109

110+
val partitioner: Partitioner = new SpacePartitioner(temporalBounds)(implicitly, implicitly, ByTileSpacetimePartitioner)
111+
109112
val metadata = TileLayerMetadata[SpaceTimeKey](cellType, layout, extent, crs(0), temporalBounds)
110-
val retiled: RDD[(SpaceTimeKey, MultibandTile)] = features.tileToLayout(metadata)
113+
val retiled: RDD[(SpaceTimeKey, MultibandTile)] = features.tileToLayout(metadata).partitionBy(partitioner)
111114
ContextRDD(retiled,metadata)
112115

113116

0 commit comments

Comments
 (0)