Skip to content

Commit b69ca49

Browse files
authored
Add GetLogSubrangeWithLimit message to query log ranges by window and maximum events (#166)
* Add GetLogRangeLimit message to read log ranges by limit of events * fix test * add tests, fix segment empty range issue * Update CatchupSupervisor * Refactor to make limited subrange include end sequence * nix refactor of CatchupSupervisor, for now * Make end sequence optional * Fix catchup with limit, optionally support via config
1 parent fd98efa commit b69ca49

18 files changed

+475
-94
lines changed

src/main/scala/com/comcast/xfinity/sirius/api/SiriusConfiguration.scala

+10
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,16 @@ object SiriusConfiguration {
298298
*/
299299
final val CATCHUP_MAX_WINDOW_SIZE = "sirius.catchup.max-window-size"
300300

301+
/**
302+
* Use log limit for congestion avoidance for catchup. Default is false.
303+
*/
304+
final val CATCHUP_USE_LIMIT = "sirius.catchup.use-limit"
305+
306+
/**
307+
* Maximum catchup limit size, in number of events. Default is 1000.
308+
*/
309+
final val CATCHUP_MAX_LIMIT_SIZE = "sirius.catchup.max-limit-size"
310+
301311
/**
302312
* Starting ssthresh, which is the point where catchup transitions from Slow Start to
303313
* Congestion Avoidance. Default is 500.

src/main/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisor.scala

+30-18
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,16 @@
1515
*/
1616
package com.comcast.xfinity.sirius.api.impl.bridge
1717

18-
import akka.actor.{ActorRef, Props, Actor}
18+
import akka.actor.{Actor, ActorRef, Props}
1919
import akka.pattern.ask
20+
2021
import scala.concurrent.duration._
2122
import com.comcast.xfinity.sirius.api.impl.membership.MembershipHelper
2223
import com.comcast.xfinity.sirius.api.SiriusConfiguration
2324
import com.comcast.xfinity.sirius.api.impl.bridge.CatchupSupervisor.CatchupSupervisorInfoMBean
24-
import com.comcast.xfinity.sirius.api.impl.state.SiriusPersistenceActor.{GetLogSubrange, LogSubrange, CompleteSubrange}
25+
import com.comcast.xfinity.sirius.api.impl.state.SiriusPersistenceActor.{CompleteSubrange, GetLogSubrange, GetLogSubrangeWithLimit, LogSubrange}
2526
import com.comcast.xfinity.sirius.admin.MonitoringHooks
27+
2628
import scala.util.Success
2729

2830
case class InitiateCatchup(fromSeq: Long)
@@ -36,7 +38,7 @@ object CatchupSupervisor {
3638

3739
trait CatchupSupervisorInfoMBean {
3840
def getSSThresh: Int
39-
def getWindow: Int
41+
def getLimit: Int
4042
}
4143

4244
/**
@@ -50,9 +52,14 @@ object CatchupSupervisor {
5052
val timeoutCoeff = config.getDouble(SiriusConfiguration.CATCHUP_TIMEOUT_INCREASE_PER_EVENT, .01)
5153
val timeoutConst = config.getDouble(SiriusConfiguration.CATCHUP_TIMEOUT_BASE, 1.0)
5254
val maxWindowSize = config.getInt(SiriusConfiguration.CATCHUP_MAX_WINDOW_SIZE, 1000)
55+
val maxLimitSize = if (config.getProp(SiriusConfiguration.CATCHUP_USE_LIMIT, false)) {
56+
config.getProp[Int](SiriusConfiguration.CATCHUP_MAX_LIMIT_SIZE)
57+
} else {
58+
None
59+
}
5360
// must ensure ssthresh <= maxWindowSize
54-
val startingSSThresh = Math.min(maxWindowSize, config.getInt(SiriusConfiguration.CATCHUP_DEFAULT_SSTHRESH, 500))
55-
Props(new CatchupSupervisor(membershipHelper, timeoutCoeff, timeoutConst, maxWindowSize, startingSSThresh, config))
61+
val startingSSThresh = Math.min(maxLimitSize.getOrElse(maxWindowSize), config.getInt(SiriusConfiguration.CATCHUP_DEFAULT_SSTHRESH, 500))
62+
Props(new CatchupSupervisor(membershipHelper, timeoutCoeff, timeoutConst, maxWindowSize, maxLimitSize, startingSSThresh, config))
5663
}
5764
}
5865

@@ -76,51 +83,56 @@ private[bridge] class CatchupSupervisor(membershipHelper: MembershipHelper,
7683
timeoutCoeff: Double,
7784
timeoutConst: Double,
7885
maxWindowSize: Int,
86+
maxLimitSize: Option[Int],
7987
var ssthresh: Int,
8088
config: SiriusConfiguration) extends Actor with MonitoringHooks {
8189

82-
var window = 1
83-
def timeout() = (timeoutConst + (window * timeoutCoeff)).seconds
90+
var limit = 1
91+
def timeout() = (timeoutConst + (limit * timeoutCoeff)).seconds
8492

8593
implicit val executionContext = context.dispatcher
8694

8795
def receive = {
8896
case InitiateCatchup(fromSeq) =>
8997
membershipHelper.getRandomMember.map(remote => {
90-
requestSubrange(fromSeq, window, remote)
98+
requestSubrange(fromSeq, limit, remote)
9199
context.become(catchup(remote))
92100
})
93101
}
94102

95103
def catchup(source: ActorRef): Receive = {
96104
case CatchupRequestSucceeded(logSubrange: CompleteSubrange) =>
97-
if (window >= ssthresh) { // we're in Congestion Avoidance phase
98-
window = Math.min(window + 2, maxWindowSize)
105+
if (limit >= ssthresh) { // we're in Congestion Avoidance phase
106+
limit = Math.min(limit + 2, maxLimitSize.getOrElse(maxWindowSize))
99107
} else { // we're in Slow Start phase
100-
window = Math.min(window * 2, ssthresh)
108+
limit = Math.min(limit * 2, ssthresh)
101109
}
102110
context.parent ! logSubrange
103111

104112
case CatchupRequestSucceeded(logSubrange) =>
105113
context.parent ! logSubrange
106114

107115
case CatchupRequestFailed =>
108-
if (window != 1) {
116+
if (limit != 1) {
109117
// adjust ssthresh, revert to Slow Start phase
110-
ssthresh = Math.max(window / 2, 1)
111-
window = 1
118+
ssthresh = Math.max(limit / 2, 1)
119+
limit = 1
112120
}
113121
context.unbecome()
114122

115123
case ContinueCatchup(fromSeq: Long) =>
116-
requestSubrange(fromSeq, window, source)
124+
requestSubrange(fromSeq, limit, source)
117125

118126
case StopCatchup =>
119127
context.unbecome()
120128
}
121129

122-
def requestSubrange(fromSeq: Long, window: Int, source: ActorRef): Unit = {
123-
source.ask(GetLogSubrange(fromSeq, fromSeq + window))(timeout()).onComplete {
130+
def requestSubrange(fromSeq: Long, limit: Int, source: ActorRef): Unit = {
131+
val message = maxLimitSize match {
132+
case Some(_) => GetLogSubrangeWithLimit(fromSeq, fromSeq + maxWindowSize, limit)
133+
case None => GetLogSubrange(fromSeq, fromSeq + limit)
134+
}
135+
source.ask(message)(timeout()).onComplete {
124136
case Success(logSubrange: LogSubrange) => self ! CatchupRequestSucceeded(logSubrange)
125137
case _ => self ! CatchupRequestFailed
126138
}
@@ -135,6 +147,6 @@ private[bridge] class CatchupSupervisor(membershipHelper: MembershipHelper,
135147

136148
class CatchupSupervisorInfo extends CatchupSupervisorInfoMBean {
137149
def getSSThresh = ssthresh
138-
def getWindow = window
150+
def getLimit = limit
139151
}
140152
}

src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala

+64-16
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,25 @@
1515
*/
1616
package com.comcast.xfinity.sirius.api.impl.state
1717

18-
import akka.actor.{Props, Actor, ActorRef}
18+
import akka.actor.{Actor, ActorRef, Props}
1919
import com.comcast.xfinity.sirius.writeaheadlog.SiriusLog
2020
import com.comcast.xfinity.sirius.api.{SiriusConfiguration, SiriusResult}
2121
import com.comcast.xfinity.sirius.api.impl._
2222
import com.comcast.xfinity.sirius.admin.MonitoringHooks
2323

24+
import scala.collection.mutable.ListBuffer
25+
2426
object SiriusPersistenceActor {
2527

2628
/**
2729
* Trait encapsulating _queries_ into Sirius's log's state,
2830
* that is, the state of persisted data
2931
*/
3032
sealed trait LogQuery
33+
sealed trait LogQuerySubrange extends LogQuery {
34+
def begin: Long
35+
def end: Long
36+
}
3137

3238
case object GetLogSize extends LogQuery
3339
/**
@@ -40,7 +46,19 @@ object SiriusPersistenceActor {
4046
* @param begin first sequence number of the range
4147
* @param end last sequence number of the range, inclusive
4248
*/
43-
case class GetLogSubrange(begin: Long, end: Long) extends LogQuery
49+
case class GetLogSubrange(begin: Long, end: Long) extends LogQuerySubrange
50+
/**
51+
* Message for directly requesting a chunk of the log from a node.
52+
*
53+
* SiriusPersistenceActor is expected to reply with a LogSubrange
54+
* when receiving this message. This range should be as complete
55+
* as possible.
56+
*
57+
* @param begin first sequence number of the range
58+
* @param end last sequence number of the range, inclusive
59+
* @param limit the maximum number of events
60+
*/
61+
case class GetLogSubrangeWithLimit(begin: Long, end: Long, limit: Long) extends LogQuerySubrange
4462

4563
trait LogSubrange
4664
trait PopulatedSubrange extends LogSubrange {
@@ -129,21 +147,11 @@ class SiriusPersistenceActor(stateActor: ActorRef,
129147

130148
lastWriteTime = thisWriteTime
131149

132-
case GetLogSubrange(rangeStart, rangeEnd) if rangeEnd < siriusLog.getNextSeq => // we can answer fully
133-
val events = siriusLog.foldLeftRange(rangeStart, rangeEnd)(List[OrderedEvent]())(
134-
(acc, event) => event :: acc
135-
).reverse
136-
sender ! CompleteSubrange(rangeStart, rangeEnd, events)
150+
case GetLogSubrange(start, end) =>
151+
sender ! querySubrange(start, end, Long.MaxValue)
137152

138-
case GetLogSubrange(rangeStart, rangeEnd) if siriusLog.getNextSeq <= rangeStart => // we can't send anything useful back
139-
sender ! EmptySubrange
140-
141-
case GetLogSubrange(rangeStart, rangeEnd) => // we can respond partially
142-
val lastSeq = siriusLog.getNextSeq - 1
143-
val events = siriusLog.foldLeftRange(rangeStart, lastSeq)(List[OrderedEvent]())(
144-
(acc, event) => event :: acc
145-
).reverse
146-
sender ! PartialSubrange(rangeStart, lastSeq, events)
153+
case GetLogSubrangeWithLimit(start, end, limit) =>
154+
sender ! querySubrange(start, end, limit)
147155

148156
case GetNextLogSeq =>
149157
sender ! siriusLog.getNextSeq
@@ -156,6 +164,46 @@ class SiriusPersistenceActor(stateActor: ActorRef,
156164
case _: SiriusResult =>
157165
}
158166

167+
private def querySubrange(rangeStart: Long, rangeEnd: Long, limit: Long): LogSubrange = {
168+
val nextSeq = siriusLog.getNextSeq
169+
val lastSeq = nextSeq - 1
170+
if (limit <= 0 || rangeEnd < rangeStart || rangeEnd <= 0 || rangeStart > lastSeq) {
171+
// parameters are out of range, can't return anything useful
172+
EmptySubrange
173+
} else {
174+
val endSeq = if (rangeEnd > lastSeq) lastSeq else rangeEnd
175+
if (limit > (endSeq - rangeStart)) {
176+
// the limit is larger than the subrange window, so do not enforce
177+
val events = siriusLog.foldLeftRange(rangeStart, endSeq)(ListBuffer.empty[OrderedEvent])(
178+
(acc, evt) => acc += evt
179+
).toList
180+
if (endSeq < rangeEnd) {
181+
// the end of the range extends beyond the end of the log, so can only partially answer
182+
PartialSubrange(rangeStart, endSeq, events)
183+
} else {
184+
// the range is entirely within the log, so can fully answer
185+
CompleteSubrange(rangeStart, endSeq, events)
186+
}
187+
} else {
188+
// the limit is smaller than the subrange window
189+
val buffer = siriusLog.foldLeftRangeWhile(rangeStart, endSeq)(ListBuffer.empty[OrderedEvent])(
190+
buffer => buffer.size < limit
191+
)(
192+
(acc, evt) => acc += evt
193+
)
194+
if (buffer.size < limit && endSeq < rangeEnd) {
195+
// the end of the subrange extended part the end of the log
196+
// and the buffer was not filled to the limit, so we can only partially respond
197+
PartialSubrange(rangeStart, endSeq, buffer.toList)
198+
} else {
199+
// the buffer was filled to the limit, so completely respond using the sequence of the
200+
// last event as the end of the range
201+
CompleteSubrange(rangeStart, buffer.last.sequence, buffer.toList)
202+
}
203+
}
204+
}
205+
}
206+
159207
/**
160208
* Monitoring hooks
161209
*/

src/main/scala/com/comcast/xfinity/sirius/uberstore/UberPair.scala

+5
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,11 @@ class UberPair(dataFile: UberDataFile, index: SeqIndex) {
107107
)
108108
}
109109

110+
def foldLeftRangeWhile[T](startSeq: Long, endSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = {
111+
val (startOffset, endOffset) = index.getOffsetRange(startSeq, endSeq)
112+
dataFile.foldLeftRangeWhile(startOffset, endOffset)(acc0)(pred)(foldFun)
113+
}
114+
110115
/**
111116
* Close underlying file handles or connections. This UberStoreFilePair should not be used after
112117
* close is called.

src/main/scala/com/comcast/xfinity/sirius/uberstore/UberStore.scala

+7
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,13 @@ class UberStore private[uberstore] (baseDir: String, uberpair: UberPair) extends
8282
def foldLeftRange[T](startSeq: Long, endSeq: Long)(acc0: T)(foldFun: (T, OrderedEvent) => T): T =
8383
uberpair.foldLeftRange(startSeq, endSeq)(acc0)(foldFun)
8484

85+
/**
86+
* @inheritdoc
87+
*/
88+
def foldLeftRangeWhile[T](startSeq: Long, endSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = {
89+
uberpair.foldLeftRangeWhile(startSeq, endSeq)(acc0)(pred)(foldFun)
90+
}
91+
8592
/**
8693
* Close underlying file handles or connections. This UberStore should not be used after
8794
* close is called.

src/main/scala/com/comcast/xfinity/sirius/uberstore/data/UberDataFile.scala

+27-3
Original file line numberDiff line numberDiff line change
@@ -101,15 +101,24 @@ private[uberstore] class UberDataFile(dataFileName: String,
101101
def foldLeftRange[T](baseOff: Long, endOff: Long)(acc0: T)(foldFun: (T, Long, OrderedEvent) => T): T = {
102102
val readHandle = fileHandleFactory.createReadHandle(dataFileName, baseOff)
103103
try {
104-
foldLeftUntil(readHandle, endOff, acc0, foldFun)
104+
foldLeftUntilOffset(readHandle, endOff, acc0, foldFun)
105+
} finally {
106+
readHandle.close()
107+
}
108+
}
109+
110+
def foldLeftRangeWhile[T](baseOff:Long, endOff: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = {
111+
val readHandle = fileHandleFactory.createReadHandle(dataFileName, baseOff)
112+
try {
113+
foldLeftRangeWhile(readHandle, endOff, acc0, pred, foldFun)
105114
} finally {
106115
readHandle.close()
107116
}
108117
}
109118

110119
// private low low low level fold left
111120
@tailrec
112-
private def foldLeftUntil[T](readHandle: UberDataFileReadHandle, maxOffset: Long, acc: T, foldFun: (T, Long, OrderedEvent) => T): T = {
121+
private def foldLeftUntilOffset[T](readHandle: UberDataFileReadHandle, maxOffset: Long, acc: T, foldFun: (T, Long, OrderedEvent) => T): T = {
113122
val offset = readHandle.offset()
114123
if (offset > maxOffset) {
115124
acc
@@ -118,7 +127,22 @@ private[uberstore] class UberDataFile(dataFileName: String,
118127
case None => acc
119128
case Some(bytes) =>
120129
val accNew = foldFun(acc, offset, codec.deserialize(bytes))
121-
foldLeftUntil(readHandle, maxOffset, accNew, foldFun)
130+
foldLeftUntilOffset(readHandle, maxOffset, accNew, foldFun)
131+
}
132+
}
133+
}
134+
135+
@tailrec
136+
private def foldLeftRangeWhile[T](readHandle: UberDataFileReadHandle, maxOffset: Long, acc: T, pred: T => Boolean, foldFun: (T, OrderedEvent) => T): T = {
137+
val offset = readHandle.offset()
138+
if (offset > maxOffset || !pred(acc)) {
139+
acc
140+
} else {
141+
fileOps.readNext(readHandle) match {
142+
case None => acc
143+
case Some(bytes) =>
144+
val accNew = foldFun(acc, codec.deserialize(bytes))
145+
foldLeftRangeWhile(readHandle, maxOffset, accNew, pred, foldFun)
122146
}
123147
}
124148
}

src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/Segment.scala

+17-2
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ class Segment private[uberstore](val location: File,
107107
* Get the number of entries written to the Segment
108108
* @return number of entries in the Segment
109109
*/
110-
def size = index.size
110+
def size: Long = index.size
111111

112112
/**
113113
* Write OrderedEvent event into this dir. Will fail if closed or sequence
@@ -136,10 +136,15 @@ class Segment private[uberstore](val location: File,
136136
(acc, event) => acc + event.request.key
137137
)
138138

139+
/**
140+
* Get the first sequence number in this dir.
141+
*/
142+
def getFirstSeq: Option[Long] = index.getMinSeq
143+
139144
/**
140145
* Get the next possible sequence number in this dir.
141146
*/
142-
def getNextSeq = index.getMaxSeq match {
147+
def getNextSeq: Long = index.getMaxSeq match {
143148
case None => 1L
144149
case Some(seq) => seq + 1
145150
}
@@ -167,6 +172,16 @@ class Segment private[uberstore](val location: File,
167172
)
168173
}
169174

175+
def foldLeftRangeWhile[T](startSeq: Long, endSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T =
176+
index.getOffsetRange(startSeq, endSeq) match {
177+
case (_, endOffset) if endOffset == -1 => // indicates an empty range
178+
acc0
179+
case (startOffset, endOffset) =>
180+
dataFile.foldLeftRangeWhile(startOffset, endOffset)(acc0)(pred)(
181+
(acc, evt) => foldFun(acc, evt)
182+
)
183+
}
184+
170185
/**
171186
* Close underlying file handles or connections. This Segment should not be used after
172187
* close is called.

src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/SegmentedUberStore.scala

+9
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,15 @@ class SegmentedUberStore private[segmented] (base: JFile,
151151
liveDir.foldLeftRange(startSeq, endSeq)(res0)(foldFun)
152152
}
153153

154+
/**
155+
* @inheritdoc
156+
*/
157+
def foldLeftRangeWhile[T](startSeq: Long, endSeq: Long)(acc0: T)(pred: T => Boolean)(foldFun: (T, OrderedEvent) => T): T = {
158+
val res0 = readOnlyDirs.foldLeft(acc0)(
159+
(acc, dir) => dir.foldLeftRangeWhile(startSeq, endSeq)(acc)(pred)(foldFun)
160+
)
161+
liveDir.foldLeftRangeWhile(startSeq, endSeq)(res0)(pred)(foldFun)
162+
}
154163

155164
/**
156165
* Close underlying file handles or connections. This SegmentedUberStore should not be used after

0 commit comments

Comments
 (0)