Skip to content

Commit 50f58e2

Browse files
author
Oleg
committed
[TH2-4971] Add group into message ID in transport response
1 parent b5bf77c commit 50f58e2

File tree

11 files changed

+263
-89
lines changed

11 files changed

+263
-89
lines changed

src/main/kotlin/com/exactpro/th2/lwdataprovider/Utils.kt

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,8 @@ import com.exactpro.cradle.messages.StoredMessageId
2020
import com.exactpro.cradle.messages.StoredMessageIdUtils
2121
import com.exactpro.th2.dataprovider.lw.grpc.BookId
2222
import com.exactpro.th2.lwdataprovider.entities.requests.GetEventRequest
23+
import com.exactpro.th2.lwdataprovider.entities.responses.MessageIdWithGroup
2324
import com.google.gson.Gson
24-
import java.time.ZoneOffset
25-
import java.time.format.DateTimeFormatter
26-
import java.util.*
2725

2826
private val gson = Gson()
2927

@@ -39,9 +37,19 @@ fun failureReason(batchId: String? = null, id: String? = null, error: String): S
3937

4038
fun StoredMessageId.toReportId() =
4139
"${bookId.name}:$sessionAlias:${direction.label}:${StoredMessageIdUtils.timestampToString(timestamp)}:$sequence"
40+
41+
fun MessageIdWithGroup.toReportId() = with(messageId) {
42+
"${bookId.name}:$group:$sessionAlias:${direction.label}:${StoredMessageIdUtils.timestampToString(timestamp)}:$sequence"
43+
}
4244
fun GetEventRequest.failureReason(error: String): String = failureReason(batchId, eventId, error)
4345
fun StoredMessageId.failureReason(error: String): String = failureReason(
4446
null,
4547
toReportId(),
4648
error
49+
)
50+
51+
fun MessageIdWithGroup.failureReason(error: String): String = failureReason(
52+
null,
53+
toReportId(),
54+
error
4755
)

src/main/kotlin/com/exactpro/th2/lwdataprovider/entities/responses/CustomSerilizer.kt

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package com.exactpro.th2.lwdataprovider.entities.responses
1818

19-
import com.exactpro.cradle.messages.StoredMessageId
2019
import com.exactpro.cradle.utils.EscapeUtils.escape
2120
import com.exactpro.cradle.utils.TimeUtils
2221
import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.ParsedMessage
@@ -25,9 +24,6 @@ import com.exactpro.th2.lwdataprovider.entities.responses.ser.numberOfDigits
2524
import java.io.ByteArrayOutputStream
2625
import java.io.OutputStream
2726
import java.time.Instant
28-
import kotlin.math.ceil
29-
import kotlin.math.log10
30-
import kotlin.math.max
3127
import kotlin.text.Charsets.UTF_8
3228

3329
private val COMMA = ",".toByteArray(UTF_8).first().toInt()
@@ -80,28 +76,34 @@ fun ProviderMessage53Transport.toJSONByteArray(): ByteArray =
8076
write(CLOSING_CURLY_BRACE)
8177
}.toByteArray()
8278

83-
private fun OutputStream.writeMessageId(messageId: StoredMessageId) {
79+
private fun OutputStream.writeMessageId(idWithGroup: MessageIdWithGroup) {
8480
write(MESSAGE_ID_FILED)
8581
write(COLON)
8682
write(DOUBLE_QUOTE)
87-
with(messageId) {
88-
write(escape(bookId.toString()).toByteArray(UTF_8))
89-
write(COLON)
90-
write(escape(sessionAlias).toByteArray(UTF_8))
91-
write(COLON)
92-
write(direction.label.toByteArray(UTF_8))
93-
write(COLON)
94-
TimeUtils.toLocalTimestamp(timestamp).apply {
95-
writeNumber(year, 4)
96-
writeTwoDigits(monthValue)
97-
writeTwoDigits(dayOfMonth)
98-
writeTwoDigits(hour)
99-
writeTwoDigits(minute)
100-
writeTwoDigits(second)
101-
writeNumber(nano, 9)
83+
with(idWithGroup) {
84+
with(messageId) {
85+
write(escape(bookId.toString()).toByteArray(UTF_8))
86+
write(COLON)
87+
if (group != null) {
88+
write(escape(group).toByteArray(UTF_8))
89+
write(COLON)
90+
}
91+
write(escape(sessionAlias).toByteArray(UTF_8))
92+
write(COLON)
93+
write(direction.label.toByteArray(UTF_8))
94+
write(COLON)
95+
TimeUtils.toLocalTimestamp(timestamp).apply {
96+
writeNumber(year, 4)
97+
writeTwoDigits(monthValue)
98+
writeTwoDigits(dayOfMonth)
99+
writeTwoDigits(hour)
100+
writeTwoDigits(minute)
101+
writeTwoDigits(second)
102+
writeNumber(nano, 9)
103+
}
104+
write(COLON)
105+
write(sequence.toString().toByteArray(UTF_8))
102106
}
103-
write(COLON)
104-
write(sequence.toString().toByteArray(UTF_8))
105107
}
106108
write(DOUBLE_QUOTE)
107109
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2023 Exactpro (Exactpro Systems Limited)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.exactpro.th2.lwdataprovider.entities.responses
18+
19+
import com.exactpro.cradle.messages.StoredMessageId
20+
import com.exactpro.cradle.messages.StoredMessageIdUtils
21+
import com.exactpro.cradle.utils.EscapeUtils
22+
23+
class MessageIdWithGroup private constructor(
24+
val group: String?,
25+
val messageId: StoredMessageId,
26+
) {
27+
28+
companion object {
29+
@JvmStatic
30+
fun create(messageId: StoredMessageId): MessageIdWithGroup {
31+
return MessageIdWithGroup(null, messageId)
32+
}
33+
@JvmStatic
34+
fun create(group: String?, messageId: StoredMessageId): MessageIdWithGroup {
35+
return MessageIdWithGroup(group?.ifEmpty { messageId.sessionAlias }, messageId)
36+
}
37+
@JvmStatic
38+
fun fromString(value: String): MessageIdWithGroup {
39+
val parts = EscapeUtils.split(value).toMutableList()
40+
require(parts.size == 5 || parts.size == 6) {
41+
"invalid format for message ID with group: $value"
42+
}
43+
var group: String? = null
44+
if (parts.size == 6) {
45+
// book:<group>:alias:direction:timestamp:sequence
46+
group = parts[1]?.takeUnless { it.isEmpty() }
47+
parts.removeAt(1)
48+
}
49+
val seq = StoredMessageIdUtils.getSequence(parts)
50+
val timestamp = StoredMessageIdUtils.getTimestamp(parts)
51+
val direction = StoredMessageIdUtils.getDirection(parts)
52+
val session = StoredMessageIdUtils.getSessionAlias(parts)
53+
val book = StoredMessageIdUtils.getBookId(parts)
54+
val id = StoredMessageId(book, session, direction, timestamp, seq)
55+
return MessageIdWithGroup(group, id)
56+
}
57+
}
58+
}

src/main/kotlin/com/exactpro/th2/lwdataprovider/entities/responses/ProviderMessage53Transport.kt

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package com.exactpro.th2.lwdataprovider.entities.responses
1818

1919
import com.exactpro.cradle.Direction.FIRST
2020
import com.exactpro.cradle.messages.StoredMessage
21-
import com.exactpro.cradle.messages.StoredMessageId
2221
import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.ParsedMessage
2322
import com.exactpro.th2.lwdataprovider.entities.internal.Direction
2423
import kotlinx.serialization.Serializable
@@ -40,8 +39,8 @@ data class ProviderMessage53Transport constructor(
4039

4140
val bodyBase64: String?,
4241

43-
@Serializable(with = StoredMessageIdSerializer::class)
44-
val messageId: StoredMessageId,
42+
@Serializable(with = MessageIDWithGroupSerializer::class)
43+
val messageId: MessageIdWithGroup,
4544
) : ResponseMessage {
4645

4746
constructor(
@@ -58,6 +57,6 @@ data class ProviderMessage53Transport constructor(
5857
attachedEventIds = events,
5958
body = body?.map { TransportMessageContainer(sessionGroup, it) },
6059
bodyBase64 = base64Body,
61-
messageId = rawStoredMessage.id
60+
messageId = MessageIdWithGroup.create(sessionGroup, rawStoredMessage.id),
6261
)
6362
}

src/main/kotlin/com/exactpro/th2/lwdataprovider/entities/responses/ResponseMessage.kt

Lines changed: 57 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,6 @@ import kotlinx.serialization.modules.SerializersModule
4646
import kotlinx.serialization.modules.contextual
4747
import kotlinx.serialization.serializer
4848
import java.time.Instant
49-
import kotlin.math.ceil
50-
import kotlin.math.log10
5149

5250
/**
5351
* Marker interface to specify the message what can be sent in response to message request
@@ -105,63 +103,77 @@ object InstantSerializer : KSerializer<Instant> {
105103
}
106104
}
107105

108-
@OptIn(ExperimentalSerializationApi::class)
109-
@Serializer(forClass = StoredMessageId::class)
106+
object MessageIDWithGroupSerializer : KSerializer<MessageIdWithGroup> {
107+
override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("MessageIdWithGroup", PrimitiveKind.STRING)
108+
109+
override fun serialize(encoder: Encoder, value: MessageIdWithGroup) {
110+
encoder.encodeString(
111+
buildString {
112+
idToString(value.messageId, group = value.group)
113+
}
114+
)
115+
}
116+
117+
override fun deserialize(decoder: Decoder): MessageIdWithGroup =
118+
MessageIdWithGroup.fromString(decoder.decodeString())
119+
}
120+
110121
object StoredMessageIdSerializer : KSerializer<StoredMessageId> {
111122
override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("StoredMessageId", PrimitiveKind.STRING)
112123
override fun serialize(encoder: Encoder, value: StoredMessageId) {
113-
encoder.encodeString(idToString(value))
124+
encoder.encodeString(buildString { idToString(value) })
114125
}
115126

116-
internal fun idToString(value: StoredMessageId): String =
117-
run {
118-
// Here we try to avoid constant call for timestamp formatter that is used in StoredMessageId.toString()
119-
// And build the ID ourselves
120-
buildString {
121-
with(value) {
122-
append(EscapeUtils.escape(bookId.toString()))
123-
append(EscapeUtils.DELIMITER)
124-
append(EscapeUtils.escape(sessionAlias))
125-
append(EscapeUtils.DELIMITER)
126-
append(direction.label)
127-
append(EscapeUtils.DELIMITER)
128-
appendTimestamp(timestamp)
129-
append(EscapeUtils.DELIMITER)
130-
append(sequence)
131-
}
132-
}
133-
}
134-
135127
override fun deserialize(decoder: Decoder): StoredMessageId = StoredMessageId.fromString(decoder.decodeString())
128+
}
136129

137-
private fun StringBuilder.appendTimestamp(timestamp: Instant) {
138-
TimeUtils.toLocalTimestamp(timestamp).apply {
139-
appendNumber(year, 4)
140-
appendTwoDigits(monthValue)
141-
appendTwoDigits(dayOfMonth)
142-
appendTwoDigits(hour)
143-
appendTwoDigits(minute)
144-
appendTwoDigits(second)
145-
appendNumber(nano, 9)
130+
internal fun StringBuilder.idToString(value: StoredMessageId, group: String? = null): String = apply {
131+
// Here we try to avoid constant call for timestamp formatter that is used in StoredMessageId.toString()
132+
// And build the ID ourselves
133+
with(value) {
134+
append(EscapeUtils.escape(bookId.toString()))
135+
append(EscapeUtils.DELIMITER)
136+
if (group != null) {
137+
append(EscapeUtils.escape(group))
138+
append(EscapeUtils.DELIMITER)
146139
}
140+
append(EscapeUtils.escape(sessionAlias))
141+
append(EscapeUtils.DELIMITER)
142+
append(direction.label)
143+
append(EscapeUtils.DELIMITER)
144+
appendTimestamp(timestamp)
145+
append(EscapeUtils.DELIMITER)
146+
append(sequence)
147147
}
148+
}.toString()
149+
150+
private fun StringBuilder.appendTimestamp(timestamp: Instant) {
151+
TimeUtils.toLocalTimestamp(timestamp).apply {
152+
appendNumber(year, 4)
153+
appendTwoDigits(monthValue)
154+
appendTwoDigits(dayOfMonth)
155+
appendTwoDigits(hour)
156+
appendTwoDigits(minute)
157+
appendTwoDigits(second)
158+
appendNumber(nano, 9)
159+
}
160+
}
148161

149-
private fun StringBuilder.appendTwoDigits(value: Int) {
150-
if (value < 10) {
151-
append(0)
152-
}
153-
append(value)
162+
private fun StringBuilder.appendTwoDigits(value: Int) {
163+
if (value < 10) {
164+
append(0)
154165
}
166+
append(value)
167+
}
155168

156-
private fun StringBuilder.appendNumber(value: Int, size: Int) {
157-
val digits = numberOfDigits(value)
158-
if (digits < size) {
159-
repeat(size - digits) {
160-
append(0)
161-
}
169+
private fun StringBuilder.appendNumber(value: Int, size: Int) {
170+
val digits = numberOfDigits(value)
171+
if (digits < size) {
172+
repeat(size - digits) {
173+
append(0)
162174
}
163-
append(value)
164175
}
176+
append(value)
165177
}
166178

167179
object FieldSerializer : KSerializer<Any> {

src/main/kotlin/com/exactpro/th2/lwdataprovider/handlers/SearchMessagesHandler.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ class SearchMessagesHandler(
197197
val rootSink = RootMessagesDataSink(
198198
requestContext,
199199
if (request.rawOnly) {
200-
RawStoredMessageHandler(requestContext)
200+
RawStoredMessageHandler(requestContext, markerAsGroup = true)
201201
} else {
202202
if (configuration.useTransportMode) {
203203
TransportParsedStoredMessageHandler(
@@ -241,7 +241,7 @@ class SearchMessagesHandler(
241241
val rootSink = RootMessagesDataSink(
242242
requestContext,
243243
if (request.responseFormats.hasRowOnly()) {
244-
RawStoredMessageHandler(requestContext)
244+
RawStoredMessageHandler(requestContext, markerAsGroup = true)
245245
} else {
246246
if (configuration.useTransportMode) {
247247
TransportParsedStoredMessageHandler(
@@ -581,6 +581,7 @@ private abstract class MessagesDataSink(
581581

582582
private class RawStoredMessageHandler(
583583
private val handler: MessageResponseHandler,
584+
private val markerAsGroup: Boolean = false,
584585
) : MarkedResponseHandler<String, StoredMessage> {
585586
override val isAlive: Boolean
586587
get() = handler.isAlive
@@ -597,7 +598,7 @@ private class RawStoredMessageHandler(
597598
}
598599

599600
override fun handleNext(marker: String, data: StoredMessage) {
600-
handler.handleNext(RequestedMessageDetails(data))
601+
handler.handleNext(RequestedMessageDetails(data, sessionGroup = if (markerAsGroup) marker else null))
601602
}
602603

603604
}

0 commit comments

Comments
 (0)