Skip to content

Commit 40db62a

Browse files
authored
Fix S3 deletion for buckets/endpoints other than what’s configured for upload (#9047)
We can have multiple managed s3 stores. Only one is selected for upload but dataset deletion should work for all of them. This PR refactors the ManagedS3Service a bit and selects the correct client with endpoint/credential for each path. ### URL of deployed dev instance (used for testing): - https://___.webknossos.xyz ### Steps to test: (I already tested locally, no need to re-test) - In application.conf, insert multiple dataVaults.credentials for s3 endpoints. Set s3Upload.credentialName to the first of them - Upload a small dataset - set the s3Upload.credentialName to another of them - make sure both datasets can be loaded - delete both datasets from their respective settings views. - backend logging should show they were deleted from their respective s3 buckets ### Issues: - follow-up for #8924 - fixes #9043 ------ - [x] Added changelog entry (create a `$PR_NUMBER.md` file in `unreleased_changes` or use `./tools/create-changelog-entry.py`) - [x] Removed dev-only changes like prints and application.conf edits - [x] Considered [common edge cases](../blob/master/.github/common_edge_cases.md) - [x] Needs datastore update after deployment
1 parent bb89fb8 commit 40db62a

File tree

7 files changed

+77
-76
lines changed

7 files changed

+77
-76
lines changed

unreleased_changes/9047.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
### Fixed
2+
- Fixed dataset deletion for data stored on managed s3 buckets different from the one currently selected for dataset upload.

webknossos-datastore/app/com/scalableminds/webknossos/datastore/datavault/S3DataVault.scala

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,8 @@ import org.apache.commons.lang3.builder.HashCodeBuilder
1414
import play.api.libs.ws.WSClient
1515
import software.amazon.awssdk.auth.credentials.{
1616
AnonymousCredentialsProvider,
17-
AwsBasicCredentials,
1817
AwsCredentialsProvider,
1918
EnvironmentVariableCredentialsProvider,
20-
StaticCredentialsProvider
2119
}
2220
import software.amazon.awssdk.awscore.util.AwsHostNameUtils
2321
import software.amazon.awssdk.core.ResponseBytes
@@ -193,12 +191,7 @@ object S3DataVault {
193191

194192
private def getCredentialsProvider(credentialOpt: Option[S3AccessKeyCredential]): AwsCredentialsProvider =
195193
credentialOpt match {
196-
case Some(s3AccessKeyCredential: S3AccessKeyCredential) =>
197-
StaticCredentialsProvider.create(
198-
AwsBasicCredentials.builder
199-
.accessKeyId(s3AccessKeyCredential.accessKeyId)
200-
.secretAccessKey(s3AccessKeyCredential.secretAccessKey)
201-
.build())
194+
case Some(s3AccessKeyCredential: S3AccessKeyCredential) => s3AccessKeyCredential.toCredentialsProvider
202195
case None if sys.env.contains("AWS_ACCESS_KEY_ID") || sys.env.contains("AWS_ACCESS_KEY") =>
203196
EnvironmentVariableCredentialsProvider.create()
204197
case None =>

webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/DSUsedStorageService.scala

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ object PathStorageUsageResponse {
3131

3232
case class PathPair(original: String, upath: UPath)
3333

34-
class DSUsedStorageService @Inject()(config: DataStoreConfig, dataVaultService: DataVaultService)
34+
class DSUsedStorageService @Inject()(config: DataStoreConfig,
35+
dataVaultService: DataVaultService,
36+
managedS3Service: ManagedS3Service)
3537
extends FoxImplicits
3638
with LazyLogging {
3739

@@ -55,14 +57,8 @@ class DSUsedStorageService @Inject()(config: DataStoreConfig, dataVaultService:
5557
pathPair
5658
})
5759
// Check to only measure remote paths that are part of a vault that is configured.
58-
(pathPairsToMeasure, _absoluteUpathsToSkip) = pathPairsWithAbsoluteUpath.partition(
59-
path =>
60-
path.upath.isLocal || config.Datastore.DataVaults.credentials.exists(
61-
vaultCredentialConfig =>
62-
UPath
63-
.fromString(vaultCredentialConfig.getString("name"))
64-
.map(registeredPath => path.upath.startsWith(registeredPath))
65-
.getOrElse(false)))
60+
(pathPairsToMeasure, _absoluteUpathsToSkip) = pathPairsWithAbsoluteUpath.partition(path =>
61+
path.upath.isLocal || managedS3Service.pathIsInManagedS3(path.upath))
6662
vaultPathsForPathPairsToMeasure <- Fox.serialCombined(pathPairsToMeasure)(pathPair =>
6763
dataVaultService.vaultPathFor(pathPair.upath))
6864
usedBytes <- Fox.fromFuture(

webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/ManagedS3Service.scala

Lines changed: 56 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
package com.scalableminds.webknossos.datastore.services
22

3+
import com.scalableminds.util.time.Instant
34
import com.scalableminds.util.tools.{Box, Fox, FoxImplicits}
45
import com.scalableminds.util.tools.Box.tryo
56
import com.scalableminds.webknossos.datastore.DataStoreConfig
67
import com.scalableminds.webknossos.datastore.helpers.{PathSchemes, S3UriUtils, UPath}
7-
import com.scalableminds.webknossos.datastore.storage.{CredentialConfigReader, S3AccessKeyCredential}
8+
import com.scalableminds.webknossos.datastore.storage.{
9+
CredentialConfigReader,
10+
DataVaultCredential,
11+
S3AccessKeyCredential
12+
}
813
import com.typesafe.scalalogging.LazyLogging
9-
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
1014
import software.amazon.awssdk.core.checksums.RequestChecksumCalculation
1115
import software.amazon.awssdk.regions.Region
1216
import software.amazon.awssdk.services.s3.S3AsyncClient
@@ -25,24 +29,44 @@ import scala.concurrent.ExecutionContext
2529
import scala.jdk.CollectionConverters._
2630
import scala.jdk.FutureConverters._
2731

28-
class ManagedS3Service @Inject()(dataStoreConfig: DataStoreConfig) extends FoxImplicits with LazyLogging {
32+
class ManagedS3Service @Inject()(config: DataStoreConfig) extends FoxImplicits with LazyLogging {
33+
34+
def pathIsInManagedS3(path: UPath): Boolean =
35+
path.getScheme.contains(PathSchemes.schemeS3) && globalCredentials.exists(c =>
36+
UPath.fromString(c.name).map(path.startsWith).getOrElse(false))
37+
38+
def findGlobalCredentialFor(pathOpt: Option[UPath]): Option[S3AccessKeyCredential] =
39+
pathOpt.flatMap(findGlobalCredentialFor)
40+
41+
private def findGlobalCredentialFor(path: UPath): Option[S3AccessKeyCredential] =
42+
globalCredentials.collectFirst {
43+
case credential: S3AccessKeyCredential if path.toString.startsWith(credential.name) => credential
44+
}
2945

30-
private lazy val s3UploadCredentialsOpt: Option[(String, String)] =
31-
dataStoreConfig.Datastore.DataVaults.credentials.flatMap { credentialConfig =>
46+
private lazy val globalCredentials: Seq[DataVaultCredential] = {
47+
val res = config.Datastore.DataVaults.credentials.flatMap { credentialConfig =>
3248
new CredentialConfigReader(credentialConfig).getCredential
33-
}.collectFirst {
34-
case S3AccessKeyCredential(credentialName, accessKeyId, secretAccessKey, _, _)
35-
if dataStoreConfig.Datastore.S3Upload.credentialName == credentialName =>
36-
(accessKeyId, secretAccessKey)
49+
}
50+
logger.info(s"Parsed ${res.length} global data vault credentials from datastore config.")
51+
res
52+
}
53+
54+
private lazy val s3UploadCredentialOpt: Option[S3AccessKeyCredential] =
55+
globalCredentials.collectFirst {
56+
case credential: S3AccessKeyCredential if config.Datastore.S3Upload.credentialName == credential.name =>
57+
credential
3758
}
3859

3960
lazy val s3UploadBucketOpt: Option[String] =
4061
// by convention, the credentialName is the S3 URI so we can extract the bucket from it.
41-
S3UriUtils.hostBucketFromUri(new URI(dataStoreConfig.Datastore.S3Upload.credentialName))
62+
S3UriUtils.hostBucketFromUri(new URI(config.Datastore.S3Upload.credentialName))
4263

43-
private lazy val s3UploadEndpoint: URI = {
44-
// by convention, the credentialName is the S3 URI so we can extract the bucket from it.
45-
val credentialUri = new URI(dataStoreConfig.Datastore.S3Upload.credentialName)
64+
private lazy val s3UploadEndpoint: URI =
65+
endpointForCredentialName(config.Datastore.S3Upload.credentialName)
66+
67+
private def endpointForCredentialName(credentialName: String) = {
68+
// by convention, the credentialName is the S3 URI so we can extract the endpoint from it.
69+
val credentialUri = new URI(credentialName)
4670
new URI(
4771
"https",
4872
null,
@@ -54,26 +78,26 @@ class ManagedS3Service @Inject()(dataStoreConfig: DataStoreConfig) extends FoxIm
5478
)
5579
}
5680

57-
private lazy val s3ClientBox: Box[S3AsyncClient] = for {
58-
accessKeyId <- Box(s3UploadCredentialsOpt.map(_._1))
59-
secretAccessKey <- Box(s3UploadCredentialsOpt.map(_._2))
60-
client <- tryo(
81+
private lazy val s3UploadClientBox: Box[S3AsyncClient] = for {
82+
s3UploadCredential <- Box(s3UploadCredentialOpt)
83+
client <- buildClient(s3UploadEndpoint, s3UploadCredential)
84+
} yield client
85+
86+
private def buildClient(endpoint: URI, credential: S3AccessKeyCredential): Box[S3AsyncClient] =
87+
tryo(
6188
S3AsyncClient
6289
.builder()
63-
.credentialsProvider(StaticCredentialsProvider.create(
64-
AwsBasicCredentials.builder.accessKeyId(accessKeyId).secretAccessKey(secretAccessKey).build()
65-
))
90+
.credentialsProvider(credential.toCredentialsProvider)
6691
.crossRegionAccessEnabled(true)
6792
.forcePathStyle(true)
68-
.endpointOverride(s3UploadEndpoint)
93+
.endpointOverride(endpoint)
6994
.region(Region.US_EAST_1)
7095
// Disabling checksum calculation prevents files being stored with Content Encoding "aws-chunked".
7196
.requestChecksumCalculation(RequestChecksumCalculation.WHEN_REQUIRED)
7297
.build())
73-
} yield client
7498

75-
lazy val transferManagerBox: Box[S3TransferManager] = for {
76-
client <- s3ClientBox
99+
lazy val s3UploadTransferManagerBox: Box[S3TransferManager] = for {
100+
client <- s3UploadClientBox
77101
} yield S3TransferManager.builder().s3Client(client).build()
78102

79103
def deletePaths(paths: Seq[UPath])(implicit ec: ExecutionContext): Fox[Unit] = {
@@ -89,13 +113,19 @@ class ManagedS3Service @Inject()(dataStoreConfig: DataStoreConfig) extends FoxIm
89113
implicit ec: ExecutionContext): Fox[Unit] =
90114
for {
91115
bucket <- bucketOpt.toFox ?~> "Could not determine S3 bucket from UPath"
92-
s3Client <- s3ClientBox.toFox ?~> "No managed s3 client configured"
116+
firstPath <- paths.headOption.toFox // groupBy of the caller guarantees that this is not called with empty list.
117+
credential <- findGlobalCredentialFor(firstPath).toFox // Convention is that the credentials are per bucket, so we can reuse this for all paths
118+
endpoint = endpointForCredentialName(credential.name)
119+
s3Client <- buildClient(endpoint, credential).toFox
93120
prefixes <- Fox.combined(paths.map(path => S3UriUtils.objectKeyFromUri(path.toRemoteUriUnsafe).toFox))
94121
keys: Seq[String] <- Fox.serialCombined(prefixes)(listKeysAtPrefix(s3Client, bucket, _)).map(_.flatten)
95122
uniqueKeys = keys.distinct
96123
_ = logger.info(s"Deleting ${uniqueKeys.length} objects from managed S3 bucket $bucket...")
124+
before = Instant.now
97125
_ <- Fox.serialCombined(uniqueKeys.grouped(1000).toSeq)(deleteBatch(s3Client, bucket, _)).map(_ => ())
98-
_ = logger.info(s"Successfully deleted ${uniqueKeys.length} objects from managed S3 bucket $bucket.")
126+
_ = Instant.logSince(before,
127+
s"Successfully deleted ${uniqueKeys.length} objects from managed S3 bucket $bucket.",
128+
logger)
99129
} yield ()
100130

101131
private def deleteBatch(s3Client: S3AsyncClient, bucket: String, keys: Seq[String])(
@@ -142,16 +172,4 @@ class ManagedS3Service @Inject()(dataStoreConfig: DataStoreConfig) extends FoxIm
142172
listRecursive(None, Seq())
143173
}
144174

145-
private lazy val globalCredentials = {
146-
val res = dataStoreConfig.Datastore.DataVaults.credentials.flatMap { credentialConfig =>
147-
new CredentialConfigReader(credentialConfig).getCredential
148-
}
149-
logger.info(s"Parsed ${res.length} global data vault credentials from datastore config.")
150-
res
151-
}
152-
153-
def pathIsInManagedS3(path: UPath): Boolean =
154-
path.getScheme.contains(PathSchemes.schemeS3) && globalCredentials.exists(c =>
155-
UPath.fromString(c.name).map(path.startsWith).getOrElse(false))
156-
157175
}

webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/uploading/UploadService.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -537,7 +537,7 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
537537
prefix: String
538538
): Fox[Unit] =
539539
for {
540-
transferManager <- managedS3Service.transferManagerBox.toFox ?~> "S3 upload is not properly configured, cannot get S3 client"
540+
transferManager <- managedS3Service.s3UploadTransferManagerBox.toFox ?~> "S3 upload is not properly configured, cannot get S3 client"
541541
directoryUpload = transferManager.uploadDirectory(
542542
UploadDirectoryRequest.builder().bucket(bucketName).s3Prefix(prefix).source(dataDir).build()
543543
)

webknossos-datastore/app/com/scalableminds/webknossos/datastore/storage/DataVaultCredentials.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package com.scalableminds.webknossos.datastore.storage
22

33
import com.scalableminds.util.tools.Fox
44
import play.api.libs.json.{JsValue, Json, OFormat}
5+
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
56

67
import scala.concurrent.ExecutionContext
78

@@ -39,6 +40,10 @@ case class S3AccessKeyCredential(name: String,
3940
organization: Option[String])
4041
extends DataVaultCredential {
4142
override def userId: Option[String] = user
43+
44+
def toCredentialsProvider: StaticCredentialsProvider = StaticCredentialsProvider.create(
45+
AwsBasicCredentials.builder.accessKeyId(accessKeyId).secretAccessKey(secretAccessKey).build()
46+
)
4247
}
4348

4449
object S3AccessKeyCredential {

webknossos-datastore/app/com/scalableminds/webknossos/datastore/storage/DataVaultService.scala

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import com.typesafe.scalalogging.LazyLogging
1616
import com.scalableminds.webknossos.datastore.dataformats.MagLocator
1717
import com.scalableminds.webknossos.datastore.helpers.{PathSchemes, UPath}
1818
import com.scalableminds.webknossos.datastore.models.datasource.{DataSourceId, LayerAttachment}
19-
import com.scalableminds.webknossos.datastore.services.DSRemoteWebknossosClient
19+
import com.scalableminds.webknossos.datastore.services.{DSRemoteWebknossosClient, ManagedS3Service}
2020
import play.api.libs.ws.WSClient
2121

2222
import java.nio.file.Path
@@ -27,15 +27,16 @@ case class CredentializedUPath(upath: UPath, credential: Option[DataVaultCredent
2727

2828
class DataVaultService @Inject()(ws: WSClient,
2929
config: DataStoreConfig,
30-
remoteWebknossosClient: DSRemoteWebknossosClient)
30+
remoteWebknossosClient: DSRemoteWebknossosClient,
31+
managedS3Service: ManagedS3Service)
3132
extends LazyLogging
3233
with FoxImplicits {
3334

3435
private val vaultCache: AlfuCache[CredentializedUPath, DataVault] =
3536
AlfuCache(maxCapacity = 100)
3637

3738
def vaultPathFor(upath: UPath)(implicit ec: ExecutionContext): Fox[VaultPath] = {
38-
val credentialOpt = findGlobalCredentialFor(Some(upath))
39+
val credentialOpt = managedS3Service.findGlobalCredentialFor(Some(upath))
3940
vaultPathFor(CredentializedUPath(upath, credentialOpt))
4041
}
4142

@@ -109,25 +110,14 @@ class DataVaultService @Inject()(ws: WSClient,
109110
resolveMagPath(magLocator, localDatasetDir, localLayerDir, layerName)
110111
}
111112

112-
private lazy val globalCredentials = {
113-
val res = config.Datastore.DataVaults.credentials.flatMap { credentialConfig =>
114-
new CredentialConfigReader(credentialConfig).getCredential
115-
}
116-
logger.info(s"Parsed ${res.length} global data vault credentials from datastore config.")
117-
res
118-
}
119-
120-
private def findGlobalCredentialFor(pathOpt: Option[UPath]): Option[DataVaultCredential] =
121-
pathOpt.flatMap(path => globalCredentials.find(c => path.toString.startsWith(c.name)))
122-
123113
private def credentialFor(magLocator: MagLocator)(implicit ec: ExecutionContext): Fox[DataVaultCredential] =
124114
magLocator.credentialId match {
125115
case Some(credentialId) =>
126116
remoteWebknossosClient.getCredential(credentialId)
127117
case None =>
128118
magLocator.credentials match {
129119
case Some(credential) => Fox.successful(credential)
130-
case None => findGlobalCredentialFor(magLocator.path).toFox
120+
case None => managedS3Service.findGlobalCredentialFor(magLocator.path).toFox
131121
}
132122
}
133123

@@ -136,14 +126,14 @@ class DataVaultService @Inject()(ws: WSClient,
136126
case Some(credentialId) =>
137127
remoteWebknossosClient.getCredential(credentialId)
138128
case None =>
139-
findGlobalCredentialFor(Some(attachment.path)).toFox
129+
managedS3Service.findGlobalCredentialFor(Some(attachment.path)).toFox
140130
}
141131

142132
def pathIsAllowedToAddDirectly(path: UPath): Boolean =
143133
if (path.isLocal)
144134
pathIsDataSourceLocal(path) || pathIsInLocalDirectoryWhitelist(path)
145135
else
146-
!pathMatchesGlobalCredentials(path)
136+
!managedS3Service.pathIsInManagedS3(path)
147137

148138
private def pathIsDataSourceLocal(path: UPath): Boolean =
149139
path.isLocal && {
@@ -152,9 +142,6 @@ class DataVaultService @Inject()(ws: WSClient,
152142
!path.isAbsolute && inWorkingDir.startsWith(workingDir)
153143
}
154144

155-
private def pathMatchesGlobalCredentials(path: UPath): Boolean =
156-
findGlobalCredentialFor(Some(path)).isDefined
157-
158145
private def pathIsInLocalDirectoryWhitelist(path: UPath): Boolean =
159146
path.isLocal &&
160147
config.Datastore.localDirectoryWhitelist.exists(whitelistEntry => path.toString.startsWith(whitelistEntry))

0 commit comments

Comments
 (0)