From f7c1cdc9e7e2d633ac5872e729066bcd64082429 Mon Sep 17 00:00:00 2001 From: odisseus Date: Wed, 6 Nov 2024 19:38:20 +0100 Subject: [PATCH 1/3] Custom logic for uploading indexes to S3 --- .../shared_indexes/core/WorkPlan.scala | 4 +- .../shared_indexes/remote/S3Upload.scala | 128 ++++++++++++++++++ .../remote/JdkIndexesS3OperationsTest.scala | 49 +++++++ 3 files changed, 179 insertions(+), 2 deletions(-) create mode 100644 src/main/scala/com/virtuslab/shared_indexes/remote/S3Upload.scala create mode 100644 src/test/scala/com/virtuslab/shared_indexes/remote/JdkIndexesS3OperationsTest.scala diff --git a/src/main/scala/com/virtuslab/shared_indexes/core/WorkPlan.scala b/src/main/scala/com/virtuslab/shared_indexes/core/WorkPlan.scala index baad939..23a196e 100644 --- a/src/main/scala/com/virtuslab/shared_indexes/core/WorkPlan.scala +++ b/src/main/scala/com/virtuslab/shared_indexes/core/WorkPlan.scala @@ -2,10 +2,10 @@ package com.virtuslab.shared_indexes.core import com.intellij.indexing.shared.cdn.CdnUpdatePlan import com.intellij.indexing.shared.cdn.S3 -import com.intellij.indexing.shared.cdn.S3_uploadKt import com.intellij.indexing.shared.cdn.S3Kt import com.intellij.indexing.shared.local.Local_uploadKt import com.virtuslab.shared_indexes.config.MainConfig +import com.virtuslab.shared_indexes.remote.S3Upload case class WorkPlan( intelliJ: IntelliJ, @@ -63,7 +63,7 @@ object WorkPlan { val s3ApiUrl = indexStorageConfig.indexServerUrl .getOrElse(throw new IllegalStateException("Must provide the server address for uploading to S3")) val s3 = S3Kt.S3(s"$s3ApiUrl/$bucketName", bucketName, s3ApiUrl, "/", 10) - (Some(s3), S3_uploadKt.updateS3Indexes(s3, _)) + (Some(s3), new S3Upload(s3).updateS3Indexes _) case None => // updateLocalIndexes executes the update plan in given basePath, which is the location that // the server should host diff --git a/src/main/scala/com/virtuslab/shared_indexes/remote/S3Upload.scala b/src/main/scala/com/virtuslab/shared_indexes/remote/S3Upload.scala new file mode 100644 index 0000000..fc02af9 --- /dev/null +++ b/src/main/scala/com/virtuslab/shared_indexes/remote/S3Upload.scala @@ -0,0 +1,128 @@ +package com.virtuslab.shared_indexes.remote + +import com.intellij.indexing.shared.cdn.upload.{CdnUploadAnotherEntry, CdnUploadDataEntry, CdnUploadEntry} +import com.intellij.indexing.shared.cdn.{CdnEntry, CdnUpdatePlan, S3, S3CdnEntry} +import com.intellij.indexing.shared.local.LocalDiskEntry +import org.slf4j.LoggerFactory +import software.amazon.awssdk.core.sync.RequestBody +import software.amazon.awssdk.services.s3.model.{DeleteObjectRequest, PutObjectRequest} + +import java.io.{ByteArrayInputStream, InputStream} +import java.nio.file.Files +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext, Future, TimeoutException} +import scala.jdk.CollectionConverters._ +import scala.util.{Failure, Success, Try, Using} + +class S3Upload(private val s3: S3, private val timeout: Duration = Duration(5, "min")) { + private val logger = LoggerFactory.getLogger(this.getClass) + private val rootInBucket: String = s3.getRootInBucket + private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(s3.getDispatcher.getExecutor) + + def updateS3Indexes(plan: CdnUpdatePlan): Unit = { + if (plan.getNewEntries.isEmpty && plan.getRemoveEntries.isEmpty) { + logger.warn("The update plan is empty, nothing to do!") + return + } + + val failedRemoves = removeFiles(plan.getRemoveEntries.asScala) + val failedUploads = uploadFiles(plan.getNewEntries.asScala) + val failureSummary = new StringBuilder() + if (failedRemoves.nonEmpty) { + failureSummary.append(s"Failed to remove ${failedRemoves.size} entries: \n") + failedRemoves.foreach { case (entry, e) => + failureSummary.append(s" $entry: \n ${e.getMessage}\n") + } + } + if (failedUploads.nonEmpty) { + failureSummary.append(s"Failed to upload ${failedUploads.size} entries: \n") + failedUploads.foreach { case (entry, e) => + failureSummary.append(s" $entry: \n ${e.getMessage}\n") + } + } + if (failureSummary.nonEmpty) { + throw new RuntimeException(failureSummary.toString()) + } + } + + private def uploadFiles(files: Iterable[CdnUploadEntry]): Map[CdnUploadEntry, Throwable] = { + logger.info("Uploading {} items...", files.size) + val results = awaitAll(putObject)(files) + collectFailures(files, results) + } + + private def removeFiles(files: Iterable[CdnEntry]): Map[CdnEntry, Throwable] = { + logger.info("Removing {} items...", files.size) + val results = awaitAll(deleteObject)(files) + collectFailures(files, results) + } + + private def awaitAll[T, R](task: T => Future[R])(inputs: Iterable[T]): Iterable[Try[R]] = { + val futures = inputs.map(task) + val completions = futures.map(_.transform { case _: Try[R] => Success(()) }) + val allCompleted = Future.sequence(completions) + var timeoutException: Option[TimeoutException] = None + try { + Await.ready(allCompleted, timeout) + } catch { + case e: TimeoutException => timeoutException = Some(e) + } + futures.map(_.value.getOrElse(Failure(timeoutException.get))) + } + + private def collectFailures[T, R](keys: Iterable[T], results: Iterable[Try[R]]): Map[T, Throwable] = { + (keys zip results).collect { case (file, Failure(e)) => + (file, e) + }.toMap + } + + private def putObject(entry: CdnUploadEntry): Future[Unit] = { + require( + !entry.getKey.startsWith(s"${rootInBucket}/"), + s"Key must not start with S3 paths prefix (${rootInBucket}): ${entry.getKey}" + ) + val req = PutObjectRequest.builder() + .bucket(s3.getBucket) + .key(keyInBucket(entry)) + .contentType(entry.getContentType) + .build() + Future { + logger.info("S3PUT {}", entry) + Using.resource(getInputStream(entry)) { istream => + s3.getClient.putObject(req, RequestBody.fromInputStream(istream, entry.getContentLength)) + } + logger.info("S3PUT end {}", entry) + } + } + + private def deleteObject(entry: CdnEntry): Future[Unit] = { + require( + !entry.getKey.startsWith(s"${rootInBucket}/"), + s"Key must not start with S3 paths prefix (${rootInBucket}): ${entry.getKey}" + ) + require(entry.isInstanceOf[S3CdnEntry], s"It is only possible to remove S3 entries, but was: $entry") + val req = DeleteObjectRequest.builder() + .bucket(s3.getBucket) + .key(keyInBucket(entry)) + .build() + Future { + logger.info("S3DEL {}", entry) + s3.getClient.deleteObject(req) + } + } + + private def keyInBucket(entry: CdnEntry): String = { + if (rootInBucket.isBlank) entry.getKey + else s"${rootInBucket}/${entry.getKey}" + } + + private def getInputStream(entry: CdnUploadEntry): InputStream = entry match { + case de: CdnUploadDataEntry => + new ByteArrayInputStream(de.getContent) + case ae: CdnUploadAnotherEntry => + ae.getItem match { + case l: LocalDiskEntry => Files.newInputStream(l.getPath) + } + } + +} diff --git a/src/test/scala/com/virtuslab/shared_indexes/remote/JdkIndexesS3OperationsTest.scala b/src/test/scala/com/virtuslab/shared_indexes/remote/JdkIndexesS3OperationsTest.scala new file mode 100644 index 0000000..76e6791 --- /dev/null +++ b/src/test/scala/com/virtuslab/shared_indexes/remote/JdkIndexesS3OperationsTest.scala @@ -0,0 +1,49 @@ +package com.virtuslab.shared_indexes.remote + +import com.intellij.indexing.shared.cdn.upload.{CdnUploadDataEntry, CdnUploadEntry} +import com.intellij.indexing.shared.cdn.{CdnUpdatePlan, S3, S3CdnEntry} +import org.scalamock.scalatest.MockFactory +import org.scalatest.funsuite.AnyFunSuite +import software.amazon.awssdk.core.sync.RequestBody +import software.amazon.awssdk.services.s3.S3Client +import software.amazon.awssdk.services.s3.model.{S3Object, PutObjectRequest, DeleteObjectRequest} + +import java.time.Instant +import scala.jdk.CollectionConverters._ + +class JdkIndexesS3OperationsTest extends AnyFunSuite with MockFactory { + + private val s3Client = stub[S3Client] + private val s3 = new S3("http://example.s3", "test-bucket", s3Client, "/path/to/indexes", 1) + + test("update") { + // Given + val toUpload = + new CdnUploadDataEntry("shared-index-jdk.metadata.json", "application/json", () => Array.emptyByteArray) + val toRemove = new S3CdnEntry( + s3, + S3Object.builder().key("deleteme").size(56L).lastModified(Instant.parse("2024-11-06T10:15:30Z")).build() + ) + val newEntries = Seq[CdnUploadEntry](toUpload).asJava + val removeEntries = Seq(toRemove).asJava + val updatePlan = new CdnUpdatePlan(newEntries, removeEntries) + val s3Upload = new S3Upload(s3) + + // When + s3Upload.updateS3Indexes(updatePlan) + + // Then + (s3Client.putObject(_: PutObjectRequest, _: RequestBody)).verify(where { (req: PutObjectRequest, _: RequestBody) => + req.bucket() == "test-bucket" && + req.contentType() == "application/json" && + req.key() == "/path/to/indexes/shared-index-jdk.metadata.json" + }) + + (s3Client.deleteObject(_: DeleteObjectRequest)).verify(where { req: DeleteObjectRequest => + req.bucket() == "test-bucket" && + req.key() == "/path/to/indexes/deleteme" + }) + + } + +} From accd91681d0ecbb527c97d1a29d509d6f1a26a7c Mon Sep 17 00:00:00 2001 From: odisseus Date: Thu, 7 Nov 2024 14:17:42 +0100 Subject: [PATCH 2/3] log debug --- .../scala/com/virtuslab/shared_indexes/remote/S3Upload.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/scala/com/virtuslab/shared_indexes/remote/S3Upload.scala b/src/main/scala/com/virtuslab/shared_indexes/remote/S3Upload.scala index fc02af9..5bd1c8b 100644 --- a/src/main/scala/com/virtuslab/shared_indexes/remote/S3Upload.scala +++ b/src/main/scala/com/virtuslab/shared_indexes/remote/S3Upload.scala @@ -91,7 +91,7 @@ class S3Upload(private val s3: S3, private val timeout: Duration = Duration(5, " Using.resource(getInputStream(entry)) { istream => s3.getClient.putObject(req, RequestBody.fromInputStream(istream, entry.getContentLength)) } - logger.info("S3PUT end {}", entry) + logger.debug("S3PUT end {}", entry) } } @@ -108,6 +108,7 @@ class S3Upload(private val s3: S3, private val timeout: Duration = Duration(5, " Future { logger.info("S3DEL {}", entry) s3.getClient.deleteObject(req) + logger.debug("S3DEL end {}", entry) } } From 5b678878d5e36350e601578fb0c9e6f7e3ee9a9f Mon Sep 17 00:00:00 2001 From: odisseus Date: Fri, 8 Nov 2024 15:36:46 +0100 Subject: [PATCH 3/3] test --- .../shared_indexes/remote/S3Upload.scala | 8 ++-- .../remote/JdkIndexesS3OperationsTest.scala | 43 ++++++++++++++++++- 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/virtuslab/shared_indexes/remote/S3Upload.scala b/src/main/scala/com/virtuslab/shared_indexes/remote/S3Upload.scala index 5bd1c8b..68e28ec 100644 --- a/src/main/scala/com/virtuslab/shared_indexes/remote/S3Upload.scala +++ b/src/main/scala/com/virtuslab/shared_indexes/remote/S3Upload.scala @@ -29,15 +29,15 @@ class S3Upload(private val s3: S3, private val timeout: Duration = Duration(5, " val failedUploads = uploadFiles(plan.getNewEntries.asScala) val failureSummary = new StringBuilder() if (failedRemoves.nonEmpty) { - failureSummary.append(s"Failed to remove ${failedRemoves.size} entries: \n") + failureSummary.append(s"Failed to remove ${failedRemoves.size} entries:\n") failedRemoves.foreach { case (entry, e) => - failureSummary.append(s" $entry: \n ${e.getMessage}\n") + failureSummary.append(s" $entry:\n ${e.getMessage}\n") } } if (failedUploads.nonEmpty) { - failureSummary.append(s"Failed to upload ${failedUploads.size} entries: \n") + failureSummary.append(s"Failed to upload ${failedUploads.size} entries:\n") failedUploads.foreach { case (entry, e) => - failureSummary.append(s" $entry: \n ${e.getMessage}\n") + failureSummary.append(s" $entry:\n ${e.getMessage}\n") } } if (failureSummary.nonEmpty) { diff --git a/src/test/scala/com/virtuslab/shared_indexes/remote/JdkIndexesS3OperationsTest.scala b/src/test/scala/com/virtuslab/shared_indexes/remote/JdkIndexesS3OperationsTest.scala index 76e6791..f7825c8 100644 --- a/src/test/scala/com/virtuslab/shared_indexes/remote/JdkIndexesS3OperationsTest.scala +++ b/src/test/scala/com/virtuslab/shared_indexes/remote/JdkIndexesS3OperationsTest.scala @@ -6,10 +6,11 @@ import org.scalamock.scalatest.MockFactory import org.scalatest.funsuite.AnyFunSuite import software.amazon.awssdk.core.sync.RequestBody import software.amazon.awssdk.services.s3.S3Client -import software.amazon.awssdk.services.s3.model.{S3Object, PutObjectRequest, DeleteObjectRequest} +import software.amazon.awssdk.services.s3.model.{DeleteObjectRequest, PutObjectRequest, S3Object} import java.time.Instant import scala.jdk.CollectionConverters._ +import scala.util.Try class JdkIndexesS3OperationsTest extends AnyFunSuite with MockFactory { @@ -46,4 +47,44 @@ class JdkIndexesS3OperationsTest extends AnyFunSuite with MockFactory { } + test("single server error") { + // Given + val newEntries = Seq( + "shared-index-jdk.metadata.json" -> "application/json", + "shared-index-jdk.ijx.xz" -> "application/xz", + "shared-index-jdk.sha256" -> "application/octet-stream" + ).map { case (key, contentType) => + new CdnUploadDataEntry(key, contentType, () => Array.emptyByteArray) + }.asJava + val removeEntries = Seq().asJava + val updatePlan = new CdnUpdatePlan(newEntries, removeEntries) + val s3Upload = new S3Upload(s3) + (s3Client.putObject(_: PutObjectRequest, _: RequestBody)).when(*, *).throws(new Exception("Server error")).once() + + // When + val exception = Try { + s3Upload.updateS3Indexes(updatePlan) + }.failed.get + + // Then + Seq( + "shared-index-jdk.metadata.json" -> "application/json", + "shared-index-jdk.ijx.xz" -> "application/xz", + "shared-index-jdk.sha256" -> "application/octet-stream" + ).foreach { case (key, contentType) => + (s3Client.putObject(_: PutObjectRequest, _: RequestBody)).verify(where { + (req: PutObjectRequest, _: RequestBody) => + req.contentType() == contentType && + req.key() == "/path/to/indexes/" + key + }) + } + val expectedErrorMessage = + """Failed to upload 1 entries: + | UploadDataEntry(key=shared-index-jdk.metadata.json, type=application/json): + | Server error + |""".stripMargin + + assert(exception.getMessage == expectedErrorMessage) + } + }