@@ -20,6 +20,7 @@ import com.scalableminds.webknossos.datastore.helpers.{DatasetDeleter, Directory
2020import com .scalableminds .webknossos .datastore .models .UnfinishedUpload
2121import com .scalableminds .webknossos .datastore .models .datasource .GenericDataSource .FILENAME_DATASOURCE_PROPERTIES_JSON
2222import com .scalableminds .webknossos .datastore .models .datasource ._
23+ import com .scalableminds .webknossos .datastore .models .datasource .inbox .InboxDataSource
2324import com .scalableminds .webknossos .datastore .services .{DSRemoteWebknossosClient , DataSourceService }
2425import com .scalableminds .webknossos .datastore .storage .{
2526 CredentialConfigReader ,
@@ -401,7 +402,7 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
401402 _ = logger.info(
402403 s " Starting upload of dataset ${dataSourceId.organizationId}/ ${dataSourceId.directoryName} to S3. " )
403404 s3ObjectKey = s " ${dataStoreConfig.Datastore .S3Upload .objectKeyPrefix}/ $uploadId/ "
404- _ <- uploadDirectoryToS3(unpackToDir, dataStoreConfig.Datastore .S3Upload .bucketName, s3ObjectKey)
405+ _ <- uploadDirectoryToS3(unpackToDir, dataSource, dataStoreConfig.Datastore .S3Upload .bucketName, s3ObjectKey)
405406 _ = logger.info(
406407 s " Finished upload of dataset ${dataSourceId.organizationId}/ ${dataSourceId.directoryName} to S3. " )
407408 endPointHost = new URI (dataStoreConfig.Datastore .S3Upload .endpoint).getHost
@@ -524,11 +525,23 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
524525
525526 private def uploadDirectoryToS3 (
526527 dataDir : Path ,
528+ dataSource : InboxDataSource ,
527529 bucketName : String ,
528530 prefix : String
529531 ): Fox [Unit ] =
530532 for {
531533 _ <- Fox .successful(())
534+ // Delete all files in the dataDir that are not at a mag path or an attachment path, since we do not need to upload them to S3.
535+ filesToDelete <- getNonReferencedFiles(dataDir, dataSource)
536+ _ = filesToDelete.foreach(file => {
537+ logger.info(s " Deleting file $file before upload to S3. " )
538+ try {
539+ Files .deleteIfExists(file)
540+ } catch {
541+ case e : Exception =>
542+ logger.warn(s " Could not delete file $file before upload to S3: ${e.getMessage}" )
543+ }
544+ })
532545 directoryUpload = transferManager.uploadDirectory(
533546 UploadDirectoryRequest .builder().bucket(bucketName).s3Prefix(prefix).source(dataDir).build()
534547 )
@@ -538,6 +551,26 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
538551 s " Some files failed to upload to S3: $failedTransfers"
539552 } yield ()
540553
554+ private def getNonReferencedFiles (dataDir : Path , dataSource : InboxDataSource ): Fox [List [Path ]] =
555+ for {
556+ usableDataSource <- dataSource.toUsable.toFox ?~> " Data source is not usable"
557+ explicitPaths : Set [Path ] = usableDataSource.dataLayers
558+ .flatMap(layer =>
559+ layer.mags.map(mag =>
560+ mag.path match {
561+ case Some (_) => None
562+ case None => Some (dataDir.resolve(List (layer.name, mag.mag.toMagLiteral(true )).mkString(" /" )))
563+ }))
564+ .flatten
565+ .toSet
566+ neededPaths = usableDataSource.dataLayers
567+ .flatMap(layer => layer.allExplicitPaths)
568+ .map(dataDir.resolve)
569+ .toSet ++ explicitPaths
570+ allFiles <- PathUtils .listFilesRecursive(dataDir, silent = true , maxDepth = 10 ).toFox
571+ filesToDelete = allFiles.filterNot(file => neededPaths.exists(neededPath => file.startsWith(neededPath)))
572+ } yield filesToDelete
573+
541574 private def cleanUpOnFailure [T ](result : Box [T ],
542575 dataSourceId : DataSourceId ,
543576 datasetNeedsConversion : Boolean ,
0 commit comments