Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Per executor output v02 #337

Merged
merged 20 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
3e67e96
Save output from executors to unique folders. Move the successful res…
EmileSonneveld Oct 17, 2024
fd6235a
Merge branch 'develop' into per_executor_output_v02
EmileSonneveld Oct 17, 2024
003736c
Fix output folder for tests.
EmileSonneveld Oct 18, 2024
f916512
Add waitTillPathAvailable when accessing file from driver that was wr…
EmileSonneveld Oct 29, 2024
bf32624
Merge branch 'develop' into per_executor_output_v02
EmileSonneveld Oct 29, 2024
7a00557
Use move trick for saveStitchedTileGrid too. https://github.com/Open-…
EmileSonneveld Oct 29, 2024
e81cc7b
Disable sentinelhub test for the moment.
EmileSonneveld Oct 29, 2024
65046dd
Disable sentinelhub test for the moment.
EmileSonneveld Oct 29, 2024
22327e1
React to MR: Cleanup code. Use @TempDir. Prune imports. https://githu…
EmileSonneveld Oct 31, 2024
d847b26
Merge branch 'develop' into per_executor_output_v02
EmileSonneveld Oct 31, 2024
dccc838
Fix for when using filepath_per_band. Re-enable shub tests.
EmileSonneveld Oct 31, 2024
41ab3dd
Use more @TempDir. Cleanup.
EmileSonneveld Oct 31, 2024
f199970
Support using S3 API directly for tiff output. https://github.com/Ope…
EmileSonneveld Nov 4, 2024
2dc7b30
Avoid SentinelHubException in CI
EmileSonneveld Nov 4, 2024
63a39b5
More cleanup for MR. Fix removing empty executorAttemptDirectories.
EmileSonneveld Nov 5, 2024
6a9bc4b
Allow override target files again to be sure.
EmileSonneveld Nov 6, 2024
5a710f4
Don't crash when geoTiff.write() does not return a file. https://gith…
EmileSonneveld Nov 7, 2024
5e31712
Fix for TileGridTest.testWriteRDDTileGrid.
EmileSonneveld Nov 7, 2024
a68be18
Merge branch 'develop' into per_executor_output_v02
EmileSonneveld Nov 7, 2024
7a6a8af
Enable test before merge
EmileSonneveld Nov 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
package org.openeo.geotrellis.creo

import geotrellis.store.s3.AmazonS3URI
import org.apache.commons.io.FileUtils
import org.openeo.geotrelliss3.S3Utils
import org.slf4j.LoggerFactory
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.awscore.retry.conditions.RetryOnErrorCodeCondition
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration
import software.amazon.awssdk.core.retry.RetryPolicy
import software.amazon.awssdk.core.retry.backoff.FullJitterBackoffStrategy
import software.amazon.awssdk.core.retry.conditions.{OrRetryCondition, RetryCondition}
import software.amazon.awssdk.core.sync.RequestBody
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.model._
import software.amazon.awssdk.services.s3.{S3AsyncClient, S3Client, S3Configuration}

import java.net.URI
import java.nio.file.{FileAlreadyExistsException, Files, Path}
import java.time.Duration
import scala.collection.JavaConverters._
import scala.collection.immutable.Iterable
import scala.util.control.Breaks.{break, breakable}

object CreoS3Utils {
private val logger = LoggerFactory.getLogger(getClass)

private val cloudFerroRegion: Region = Region.of("RegionOne")

Expand Down Expand Up @@ -66,8 +76,174 @@ object CreoS3Utils {
overrideConfig
}

def deleteCreoSubFolder(bucket_name: String, subfolder: String) = {
//noinspection ScalaWeakerAccess
def deleteCreoSubFolder(bucket_name: String, subfolder: String): Unit = {
val s3Client = getCreoS3Client()
S3Utils.deleteSubFolder(s3Client, bucket_name, subfolder)
}

def isS3(path: String): Boolean = {
path.toLowerCase.startsWith("s3:/")
}

private def toAmazonS3URI(path: String): AmazonS3URI = {
val correctS3Path = path.replaceFirst("(?i)s3:/(?!/)", "s3://")
new AmazonS3URI(correctS3Path)
}

// In the following functions an asset path could be a local path or an S3 path.

/**
* S3 does not have folders, so we interpret the path as a prefix.
*/
def assetDeleteFolders(paths: Iterable[String]): Unit = {
for (path <- paths) {
if (isS3(path)) {
val s3Uri = toAmazonS3URI(path)
deleteCreoSubFolder(s3Uri.getBucket, s3Uri.getKey)
} else {
val p = Path.of(path)
if (Files.exists(p)) {
if (Files.isDirectory(p)) {
FileUtils.deleteDirectory(p.toFile)
} else {
throw new IllegalArgumentException(f"Can only delete directory here: $path")
}
}
}
}
}

def assetDelete(path: String): Unit = {
if (isS3(path)) {
val s3Uri = toAmazonS3URI(path)
val keys = Seq(path)
val deleteObjectsRequest = DeleteObjectsRequest.builder
.bucket(s3Uri.getBucket)
.delete(Delete.builder.objects(keys.map(key => ObjectIdentifier.builder.key(key).build).asJavaCollection).build)
.build
getCreoS3Client().deleteObjects(deleteObjectsRequest)
} else {
val p = Path.of(path)
if (Files.isDirectory(p)) {
throw new IllegalArgumentException(f"Cannot delete directory like this: $path")
} else {
Files.deleteIfExists(p)
}
}
}

def asseetPathListDirectChildren(path: String): Set[String] = {
if (isS3(path)) {
val s3Uri = toAmazonS3URI(path)
val listObjectsRequest = ListObjectsRequest.builder
.bucket(s3Uri.getBucket)
.prefix(s3Uri.getKey)
.build
val listObjectsResponse = getCreoS3Client().listObjects(listObjectsRequest)
listObjectsResponse.contents.asScala.map(o => f"s3://${s3Uri.getBucket}/${o.key}").toSet
} else {
Files.list(Path.of(path)).toArray.map(_.toString).toSet
}
}

def assetExists(path: String): Boolean = {
if (isS3(path)) {
try {
// https://stackoverflow.com/a/56038360/1448736
val s3Uri = toAmazonS3URI(path)
val objectRequest = HeadObjectRequest.builder
.bucket(s3Uri.getBucket)
.key(s3Uri.getKey)
.build
getCreoS3Client().headObject(objectRequest)
true
} catch {
case _: NoSuchKeyException => false
}
} else {
Files.exists(Path.of(path))
}
}

def copyAsset(pathOrigin: String, pathDestination: String): Unit = {
if (isS3(pathOrigin) && isS3(pathDestination)) {
val s3UriOrigin = toAmazonS3URI(pathOrigin)
val s3UriDestination = toAmazonS3URI(pathDestination)
val copyRequest = CopyObjectRequest.builder
.sourceBucket(s3UriOrigin.getBucket)
.sourceKey(s3UriOrigin.getKey)
.destinationBucket(s3UriDestination.getBucket)
.destinationKey(s3UriDestination.getKey)
.build
getCreoS3Client().copyObject(copyRequest)
} else if (!isS3(pathOrigin) && !isS3(pathDestination)) {
Files.copy(Path.of(pathOrigin), Path.of(pathDestination))
} else if (!isS3(pathOrigin) && isS3(pathDestination)) {
uploadToS3(Path.of(pathOrigin), pathDestination)
} else if (isS3(pathOrigin) && !isS3(pathDestination)) {
// TODO: Download
throw new IllegalArgumentException(f"S3->local not supported here yet ($pathOrigin, $pathDestination)")
} else {
throw new IllegalArgumentException(f"Should be impossible to get here ($pathOrigin, $pathDestination)")
}
}

def moveAsset(pathOrigin: String, pathDestination: String): Unit = {
// This could be optimized using move when on file system.
copyAsset(pathOrigin, pathDestination)
assetDelete(pathOrigin)
}

def waitTillPathAvailable(path: Path): Unit = {
var retry = 0
val maxTries = 20
while (!assetExists(path.toString)) {
if (retry < maxTries) {
retry += 1
val seconds = 5
logger.info(f"Waiting for path to be available. Try $retry/$maxTries (sleep:$seconds seconds): $path")
Thread.sleep(seconds * 1000)
} else {
logger.warn(f"Path is not available after $maxTries tries: $path")
// Throw error instead?
return
}
}
}

def moveOverwriteWithRetries(oldPath: String, newPath: String): Unit = {
var try_count = 1
breakable {
while (true) {
try {
if (assetExists(newPath)) {
// It might be a partial result of a previous failing task.
logger.info(f"Will replace $newPath. (try $try_count)")
assetDelete(newPath)
}
moveAsset(oldPath, newPath)
break
} catch {
case e: Exception =>
// Here if another executor wrote the file between the delete and the move statement.
try_count += 1
if (try_count > 5) {
throw e
}
}
}
}
}

def uploadToS3(localFile: Path, s3Path: String) = {
val s3Uri = toAmazonS3URI(s3Path)
val objectRequest = PutObjectRequest.builder
.bucket(s3Uri.getBucket)
.key(s3Uri.getKey)
.build

getCreoS3Client().putObject(objectRequest, RequestBody.fromFile(localFile))
s3Path
}
}
Loading