From f7c1cdc9e7e2d633ac5872e729066bcd64082429 Mon Sep 17 00:00:00 2001 From: odisseus Date: Wed, 6 Nov 2024 19:38:20 +0100 Subject: [PATCH] Custom logic for uploading indexes to S3 --- .../shared_indexes/core/WorkPlan.scala | 4 +- .../shared_indexes/remote/S3Upload.scala | 128 ++++++++++++++++++ .../remote/JdkIndexesS3OperationsTest.scala | 49 +++++++ 3 files changed, 179 insertions(+), 2 deletions(-) create mode 100644 src/main/scala/com/virtuslab/shared_indexes/remote/S3Upload.scala create mode 100644 src/test/scala/com/virtuslab/shared_indexes/remote/JdkIndexesS3OperationsTest.scala diff --git a/src/main/scala/com/virtuslab/shared_indexes/core/WorkPlan.scala b/src/main/scala/com/virtuslab/shared_indexes/core/WorkPlan.scala index baad939..23a196e 100644 --- a/src/main/scala/com/virtuslab/shared_indexes/core/WorkPlan.scala +++ b/src/main/scala/com/virtuslab/shared_indexes/core/WorkPlan.scala @@ -2,10 +2,10 @@ package com.virtuslab.shared_indexes.core import com.intellij.indexing.shared.cdn.CdnUpdatePlan import com.intellij.indexing.shared.cdn.S3 -import com.intellij.indexing.shared.cdn.S3_uploadKt import com.intellij.indexing.shared.cdn.S3Kt import com.intellij.indexing.shared.local.Local_uploadKt import com.virtuslab.shared_indexes.config.MainConfig +import com.virtuslab.shared_indexes.remote.S3Upload case class WorkPlan( intelliJ: IntelliJ, @@ -63,7 +63,7 @@ object WorkPlan { val s3ApiUrl = indexStorageConfig.indexServerUrl .getOrElse(throw new IllegalStateException("Must provide the server address for uploading to S3")) val s3 = S3Kt.S3(s"$s3ApiUrl/$bucketName", bucketName, s3ApiUrl, "/", 10) - (Some(s3), S3_uploadKt.updateS3Indexes(s3, _)) + (Some(s3), new S3Upload(s3).updateS3Indexes _) case None => // updateLocalIndexes executes the update plan in given basePath, which is the location that // the server should host diff --git a/src/main/scala/com/virtuslab/shared_indexes/remote/S3Upload.scala b/src/main/scala/com/virtuslab/shared_indexes/remote/S3Upload.scala new file mode 100644 index 0000000..fc02af9 --- /dev/null +++ b/src/main/scala/com/virtuslab/shared_indexes/remote/S3Upload.scala @@ -0,0 +1,128 @@ +package com.virtuslab.shared_indexes.remote + +import com.intellij.indexing.shared.cdn.upload.{CdnUploadAnotherEntry, CdnUploadDataEntry, CdnUploadEntry} +import com.intellij.indexing.shared.cdn.{CdnEntry, CdnUpdatePlan, S3, S3CdnEntry} +import com.intellij.indexing.shared.local.LocalDiskEntry +import org.slf4j.LoggerFactory +import software.amazon.awssdk.core.sync.RequestBody +import software.amazon.awssdk.services.s3.model.{DeleteObjectRequest, PutObjectRequest} + +import java.io.{ByteArrayInputStream, InputStream} +import java.nio.file.Files +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext, Future, TimeoutException} +import scala.jdk.CollectionConverters._ +import scala.util.{Failure, Success, Try, Using} + +class S3Upload(private val s3: S3, private val timeout: Duration = Duration(5, "min")) { + private val logger = LoggerFactory.getLogger(this.getClass) + private val rootInBucket: String = s3.getRootInBucket + private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(s3.getDispatcher.getExecutor) + + def updateS3Indexes(plan: CdnUpdatePlan): Unit = { + if (plan.getNewEntries.isEmpty && plan.getRemoveEntries.isEmpty) { + logger.warn("The update plan is empty, nothing to do!") + return + } + + val failedRemoves = removeFiles(plan.getRemoveEntries.asScala) + val failedUploads = uploadFiles(plan.getNewEntries.asScala) + val failureSummary = new StringBuilder() + if (failedRemoves.nonEmpty) { + failureSummary.append(s"Failed to remove ${failedRemoves.size} entries: \n") + failedRemoves.foreach { case (entry, e) => + failureSummary.append(s" $entry: \n ${e.getMessage}\n") + } + } + if (failedUploads.nonEmpty) { + failureSummary.append(s"Failed to upload ${failedUploads.size} entries: \n") + failedUploads.foreach { case (entry, e) => + failureSummary.append(s" $entry: \n ${e.getMessage}\n") + } + } + if (failureSummary.nonEmpty) { + throw new RuntimeException(failureSummary.toString()) + } + } + + private def uploadFiles(files: Iterable[CdnUploadEntry]): Map[CdnUploadEntry, Throwable] = { + logger.info("Uploading {} items...", files.size) + val results = awaitAll(putObject)(files) + collectFailures(files, results) + } + + private def removeFiles(files: Iterable[CdnEntry]): Map[CdnEntry, Throwable] = { + logger.info("Removing {} items...", files.size) + val results = awaitAll(deleteObject)(files) + collectFailures(files, results) + } + + private def awaitAll[T, R](task: T => Future[R])(inputs: Iterable[T]): Iterable[Try[R]] = { + val futures = inputs.map(task) + val completions = futures.map(_.transform { case _: Try[R] => Success(()) }) + val allCompleted = Future.sequence(completions) + var timeoutException: Option[TimeoutException] = None + try { + Await.ready(allCompleted, timeout) + } catch { + case e: TimeoutException => timeoutException = Some(e) + } + futures.map(_.value.getOrElse(Failure(timeoutException.get))) + } + + private def collectFailures[T, R](keys: Iterable[T], results: Iterable[Try[R]]): Map[T, Throwable] = { + (keys zip results).collect { case (file, Failure(e)) => + (file, e) + }.toMap + } + + private def putObject(entry: CdnUploadEntry): Future[Unit] = { + require( + !entry.getKey.startsWith(s"${rootInBucket}/"), + s"Key must not start with S3 paths prefix (${rootInBucket}): ${entry.getKey}" + ) + val req = PutObjectRequest.builder() + .bucket(s3.getBucket) + .key(keyInBucket(entry)) + .contentType(entry.getContentType) + .build() + Future { + logger.info("S3PUT {}", entry) + Using.resource(getInputStream(entry)) { istream => + s3.getClient.putObject(req, RequestBody.fromInputStream(istream, entry.getContentLength)) + } + logger.info("S3PUT end {}", entry) + } + } + + private def deleteObject(entry: CdnEntry): Future[Unit] = { + require( + !entry.getKey.startsWith(s"${rootInBucket}/"), + s"Key must not start with S3 paths prefix (${rootInBucket}): ${entry.getKey}" + ) + require(entry.isInstanceOf[S3CdnEntry], s"It is only possible to remove S3 entries, but was: $entry") + val req = DeleteObjectRequest.builder() + .bucket(s3.getBucket) + .key(keyInBucket(entry)) + .build() + Future { + logger.info("S3DEL {}", entry) + s3.getClient.deleteObject(req) + } + } + + private def keyInBucket(entry: CdnEntry): String = { + if (rootInBucket.isBlank) entry.getKey + else s"${rootInBucket}/${entry.getKey}" + } + + private def getInputStream(entry: CdnUploadEntry): InputStream = entry match { + case de: CdnUploadDataEntry => + new ByteArrayInputStream(de.getContent) + case ae: CdnUploadAnotherEntry => + ae.getItem match { + case l: LocalDiskEntry => Files.newInputStream(l.getPath) + } + } + +} diff --git a/src/test/scala/com/virtuslab/shared_indexes/remote/JdkIndexesS3OperationsTest.scala b/src/test/scala/com/virtuslab/shared_indexes/remote/JdkIndexesS3OperationsTest.scala new file mode 100644 index 0000000..76e6791 --- /dev/null +++ b/src/test/scala/com/virtuslab/shared_indexes/remote/JdkIndexesS3OperationsTest.scala @@ -0,0 +1,49 @@ +package com.virtuslab.shared_indexes.remote + +import com.intellij.indexing.shared.cdn.upload.{CdnUploadDataEntry, CdnUploadEntry} +import com.intellij.indexing.shared.cdn.{CdnUpdatePlan, S3, S3CdnEntry} +import org.scalamock.scalatest.MockFactory +import org.scalatest.funsuite.AnyFunSuite +import software.amazon.awssdk.core.sync.RequestBody +import software.amazon.awssdk.services.s3.S3Client +import software.amazon.awssdk.services.s3.model.{S3Object, PutObjectRequest, DeleteObjectRequest} + +import java.time.Instant +import scala.jdk.CollectionConverters._ + +class JdkIndexesS3OperationsTest extends AnyFunSuite with MockFactory { + + private val s3Client = stub[S3Client] + private val s3 = new S3("http://example.s3", "test-bucket", s3Client, "/path/to/indexes", 1) + + test("update") { + // Given + val toUpload = + new CdnUploadDataEntry("shared-index-jdk.metadata.json", "application/json", () => Array.emptyByteArray) + val toRemove = new S3CdnEntry( + s3, + S3Object.builder().key("deleteme").size(56L).lastModified(Instant.parse("2024-11-06T10:15:30Z")).build() + ) + val newEntries = Seq[CdnUploadEntry](toUpload).asJava + val removeEntries = Seq(toRemove).asJava + val updatePlan = new CdnUpdatePlan(newEntries, removeEntries) + val s3Upload = new S3Upload(s3) + + // When + s3Upload.updateS3Indexes(updatePlan) + + // Then + (s3Client.putObject(_: PutObjectRequest, _: RequestBody)).verify(where { (req: PutObjectRequest, _: RequestBody) => + req.bucket() == "test-bucket" && + req.contentType() == "application/json" && + req.key() == "/path/to/indexes/shared-index-jdk.metadata.json" + }) + + (s3Client.deleteObject(_: DeleteObjectRequest)).verify(where { req: DeleteObjectRequest => + req.bucket() == "test-bucket" && + req.key() == "/path/to/indexes/deleteme" + }) + + } + +}