Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom logic for uploading indexes to S3 #12

Merged
merged 3 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package com.virtuslab.shared_indexes.core

import com.intellij.indexing.shared.cdn.CdnUpdatePlan
import com.intellij.indexing.shared.cdn.S3
import com.intellij.indexing.shared.cdn.S3_uploadKt
import com.intellij.indexing.shared.cdn.S3Kt
import com.intellij.indexing.shared.local.Local_uploadKt
import com.virtuslab.shared_indexes.config.MainConfig
import com.virtuslab.shared_indexes.remote.S3Upload

case class WorkPlan(
intelliJ: IntelliJ,
Expand Down Expand Up @@ -63,7 +63,7 @@ object WorkPlan {
val s3ApiUrl = indexStorageConfig.indexServerUrl
.getOrElse(throw new IllegalStateException("Must provide the server address for uploading to S3"))
val s3 = S3Kt.S3(s"$s3ApiUrl/$bucketName", bucketName, s3ApiUrl, "/", 10)
(Some(s3), S3_uploadKt.updateS3Indexes(s3, _))
(Some(s3), new S3Upload(s3).updateS3Indexes _)
case None =>
// updateLocalIndexes executes the update plan in given basePath, which is the location that
// the server should host
Expand Down
129 changes: 129 additions & 0 deletions src/main/scala/com/virtuslab/shared_indexes/remote/S3Upload.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package com.virtuslab.shared_indexes.remote

import com.intellij.indexing.shared.cdn.upload.{CdnUploadAnotherEntry, CdnUploadDataEntry, CdnUploadEntry}
import com.intellij.indexing.shared.cdn.{CdnEntry, CdnUpdatePlan, S3, S3CdnEntry}
import com.intellij.indexing.shared.local.LocalDiskEntry
import org.slf4j.LoggerFactory
import software.amazon.awssdk.core.sync.RequestBody
import software.amazon.awssdk.services.s3.model.{DeleteObjectRequest, PutObjectRequest}

import java.io.{ByteArrayInputStream, InputStream}
import java.nio.file.Files
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future, TimeoutException}
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try, Using}

class S3Upload(private val s3: S3, private val timeout: Duration = Duration(5, "min")) {
private val logger = LoggerFactory.getLogger(this.getClass)
private val rootInBucket: String = s3.getRootInBucket
private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(s3.getDispatcher.getExecutor)

def updateS3Indexes(plan: CdnUpdatePlan): Unit = {
odisseus marked this conversation as resolved.
Show resolved Hide resolved
if (plan.getNewEntries.isEmpty && plan.getRemoveEntries.isEmpty) {
logger.warn("The update plan is empty, nothing to do!")
return
}

val failedRemoves = removeFiles(plan.getRemoveEntries.asScala)
val failedUploads = uploadFiles(plan.getNewEntries.asScala)
val failureSummary = new StringBuilder()
if (failedRemoves.nonEmpty) {
failureSummary.append(s"Failed to remove ${failedRemoves.size} entries:\n")
failedRemoves.foreach { case (entry, e) =>
failureSummary.append(s" $entry:\n ${e.getMessage}\n")
}
}
if (failedUploads.nonEmpty) {
failureSummary.append(s"Failed to upload ${failedUploads.size} entries:\n")
failedUploads.foreach { case (entry, e) =>
failureSummary.append(s" $entry:\n ${e.getMessage}\n")
}
}
if (failureSummary.nonEmpty) {
throw new RuntimeException(failureSummary.toString())
}
}

private def uploadFiles(files: Iterable[CdnUploadEntry]): Map[CdnUploadEntry, Throwable] = {
logger.info("Uploading {} items...", files.size)
val results = awaitAll(putObject)(files)
collectFailures(files, results)
}

private def removeFiles(files: Iterable[CdnEntry]): Map[CdnEntry, Throwable] = {
logger.info("Removing {} items...", files.size)
val results = awaitAll(deleteObject)(files)
collectFailures(files, results)
}

private def awaitAll[T, R](task: T => Future[R])(inputs: Iterable[T]): Iterable[Try[R]] = {
val futures = inputs.map(task)
val completions = futures.map(_.transform { case _: Try[R] => Success(()) })
odisseus marked this conversation as resolved.
Show resolved Hide resolved
val allCompleted = Future.sequence(completions)
var timeoutException: Option[TimeoutException] = None
try {
Await.ready(allCompleted, timeout)
} catch {
case e: TimeoutException => timeoutException = Some(e)
}
futures.map(_.value.getOrElse(Failure(timeoutException.get)))
}

private def collectFailures[T, R](keys: Iterable[T], results: Iterable[Try[R]]): Map[T, Throwable] = {
(keys zip results).collect { case (file, Failure(e)) =>
(file, e)
}.toMap
}

private def putObject(entry: CdnUploadEntry): Future[Unit] = {
require(
!entry.getKey.startsWith(s"${rootInBucket}/"),
s"Key must not start with S3 paths prefix (${rootInBucket}): ${entry.getKey}"
)
val req = PutObjectRequest.builder()
.bucket(s3.getBucket)
.key(keyInBucket(entry))
.contentType(entry.getContentType)
.build()
Future {
logger.info("S3PUT {}", entry)
Using.resource(getInputStream(entry)) { istream =>
s3.getClient.putObject(req, RequestBody.fromInputStream(istream, entry.getContentLength))
}
logger.debug("S3PUT end {}", entry)
}
}

private def deleteObject(entry: CdnEntry): Future[Unit] = {
require(
!entry.getKey.startsWith(s"${rootInBucket}/"),
s"Key must not start with S3 paths prefix (${rootInBucket}): ${entry.getKey}"
)
require(entry.isInstanceOf[S3CdnEntry], s"It is only possible to remove S3 entries, but was: $entry")
val req = DeleteObjectRequest.builder()
.bucket(s3.getBucket)
.key(keyInBucket(entry))
.build()
Future {
logger.info("S3DEL {}", entry)
odisseus marked this conversation as resolved.
Show resolved Hide resolved
s3.getClient.deleteObject(req)
logger.debug("S3DEL end {}", entry)
}
}

private def keyInBucket(entry: CdnEntry): String = {
if (rootInBucket.isBlank) entry.getKey
else s"${rootInBucket}/${entry.getKey}"
}

private def getInputStream(entry: CdnUploadEntry): InputStream = entry match {
case de: CdnUploadDataEntry =>
new ByteArrayInputStream(de.getContent)
case ae: CdnUploadAnotherEntry =>
ae.getItem match {
case l: LocalDiskEntry => Files.newInputStream(l.getPath)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package com.virtuslab.shared_indexes.remote

import com.intellij.indexing.shared.cdn.upload.{CdnUploadDataEntry, CdnUploadEntry}
import com.intellij.indexing.shared.cdn.{CdnUpdatePlan, S3, S3CdnEntry}
import org.scalamock.scalatest.MockFactory
import org.scalatest.funsuite.AnyFunSuite
import software.amazon.awssdk.core.sync.RequestBody
import software.amazon.awssdk.services.s3.S3Client
import software.amazon.awssdk.services.s3.model.{DeleteObjectRequest, PutObjectRequest, S3Object}

import java.time.Instant
import scala.jdk.CollectionConverters._
import scala.util.Try

class JdkIndexesS3OperationsTest extends AnyFunSuite with MockFactory {

private val s3Client = stub[S3Client]
private val s3 = new S3("http://example.s3", "test-bucket", s3Client, "/path/to/indexes", 1)

test("update") {
// Given
val toUpload =
new CdnUploadDataEntry("shared-index-jdk.metadata.json", "application/json", () => Array.emptyByteArray)
val toRemove = new S3CdnEntry(
s3,
S3Object.builder().key("deleteme").size(56L).lastModified(Instant.parse("2024-11-06T10:15:30Z")).build()
)
val newEntries = Seq[CdnUploadEntry](toUpload).asJava
val removeEntries = Seq(toRemove).asJava
val updatePlan = new CdnUpdatePlan(newEntries, removeEntries)
val s3Upload = new S3Upload(s3)

// When
s3Upload.updateS3Indexes(updatePlan)

// Then
(s3Client.putObject(_: PutObjectRequest, _: RequestBody)).verify(where { (req: PutObjectRequest, _: RequestBody) =>
req.bucket() == "test-bucket" &&
req.contentType() == "application/json" &&
req.key() == "/path/to/indexes/shared-index-jdk.metadata.json"
})

(s3Client.deleteObject(_: DeleteObjectRequest)).verify(where { req: DeleteObjectRequest =>
req.bucket() == "test-bucket" &&
req.key() == "/path/to/indexes/deleteme"
})

}

test("single server error") {
// Given
val newEntries = Seq(
"shared-index-jdk.metadata.json" -> "application/json",
"shared-index-jdk.ijx.xz" -> "application/xz",
"shared-index-jdk.sha256" -> "application/octet-stream"
).map { case (key, contentType) =>
new CdnUploadDataEntry(key, contentType, () => Array.emptyByteArray)
}.asJava
val removeEntries = Seq().asJava
val updatePlan = new CdnUpdatePlan(newEntries, removeEntries)
val s3Upload = new S3Upload(s3)
(s3Client.putObject(_: PutObjectRequest, _: RequestBody)).when(*, *).throws(new Exception("Server error")).once()

// When
val exception = Try {
s3Upload.updateS3Indexes(updatePlan)
}.failed.get

// Then
Seq(
"shared-index-jdk.metadata.json" -> "application/json",
"shared-index-jdk.ijx.xz" -> "application/xz",
"shared-index-jdk.sha256" -> "application/octet-stream"
).foreach { case (key, contentType) =>
(s3Client.putObject(_: PutObjectRequest, _: RequestBody)).verify(where {
(req: PutObjectRequest, _: RequestBody) =>
req.contentType() == contentType &&
req.key() == "/path/to/indexes/" + key
})
}
val expectedErrorMessage =
"""Failed to upload 1 entries:
| UploadDataEntry(key=shared-index-jdk.metadata.json, type=application/json):
| Server error
|""".stripMargin

assert(exception.getMessage == expectedErrorMessage)
}

}