Skip to content

Commit 4c436de

Browse files
committed
Refactor to make limited subrange include end sequence
1 parent 529eb78 commit 4c436de

File tree

14 files changed

+175
-85
lines changed

14 files changed

+175
-85
lines changed

src/main/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisor.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import scala.concurrent.duration._
2222
import com.comcast.xfinity.sirius.api.impl.membership.MembershipHelper
2323
import com.comcast.xfinity.sirius.api.SiriusConfiguration
2424
import com.comcast.xfinity.sirius.api.impl.bridge.CatchupSupervisor.CatchupSupervisorInfoMBean
25-
import com.comcast.xfinity.sirius.api.impl.state.SiriusPersistenceActor.{CompleteSubrange, GetLogSubrangeToLimit, LogSubrange}
25+
import com.comcast.xfinity.sirius.api.impl.state.SiriusPersistenceActor.{CompleteSubrange, GetLogSubrangeWithLimit, LogSubrange}
2626
import com.comcast.xfinity.sirius.admin.MonitoringHooks
2727

2828
import scala.util.Success
@@ -122,7 +122,7 @@ private[bridge] class CatchupSupervisor(membershipHelper: MembershipHelper,
122122
}
123123

124124
def requestSubrange(fromSeq: Long, window: Int, source: ActorRef): Unit = {
125-
source.ask(GetLogSubrangeToLimit(fromSeq, window))(timeout()).onComplete {
125+
source.ask(GetLogSubrangeWithLimit(fromSeq, Long.MaxValue, window))(timeout()).onComplete {
126126
case Success(logSubrange: LogSubrange) => self ! CatchupRequestSucceeded(logSubrange)
127127
case _ => self ! CatchupRequestFailed
128128
}

src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala

+82-28
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,18 @@ object SiriusPersistenceActor {
4343
* @param end last sequence number of the range, inclusive
4444
*/
4545
case class GetLogSubrange(begin: Long, end: Long) extends LogQuery
46-
case class GetLogSubrangeToLimit(begin: Long, events: Long) extends LogQuery
46+
/**
47+
* Message for directly requesting a chunk of the log from a node.
48+
*
49+
* SiriusPersistenceActor is expected to reply with a LogSubrange
50+
* when receiving this message. This range should be as complete
51+
* as possible.
52+
*
53+
* @param begin first sequence number of the range
54+
* @param end last sequence number of the range, inclusive
55+
* @param limit the maximum number of events
56+
*/
57+
case class GetLogSubrangeWithLimit(begin: Long, end: Long, limit: Long) extends LogQuery
4758

4859
trait LogSubrange
4960
trait PopulatedSubrange extends LogSubrange {
@@ -132,34 +143,11 @@ class SiriusPersistenceActor(stateActor: ActorRef,
132143

133144
lastWriteTime = thisWriteTime
134145

135-
case GetLogSubrangeToLimit(start, limit) =>
136-
val buffer = siriusLog.foldLeftWhile(start)(ListBuffer.empty[OrderedEvent])(buffer => buffer.length < limit)(
137-
(acc, event) => acc += event
138-
)
139-
if (buffer.isEmpty) {
140-
sender ! EmptySubrange
141-
}
142-
if (buffer.length < limit) {
143-
sender ! PartialSubrange(buffer.head.sequence, buffer.last.sequence, buffer.toList)
144-
} else {
145-
sender ! CompleteSubrange(buffer.head.sequence, buffer.last.sequence, buffer.toList)
146-
}
147-
148-
case GetLogSubrange(rangeStart, rangeEnd) if rangeEnd < siriusLog.getNextSeq => // we can answer fully
149-
val events = siriusLog.foldLeftRange(rangeStart, rangeEnd)(ListBuffer.empty[OrderedEvent])(
150-
(acc, event) => acc += event
151-
)
152-
sender ! CompleteSubrange(rangeStart, rangeEnd, events.toList)
153-
154-
case GetLogSubrange(rangeStart, _) if siriusLog.getNextSeq <= rangeStart => // we can't send anything useful back
155-
sender ! EmptySubrange
146+
case GetLogSubrangeWithLimit(start, end, limit) =>
147+
sender ! queryLimitedSubrange(start, end, limit)
156148

157-
case GetLogSubrange(rangeStart, _) => // we can respond partially
158-
val lastSeq = siriusLog.getNextSeq - 1
159-
val events = siriusLog.foldLeftRange(rangeStart, lastSeq)(ListBuffer.empty[OrderedEvent])(
160-
(acc, event) => acc += event
161-
)
162-
sender ! PartialSubrange(rangeStart, lastSeq, events.toList)
149+
case GetLogSubrange(start, end) =>
150+
sender ! querySubrange(start, end)
163151

164152
case GetNextLogSeq =>
165153
sender ! siriusLog.getNextSeq
@@ -172,6 +160,72 @@ class SiriusPersistenceActor(stateActor: ActorRef,
172160
case _: SiriusResult =>
173161
}
174162

163+
private def queryLimitedSubrange(rangeStart: Long, rangeEnd: Long, limit: Long): LogSubrange =
164+
if (limit <= 0 || rangeEnd < rangeStart || rangeEnd <= 0) {
165+
// invalid query subrange or limit, we can't send anything useful back
166+
EmptySubrange
167+
} else if (limit > (rangeEnd - rangeStart)) {
168+
// limit is larger than the subrange window, no need to enforce limit
169+
querySubrange(rangeStart, rangeEnd)
170+
} else {
171+
val nextSeq = siriusLog.getNextSeq
172+
if (rangeStart >= nextSeq) {
173+
// query is out of range, we can't send anything useful back
174+
EmptySubrange
175+
} else if (rangeEnd >= nextSeq) {
176+
// we can only answer partially
177+
val lastSeq = nextSeq - 1
178+
val buffer = siriusLog.foldLeftRangeWhile(rangeStart, lastSeq)(ListBuffer.empty[OrderedEvent])(
179+
// continue folding events as long as the buffer is smaller than the limit
180+
buffer => buffer.size < limit
181+
)(
182+
(acc, event) => acc += event
183+
)
184+
if (buffer.size < limit) {
185+
PartialSubrange(rangeStart, lastSeq, buffer.toList)
186+
} else {
187+
PartialSubrange(rangeStart, buffer.last.sequence, buffer.toList)
188+
}
189+
} else {
190+
val buffer = siriusLog.foldLeftRangeWhile(rangeStart, rangeEnd)(ListBuffer.empty[OrderedEvent])(
191+
// continue folding events as long as the buffer is smaller than the limit
192+
buffer => buffer.size < limit
193+
)(
194+
(acc, event) => acc += event
195+
)
196+
if (buffer.size < limit) {
197+
CompleteSubrange(rangeStart, rangeEnd, buffer.toList)
198+
} else {
199+
PartialSubrange(rangeStart, buffer.last.sequence, buffer.toList)
200+
}
201+
}
202+
}
203+
204+
private def querySubrange(rangeStart: Long, rangeEnd: Long): LogSubrange =
205+
if (rangeEnd < rangeStart || rangeEnd <= 0) {
206+
// invalid query subrange or limit, we can't send anything useful back
207+
EmptySubrange
208+
} else {
209+
val nextSeq = siriusLog.getNextSeq
210+
if (rangeStart >= nextSeq) {
211+
// query is out of range, we can't send anything useful back
212+
EmptySubrange
213+
} else if (rangeEnd >= nextSeq) {
214+
// we can answer partially
215+
val lastSeq = nextSeq - 1
216+
val buffer = siriusLog.foldLeftRange(rangeStart, lastSeq)(ListBuffer.empty[OrderedEvent])(
217+
(acc, event) => acc += event
218+
)
219+
PartialSubrange(rangeStart, lastSeq, buffer.toList)
220+
} else {
221+
// we can answer fully
222+
val buffer = siriusLog.foldLeftRange(rangeStart, rangeEnd)(ListBuffer.empty[OrderedEvent])(
223+
(acc, event) => acc += event
224+
)
225+
CompleteSubrange(rangeStart, rangeEnd, buffer.toList)
226+
}
227+
}
228+
175229
/**
176230
* Monitoring hooks
177231
*/

src/main/scala/com/comcast/xfinity/sirius/uberstore/UberPair.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,9 @@ class UberPair(dataFile: UberDataFile, index: SeqIndex) {
107107
)
108108
}
109109

110-
def foldLeftWhile[T](startSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = {
111-
val (startOffset, _) = index.getOffsetRange(startSeq, Long.MaxValue)
112-
dataFile.foldLeftWhile(startOffset)(acc0)(pred)(foldFun)
110+
def foldLeftRangeWhile[T](startSeq: Long, endSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = {
111+
val (startOffset, endOffset) = index.getOffsetRange(startSeq, endSeq)
112+
dataFile.foldLeftRangeWhile(startOffset, endOffset)(acc0)(pred)(foldFun)
113113
}
114114

115115
/**

src/main/scala/com/comcast/xfinity/sirius/uberstore/UberStore.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,8 @@ class UberStore private[uberstore] (baseDir: String, uberpair: UberPair) extends
8585
/**
8686
* @inheritdoc
8787
*/
88-
def foldLeftWhile[T](startSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = {
89-
uberpair.foldLeftWhile(startSeq)(acc0)(pred)(foldFun)
88+
def foldLeftRangeWhile[T](startSeq: Long, endSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = {
89+
uberpair.foldLeftRangeWhile(startSeq, endSeq)(acc0)(pred)(foldFun)
9090
}
9191

9292
/**

src/main/scala/com/comcast/xfinity/sirius/uberstore/data/UberDataFile.scala

+6-5
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,10 @@ private[uberstore] class UberDataFile(dataFileName: String,
107107
}
108108
}
109109

110-
def foldLeftWhile[T](baseOff:Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = {
110+
def foldLeftRangeWhile[T](baseOff:Long, endOff: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = {
111111
val readHandle = fileHandleFactory.createReadHandle(dataFileName, baseOff)
112112
try {
113-
foldLeftWhile(readHandle, acc0, pred, foldFun)
113+
foldLeftRangeWhile(readHandle, endOff, acc0, pred, foldFun)
114114
} finally {
115115
readHandle.close()
116116
}
@@ -133,15 +133,16 @@ private[uberstore] class UberDataFile(dataFileName: String,
133133
}
134134

135135
@tailrec
136-
private def foldLeftWhile[T](readHandle: UberDataFileReadHandle, acc: T, pred: T => Boolean, foldFun: (T, OrderedEvent) => T): T = {
137-
if (!pred(acc)) {
136+
private def foldLeftRangeWhile[T](readHandle: UberDataFileReadHandle, maxOffset: Long, acc: T, pred: T => Boolean, foldFun: (T, OrderedEvent) => T): T = {
137+
val offset = readHandle.offset()
138+
if (offset > maxOffset || !pred(acc)) {
138139
acc
139140
} else {
140141
fileOps.readNext(readHandle) match {
141142
case None => acc
142143
case Some(bytes) =>
143144
val accNew = foldFun(acc, codec.deserialize(bytes))
144-
foldLeftWhile(readHandle, accNew, pred, foldFun)
145+
foldLeftRangeWhile(readHandle, maxOffset, accNew, pred, foldFun)
145146
}
146147
}
147148
}

src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/Segment.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -172,12 +172,12 @@ class Segment private[uberstore](val location: File,
172172
)
173173
}
174174

175-
def foldLeftWhile[T](startSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T =
176-
index.getOffsetRange(startSeq, Long.MaxValue) match {
175+
def foldLeftRangeWhile[T](startSeq: Long, endSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T =
176+
index.getOffsetRange(startSeq, endSeq) match {
177177
case (_, endOffset) if endOffset == -1 => // indicates an empty range
178178
acc0
179-
case (startOffset, _) =>
180-
dataFile.foldLeftWhile(startOffset)(acc0)(pred)(
179+
case (startOffset, endOffset) =>
180+
dataFile.foldLeftRangeWhile(startOffset, endOffset)(acc0)(pred)(
181181
(acc, evt) => foldFun(acc, evt)
182182
)
183183
}

src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/SegmentedUberStore.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -154,11 +154,11 @@ class SegmentedUberStore private[segmented] (base: JFile,
154154
/**
155155
* @inheritdoc
156156
*/
157-
def foldLeftWhile[T](startSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = {
157+
def foldLeftRangeWhile[T](startSeq: Long, endSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = {
158158
val res0 = readOnlyDirs.foldLeft(acc0)(
159-
(acc, dir) => dir.foldLeftWhile(startSeq)(acc)(pred)(foldFun)
159+
(acc, dir) => dir.foldLeftRangeWhile(startSeq, endSeq)(acc)(pred)(foldFun)
160160
)
161-
liveDir.foldLeftWhile(startSeq)(res0)(pred)(foldFun)
161+
liveDir.foldLeftRangeWhile(startSeq, endSeq)(res0)(pred)(foldFun)
162162
}
163163

164164
/**

src/main/scala/com/comcast/xfinity/sirius/writeaheadlog/CachedSiriusLog.scala

+31-5
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616
package com.comcast.xfinity.sirius.writeaheadlog
1717

1818
import com.comcast.xfinity.sirius.api.impl.OrderedEvent
19+
1920
import scala.collection.JavaConverters._
2021
import java.util.{TreeMap => JTreeMap}
22+
import scala.annotation.tailrec
2123

2224
object CachedSiriusLog {
2325
/**
@@ -89,27 +91,51 @@ class CachedSiriusLog(log: SiriusLog, maxCacheSize: Int) extends SiriusLog {
8991
* @param foldFun function that should take (accumulator, event) and return a new accumulator
9092
* @return the final value of the accumulator
9193
*/
92-
override def foldLeftRange[T](startSeq: Long, endSeq: Long)(acc0: T)(foldFun: (T, OrderedEvent) => T): T = {
94+
override def foldLeftRange[T](startSeq: Long, endSeq: Long)(acc0: T)(foldFun: (T, OrderedEvent) => T): T =
9395
(startSeq, endSeq) match {
9496
case (start, end) if (firstSeq <= start && end <= lastSeq) =>
9597
foldLeftRangeCached(start, end)(acc0)(foldFun)
9698
case (start, end) =>
9799
log.foldLeftRange(start, end)(acc0)(foldFun)
98100
}
99-
}
101+
102+
def foldLeftRangeWhile[T](startSeq: Long, endSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T =
103+
(startSeq, endSeq) match {
104+
case (start, end) if (firstSeq <= start && end <= lastSeq) =>
105+
foldLeftRangeWhileCached(start, end)(acc0)(pred)(foldFun)
106+
case (start, end) =>
107+
log.foldLeftRangeWhile(start, end)(acc0)(pred)(foldFun)
108+
}
100109

101110
/**
102111
* Private inner version of fold left. This one hits the cache, and assumes that start/endSeqs are
103112
* contained in the cache. Synchronizes on writeCache so we can subMap with no fear.
104113
*/
105114
private def foldLeftRangeCached[T](startSeq: Long, endSeq: Long)
106-
(acc0: T)(foldFun: (T, OrderedEvent) => T): T = writeCache.synchronized {
115+
(acc0: T)(foldFun: (T, OrderedEvent) => T): T = writeCache.synchronized {
107116
writeCache.subMap(startSeq, true, endSeq, true)
108117
.values.asScala.foldLeft(acc0)(foldFun)
109118
}
110119

111-
override def foldLeftWhile[T](startSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T =
112-
log.foldLeftWhile(startSeq)(acc0)(pred)(foldFun)
120+
private def foldLeftRangeWhileCached[T](startSeq: Long, endSeq: Long)
121+
(acc0: T)
122+
(pred: T => Boolean)
123+
(foldFun: (T, OrderedEvent) => T): T = writeCache.synchronized {
124+
val iterable = writeCache.subMap(startSeq, true, endSeq, true)
125+
.values.asScala
126+
foldLeftRangeWhileCached(iterable.iterator, acc0, pred, foldFun)
127+
}
128+
129+
@tailrec
130+
private def foldLeftRangeWhileCached[T](iter: Iterator[OrderedEvent], acc: T, pred: T => Boolean, foldFun: (T, OrderedEvent) => T): T = {
131+
if (!(pred(acc) && iter.hasNext)) {
132+
acc
133+
} else {
134+
val evt = iter.next()
135+
val newAcc = foldFun(acc, evt)
136+
foldLeftRangeWhileCached(iter, newAcc, pred, foldFun)
137+
}
138+
}
113139

114140
def getNextSeq = log.getNextSeq
115141

src/main/scala/com/comcast/xfinity/sirius/writeaheadlog/SiriusLog.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ trait SiriusLog {
6666
* @param pred condition to continue accumulating log entries
6767
* @param foldFun function to apply to the log entry, the result being the new accumulator
6868
*/
69-
def foldLeftWhile[T](startSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T
69+
def foldLeftRangeWhile[T](startSeq: Long, endSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T
7070

7171
/**
7272
* retrieves the next sequence number to be written

src/test/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisorTest.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class CatchupSupervisorTest extends NiceTest {
5353

5454
underTest ! InitiateCatchup(1L)
5555

56-
remote.expectMsgClass(classOf[GetLogSubrangeToLimit])
56+
remote.expectMsgClass(classOf[GetLogSubrangeWithLimit])
5757
}
5858
describe("for successful requests with a complete subrange") {
5959
it("should forward the message on to the parent") {
@@ -176,7 +176,7 @@ class CatchupSupervisorTest extends NiceTest {
176176

177177
underTest ! ContinueCatchup(1L)
178178

179-
remote.expectMsg(GetLogSubrangeToLimit(1L, 1L))
179+
remote.expectMsg(GetLogSubrangeWithLimit(1L, Long.MaxValue, 1L))
180180
}
181181
it("should ignore InitiateCatchup requests if it's currently in catchup mode") {
182182
val remote = TestProbe()
@@ -197,7 +197,7 @@ class CatchupSupervisorTest extends NiceTest {
197197
underTest ! StopCatchup
198198
underTest ! InitiateCatchup(1L)
199199

200-
remote.expectMsg(GetLogSubrangeToLimit(1L, 1L))
200+
remote.expectMsg(GetLogSubrangeWithLimit(1L, Long.MaxValue, 1L))
201201
}
202202
}
203203
}

0 commit comments

Comments
 (0)