diff --git a/src/main/scala-2/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/Parser.scala b/src/main/scala-2/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/Parser.scala index ec24002..903ecc9 100644 --- a/src/main/scala-2/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/Parser.scala +++ b/src/main/scala-2/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/Parser.scala @@ -12,10 +12,13 @@ */ package com.snowplowanalytics.snowplow.analytics.scalasdk.decode +import cats.implicits._ import shapeless._ import shapeless.ops.record._ import shapeless.ops.hlist._ import cats.data.{NonEmptyList, Validated} +import java.nio.ByteBuffer +import scala.collection.mutable.ListBuffer import com.snowplowanalytics.snowplow.analytics.scalasdk.ParsingError.{FieldNumberMismatch, NotTSV, RowDecodingError} private[scalasdk] trait Parser[A] extends TSVParser[A] { @@ -42,10 +45,37 @@ private[scalasdk] trait Parser[A] extends TSVParser[A] { decoded.map(decodedValue => generic.from(decodedValue)) } } + + def parseBytes(row: ByteBuffer): DecodeResult[A] = { + val values = Parser.splitBuffer(row) + if (values.length == 1) + Validated.Invalid(NotTSV) + else if (values.length != expectedNumFields) + Validated.Invalid(FieldNumberMismatch(values.length)) + else { + val decoded = decoder.decodeBytes(values.result()).leftMap(e => RowDecodingError(e)) + decoded.map(decodedValue => generic.from(decodedValue)) + } + } } object Parser { + private val tab: Byte = '\t'.toByte + + private def splitBuffer(row: ByteBuffer): ListBuffer[ByteBuffer] = { + var current = row.duplicate + val builder = ListBuffer(current) + (row.position() until row.limit()).foreach { i => + if (row.get(i) === tab) { + current.limit(i) + current = row.duplicate.position(i + 1) + builder += current + } + } + builder + } + private[scalasdk] sealed trait DeriveParser[A] { def knownKeys[R <: HList, K <: HList, L <: HList]( diff --git a/src/main/scala-2/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/RowDecoderCompanion.scala b/src/main/scala-2/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/RowDecoderCompanion.scala index 54e3a9d..7234694 100644 --- a/src/main/scala-2/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/RowDecoderCompanion.scala +++ b/src/main/scala-2/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/RowDecoderCompanion.scala @@ -16,6 +16,7 @@ import shapeless._ import cats.syntax.validated._ import cats.syntax.either._ import cats.syntax.apply._ +import java.nio.ByteBuffer import com.snowplowanalytics.snowplow.analytics.scalasdk.ParsingError.RowDecodingErrorInfo.UnhandledRowDecodingError private[scalasdk] trait RowDecoderCompanion { @@ -44,6 +45,21 @@ private[scalasdk] trait RowDecoderCompanion { case Nil => UnhandledRowDecodingError("Not enough values, format is invalid").invalidNel } + /** Parse TSV row into HList */ + private def parseBytes[H: ValueDecoder, T <: HList]( + key: Key, + tailDecoder: RowDecoder[T], + maxLength: Option[Int], + row: List[ByteBuffer] + ): RowDecodeResult[H :: T] = + row match { + case h :: t => + val hv: RowDecodeResult[H] = ValueDecoder[H].parseBytes(key, h, maxLength).toValidatedNel + val tv: RowDecodeResult[T] = tailDecoder.decodeBytes(t) + (hv, tv).mapN(_ :: _) + case Nil => UnhandledRowDecodingError("Not enough values, format is invalid").invalidNel + } + implicit def hnilFromRow: DeriveRowDecoder[HNil] = new DeriveRowDecoder[HNil] { def get(knownKeys: List[Key], maxLengths: Map[String, Int]): RowDecoder[HNil] = @@ -55,6 +71,14 @@ private[scalasdk] trait RowDecoderCompanion { case _ => UnhandledRowDecodingError("Not enough values, format is invalid").invalidNel } + + def decodeBytes(row: List[ByteBuffer]): RowDecodeResult[HNil] = + row match { + case Nil => + HNil.validNel + case _ => + UnhandledRowDecodingError("Not enough values, format is invalid").invalidNel + } } } @@ -67,6 +91,7 @@ private[scalasdk] trait RowDecoderCompanion { val maxLength = maxLengths.get(key.name) new RowDecoder[H :: T] { def apply(row: List[String]): RowDecodeResult[H :: T] = parse(key, tailDecoder, maxLength, row) + def decodeBytes(row: List[ByteBuffer]): RowDecodeResult[H :: T] = parseBytes(key, tailDecoder, maxLength, row) } case Nil => // Shapeless type checking makes this impossible diff --git a/src/main/scala-3/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/Parser.scala b/src/main/scala-3/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/Parser.scala index 2f2108f..c64e58c 100644 --- a/src/main/scala-3/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/Parser.scala +++ b/src/main/scala-3/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/Parser.scala @@ -12,7 +12,10 @@ */ package com.snowplowanalytics.snowplow.analytics.scalasdk.decode +import cats.implicits._ import cats.data.{NonEmptyList, Validated} +import java.nio.ByteBuffer +import scala.collection.mutable.ListBuffer import com.snowplowanalytics.snowplow.analytics.scalasdk.ParsingError.{FieldNumberMismatch, NotTSV, RowDecodingError} import scala.deriving._ import scala.compiletime._ @@ -30,10 +33,32 @@ private[scalasdk] trait Parser[A] extends TSVParser[A] { else if (values.length != expectedNumFields) Validated.Invalid(FieldNumberMismatch(values.length)) else decoder(values.toList).leftMap(e => RowDecodingError(e)) } + + def parseBytes(row: ByteBuffer): DecodeResult[A] = { + val values = Parser.splitBuffer(row) + if (values.length == 1) Validated.Invalid(NotTSV) + else if (values.length != expectedNumFields) Validated.Invalid(FieldNumberMismatch(values.length)) + else decoder.decodeBytes(values.result()).leftMap(e => RowDecodingError(e)) + } } object Parser { + private val tab: Byte = '\t'.toByte + + private def splitBuffer(row: ByteBuffer): ListBuffer[ByteBuffer] = { + var current = row.duplicate + val builder = ListBuffer(current) + (row.position() until row.limit()).foreach { i => + if (row.get(i) === tab) { + current.limit(i) + current = row.duplicate.position(i + 1) + builder += current + } + } + builder + } + private[scalasdk] sealed trait DeriveParser[A] { inline def knownKeys(implicit mirror: Mirror.ProductOf[A]): List[String] = constValueTuple[mirror.MirroredElemLabels].toArray.map(_.toString).toList diff --git a/src/main/scala-3/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/RowDecoderCompanion.scala b/src/main/scala-3/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/RowDecoderCompanion.scala index 6817c10..edd31b3 100644 --- a/src/main/scala-3/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/RowDecoderCompanion.scala +++ b/src/main/scala-3/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/RowDecoderCompanion.scala @@ -16,6 +16,7 @@ import cats.syntax.validated._ import cats.syntax.either._ import cats.syntax.apply._ import com.snowplowanalytics.snowplow.analytics.scalasdk.ParsingError.RowDecodingErrorInfo.UnhandledRowDecodingError +import java.nio.ByteBuffer import scala.deriving._ import scala.compiletime._ @@ -52,6 +53,20 @@ private[scalasdk] trait RowDecoderCompanion { case Nil => UnhandledRowDecodingError("Not enough values, format is invalid").invalidNel } + private def parseBytes[H: ValueDecoder, T <: Tuple]( + key: Key, + tailDecoder: RowDecoder[T], + maxLength: Option[Int], + row: List[ByteBuffer] + ): RowDecodeResult[H *: T] = + row match { + case h :: t => + val hv: RowDecodeResult[H] = ValueDecoder[H].parseBytes(key, h, maxLength).toValidatedNel + val tv: RowDecodeResult[T] = tailDecoder.decodeBytes(t) + (hv, tv).mapN(_ *: _) + case Nil => UnhandledRowDecodingError("Not enough values, format is invalid").invalidNel + } + implicit def hnilFromRow: DeriveRowDecoder[EmptyTuple] = new DeriveRowDecoder[EmptyTuple] { def get(knownKeys: List[Key], maxLengths: Map[String, Int]): RowDecoder[EmptyTuple] = @@ -63,6 +78,14 @@ private[scalasdk] trait RowDecoderCompanion { case _ => UnhandledRowDecodingError("Not enough values, format is invalid").invalidNel } + + def decodeBytes(row: List[ByteBuffer]): RowDecodeResult[EmptyTuple] = + row match { + case Nil => + EmptyTuple.validNel + case _ => + UnhandledRowDecodingError("Not enough values, format is invalid").invalidNel + } } } @@ -75,6 +98,7 @@ private[scalasdk] trait RowDecoderCompanion { val maxLength = maxLengths.get(key.name) new RowDecoder[H *: T] { def apply(row: List[String]): RowDecodeResult[H *: T] = parse(key, tailDecoder, maxLength, row) + def decodeBytes(row: List[ByteBuffer]): RowDecodeResult[H *: T] = parseBytes(key, tailDecoder, maxLength, row) } case Nil => // Shapeless type checking makes this impossible diff --git a/src/main/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/Event.scala b/src/main/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/Event.scala index e503d3f..0d23fb0 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/Event.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/Event.scala @@ -16,6 +16,7 @@ package com.snowplowanalytics.snowplow.analytics.scalasdk import java.time.Instant import java.util.UUID import java.time.format.DateTimeFormatter +import java.nio.ByteBuffer // circe import io.circe.{Decoder, Encoder, Json, JsonObject} @@ -280,6 +281,9 @@ object Event { def parse(line: String): DecodeResult[Event] = stdParser.parse(line) + def parseBytes(bytes: ByteBuffer): DecodeResult[Event] = + stdParser.parseBytes(bytes) + private lazy val fieldNames: List[String] = Parser.deriveFor[Event].knownKeys diff --git a/src/main/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/RowDecoder.scala b/src/main/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/RowDecoder.scala index 88494ee..799a2ff 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/RowDecoder.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/RowDecoder.scala @@ -13,11 +13,15 @@ package com.snowplowanalytics.snowplow.analytics.scalasdk package decode +import java.nio.ByteBuffer + private[scalasdk] trait RowDecoder[L] extends Serializable { self => def apply(row: List[String]): RowDecodeResult[L] + def decodeBytes(row: List[ByteBuffer]): RowDecodeResult[L] def map[B](f: L => B): RowDecoder[B] = new RowDecoder[B] { def apply(row: List[String]): RowDecodeResult[B] = self.apply(row).map(f) + def decodeBytes(row: List[ByteBuffer]): RowDecodeResult[B] = self.decodeBytes(row).map(f) } } diff --git a/src/main/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/TSVParser.scala b/src/main/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/TSVParser.scala index 61f3c56..e550bb0 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/TSVParser.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/TSVParser.scala @@ -12,7 +12,10 @@ */ package com.snowplowanalytics.snowplow.analytics.scalasdk.decode +import java.nio.ByteBuffer + /** Parser for a TSV-encoded string */ trait TSVParser[A] extends Serializable { + def parseBytes(bytes: ByteBuffer): DecodeResult[A] def parse(row: String): DecodeResult[A] } diff --git a/src/main/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/ValueDecoder.scala b/src/main/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/ValueDecoder.scala index b8332bf..2018fea 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/ValueDecoder.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/ValueDecoder.scala @@ -19,6 +19,8 @@ import com.snowplowanalytics.snowplow.analytics.scalasdk.validate.FIELD_SIZES import java.time.Instant import java.time.format.DateTimeParseException import java.util.UUID +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets // cats import cats.syntax.either._ @@ -30,8 +32,8 @@ import com.snowplowanalytics.iglu.core.SelfDescribingData import com.snowplowanalytics.iglu.core.circe.implicits._ // circe -import io.circe.parser.{parse => parseJson} -import io.circe.{Error, Json} +import io.circe.jawn.JawnParser +import io.circe.{Error, Json, ParsingFailure} // This library import com.snowplowanalytics.snowplow.analytics.scalasdk.Common.{ContextsCriterion, UnstructEventCriterion} @@ -45,9 +47,19 @@ private[decode] trait ValueDecoder[A] { value: String, maxLength: Option[Int] ): DecodedValue[A] + + def parseBytes( + key: Key, + value: ByteBuffer, + maxLength: Option[Int] + ): DecodedValue[A] = + parse(key, StandardCharsets.UTF_8.decode(value).toString, maxLength) } private[decode] object ValueDecoder { + + private val parser: JawnParser = new JawnParser + def apply[A](implicit readA: ValueDecoder[A]): ValueDecoder[A] = readA def fromFunc[A](f: ((Key, String, Option[Int])) => DecodedValue[A]): ValueDecoder[A] = @@ -159,41 +171,85 @@ private[decode] object ValueDecoder { } } - implicit final val unstructuredJson: ValueDecoder[UnstructEvent] = - fromFunc[UnstructEvent] { - case (key, value, _) => - def asLeft(error: Error): RowDecodingErrorInfo = InvalidValue(key, value, error.show) + implicit final val unstructuredJson: ValueDecoder[UnstructEvent] = { + def fromJsonParseResult( + result: Either[ParsingFailure, Json], + key: Key, + originalValue: => String + ): DecodedValue[UnstructEvent] = { + def asLeft(error: Error): RowDecodingErrorInfo = InvalidValue(key, originalValue, error.show) + result + .flatMap(_.as[SelfDescribingData[Json]]) + .leftMap(asLeft) match { + case Right(SelfDescribingData(schema, data)) if UnstructEventCriterion.matches(schema) => + data.as[SelfDescribingData[Json]].leftMap(asLeft).map(_.some).map(UnstructEvent.apply) + case Right(SelfDescribingData(schema, _)) => + InvalidValue(key, originalValue, s"Unknown payload: ${schema.toSchemaUri}").asLeft[UnstructEvent] + case Left(error) => error.asLeft[UnstructEvent] + } + } + new ValueDecoder[UnstructEvent] { + def parse( + key: Key, + value: String, + maxLength: Option[Int] + ): DecodedValue[UnstructEvent] = if (value.isEmpty) UnstructEvent(None).asRight[RowDecodingErrorInfo] else - parseJson(value) - .flatMap(_.as[SelfDescribingData[Json]]) - .leftMap(asLeft) match { - case Right(SelfDescribingData(schema, data)) if UnstructEventCriterion.matches(schema) => - data.as[SelfDescribingData[Json]].leftMap(asLeft).map(_.some).map(UnstructEvent.apply) - case Right(SelfDescribingData(schema, _)) => - InvalidValue(key, value, s"Unknown payload: ${schema.toSchemaUri}").asLeft[UnstructEvent] - case Left(error) => error.asLeft[UnstructEvent] - } + fromJsonParseResult(parser.parse(value), key, value) + + override def parseBytes( + key: Key, + value: ByteBuffer, + maxLength: Option[Int] + ): DecodedValue[UnstructEvent] = + if (!value.hasRemaining()) + UnstructEvent(None).asRight[RowDecodingErrorInfo] + else + fromJsonParseResult(parser.parseByteBuffer(value), key, StandardCharsets.UTF_8.decode(value).toString) } + } - implicit final val contexts: ValueDecoder[Contexts] = - fromFunc[Contexts] { - case (key, value, _) => - def asLeft(error: Error): RowDecodingErrorInfo = InvalidValue(key, value, error.show) + implicit final val contexts: ValueDecoder[Contexts] = { + def fromJsonParseResult( + result: Either[ParsingFailure, Json], + key: Key, + originalValue: => String + ): DecodedValue[Contexts] = { + def asLeft(error: Error): RowDecodingErrorInfo = InvalidValue(key, originalValue, error.show) + result + .flatMap(_.as[SelfDescribingData[Json]]) + .leftMap(asLeft) match { + case Right(SelfDescribingData(schema, data)) if ContextsCriterion.matches(schema) => + data.as[List[SelfDescribingData[Json]]].leftMap(asLeft).map(Contexts.apply) + case Right(SelfDescribingData(schema, _)) => + InvalidValue(key, originalValue, s"Unknown payload: ${schema.toSchemaUri}").asLeft[Contexts] + case Left(error) => error.asLeft[Contexts] + } + } + new ValueDecoder[Contexts] { + def parse( + key: Key, + value: String, + maxLength: Option[Int] + ): DecodedValue[Contexts] = if (value.isEmpty) - Contexts(List()).asRight[RowDecodingErrorInfo] + Contexts(List.empty).asRight[RowDecodingErrorInfo] else - parseJson(value) - .flatMap(_.as[SelfDescribingData[Json]]) - .leftMap(asLeft) match { - case Right(SelfDescribingData(schema, data)) if ContextsCriterion.matches(schema) => - data.as[List[SelfDescribingData[Json]]].leftMap(asLeft).map(Contexts.apply) - case Right(SelfDescribingData(schema, _)) => - InvalidValue(key, value, s"Unknown payload: ${schema.toSchemaUri}").asLeft[Contexts] - case Left(error) => error.asLeft[Contexts] - } + fromJsonParseResult(parser.parse(value), key, value) + + override def parseBytes( + key: Key, + value: ByteBuffer, + maxLength: Option[Int] + ): DecodedValue[Contexts] = + if (!value.hasRemaining()) + Contexts(List.empty).asRight[RowDecodingErrorInfo] + else + fromJsonParseResult(parser.parseByteBuffer(value), key, StandardCharsets.UTF_8.decode(value).toString) } + } /** * Converts a timestamp to an ISO-8601 format usable by Instant.parse() diff --git a/src/test/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/EventSpec.scala b/src/test/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/EventSpec.scala index c3b24c9..37d126c 100644 --- a/src/test/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/EventSpec.scala +++ b/src/test/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/EventSpec.scala @@ -16,6 +16,8 @@ package com.snowplowanalytics.snowplow.analytics.scalasdk // java import java.time.Instant import java.util.UUID +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets // cats import cats.data.Validated.{Invalid, Valid} @@ -157,15 +159,22 @@ class EventSpec extends Specification with ScalaCheck { ) val eventValues = input.unzip._2.mkString("\t") + val eventValuesBytes = ByteBuffer.wrap(eventValues.getBytes(StandardCharsets.UTF_8)) val event1 = Event.parse(eventValues) val event2 = Event.parser().parse(eventValues) val event3 = Event.parser(FIELD_SIZES).parse(eventValues) + val event4 = Event.parseBytes(eventValuesBytes) + val event5 = Event.parser().parseBytes(eventValuesBytes) + val event6 = Event.parser(FIELD_SIZES).parseBytes(eventValuesBytes) // Case class must be processed as expected, for all varieties of the parser event1 mustEqual Valid(expected) event2 mustEqual Valid(expected) event3 mustEqual Valid(expected) + event4 mustEqual Valid(expected) + event5 mustEqual Valid(expected) + event6 mustEqual Valid(expected) val eventJson = event1.getOrElse(throw new RuntimeException("Failed to parse event")).toJson(true) @@ -363,10 +372,13 @@ class EventSpec extends Specification with ScalaCheck { val expected = baseExpected val eventValues = input.unzip._2.mkString("\t") + val eventValuesBytes = ByteBuffer.wrap(eventValues.getBytes(StandardCharsets.UTF_8)) val event = Event.parse(eventValues) + val event2 = Event.parseBytes(eventValuesBytes) // Case class must be processed as expected event mustEqual Valid(expected) + event2 mustEqual Valid(expected) val eventJson = event.getOrElse(throw new RuntimeException("Failed to parse event")).toJson(true) @@ -604,10 +616,13 @@ class EventSpec extends Specification with ScalaCheck { ) val eventValues = input.unzip._2.mkString("\t") + val eventValuesBytes = ByteBuffer.wrap(eventValues.getBytes(StandardCharsets.UTF_8)) val event = Event.parse(eventValues) + val event2 = Event.parseBytes(eventValuesBytes) // Case class must be processed as expected event mustEqual Valid(expected) + event2 mustEqual Valid(expected) val eventJson = event.getOrElse(throw new RuntimeException("Failed to parse event")).toJson(true) @@ -866,7 +881,9 @@ class EventSpec extends Specification with ScalaCheck { } val eventValues = input.unzip._2.mkString("\t") + val eventValuesBytes = ByteBuffer.wrap(eventValues.getBytes(StandardCharsets.UTF_8)) val event = Event.parse(eventValues) + val event2 = Event.parseBytes(eventValuesBytes) // Case class must be correctly invalidated val res = RowDecodingError( @@ -881,19 +898,27 @@ class EventSpec extends Specification with ScalaCheck { ) ) event mustEqual Invalid(res) + event2 mustEqual Invalid(res) } "fail if payload is not TSV" in { - val event = Event.parse("non tsv") + val str = "non tsv" + val bytes = ByteBuffer.wrap(str.getBytes(StandardCharsets.UTF_8)) + val event = Event.parse(str) + val event2 = Event.parseBytes(bytes) event mustEqual Invalid(NotTSV) + event2 mustEqual Invalid(NotTSV) } "fail if there are more fields than expected" in { val input = baseInput :+ "additional_field" -> "mock_value" val eventValues = input.unzip._2.mkString("\t") + val eventValuesBytes = ByteBuffer.wrap(eventValues.getBytes(StandardCharsets.UTF_8)) val event = Event.parse(eventValues) + val event2 = Event.parseBytes(eventValuesBytes) event mustEqual Invalid(FieldNumberMismatch(132)) + event2 mustEqual Invalid(FieldNumberMismatch(132)) } "fail if there are fewer fields than expected" in { @@ -905,9 +930,12 @@ class EventSpec extends Specification with ScalaCheck { ) val eventValues = input.unzip._2.mkString("\t") + val eventValuesBytes = ByteBuffer.wrap(eventValues.getBytes(StandardCharsets.UTF_8)) val event = Event.parse(eventValues) + val event2 = Event.parseBytes(eventValuesBytes) event mustEqual Invalid(FieldNumberMismatch(4)) + event2 mustEqual Invalid(FieldNumberMismatch(4)) } "successfully decode encoded event which has no contexts or unstruct_event" in {