Skip to content

Commit 23381d4

Browse files
committed
Make end sequence optional
1 parent e6dfb3b commit 23381d4

File tree

2 files changed

+44
-14
lines changed

2 files changed

+44
-14
lines changed

src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala

+32-2
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ object SiriusPersistenceActor {
5454
* @param end last sequence number of the range, inclusive
5555
* @param limit the maximum number of events
5656
*/
57-
case class GetLogSubrangeWithLimit(begin: Long, end: Long, limit: Long) extends LogQuery
57+
case class GetLogSubrangeWithLimit(begin: Long, end: Option[Long], limit: Long) extends LogQuery
5858

5959
trait LogSubrange
6060
trait PopulatedSubrange extends LogSubrange {
@@ -143,9 +143,12 @@ class SiriusPersistenceActor(stateActor: ActorRef,
143143

144144
lastWriteTime = thisWriteTime
145145

146-
case GetLogSubrangeWithLimit(start, end, limit) =>
146+
case GetLogSubrangeWithLimit(start, Some(end), limit) =>
147147
sender ! queryLimitedSubrange(start, end, limit)
148148

149+
case GetLogSubrangeWithLimit(start, None, limit) =>
150+
sender ! queryLimited(start, limit)
151+
149152
case GetLogSubrange(start, end) =>
150153
sender ! querySubrange(start, end)
151154

@@ -201,6 +204,33 @@ class SiriusPersistenceActor(stateActor: ActorRef,
201204
}
202205
}
203206

207+
private def queryLimited(rangeStart: Long, limit: Long): LogSubrange = {
208+
if (limit <= 0) {
209+
// invalid query subrange or limit, we can't send anything useful back
210+
EmptySubrange
211+
} else {
212+
val nextSeq = siriusLog.getNextSeq
213+
if (rangeStart >= nextSeq) {
214+
// query is out of range, we can't send anything useful back
215+
EmptySubrange
216+
} else {
217+
val nextSeq = siriusLog.getNextSeq
218+
val lastSeq = nextSeq - 1
219+
val buffer = siriusLog.foldLeftRangeWhile(rangeStart, lastSeq)(ListBuffer.empty[OrderedEvent])(
220+
// continue folding events as long as the buffer is smaller than the limit
221+
buffer => buffer.size < limit
222+
)(
223+
(acc, event) => acc += event
224+
)
225+
if (buffer.size < limit) {
226+
CompleteSubrange(rangeStart, lastSeq, buffer.toList)
227+
} else {
228+
PartialSubrange(rangeStart, buffer.last.sequence, buffer.toList)
229+
}
230+
}
231+
}
232+
}
233+
204234
private def querySubrange(rangeStart: Long, rangeEnd: Long): LogSubrange =
205235
if (rangeEnd < rangeStart || rangeEnd <= 0) {
206236
// invalid query subrange or limit, we can't send anything useful back

src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala

+12-12
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ class SiriusPersistenceActorTest extends NiceTest {
194194
val mockLog = makeMockLog(ListBuffer(event1, event2), 10L)
195195
val underTest = makePersistenceActor(siriusLog = mockLog)
196196

197-
senderProbe.send(underTest, GetLogSubrangeWithLimit(1, 2, 2))
197+
senderProbe.send(underTest, GetLogSubrangeWithLimit(1, Some(2), 2))
198198

199199
verifyFoldLeftRanged(mockLog, 1, 2)
200200
senderProbe.expectMsg(CompleteSubrange(1, 2, List(event1, event2)))
@@ -205,33 +205,33 @@ class SiriusPersistenceActorTest extends NiceTest {
205205
val senderProbe = TestProbe()
206206

207207
val event1 = mock[OrderedEvent]
208-
doReturn(8L).when(event1).sequence
208+
doReturn(1L).when(event1).sequence
209209
val event2 = mock[OrderedEvent]
210-
doReturn(9L).when(event2).sequence
210+
doReturn(2L).when(event2).sequence
211211
val mockLog = makeMockLog(ListBuffer(event1, event2), 10L)
212212
val underTest = makePersistenceActor(siriusLog = mockLog)
213213

214-
senderProbe.send(underTest, GetLogSubrangeWithLimit(8, Long.MaxValue, 3))
214+
senderProbe.send(underTest, GetLogSubrangeWithLimit(1, None, 3))
215215

216-
verifyFoldLeftWhile(mockLog, 8, 9)
217-
senderProbe.expectMsg(PartialSubrange(8, 9, List(event1, event2)))
216+
verifyFoldLeftWhile(mockLog, 1, 9)
217+
senderProbe.expectMsg(CompleteSubrange(1, 9, List(event1, event2)))
218218
}
219219
}
220220
describe("when we can partially reply due to limit") {
221221
it("should build the list of events and reply with it") {
222222
val senderProbe = TestProbe()
223223

224224
val event1 = mock[OrderedEvent]
225-
doReturn(8L).when(event1).sequence
225+
doReturn(1L).when(event1).sequence
226226
val event2 = mock[OrderedEvent]
227-
doReturn(9L).when(event2).sequence
227+
doReturn(2L).when(event2).sequence
228228
val mockLog = makeMockLog(ListBuffer(event1, event2), 11L)
229229
val underTest = makePersistenceActor(siriusLog = mockLog)
230230

231-
senderProbe.send(underTest, GetLogSubrangeWithLimit(8, 10, 2))
231+
senderProbe.send(underTest, GetLogSubrangeWithLimit(1, Some(10), 2))
232232

233-
verifyFoldLeftWhile(mockLog, 8, 10)
234-
senderProbe.expectMsg(PartialSubrange(8, 9, List(event1, event2)))
233+
verifyFoldLeftWhile(mockLog, 1, 10)
234+
senderProbe.expectMsg(PartialSubrange(1, 2, List(event1, event2)))
235235
}
236236
}
237237
describe("when we can't send anything useful at all") {
@@ -241,7 +241,7 @@ class SiriusPersistenceActorTest extends NiceTest {
241241
val mockLog = makeMockLog(ListBuffer(), 5L)
242242
val underTest = makePersistenceActor(siriusLog = mockLog)
243243

244-
senderProbe.send(underTest, GetLogSubrangeWithLimit(8, Long.MaxValue, 11))
244+
senderProbe.send(underTest, GetLogSubrangeWithLimit(8, None, 11))
245245

246246
senderProbe.expectMsg(EmptySubrange)
247247
}

0 commit comments

Comments
 (0)