Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parse Event from a ByteBuffer #131

Merged
merged 1 commit into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
*/
package com.snowplowanalytics.snowplow.analytics.scalasdk.decode

import cats.implicits._
import shapeless._
import shapeless.ops.record._
import shapeless.ops.hlist._
import cats.data.{NonEmptyList, Validated}
import java.nio.ByteBuffer
import scala.collection.mutable.ListBuffer
import com.snowplowanalytics.snowplow.analytics.scalasdk.ParsingError.{FieldNumberMismatch, NotTSV, RowDecodingError}

private[scalasdk] trait Parser[A] extends TSVParser[A] {
Expand All @@ -42,10 +45,37 @@ private[scalasdk] trait Parser[A] extends TSVParser[A] {
decoded.map(decodedValue => generic.from(decodedValue))
}
}

def parseBytes(row: ByteBuffer): DecodeResult[A] = {
val values = Parser.splitBuffer(row)
if (values.length == 1)
Validated.Invalid(NotTSV)
else if (values.length != expectedNumFields)
Validated.Invalid(FieldNumberMismatch(values.length))
else {
val decoded = decoder.decodeBytes(values.result()).leftMap(e => RowDecodingError(e))
decoded.map(decodedValue => generic.from(decodedValue))
}
}
}

object Parser {

private val tab: Byte = '\t'.toByte

private def splitBuffer(row: ByteBuffer): ListBuffer[ByteBuffer] = {
var current = row.duplicate
val builder = ListBuffer(current)
(row.position() until row.limit()).foreach { i =>
if (row.get(i) === tab) {
current.limit(i)
current = row.duplicate.position(i + 1)
builder += current
}
}
builder
}

private[scalasdk] sealed trait DeriveParser[A] {

def knownKeys[R <: HList, K <: HList, L <: HList](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import shapeless._
import cats.syntax.validated._
import cats.syntax.either._
import cats.syntax.apply._
import java.nio.ByteBuffer
import com.snowplowanalytics.snowplow.analytics.scalasdk.ParsingError.RowDecodingErrorInfo.UnhandledRowDecodingError

private[scalasdk] trait RowDecoderCompanion {
Expand Down Expand Up @@ -44,6 +45,21 @@ private[scalasdk] trait RowDecoderCompanion {
case Nil => UnhandledRowDecodingError("Not enough values, format is invalid").invalidNel
}

/** Parse TSV row into HList */
private def parseBytes[H: ValueDecoder, T <: HList](
key: Key,
tailDecoder: RowDecoder[T],
maxLength: Option[Int],
row: List[ByteBuffer]
): RowDecodeResult[H :: T] =
row match {
case h :: t =>
val hv: RowDecodeResult[H] = ValueDecoder[H].parseBytes(key, h, maxLength).toValidatedNel
val tv: RowDecodeResult[T] = tailDecoder.decodeBytes(t)
(hv, tv).mapN(_ :: _)
case Nil => UnhandledRowDecodingError("Not enough values, format is invalid").invalidNel
}

implicit def hnilFromRow: DeriveRowDecoder[HNil] =
new DeriveRowDecoder[HNil] {
def get(knownKeys: List[Key], maxLengths: Map[String, Int]): RowDecoder[HNil] =
Expand All @@ -55,6 +71,14 @@ private[scalasdk] trait RowDecoderCompanion {
case _ =>
UnhandledRowDecodingError("Not enough values, format is invalid").invalidNel
}

def decodeBytes(row: List[ByteBuffer]): RowDecodeResult[HNil] =
row match {
case Nil =>
HNil.validNel
case _ =>
UnhandledRowDecodingError("Not enough values, format is invalid").invalidNel
}
}
}

Expand All @@ -67,6 +91,7 @@ private[scalasdk] trait RowDecoderCompanion {
val maxLength = maxLengths.get(key.name)
new RowDecoder[H :: T] {
def apply(row: List[String]): RowDecodeResult[H :: T] = parse(key, tailDecoder, maxLength, row)
def decodeBytes(row: List[ByteBuffer]): RowDecodeResult[H :: T] = parseBytes(key, tailDecoder, maxLength, row)
}
case Nil =>
// Shapeless type checking makes this impossible
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
*/
package com.snowplowanalytics.snowplow.analytics.scalasdk.decode

import cats.implicits._
import cats.data.{NonEmptyList, Validated}
import java.nio.ByteBuffer
import scala.collection.mutable.ListBuffer
import com.snowplowanalytics.snowplow.analytics.scalasdk.ParsingError.{FieldNumberMismatch, NotTSV, RowDecodingError}
import scala.deriving._
import scala.compiletime._
Expand All @@ -30,10 +33,32 @@ private[scalasdk] trait Parser[A] extends TSVParser[A] {
else if (values.length != expectedNumFields) Validated.Invalid(FieldNumberMismatch(values.length))
else decoder(values.toList).leftMap(e => RowDecodingError(e))
}

def parseBytes(row: ByteBuffer): DecodeResult[A] = {
val values = Parser.splitBuffer(row)
if (values.length == 1) Validated.Invalid(NotTSV)
else if (values.length != expectedNumFields) Validated.Invalid(FieldNumberMismatch(values.length))
else decoder.decodeBytes(values.result()).leftMap(e => RowDecodingError(e))
}
}

object Parser {

private val tab: Byte = '\t'.toByte

private def splitBuffer(row: ByteBuffer): ListBuffer[ByteBuffer] = {
var current = row.duplicate
val builder = ListBuffer(current)
(row.position() until row.limit()).foreach { i =>
if (row.get(i) === tab) {
current.limit(i)
current = row.duplicate.position(i + 1)
builder += current
}
}
builder
}

private[scalasdk] sealed trait DeriveParser[A] {
inline def knownKeys(implicit mirror: Mirror.ProductOf[A]): List[String] =
constValueTuple[mirror.MirroredElemLabels].toArray.map(_.toString).toList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import cats.syntax.validated._
import cats.syntax.either._
import cats.syntax.apply._
import com.snowplowanalytics.snowplow.analytics.scalasdk.ParsingError.RowDecodingErrorInfo.UnhandledRowDecodingError
import java.nio.ByteBuffer
import scala.deriving._
import scala.compiletime._

Expand Down Expand Up @@ -52,6 +53,20 @@ private[scalasdk] trait RowDecoderCompanion {
case Nil => UnhandledRowDecodingError("Not enough values, format is invalid").invalidNel
}

private def parseBytes[H: ValueDecoder, T <: Tuple](
key: Key,
tailDecoder: RowDecoder[T],
maxLength: Option[Int],
row: List[ByteBuffer]
): RowDecodeResult[H *: T] =
row match {
case h :: t =>
val hv: RowDecodeResult[H] = ValueDecoder[H].parseBytes(key, h, maxLength).toValidatedNel
val tv: RowDecodeResult[T] = tailDecoder.decodeBytes(t)
(hv, tv).mapN(_ *: _)
case Nil => UnhandledRowDecodingError("Not enough values, format is invalid").invalidNel
}

implicit def hnilFromRow: DeriveRowDecoder[EmptyTuple] =
new DeriveRowDecoder[EmptyTuple] {
def get(knownKeys: List[Key], maxLengths: Map[String, Int]): RowDecoder[EmptyTuple] =
Expand All @@ -63,6 +78,14 @@ private[scalasdk] trait RowDecoderCompanion {
case _ =>
UnhandledRowDecodingError("Not enough values, format is invalid").invalidNel
}

def decodeBytes(row: List[ByteBuffer]): RowDecodeResult[EmptyTuple] =
row match {
case Nil =>
EmptyTuple.validNel
case _ =>
UnhandledRowDecodingError("Not enough values, format is invalid").invalidNel
}
}
}

Expand All @@ -75,6 +98,7 @@ private[scalasdk] trait RowDecoderCompanion {
val maxLength = maxLengths.get(key.name)
new RowDecoder[H *: T] {
def apply(row: List[String]): RowDecodeResult[H *: T] = parse(key, tailDecoder, maxLength, row)
def decodeBytes(row: List[ByteBuffer]): RowDecodeResult[H *: T] = parseBytes(key, tailDecoder, maxLength, row)
}
case Nil =>
// Shapeless type checking makes this impossible
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package com.snowplowanalytics.snowplow.analytics.scalasdk
import java.time.Instant
import java.util.UUID
import java.time.format.DateTimeFormatter
import java.nio.ByteBuffer

// circe
import io.circe.{Decoder, Encoder, Json, JsonObject}
Expand Down Expand Up @@ -280,6 +281,9 @@ object Event {
def parse(line: String): DecodeResult[Event] =
stdParser.parse(line)

def parseBytes(bytes: ByteBuffer): DecodeResult[Event] =
stdParser.parseBytes(bytes)

private lazy val fieldNames: List[String] =
Parser.deriveFor[Event].knownKeys

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@
package com.snowplowanalytics.snowplow.analytics.scalasdk
package decode

import java.nio.ByteBuffer

private[scalasdk] trait RowDecoder[L] extends Serializable { self =>
def apply(row: List[String]): RowDecodeResult[L]
def decodeBytes(row: List[ByteBuffer]): RowDecodeResult[L]
def map[B](f: L => B): RowDecoder[B] =
new RowDecoder[B] {
def apply(row: List[String]): RowDecodeResult[B] = self.apply(row).map(f)
def decodeBytes(row: List[ByteBuffer]): RowDecodeResult[B] = self.decodeBytes(row).map(f)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
*/
package com.snowplowanalytics.snowplow.analytics.scalasdk.decode

import java.nio.ByteBuffer

/** Parser for a TSV-encoded string */
trait TSVParser[A] extends Serializable {
def parseBytes(bytes: ByteBuffer): DecodeResult[A]
def parse(row: String): DecodeResult[A]
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import com.snowplowanalytics.snowplow.analytics.scalasdk.validate.FIELD_SIZES
import java.time.Instant
import java.time.format.DateTimeParseException
import java.util.UUID
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets

// cats
import cats.syntax.either._
Expand All @@ -30,8 +32,8 @@ import com.snowplowanalytics.iglu.core.SelfDescribingData
import com.snowplowanalytics.iglu.core.circe.implicits._

// circe
import io.circe.parser.{parse => parseJson}
import io.circe.{Error, Json}
import io.circe.jawn.JawnParser
import io.circe.{Error, Json, ParsingFailure}

// This library
import com.snowplowanalytics.snowplow.analytics.scalasdk.Common.{ContextsCriterion, UnstructEventCriterion}
Expand All @@ -45,9 +47,19 @@ private[decode] trait ValueDecoder[A] {
value: String,
maxLength: Option[Int]
): DecodedValue[A]

def parseBytes(
key: Key,
value: ByteBuffer,
maxLength: Option[Int]
): DecodedValue[A] =
parse(key, StandardCharsets.UTF_8.decode(value).toString, maxLength)
}

private[decode] object ValueDecoder {

private val parser: JawnParser = new JawnParser

def apply[A](implicit readA: ValueDecoder[A]): ValueDecoder[A] = readA

def fromFunc[A](f: ((Key, String, Option[Int])) => DecodedValue[A]): ValueDecoder[A] =
Expand Down Expand Up @@ -159,41 +171,85 @@ private[decode] object ValueDecoder {
}
}

implicit final val unstructuredJson: ValueDecoder[UnstructEvent] =
fromFunc[UnstructEvent] {
case (key, value, _) =>
def asLeft(error: Error): RowDecodingErrorInfo = InvalidValue(key, value, error.show)
implicit final val unstructuredJson: ValueDecoder[UnstructEvent] = {
def fromJsonParseResult(
result: Either[ParsingFailure, Json],
key: Key,
originalValue: => String
): DecodedValue[UnstructEvent] = {
def asLeft(error: Error): RowDecodingErrorInfo = InvalidValue(key, originalValue, error.show)
result
.flatMap(_.as[SelfDescribingData[Json]])
.leftMap(asLeft) match {
case Right(SelfDescribingData(schema, data)) if UnstructEventCriterion.matches(schema) =>
data.as[SelfDescribingData[Json]].leftMap(asLeft).map(_.some).map(UnstructEvent.apply)
case Right(SelfDescribingData(schema, _)) =>
InvalidValue(key, originalValue, s"Unknown payload: ${schema.toSchemaUri}").asLeft[UnstructEvent]
case Left(error) => error.asLeft[UnstructEvent]
}
}
new ValueDecoder[UnstructEvent] {
def parse(
key: Key,
value: String,
maxLength: Option[Int]
): DecodedValue[UnstructEvent] =
if (value.isEmpty)
UnstructEvent(None).asRight[RowDecodingErrorInfo]
else
parseJson(value)
.flatMap(_.as[SelfDescribingData[Json]])
.leftMap(asLeft) match {
case Right(SelfDescribingData(schema, data)) if UnstructEventCriterion.matches(schema) =>
data.as[SelfDescribingData[Json]].leftMap(asLeft).map(_.some).map(UnstructEvent.apply)
case Right(SelfDescribingData(schema, _)) =>
InvalidValue(key, value, s"Unknown payload: ${schema.toSchemaUri}").asLeft[UnstructEvent]
case Left(error) => error.asLeft[UnstructEvent]
}
fromJsonParseResult(parser.parse(value), key, value)

override def parseBytes(
key: Key,
value: ByteBuffer,
maxLength: Option[Int]
): DecodedValue[UnstructEvent] =
if (!value.hasRemaining())
UnstructEvent(None).asRight[RowDecodingErrorInfo]
else
fromJsonParseResult(parser.parseByteBuffer(value), key, StandardCharsets.UTF_8.decode(value).toString)
}
}

implicit final val contexts: ValueDecoder[Contexts] =
fromFunc[Contexts] {
case (key, value, _) =>
def asLeft(error: Error): RowDecodingErrorInfo = InvalidValue(key, value, error.show)
implicit final val contexts: ValueDecoder[Contexts] = {
def fromJsonParseResult(
result: Either[ParsingFailure, Json],
key: Key,
originalValue: => String
): DecodedValue[Contexts] = {
def asLeft(error: Error): RowDecodingErrorInfo = InvalidValue(key, originalValue, error.show)
result
.flatMap(_.as[SelfDescribingData[Json]])
.leftMap(asLeft) match {
case Right(SelfDescribingData(schema, data)) if ContextsCriterion.matches(schema) =>
data.as[List[SelfDescribingData[Json]]].leftMap(asLeft).map(Contexts.apply)
case Right(SelfDescribingData(schema, _)) =>
InvalidValue(key, originalValue, s"Unknown payload: ${schema.toSchemaUri}").asLeft[Contexts]
case Left(error) => error.asLeft[Contexts]
}
}
new ValueDecoder[Contexts] {
def parse(
key: Key,
value: String,
maxLength: Option[Int]
): DecodedValue[Contexts] =
if (value.isEmpty)
Contexts(List()).asRight[RowDecodingErrorInfo]
Contexts(List.empty).asRight[RowDecodingErrorInfo]
else
parseJson(value)
.flatMap(_.as[SelfDescribingData[Json]])
.leftMap(asLeft) match {
case Right(SelfDescribingData(schema, data)) if ContextsCriterion.matches(schema) =>
data.as[List[SelfDescribingData[Json]]].leftMap(asLeft).map(Contexts.apply)
case Right(SelfDescribingData(schema, _)) =>
InvalidValue(key, value, s"Unknown payload: ${schema.toSchemaUri}").asLeft[Contexts]
case Left(error) => error.asLeft[Contexts]
}
fromJsonParseResult(parser.parse(value), key, value)

override def parseBytes(
key: Key,
value: ByteBuffer,
maxLength: Option[Int]
): DecodedValue[Contexts] =
if (!value.hasRemaining())
Contexts(List.empty).asRight[RowDecodingErrorInfo]
else
fromJsonParseResult(parser.parseByteBuffer(value), key, StandardCharsets.UTF_8.decode(value).toString)
}
}

/**
* Converts a timestamp to an ISO-8601 format usable by Instant.parse()
Expand Down
Loading
Loading