Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/main/kotlin/com/exactpro/th2/lwdataprovider/Utils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@ import com.exactpro.cradle.messages.StoredMessageId
import com.exactpro.cradle.messages.StoredMessageIdUtils
import com.exactpro.th2.dataprovider.lw.grpc.BookId
import com.exactpro.th2.lwdataprovider.entities.requests.GetEventRequest
import com.exactpro.th2.lwdataprovider.entities.responses.MessageIdWithGroup
import com.google.gson.Gson
import java.time.ZoneOffset
import java.time.format.DateTimeFormatter
import java.util.*

private val gson = Gson()

Expand All @@ -39,9 +37,19 @@ fun failureReason(batchId: String? = null, id: String? = null, error: String): S

fun StoredMessageId.toReportId() =
"${bookId.name}:$sessionAlias:${direction.label}:${StoredMessageIdUtils.timestampToString(timestamp)}:$sequence"

fun MessageIdWithGroup.toReportId() = with(messageId) {
"${bookId.name}:$group:$sessionAlias:${direction.label}:${StoredMessageIdUtils.timestampToString(timestamp)}:$sequence"
}
fun GetEventRequest.failureReason(error: String): String = failureReason(batchId, eventId, error)
fun StoredMessageId.failureReason(error: String): String = failureReason(
null,
toReportId(),
error
)

fun MessageIdWithGroup.failureReason(error: String): String = failureReason(
null,
toReportId(),
error
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.exactpro.th2.lwdataprovider.entities.responses

import com.exactpro.cradle.messages.StoredMessageId
import com.exactpro.cradle.utils.EscapeUtils.escape
import com.exactpro.cradle.utils.TimeUtils
import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.ParsedMessage
Expand All @@ -25,9 +24,6 @@ import com.exactpro.th2.lwdataprovider.entities.responses.ser.numberOfDigits
import java.io.ByteArrayOutputStream
import java.io.OutputStream
import java.time.Instant
import kotlin.math.ceil
import kotlin.math.log10
import kotlin.math.max
import kotlin.text.Charsets.UTF_8

private val COMMA = ",".toByteArray(UTF_8).first().toInt()
Expand Down Expand Up @@ -80,28 +76,34 @@ fun ProviderMessage53Transport.toJSONByteArray(): ByteArray =
write(CLOSING_CURLY_BRACE)
}.toByteArray()

private fun OutputStream.writeMessageId(messageId: StoredMessageId) {
private fun OutputStream.writeMessageId(idWithGroup: MessageIdWithGroup) {
write(MESSAGE_ID_FILED)
write(COLON)
write(DOUBLE_QUOTE)
with(messageId) {
write(escape(bookId.toString()).toByteArray(UTF_8))
write(COLON)
write(escape(sessionAlias).toByteArray(UTF_8))
write(COLON)
write(direction.label.toByteArray(UTF_8))
write(COLON)
TimeUtils.toLocalTimestamp(timestamp).apply {
writeNumber(year, 4)
writeTwoDigits(monthValue)
writeTwoDigits(dayOfMonth)
writeTwoDigits(hour)
writeTwoDigits(minute)
writeTwoDigits(second)
writeNumber(nano, 9)
with(idWithGroup) {
with(messageId) {
write(escape(bookId.toString()).toByteArray(UTF_8))
write(COLON)
if (group != null) {
write(escape(group).toByteArray(UTF_8))
write(COLON)
}
write(escape(sessionAlias).toByteArray(UTF_8))
write(COLON)
write(direction.label.toByteArray(UTF_8))
write(COLON)
TimeUtils.toLocalTimestamp(timestamp).apply {
writeNumber(year, 4)
writeTwoDigits(monthValue)
writeTwoDigits(dayOfMonth)
writeTwoDigits(hour)
writeTwoDigits(minute)
writeTwoDigits(second)
writeNumber(nano, 9)
}
write(COLON)
write(sequence.toString().toByteArray(UTF_8))
}
write(COLON)
write(sequence.toString().toByteArray(UTF_8))
}
write(DOUBLE_QUOTE)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2023 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.exactpro.th2.lwdataprovider.entities.responses

import com.exactpro.cradle.messages.StoredMessageId
import com.exactpro.cradle.messages.StoredMessageIdUtils
import com.exactpro.cradle.utils.EscapeUtils

class MessageIdWithGroup private constructor(
val group: String?,
val messageId: StoredMessageId,
) {

companion object {
@JvmStatic
fun create(messageId: StoredMessageId): MessageIdWithGroup {
return MessageIdWithGroup(null, messageId)
}
@JvmStatic
fun create(group: String?, messageId: StoredMessageId): MessageIdWithGroup {
return MessageIdWithGroup(group?.ifEmpty { messageId.sessionAlias }, messageId)
}
@JvmStatic
fun fromString(value: String): MessageIdWithGroup {
val parts = EscapeUtils.split(value).toMutableList()
require(parts.size == 5 || parts.size == 6) {
"invalid format for message ID with group: $value"
}
var group: String? = null
if (parts.size == 6) {
// book:<group>:alias:direction:timestamp:sequence
group = parts[1]?.takeUnless { it.isEmpty() }
parts.removeAt(1)
}
val seq = StoredMessageIdUtils.getSequence(parts)
val timestamp = StoredMessageIdUtils.getTimestamp(parts)
val direction = StoredMessageIdUtils.getDirection(parts)
val session = StoredMessageIdUtils.getSessionAlias(parts)
val book = StoredMessageIdUtils.getBookId(parts)
val id = StoredMessageId(book, session, direction, timestamp, seq)
return MessageIdWithGroup(group, id)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package com.exactpro.th2.lwdataprovider.entities.responses

import com.exactpro.cradle.Direction.FIRST
import com.exactpro.cradle.messages.StoredMessage
import com.exactpro.cradle.messages.StoredMessageId
import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.ParsedMessage
import com.exactpro.th2.lwdataprovider.entities.internal.Direction
import kotlinx.serialization.Serializable
Expand All @@ -40,8 +39,8 @@ data class ProviderMessage53Transport constructor(

val bodyBase64: String?,

@Serializable(with = StoredMessageIdSerializer::class)
val messageId: StoredMessageId,
@Serializable(with = MessageIDWithGroupSerializer::class)
val messageId: MessageIdWithGroup,
) : ResponseMessage {

constructor(
Expand All @@ -58,6 +57,6 @@ data class ProviderMessage53Transport constructor(
attachedEventIds = events,
body = body?.map { TransportMessageContainer(sessionGroup, it) },
bodyBase64 = base64Body,
messageId = rawStoredMessage.id
messageId = MessageIdWithGroup.create(sessionGroup, rawStoredMessage.id),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ import kotlinx.serialization.modules.SerializersModule
import kotlinx.serialization.modules.contextual
import kotlinx.serialization.serializer
import java.time.Instant
import kotlin.math.ceil
import kotlin.math.log10

/**
* Marker interface to specify the message what can be sent in response to message request
Expand Down Expand Up @@ -105,63 +103,77 @@ object InstantSerializer : KSerializer<Instant> {
}
}

@OptIn(ExperimentalSerializationApi::class)
@Serializer(forClass = StoredMessageId::class)
object MessageIDWithGroupSerializer : KSerializer<MessageIdWithGroup> {
override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("MessageIdWithGroup", PrimitiveKind.STRING)

override fun serialize(encoder: Encoder, value: MessageIdWithGroup) {
encoder.encodeString(
buildString {
idToString(value.messageId, group = value.group)
}
)
}

override fun deserialize(decoder: Decoder): MessageIdWithGroup =
MessageIdWithGroup.fromString(decoder.decodeString())
}

object StoredMessageIdSerializer : KSerializer<StoredMessageId> {
override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("StoredMessageId", PrimitiveKind.STRING)
override fun serialize(encoder: Encoder, value: StoredMessageId) {
encoder.encodeString(idToString(value))
encoder.encodeString(buildString { idToString(value) })
}

internal fun idToString(value: StoredMessageId): String =
run {
// Here we try to avoid constant call for timestamp formatter that is used in StoredMessageId.toString()
// And build the ID ourselves
buildString {
with(value) {
append(EscapeUtils.escape(bookId.toString()))
append(EscapeUtils.DELIMITER)
append(EscapeUtils.escape(sessionAlias))
append(EscapeUtils.DELIMITER)
append(direction.label)
append(EscapeUtils.DELIMITER)
appendTimestamp(timestamp)
append(EscapeUtils.DELIMITER)
append(sequence)
}
}
}

override fun deserialize(decoder: Decoder): StoredMessageId = StoredMessageId.fromString(decoder.decodeString())
}

private fun StringBuilder.appendTimestamp(timestamp: Instant) {
TimeUtils.toLocalTimestamp(timestamp).apply {
appendNumber(year, 4)
appendTwoDigits(monthValue)
appendTwoDigits(dayOfMonth)
appendTwoDigits(hour)
appendTwoDigits(minute)
appendTwoDigits(second)
appendNumber(nano, 9)
internal fun StringBuilder.idToString(value: StoredMessageId, group: String? = null): String = apply {
// Here we try to avoid constant call for timestamp formatter that is used in StoredMessageId.toString()
// And build the ID ourselves
with(value) {
append(EscapeUtils.escape(bookId.toString()))
append(EscapeUtils.DELIMITER)
if (group != null) {
append(EscapeUtils.escape(group))
append(EscapeUtils.DELIMITER)
}
append(EscapeUtils.escape(sessionAlias))
append(EscapeUtils.DELIMITER)
append(direction.label)
append(EscapeUtils.DELIMITER)
appendTimestamp(timestamp)
append(EscapeUtils.DELIMITER)
append(sequence)
}
}.toString()

private fun StringBuilder.appendTimestamp(timestamp: Instant) {
TimeUtils.toLocalTimestamp(timestamp).apply {
appendNumber(year, 4)
appendTwoDigits(monthValue)
appendTwoDigits(dayOfMonth)
appendTwoDigits(hour)
appendTwoDigits(minute)
appendTwoDigits(second)
appendNumber(nano, 9)
}
}

private fun StringBuilder.appendTwoDigits(value: Int) {
if (value < 10) {
append(0)
}
append(value)
private fun StringBuilder.appendTwoDigits(value: Int) {
if (value < 10) {
append(0)
}
append(value)
}

private fun StringBuilder.appendNumber(value: Int, size: Int) {
val digits = numberOfDigits(value)
if (digits < size) {
repeat(size - digits) {
append(0)
}
private fun StringBuilder.appendNumber(value: Int, size: Int) {
val digits = numberOfDigits(value)
if (digits < size) {
repeat(size - digits) {
append(0)
}
append(value)
}
append(value)
}

object FieldSerializer : KSerializer<Any> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ class SearchMessagesHandler(
val rootSink = RootMessagesDataSink(
requestContext,
if (request.rawOnly) {
RawStoredMessageHandler(requestContext)
RawStoredMessageHandler(requestContext, markerAsGroup = true)
} else {
if (configuration.useTransportMode) {
TransportParsedStoredMessageHandler(
Expand Down Expand Up @@ -241,7 +241,7 @@ class SearchMessagesHandler(
val rootSink = RootMessagesDataSink(
requestContext,
if (request.responseFormats.hasRowOnly()) {
RawStoredMessageHandler(requestContext)
RawStoredMessageHandler(requestContext, markerAsGroup = true)
} else {
if (configuration.useTransportMode) {
TransportParsedStoredMessageHandler(
Expand Down Expand Up @@ -581,6 +581,7 @@ private abstract class MessagesDataSink(

private class RawStoredMessageHandler(
private val handler: MessageResponseHandler,
private val markerAsGroup: Boolean = false,
) : MarkedResponseHandler<String, StoredMessage> {
override val isAlive: Boolean
get() = handler.isAlive
Expand All @@ -597,7 +598,7 @@ private class RawStoredMessageHandler(
}

override fun handleNext(marker: String, data: StoredMessage) {
handler.handleNext(RequestedMessageDetails(data))
handler.handleNext(RequestedMessageDetails(data, sessionGroup = if (markerAsGroup) marker else null))
}

}
Loading