Skip to content

Commit

Permalink
Custom logic for uploading indexes to S3
Browse files Browse the repository at this point in the history
  • Loading branch information
odisseus committed Nov 6, 2024
1 parent 0a1567e commit cd637a3
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.intellij.indexing.shared.cdn.S3_uploadKt
import com.intellij.indexing.shared.cdn.S3Kt
import com.intellij.indexing.shared.local.Local_uploadKt
import com.virtuslab.shared_indexes.config.MainConfig
import com.virtuslab.shared_indexes.remote.S3Upload

case class WorkPlan(
intelliJ: IntelliJ,
Expand Down Expand Up @@ -63,7 +64,7 @@ object WorkPlan {
val s3ApiUrl = indexStorageConfig.indexServerUrl
.getOrElse(throw new IllegalStateException("Must provide the server address for uploading to S3"))
val s3 = S3Kt.S3(s"$s3ApiUrl/$bucketName", bucketName, s3ApiUrl, "/", 10)
(Some(s3), S3_uploadKt.updateS3Indexes(s3, _))
(Some(s3), new S3Upload(s3).updateS3Indexes _)
case None =>
// updateLocalIndexes executes the update plan in given basePath, which is the location that
// the server should host
Expand Down
128 changes: 128 additions & 0 deletions src/main/scala/com/virtuslab/shared_indexes/remote/S3Upload.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package com.virtuslab.shared_indexes.remote

import com.intellij.indexing.shared.cdn.upload.{CdnUploadAnotherEntry, CdnUploadDataEntry, CdnUploadEntry}
import com.intellij.indexing.shared.cdn.{CdnEntry, CdnUpdatePlan, S3, S3CdnEntry}
import com.intellij.indexing.shared.local.LocalDiskEntry
import org.slf4j.LoggerFactory
import software.amazon.awssdk.core.sync.RequestBody
import software.amazon.awssdk.services.s3.model.{DeleteObjectRequest, PutObjectRequest}

import java.io.{ByteArrayInputStream, InputStream}
import java.nio.file.Files
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future, TimeoutException}
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try, Using}

class S3Upload(private val s3: S3, private val timeout: Duration = Duration(5, "min")) {
private val logger = LoggerFactory.getLogger(this.getClass)
private val rootInBucket: String = s3.getRootInBucket
private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(s3.getDispatcher.getExecutor)

def updateS3Indexes(plan: CdnUpdatePlan): Unit = {
if (plan.getNewEntries.isEmpty && plan.getRemoveEntries.isEmpty) {
logger.warn("The update plan is empty, nothing to do!")
return
}

val failedRemoves = removeFiles(plan.getRemoveEntries.asScala)
val failedUploads = uploadFiles(plan.getNewEntries.asScala)
val failureSummary = new StringBuilder()
if (failedRemoves.nonEmpty) {
failureSummary.append(s"Failed to remove ${failedRemoves.size} entries: \n")
failedRemoves.foreach { case (entry, e) =>
failureSummary.append(s" $entry: \n ${e.getMessage}\n")
}
}
if (failedUploads.nonEmpty) {
failureSummary.append(s"Failed to upload ${failedUploads.size} entries: \n")
failedUploads.foreach { case (entry, e) =>
failureSummary.append(s" $entry: \n ${e.getMessage}\n")
}
}
if (failureSummary.nonEmpty) {
throw new RuntimeException(failureSummary.toString())
}
}

private def uploadFiles(files: Iterable[CdnUploadEntry]): Map[CdnUploadEntry, Throwable] = {
logger.info("Uploading {} items...", files.size)
val results = awaitAll(putObject)(files)
collectFailures(files, results)
}

private def removeFiles(files: Iterable[CdnEntry]): Map[CdnEntry, Throwable] = {
logger.info("Removing {} items...", files.size)
val results = awaitAll(deleteObject)(files)
collectFailures(files, results)
}

private def awaitAll[T, R](task: T => Future[R])(inputs: Iterable[T]): Iterable[Try[R]] = {
val futures = inputs.map(task)
val completions = futures.map(_.transform { case _: Try[R] => Success(()) })
val allCompleted = Future.sequence(completions)
var timeoutException: Option[TimeoutException] = None
try {
Await.ready(allCompleted, timeout)
} catch {
case e: TimeoutException => timeoutException = Some(e)
}
futures.map(_.value.getOrElse(Failure(timeoutException.get)))
}

private def collectFailures[T, R](keys: Iterable[T], results: Iterable[Try[R]]): Map[T, Throwable] = {
(keys zip results).collect { case (file, Failure(e)) =>
(file, e)
}.toMap
}

private def putObject(entry: CdnUploadEntry): Future[Unit] = {
require(
!entry.getKey.startsWith(s"${rootInBucket}/"),
s"Key must not start with S3 paths prefix (${rootInBucket}): ${entry.getKey}"
)
val req = PutObjectRequest.builder()
.bucket(s3.getBucket)
.key(keyInBucket(entry))
.contentType(entry.getContentType)
.build()
Future {
logger.info("S3PUT {}", entry)
Using.resource(getInputStream(entry)) { istream =>
s3.getClient.putObject(req, RequestBody.fromInputStream(istream, entry.getContentLength))
}
logger.info("S3PUT end {}", entry)
}
}

private def deleteObject(entry: CdnEntry): Future[Unit] = {
require(
!entry.getKey.startsWith(s"${rootInBucket}/"),
s"Key must not start with S3 paths prefix (${rootInBucket}): ${entry.getKey}"
)
require(entry.isInstanceOf[S3CdnEntry], s"It is only possible to remove S3 entries, but was: $entry")
val req = DeleteObjectRequest.builder()
.bucket(s3.getBucket)
.key(keyInBucket(entry))
.build()
Future {
logger.info("S3DEL {}", entry)
s3.getClient.deleteObject(req)
}
}

private def keyInBucket(entry: CdnEntry): String = {
if (rootInBucket.isBlank) entry.getKey
else s"${rootInBucket}/${entry.getKey}"
}

private def getInputStream(entry: CdnUploadEntry): InputStream = entry match {
case de: CdnUploadDataEntry =>
new ByteArrayInputStream(de.getContent)
case ae: CdnUploadAnotherEntry =>
ae.getItem match {
case l: LocalDiskEntry => Files.newInputStream(l.getPath)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.virtuslab.shared_indexes.remote

import com.intellij.indexing.shared.cdn.upload.{CdnUploadDataEntry, CdnUploadEntry}
import com.intellij.indexing.shared.cdn.{CdnUpdatePlan, S3, S3CdnEntry}
import org.scalamock.scalatest.MockFactory
import org.scalatest.funsuite.AnyFunSuite
import software.amazon.awssdk.core.sync.RequestBody
import software.amazon.awssdk.services.s3.S3Client
import software.amazon.awssdk.services.s3.model.{S3Object, PutObjectRequest, DeleteObjectRequest}

import java.time.Instant
import scala.jdk.CollectionConverters._

class JdkIndexesS3OperationsTest extends AnyFunSuite with MockFactory {

private val s3Client = stub[S3Client]
private val s3 = new S3("http://example.s3", "test-bucket", s3Client, "/path/to/indexes", 1)

test("update") {
// Given
val toUpload =
new CdnUploadDataEntry("shared-index-jdk.metadata.json", "application/json", () => Array.emptyByteArray)
val toRemove = new S3CdnEntry(
s3,
S3Object.builder().key("deleteme").size(56L).lastModified(Instant.parse("2024-11-06T10:15:30Z")).build()
)
val newEntries = Seq[CdnUploadEntry](toUpload).asJava
val removeEntries = Seq(toRemove).asJava
val updatePlan = new CdnUpdatePlan(newEntries, removeEntries)
val s3Upload = new S3Upload(s3)

// When
s3Upload.updateS3Indexes(updatePlan)

// Then
(s3Client.putObject(_: PutObjectRequest, _: RequestBody)).verify(where { (req: PutObjectRequest, _: RequestBody) =>
req.bucket() == "test-bucket" &&
req.contentType() == "application/json" &&
req.key() == "/path/to/indexes/shared-index-jdk.metadata.json"
})

(s3Client.deleteObject(_: DeleteObjectRequest)).verify(where { req: DeleteObjectRequest =>
req.bucket() == "test-bucket" &&
req.key() == "/path/to/indexes/deleteme"
})

}

}

0 comments on commit cd637a3

Please sign in to comment.