From 6494e86c08372db65aeab480a1dad1d241a3ec8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ga=C3=ABl=20Ferrachat?= Date: Fri, 9 Jun 2023 14:02:31 +0200 Subject: [PATCH] File: Add zip compression level (#2980) * Add deflate compression level in zip flow #2766 * Avoid unnecessary ByteString operation in zip flow * Elements pushed to zip flow might be of any size. Decoding them completely using utf8 just to check a few bytes at the beginning can be a big loss. --- .../archive/FileByteStringSeparators.scala | 4 ++-- .../file/impl/archive/ZipArchiveFlow.scala | 10 +++++--- .../file/impl/archive/ZipArchiveManager.scala | 6 +++-- .../stream/alpakka/file/javadsl/Archive.scala | 14 +++++++++-- .../alpakka/file/scaladsl/Archive.scala | 10 +++++++- .../impl/archive/ZipArchiveFlowTest.scala | 23 +++++++++++++++++++ .../scala/docs/scaladsl/ArchiveSpec.scala | 14 +++++++++++ 7 files changed, 71 insertions(+), 10 deletions(-) diff --git a/file/src/main/scala/akka/stream/alpakka/file/impl/archive/FileByteStringSeparators.scala b/file/src/main/scala/akka/stream/alpakka/file/impl/archive/FileByteStringSeparators.scala index ec9284eba9..f3cdbfdd2e 100644 --- a/file/src/main/scala/akka/stream/alpakka/file/impl/archive/FileByteStringSeparators.scala +++ b/file/src/main/scala/akka/stream/alpakka/file/impl/archive/FileByteStringSeparators.scala @@ -25,10 +25,10 @@ import akka.util.ByteString ByteString(endFileWord) def isStartingByteString(b: ByteString): Boolean = - b.utf8String.startsWith(startFileWord) + b.size >= 7 && b.slice(0, 7).utf8String == startFileWord def isEndingByteString(b: ByteString): Boolean = - b.utf8String == endFileWord + b.size == 5 && b.utf8String == endFileWord def getPathFromStartingByteString(b: ByteString): String = { val splitted = b.utf8String.split(separator) diff --git a/file/src/main/scala/akka/stream/alpakka/file/impl/archive/ZipArchiveFlow.scala b/file/src/main/scala/akka/stream/alpakka/file/impl/archive/ZipArchiveFlow.scala index 75cdee5dae..df89fd234f 100644 --- a/file/src/main/scala/akka/stream/alpakka/file/impl/archive/ZipArchiveFlow.scala +++ b/file/src/main/scala/akka/stream/alpakka/file/impl/archive/ZipArchiveFlow.scala @@ -16,7 +16,8 @@ import akka.util.{ByteString, ByteStringBuilder} * INTERNAL API */ @InternalApi private[file] final class ZipArchiveFlowStage( - val shape: FlowShape[ByteString, ByteString] + val shape: FlowShape[ByteString, ByteString], + deflateCompression: Option[Int] = None ) extends GraphStageLogic(shape) { private def in = shape.in @@ -77,12 +78,15 @@ import akka.util.{ByteString, ByteStringBuilder} } ) + override def preStart(): Unit = + deflateCompression.foreach(l => zip.setLevel(l)) } /** * INTERNAL API */ -@InternalApi private[file] final class ZipArchiveFlow extends GraphStage[FlowShape[ByteString, ByteString]] { +@InternalApi private[file] final class ZipArchiveFlow(deflateCompression: Option[Int] = None) + extends GraphStage[FlowShape[ByteString, ByteString]] { val in: Inlet[ByteString] = Inlet(Logging.simpleName(this) + ".in") val out: Outlet[ByteString] = Outlet(Logging.simpleName(this) + ".out") @@ -93,5 +97,5 @@ import akka.util.{ByteString, ByteStringBuilder} override val shape: FlowShape[ByteString, ByteString] = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new ZipArchiveFlowStage(shape) + new ZipArchiveFlowStage(shape, deflateCompression) } diff --git a/file/src/main/scala/akka/stream/alpakka/file/impl/archive/ZipArchiveManager.scala b/file/src/main/scala/akka/stream/alpakka/file/impl/archive/ZipArchiveManager.scala index 60165ea9a9..ac383bade0 100644 --- a/file/src/main/scala/akka/stream/alpakka/file/impl/archive/ZipArchiveManager.scala +++ b/file/src/main/scala/akka/stream/alpakka/file/impl/archive/ZipArchiveManager.scala @@ -15,8 +15,10 @@ import akka.util.ByteString */ @InternalApi private[file] object ZipArchiveManager { - def zipFlow(): Flow[(ArchiveMetadata, Source[ByteString, Any]), ByteString, NotUsed] = { - val archiveZipFlow = new ZipArchiveFlow() + def zipFlow( + deflateCompression: Option[Int] = None + ): Flow[(ArchiveMetadata, Source[ByteString, Any]), ByteString, NotUsed] = { + val archiveZipFlow = new ZipArchiveFlow(deflateCompression) Flow[(ArchiveMetadata, Source[ByteString, Any])] .flatMapConcat { case (metadata, stream) => diff --git a/file/src/main/scala/akka/stream/alpakka/file/javadsl/Archive.scala b/file/src/main/scala/akka/stream/alpakka/file/javadsl/Archive.scala index d7a7f86855..671a8e186a 100644 --- a/file/src/main/scala/akka/stream/alpakka/file/javadsl/Archive.scala +++ b/file/src/main/scala/akka/stream/alpakka/file/javadsl/Archive.scala @@ -22,12 +22,22 @@ object Archive { /** * Flow for compressing multiple files into one ZIP file. + * + * @param deflateCompression see [[java.util.zip.Deflater Deflater]] */ - def zip(): Flow[Pair[ArchiveMetadata, Source[ByteString, NotUsed]], ByteString, NotUsed] = + def zip( + deflateCompression: Option[Int] + ): Flow[Pair[ArchiveMetadata, Source[ByteString, NotUsed]], ByteString, NotUsed] = Flow .create[Pair[ArchiveMetadata, Source[ByteString, NotUsed]]]() .map(func(pair => (pair.first, pair.second.asScala))) - .via(scaladsl.Archive.zip().asJava) + .via(scaladsl.Archive.zip(deflateCompression).asJava) + + /** + * Flow for compressing multiple files into one ZIP file. + */ + def zip(): Flow[Pair[ArchiveMetadata, Source[ByteString, NotUsed]], ByteString, NotUsed] = + zip(None) /** * Flow for reading ZIP files. diff --git a/file/src/main/scala/akka/stream/alpakka/file/scaladsl/Archive.scala b/file/src/main/scala/akka/stream/alpakka/file/scaladsl/Archive.scala index 85ba3a0992..2be2b0837b 100644 --- a/file/src/main/scala/akka/stream/alpakka/file/scaladsl/Archive.scala +++ b/file/src/main/scala/akka/stream/alpakka/file/scaladsl/Archive.scala @@ -18,11 +18,19 @@ import java.nio.charset.{Charset, StandardCharsets} */ object Archive { + /** + * Flow for compressing multiple files into one ZIP file. + * + * @param deflateCompression see [[java.util.zip.Deflater Deflater]] + */ + def zip(deflateCompression: Option[Int]): Flow[(ArchiveMetadata, Source[ByteString, Any]), ByteString, NotUsed] = + ZipArchiveManager.zipFlow(deflateCompression) + /** * Flow for compressing multiple files into one ZIP file. */ def zip(): Flow[(ArchiveMetadata, Source[ByteString, Any]), ByteString, NotUsed] = - ZipArchiveManager.zipFlow() + zip(None) /** * Flow for reading ZIP files. diff --git a/file/src/test/scala/akka/stream/alpakka/file/impl/archive/ZipArchiveFlowTest.scala b/file/src/test/scala/akka/stream/alpakka/file/impl/archive/ZipArchiveFlowTest.scala index 9eb1f4b1f5..2285da16c6 100644 --- a/file/src/test/scala/akka/stream/alpakka/file/impl/archive/ZipArchiveFlowTest.scala +++ b/file/src/test/scala/akka/stream/alpakka/file/impl/archive/ZipArchiveFlowTest.scala @@ -13,6 +13,8 @@ import akka.util.ByteString import org.scalatest.BeforeAndAfterAll import org.scalatest.wordspec.AnyWordSpecLike +import java.util.zip.Deflater + class ZipArchiveFlowTest extends TestKit(ActorSystem("ziparchive")) with AnyWordSpecLike @@ -40,6 +42,27 @@ class ZipArchiveFlowTest downstream.expectComplete() } } + + "compression flag given and stream ends" should { + "emit element only when downstream requests" in { + val (upstream, downstream) = + TestSource[ByteString]() + .via(new ZipArchiveFlow(Some(Deflater.NO_COMPRESSION))) + .toMat(TestSink())(Keep.both) + .run() + + upstream.sendNext(FileByteStringSeparators.createStartingByteString("test")) + upstream.sendNext(ByteString(1)) + upstream.sendNext(FileByteStringSeparators.createEndingByteString()) + upstream.sendComplete() + + downstream.request(2) + downstream.expectNextN(2) + downstream.request(1) + downstream.expectNextN(1) + downstream.expectComplete() + } + } } override def afterAll(): Unit = { diff --git a/file/src/test/scala/docs/scaladsl/ArchiveSpec.scala b/file/src/test/scala/docs/scaladsl/ArchiveSpec.scala index 2cceb69971..0bc3a11cc3 100644 --- a/file/src/test/scala/docs/scaladsl/ArchiveSpec.scala +++ b/file/src/test/scala/docs/scaladsl/ArchiveSpec.scala @@ -20,6 +20,7 @@ import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures} import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike +import java.util.zip.Deflater import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} @@ -113,6 +114,19 @@ class ArchiveSpec archiveHelper.unzip(akkaZipped.futureValue).asScala shouldBe inputFiles } + "archive files with compression flag" in { + val inputFiles = generateInputFiles(5, 100) + val inputStream = filesToStream(inputFiles) + val zipFlow = Archive.zip(Some(Deflater.NO_COMPRESSION)) + + val akkaZipped: Future[ByteString] = + inputStream + .via(zipFlow) + .runWith(Sink.fold(ByteString.empty)(_ ++ _)) + + archiveHelper.unzip(akkaZipped.futureValue).asScala shouldBe inputFiles + } + "unarchive files" in { val inputFiles = generateInputFiles(5, 100) val inputStream = filesToStream(inputFiles)