Skip to content

Commit 0aa0d30

Browse files
committed
Add GetLogRangeLimit message to read log ranges by limit of events
1 parent fd98efa commit 0aa0d30

File tree

13 files changed

+221
-27
lines changed

13 files changed

+221
-27
lines changed

src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala

+27-11
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@
1515
*/
1616
package com.comcast.xfinity.sirius.api.impl.state
1717

18-
import akka.actor.{Props, Actor, ActorRef}
18+
import akka.actor.{Actor, ActorRef, Props}
1919
import com.comcast.xfinity.sirius.writeaheadlog.SiriusLog
2020
import com.comcast.xfinity.sirius.api.{SiriusConfiguration, SiriusResult}
2121
import com.comcast.xfinity.sirius.api.impl._
2222
import com.comcast.xfinity.sirius.admin.MonitoringHooks
2323

24+
import scala.collection.mutable.ListBuffer
25+
2426
object SiriusPersistenceActor {
2527

2628
/**
@@ -41,6 +43,7 @@ object SiriusPersistenceActor {
4143
* @param end last sequence number of the range, inclusive
4244
*/
4345
case class GetLogSubrange(begin: Long, end: Long) extends LogQuery
46+
case class GetLogRangeLimit(begin: Long, events: Long) extends LogQuery
4447

4548
trait LogSubrange
4649
trait PopulatedSubrange extends LogSubrange {
@@ -129,21 +132,34 @@ class SiriusPersistenceActor(stateActor: ActorRef,
129132

130133
lastWriteTime = thisWriteTime
131134

135+
case GetLogRangeLimit(start, limit) =>
136+
val buffer = siriusLog.foldLeftWhile(start)(ListBuffer.empty[OrderedEvent])(buffer => buffer.length < limit)(
137+
(acc, event) => acc += event
138+
)
139+
if (buffer.isEmpty) {
140+
sender ! EmptySubrange
141+
}
142+
if (buffer.length < limit) {
143+
sender ! PartialSubrange(buffer.head.sequence, buffer.last.sequence, buffer.toList)
144+
} else {
145+
sender ! CompleteSubrange(buffer.head.sequence, buffer.last.sequence, buffer.toList)
146+
}
147+
132148
case GetLogSubrange(rangeStart, rangeEnd) if rangeEnd < siriusLog.getNextSeq => // we can answer fully
133-
val events = siriusLog.foldLeftRange(rangeStart, rangeEnd)(List[OrderedEvent]())(
134-
(acc, event) => event :: acc
135-
).reverse
136-
sender ! CompleteSubrange(rangeStart, rangeEnd, events)
149+
val events = siriusLog.foldLeftRange(rangeStart, rangeEnd)(ListBuffer.empty[OrderedEvent])(
150+
(acc, event) => acc += event
151+
)
152+
sender ! CompleteSubrange(rangeStart, rangeEnd, events.toList)
137153

138-
case GetLogSubrange(rangeStart, rangeEnd) if siriusLog.getNextSeq <= rangeStart => // we can't send anything useful back
154+
case GetLogSubrange(rangeStart, _) if siriusLog.getNextSeq <= rangeStart => // we can't send anything useful back
139155
sender ! EmptySubrange
140156

141-
case GetLogSubrange(rangeStart, rangeEnd) => // we can respond partially
157+
case GetLogSubrange(rangeStart, _) => // we can respond partially
142158
val lastSeq = siriusLog.getNextSeq - 1
143-
val events = siriusLog.foldLeftRange(rangeStart, lastSeq)(List[OrderedEvent]())(
144-
(acc, event) => event :: acc
145-
).reverse
146-
sender ! PartialSubrange(rangeStart, lastSeq, events)
159+
val events = siriusLog.foldLeftRange(rangeStart, lastSeq)(ListBuffer.empty[OrderedEvent])(
160+
(acc, event) => acc += event
161+
)
162+
sender ! PartialSubrange(rangeStart, lastSeq, events.toList)
147163

148164
case GetNextLogSeq =>
149165
sender ! siriusLog.getNextSeq

src/main/scala/com/comcast/xfinity/sirius/uberstore/UberPair.scala

+5
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,11 @@ class UberPair(dataFile: UberDataFile, index: SeqIndex) {
107107
)
108108
}
109109

110+
def foldLeftWhile[T](startSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = {
111+
val (startOffset, _) = index.getOffsetRange(startSeq, Long.MaxValue)
112+
dataFile.foldLeftWhile(startOffset)(acc0)(pred)(foldFun)
113+
}
114+
110115
/**
111116
* Close underlying file handles or connections. This UberStoreFilePair should not be used after
112117
* close is called.

src/main/scala/com/comcast/xfinity/sirius/uberstore/UberStore.scala

+7
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,13 @@ class UberStore private[uberstore] (baseDir: String, uberpair: UberPair) extends
8282
def foldLeftRange[T](startSeq: Long, endSeq: Long)(acc0: T)(foldFun: (T, OrderedEvent) => T): T =
8383
uberpair.foldLeftRange(startSeq, endSeq)(acc0)(foldFun)
8484

85+
/**
86+
* @inheritdoc
87+
*/
88+
def foldLeftWhile[T](startSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = {
89+
uberpair.foldLeftWhile(startSeq)(acc0)(pred)(foldFun)
90+
}
91+
8592
/**
8693
* Close underlying file handles or connections. This UberStore should not be used after
8794
* close is called.

src/main/scala/com/comcast/xfinity/sirius/uberstore/data/UberDataFile.scala

+26-3
Original file line numberDiff line numberDiff line change
@@ -101,15 +101,24 @@ private[uberstore] class UberDataFile(dataFileName: String,
101101
def foldLeftRange[T](baseOff: Long, endOff: Long)(acc0: T)(foldFun: (T, Long, OrderedEvent) => T): T = {
102102
val readHandle = fileHandleFactory.createReadHandle(dataFileName, baseOff)
103103
try {
104-
foldLeftUntil(readHandle, endOff, acc0, foldFun)
104+
foldLeftUntilOffset(readHandle, endOff, acc0, foldFun)
105+
} finally {
106+
readHandle.close()
107+
}
108+
}
109+
110+
def foldLeftWhile[T](baseOff:Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = {
111+
val readHandle = fileHandleFactory.createReadHandle(dataFileName, baseOff)
112+
try {
113+
foldLeftWhile(readHandle, acc0, pred, foldFun)
105114
} finally {
106115
readHandle.close()
107116
}
108117
}
109118

110119
// private low low low level fold left
111120
@tailrec
112-
private def foldLeftUntil[T](readHandle: UberDataFileReadHandle, maxOffset: Long, acc: T, foldFun: (T, Long, OrderedEvent) => T): T = {
121+
private def foldLeftUntilOffset[T](readHandle: UberDataFileReadHandle, maxOffset: Long, acc: T, foldFun: (T, Long, OrderedEvent) => T): T = {
113122
val offset = readHandle.offset()
114123
if (offset > maxOffset) {
115124
acc
@@ -118,7 +127,21 @@ private[uberstore] class UberDataFile(dataFileName: String,
118127
case None => acc
119128
case Some(bytes) =>
120129
val accNew = foldFun(acc, offset, codec.deserialize(bytes))
121-
foldLeftUntil(readHandle, maxOffset, accNew, foldFun)
130+
foldLeftUntilOffset(readHandle, maxOffset, accNew, foldFun)
131+
}
132+
}
133+
}
134+
135+
@tailrec
136+
private def foldLeftWhile[T](readHandle: UberDataFileReadHandle, acc: T, pred: T => Boolean, foldFun: (T, OrderedEvent) => T): T = {
137+
if (!pred(acc)) {
138+
acc
139+
} else {
140+
fileOps.readNext(readHandle) match {
141+
case None => acc
142+
case Some(bytes) =>
143+
val accNew = foldFun(acc, codec.deserialize(bytes))
144+
foldLeftWhile(readHandle, accNew, pred, foldFun)
122145
}
123146
}
124147
}

src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/Segment.scala

+14-2
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ class Segment private[uberstore](val location: File,
107107
* Get the number of entries written to the Segment
108108
* @return number of entries in the Segment
109109
*/
110-
def size = index.size
110+
def size: Long = index.size
111111

112112
/**
113113
* Write OrderedEvent event into this dir. Will fail if closed or sequence
@@ -136,10 +136,15 @@ class Segment private[uberstore](val location: File,
136136
(acc, event) => acc + event.request.key
137137
)
138138

139+
/**
140+
* Get the first sequence number in this dir.
141+
*/
142+
def getFirstSeq: Option[Long] = index.getMinSeq
143+
139144
/**
140145
* Get the next possible sequence number in this dir.
141146
*/
142-
def getNextSeq = index.getMaxSeq match {
147+
def getNextSeq: Long = index.getMaxSeq match {
143148
case None => 1L
144149
case Some(seq) => seq + 1
145150
}
@@ -167,6 +172,13 @@ class Segment private[uberstore](val location: File,
167172
)
168173
}
169174

175+
def foldLeftWhile[T](startSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = {
176+
val (startOffset, _) = index.getOffsetRange(startSeq, Long.MaxValue)
177+
dataFile.foldLeftWhile(startOffset)(acc0)(pred)(
178+
(acc, evt) => foldFun(acc, evt)
179+
)
180+
}
181+
170182
/**
171183
* Close underlying file handles or connections. This Segment should not be used after
172184
* close is called.

src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/SegmentedUberStore.scala

+9
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,15 @@ class SegmentedUberStore private[segmented] (base: JFile,
151151
liveDir.foldLeftRange(startSeq, endSeq)(res0)(foldFun)
152152
}
153153

154+
/**
155+
* @inheritdoc
156+
*/
157+
def foldLeftWhile[T](startSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = {
158+
val res0 = readOnlyDirs.foldLeft(acc0)(
159+
(acc, dir) => dir.foldLeftWhile(startSeq)(acc)(pred)(foldFun)
160+
)
161+
liveDir.foldLeftWhile(startSeq)(res0)(pred)(foldFun)
162+
}
154163

155164
/**
156165
* Close underlying file handles or connections. This SegmentedUberStore should not be used after

src/main/scala/com/comcast/xfinity/sirius/uberstore/seqindex/DiskOnlySeqIndex.scala

+19-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,17 @@ class DiskOnlySeqIndex private(handle: RandomAccessFile,
5151
var isClosed = false
5252
var size: Long = handle.length() / 24
5353

54-
var maxSeq = {
54+
private var minSeq = {
55+
if (handle.length == 0)
56+
None
57+
else {
58+
handle.seek(0)
59+
val (seq, _) = fileOps.readEntry(handle)
60+
Some(seq)
61+
}
62+
}
63+
64+
private var maxSeq = {
5565
if (handle.length == 0)
5666
None
5767
else {
@@ -87,6 +97,11 @@ class DiskOnlySeqIndex private(handle: RandomAccessFile,
8797
getOffsetForAux(0, handle.length)
8898
}
8999

100+
/**
101+
* {@inheritdoc}
102+
*/
103+
def getMinSeq: Option[Long] = minSeq
104+
90105
/**
91106
* {@inheritdoc}
92107
*/
@@ -98,6 +113,9 @@ class DiskOnlySeqIndex private(handle: RandomAccessFile,
98113
def put(seq: Long, offset: Long): Unit = synchronized {
99114
handle.seek(handle.length)
100115
fileOps.put(handle, seq, offset)
116+
if (minSeq.isEmpty) {
117+
minSeq = Some(seq)
118+
}
101119
maxSeq = Some(seq)
102120
size += 1
103121
}

src/main/scala/com/comcast/xfinity/sirius/uberstore/seqindex/SeqIndex.scala

+7
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,13 @@ trait SeqIndex {
2525
*/
2626
def getOffsetFor(seq: Long): Option[Long]
2727

28+
/**
29+
* Get the minimum sequence number stored, if such exists
30+
*
31+
* @return Some(sequence) or None if none such exists
32+
*/
33+
def getMinSeq: Option[Long]
34+
2835
/**
2936
* Get the maximum sequence number stored, if such exists
3037
*

src/main/scala/com/comcast/xfinity/sirius/writeaheadlog/CachedSiriusLog.scala

+3
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,9 @@ class CachedSiriusLog(log: SiriusLog, maxCacheSize: Int) extends SiriusLog {
108108
.values.asScala.foldLeft(acc0)(foldFun)
109109
}
110110

111+
override def foldLeftWhile[T](startSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T =
112+
log.foldLeftWhile(startSeq)(acc0)(pred)(foldFun)
113+
111114
def getNextSeq = log.getNextSeq
112115

113116
def compact(): Unit = {

src/main/scala/com/comcast/xfinity/sirius/writeaheadlog/SiriusLog.scala

+9
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,15 @@ trait SiriusLog {
5959
*/
6060
def foldLeftRange[T](startSeq: Long, endSeq: Long)(acc0: T)(foldFun: (T, OrderedEvent) => T): T
6161

62+
/**
63+
* Fold left across log entries while a condition is met
64+
* @param startSeq sequence number to start with, inclusive
65+
* @param acc0 initial accumulator value
66+
* @param pred condition to continue accumulating log entries
67+
* @param foldFun function to apply to the log entry, the result being the new accumulator
68+
*/
69+
def foldLeftWhile[T](startSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T
70+
6271
/**
6372
* retrieves the next sequence number to be written
6473
*/

0 commit comments

Comments
 (0)