Skip to content

Commit

Permalink
File: Add zip compression level (#2980)
Browse files Browse the repository at this point in the history
* Add deflate compression level in zip flow #2766
* Avoid unnecessary ByteString operation in zip flow
* Elements pushed to zip flow might be of any size. Decoding them completely using utf8 just to check a few bytes at the beginning can be a big loss.
  • Loading branch information
gael-ft committed Jun 9, 2023
1 parent ee6acac commit 6494e86
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import akka.util.ByteString
ByteString(endFileWord)

def isStartingByteString(b: ByteString): Boolean =
b.utf8String.startsWith(startFileWord)
b.size >= 7 && b.slice(0, 7).utf8String == startFileWord

def isEndingByteString(b: ByteString): Boolean =
b.utf8String == endFileWord
b.size == 5 && b.utf8String == endFileWord

def getPathFromStartingByteString(b: ByteString): String = {
val splitted = b.utf8String.split(separator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import akka.util.{ByteString, ByteStringBuilder}
* INTERNAL API
*/
@InternalApi private[file] final class ZipArchiveFlowStage(
val shape: FlowShape[ByteString, ByteString]
val shape: FlowShape[ByteString, ByteString],
deflateCompression: Option[Int] = None
) extends GraphStageLogic(shape) {

private def in = shape.in
Expand Down Expand Up @@ -77,12 +78,15 @@ import akka.util.{ByteString, ByteStringBuilder}
}
)

override def preStart(): Unit =
deflateCompression.foreach(l => zip.setLevel(l))
}

/**
* INTERNAL API
*/
@InternalApi private[file] final class ZipArchiveFlow extends GraphStage[FlowShape[ByteString, ByteString]] {
@InternalApi private[file] final class ZipArchiveFlow(deflateCompression: Option[Int] = None)
extends GraphStage[FlowShape[ByteString, ByteString]] {

val in: Inlet[ByteString] = Inlet(Logging.simpleName(this) + ".in")
val out: Outlet[ByteString] = Outlet(Logging.simpleName(this) + ".out")
Expand All @@ -93,5 +97,5 @@ import akka.util.{ByteString, ByteStringBuilder}
override val shape: FlowShape[ByteString, ByteString] = FlowShape(in, out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new ZipArchiveFlowStage(shape)
new ZipArchiveFlowStage(shape, deflateCompression)
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ import akka.util.ByteString
*/
@InternalApi private[file] object ZipArchiveManager {

def zipFlow(): Flow[(ArchiveMetadata, Source[ByteString, Any]), ByteString, NotUsed] = {
val archiveZipFlow = new ZipArchiveFlow()
def zipFlow(
deflateCompression: Option[Int] = None
): Flow[(ArchiveMetadata, Source[ByteString, Any]), ByteString, NotUsed] = {
val archiveZipFlow = new ZipArchiveFlow(deflateCompression)
Flow[(ArchiveMetadata, Source[ByteString, Any])]
.flatMapConcat {
case (metadata, stream) =>
Expand Down
14 changes: 12 additions & 2 deletions file/src/main/scala/akka/stream/alpakka/file/javadsl/Archive.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,22 @@ object Archive {

/**
* Flow for compressing multiple files into one ZIP file.
*
* @param deflateCompression see [[java.util.zip.Deflater Deflater]]
*/
def zip(): Flow[Pair[ArchiveMetadata, Source[ByteString, NotUsed]], ByteString, NotUsed] =
def zip(
deflateCompression: Option[Int]
): Flow[Pair[ArchiveMetadata, Source[ByteString, NotUsed]], ByteString, NotUsed] =
Flow
.create[Pair[ArchiveMetadata, Source[ByteString, NotUsed]]]()
.map(func(pair => (pair.first, pair.second.asScala)))
.via(scaladsl.Archive.zip().asJava)
.via(scaladsl.Archive.zip(deflateCompression).asJava)

/**
* Flow for compressing multiple files into one ZIP file.
*/
def zip(): Flow[Pair[ArchiveMetadata, Source[ByteString, NotUsed]], ByteString, NotUsed] =
zip(None)

/**
* Flow for reading ZIP files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,19 @@ import java.nio.charset.{Charset, StandardCharsets}
*/
object Archive {

/**
* Flow for compressing multiple files into one ZIP file.
*
* @param deflateCompression see [[java.util.zip.Deflater Deflater]]
*/
def zip(deflateCompression: Option[Int]): Flow[(ArchiveMetadata, Source[ByteString, Any]), ByteString, NotUsed] =
ZipArchiveManager.zipFlow(deflateCompression)

/**
* Flow for compressing multiple files into one ZIP file.
*/
def zip(): Flow[(ArchiveMetadata, Source[ByteString, Any]), ByteString, NotUsed] =
ZipArchiveManager.zipFlow()
zip(None)

/**
* Flow for reading ZIP files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import akka.util.ByteString
import org.scalatest.BeforeAndAfterAll
import org.scalatest.wordspec.AnyWordSpecLike

import java.util.zip.Deflater

class ZipArchiveFlowTest
extends TestKit(ActorSystem("ziparchive"))
with AnyWordSpecLike
Expand Down Expand Up @@ -40,6 +42,27 @@ class ZipArchiveFlowTest
downstream.expectComplete()
}
}

"compression flag given and stream ends" should {
"emit element only when downstream requests" in {
val (upstream, downstream) =
TestSource[ByteString]()
.via(new ZipArchiveFlow(Some(Deflater.NO_COMPRESSION)))
.toMat(TestSink())(Keep.both)
.run()

upstream.sendNext(FileByteStringSeparators.createStartingByteString("test"))
upstream.sendNext(ByteString(1))
upstream.sendNext(FileByteStringSeparators.createEndingByteString())
upstream.sendComplete()

downstream.request(2)
downstream.expectNextN(2)
downstream.request(1)
downstream.expectNextN(1)
downstream.expectComplete()
}
}
}

override def afterAll(): Unit = {
Expand Down
14 changes: 14 additions & 0 deletions file/src/test/scala/docs/scaladsl/ArchiveSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

import java.util.zip.Deflater
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}

Expand Down Expand Up @@ -113,6 +114,19 @@ class ArchiveSpec
archiveHelper.unzip(akkaZipped.futureValue).asScala shouldBe inputFiles
}

"archive files with compression flag" in {
val inputFiles = generateInputFiles(5, 100)
val inputStream = filesToStream(inputFiles)
val zipFlow = Archive.zip(Some(Deflater.NO_COMPRESSION))

val akkaZipped: Future[ByteString] =
inputStream
.via(zipFlow)
.runWith(Sink.fold(ByteString.empty)(_ ++ _))

archiveHelper.unzip(akkaZipped.futureValue).asScala shouldBe inputFiles
}

"unarchive files" in {
val inputFiles = generateInputFiles(5, 100)
val inputStream = filesToStream(inputFiles)
Expand Down

0 comments on commit 6494e86

Please sign in to comment.