Skip to content

Commit cc34439

Browse files
committed
Fix catchup with limit, optionally support via config
1 parent 23381d4 commit cc34439

File tree

6 files changed

+186
-156
lines changed

6 files changed

+186
-156
lines changed

src/main/scala/com/comcast/xfinity/sirius/api/SiriusConfiguration.scala

+10
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,16 @@ object SiriusConfiguration {
298298
*/
299299
final val CATCHUP_MAX_WINDOW_SIZE = "sirius.catchup.max-window-size"
300300

301+
/**
302+
* Use log limit for congestion avoidance for catchup. Default is false.
303+
*/
304+
final val CATCHUP_USE_LIMIT = "sirius.catchup.use-limit"
305+
306+
/**
307+
* Maximum catchup limit size, in number of events. Default is 1000.
308+
*/
309+
final val CATCHUP_MAX_LIMIT_SIZE = "sirius.catchup.max-limit-size"
310+
301311
/**
302312
* Starting ssthresh, which is the point where catchup transitions from Slow Start to
303313
* Congestion Avoidance. Default is 500.

src/main/scala/com/comcast/xfinity/sirius/api/impl/bridge/CatchupSupervisor.scala

+26-16
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ object CatchupSupervisor {
3838

3939
trait CatchupSupervisorInfoMBean {
4040
def getSSThresh: Int
41-
def getWindow: Int
41+
def getLimit: Int
4242
}
4343

4444
/**
@@ -52,9 +52,14 @@ object CatchupSupervisor {
5252
val timeoutCoeff = config.getDouble(SiriusConfiguration.CATCHUP_TIMEOUT_INCREASE_PER_EVENT, .01)
5353
val timeoutConst = config.getDouble(SiriusConfiguration.CATCHUP_TIMEOUT_BASE, 1.0)
5454
val maxWindowSize = config.getInt(SiriusConfiguration.CATCHUP_MAX_WINDOW_SIZE, 1000)
55+
val maxLimitSize = if (config.getProp(SiriusConfiguration.CATCHUP_USE_LIMIT, false)) {
56+
config.getProp[Int](SiriusConfiguration.CATCHUP_MAX_LIMIT_SIZE)
57+
} else {
58+
None
59+
}
5560
// must ensure ssthresh <= maxWindowSize
56-
val startingSSThresh = Math.min(maxWindowSize, config.getInt(SiriusConfiguration.CATCHUP_DEFAULT_SSTHRESH, 500))
57-
Props(new CatchupSupervisor(membershipHelper, timeoutCoeff, timeoutConst, maxWindowSize, startingSSThresh, config))
61+
val startingSSThresh = Math.min(maxLimitSize.getOrElse(maxWindowSize), config.getInt(SiriusConfiguration.CATCHUP_DEFAULT_SSTHRESH, 500))
62+
Props(new CatchupSupervisor(membershipHelper, timeoutCoeff, timeoutConst, maxWindowSize, maxLimitSize, startingSSThresh, config))
5863
}
5964
}
6065

@@ -78,51 +83,56 @@ private[bridge] class CatchupSupervisor(membershipHelper: MembershipHelper,
7883
timeoutCoeff: Double,
7984
timeoutConst: Double,
8085
maxWindowSize: Int,
86+
maxLimitSize: Option[Int],
8187
var ssthresh: Int,
8288
config: SiriusConfiguration) extends Actor with MonitoringHooks {
8389

84-
var window = 1
85-
def timeout() = (timeoutConst + (window * timeoutCoeff)).seconds
90+
var limit = 1
91+
def timeout() = (timeoutConst + (limit * timeoutCoeff)).seconds
8692

8793
implicit val executionContext = context.dispatcher
8894

8995
def receive = {
9096
case InitiateCatchup(fromSeq) =>
9197
membershipHelper.getRandomMember.map(remote => {
92-
requestSubrange(fromSeq, window, remote)
98+
requestSubrange(fromSeq, limit, remote)
9399
context.become(catchup(remote))
94100
})
95101
}
96102

97103
def catchup(source: ActorRef): Receive = {
98104
case CatchupRequestSucceeded(logSubrange: CompleteSubrange) =>
99-
if (window >= ssthresh) { // we're in Congestion Avoidance phase
100-
window = Math.min(window + 2, maxWindowSize)
105+
if (limit >= ssthresh) { // we're in Congestion Avoidance phase
106+
limit = Math.min(limit + 2, maxLimitSize.getOrElse(maxWindowSize))
101107
} else { // we're in Slow Start phase
102-
window = Math.min(window * 2, ssthresh)
108+
limit = Math.min(limit * 2, ssthresh)
103109
}
104110
context.parent ! logSubrange
105111

106112
case CatchupRequestSucceeded(logSubrange) =>
107113
context.parent ! logSubrange
108114

109115
case CatchupRequestFailed =>
110-
if (window != 1) {
116+
if (limit != 1) {
111117
// adjust ssthresh, revert to Slow Start phase
112-
ssthresh = Math.max(window / 2, 1)
113-
window = 1
118+
ssthresh = Math.max(limit / 2, 1)
119+
limit = 1
114120
}
115121
context.unbecome()
116122

117123
case ContinueCatchup(fromSeq: Long) =>
118-
requestSubrange(fromSeq, window, source)
124+
requestSubrange(fromSeq, limit, source)
119125

120126
case StopCatchup =>
121127
context.unbecome()
122128
}
123129

124-
def requestSubrange(fromSeq: Long, window: Int, source: ActorRef): Unit = {
125-
source.ask(GetLogSubrange(fromSeq, fromSeq + window))(timeout()).onComplete {
130+
def requestSubrange(fromSeq: Long, limit: Int, source: ActorRef): Unit = {
131+
val message = maxLimitSize match {
132+
case Some(_) => GetLogSubrangeWithLimit(fromSeq, fromSeq + maxWindowSize, limit)
133+
case None => GetLogSubrange(fromSeq, fromSeq + limit)
134+
}
135+
source.ask(message)(timeout()).onComplete {
126136
case Success(logSubrange: LogSubrange) => self ! CatchupRequestSucceeded(logSubrange)
127137
case _ => self ! CatchupRequestFailed
128138
}
@@ -137,6 +147,6 @@ private[bridge] class CatchupSupervisor(membershipHelper: MembershipHelper,
137147

138148
class CatchupSupervisorInfo extends CatchupSupervisorInfoMBean {
139149
def getSSThresh = ssthresh
140-
def getWindow = window
150+
def getLimit = limit
141151
}
142152
}

src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala

+36-88
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ object SiriusPersistenceActor {
3030
* that is, the state of persisted data
3131
*/
3232
sealed trait LogQuery
33+
sealed trait LogQuerySubrange extends LogQuery {
34+
def begin: Long
35+
def end: Long
36+
}
3337

3438
case object GetLogSize extends LogQuery
3539
/**
@@ -42,7 +46,7 @@ object SiriusPersistenceActor {
4246
* @param begin first sequence number of the range
4347
* @param end last sequence number of the range, inclusive
4448
*/
45-
case class GetLogSubrange(begin: Long, end: Long) extends LogQuery
49+
case class GetLogSubrange(begin: Long, end: Long) extends LogQuerySubrange
4650
/**
4751
* Message for directly requesting a chunk of the log from a node.
4852
*
@@ -54,7 +58,7 @@ object SiriusPersistenceActor {
5458
* @param end last sequence number of the range, inclusive
5559
* @param limit the maximum number of events
5660
*/
57-
case class GetLogSubrangeWithLimit(begin: Long, end: Option[Long], limit: Long) extends LogQuery
61+
case class GetLogSubrangeWithLimit(begin: Long, end: Long, limit: Long) extends LogQuerySubrange
5862

5963
trait LogSubrange
6064
trait PopulatedSubrange extends LogSubrange {
@@ -143,14 +147,11 @@ class SiriusPersistenceActor(stateActor: ActorRef,
143147

144148
lastWriteTime = thisWriteTime
145149

146-
case GetLogSubrangeWithLimit(start, Some(end), limit) =>
147-
sender ! queryLimitedSubrange(start, end, limit)
148-
149-
case GetLogSubrangeWithLimit(start, None, limit) =>
150-
sender ! queryLimited(start, limit)
151-
152150
case GetLogSubrange(start, end) =>
153-
sender ! querySubrange(start, end)
151+
sender ! querySubrange(start, end, Long.MaxValue)
152+
153+
case GetLogSubrangeWithLimit(start, end, limit) =>
154+
sender ! querySubrange(start, end, limit)
154155

155156
case GetNextLogSeq =>
156157
sender ! siriusLog.getNextSeq
@@ -163,99 +164,46 @@ class SiriusPersistenceActor(stateActor: ActorRef,
163164
case _: SiriusResult =>
164165
}
165166

166-
private def queryLimitedSubrange(rangeStart: Long, rangeEnd: Long, limit: Long): LogSubrange =
167-
if (limit <= 0 || rangeEnd < rangeStart || rangeEnd <= 0) {
168-
// invalid query subrange or limit, we can't send anything useful back
167+
private def querySubrange(rangeStart: Long, rangeEnd: Long, limit: Long): LogSubrange = {
168+
val nextSeq = siriusLog.getNextSeq
169+
val lastSeq = nextSeq - 1
170+
if (limit <= 0 || rangeEnd < rangeStart || rangeEnd <= 0 || rangeStart > lastSeq) {
171+
// parameters are out of range, can't return anything useful
169172
EmptySubrange
170-
} else if (limit > (rangeEnd - rangeStart)) {
171-
// limit is larger than the subrange window, no need to enforce limit
172-
querySubrange(rangeStart, rangeEnd)
173173
} else {
174-
val nextSeq = siriusLog.getNextSeq
175-
if (rangeStart >= nextSeq) {
176-
// query is out of range, we can't send anything useful back
177-
EmptySubrange
178-
} else if (rangeEnd >= nextSeq) {
179-
// we can only answer partially
180-
val lastSeq = nextSeq - 1
181-
val buffer = siriusLog.foldLeftRangeWhile(rangeStart, lastSeq)(ListBuffer.empty[OrderedEvent])(
182-
// continue folding events as long as the buffer is smaller than the limit
183-
buffer => buffer.size < limit
184-
)(
185-
(acc, event) => acc += event
186-
)
187-
if (buffer.size < limit) {
188-
PartialSubrange(rangeStart, lastSeq, buffer.toList)
189-
} else {
190-
PartialSubrange(rangeStart, buffer.last.sequence, buffer.toList)
191-
}
192-
} else {
193-
val buffer = siriusLog.foldLeftRangeWhile(rangeStart, rangeEnd)(ListBuffer.empty[OrderedEvent])(
194-
// continue folding events as long as the buffer is smaller than the limit
195-
buffer => buffer.size < limit
196-
)(
197-
(acc, event) => acc += event
198-
)
199-
if (buffer.size < limit) {
200-
CompleteSubrange(rangeStart, rangeEnd, buffer.toList)
174+
val endSeq = if (rangeEnd > lastSeq) lastSeq else rangeEnd
175+
if (limit > (endSeq - rangeStart)) {
176+
// the limit is larger than the subrange window, so do not enforce
177+
val events = siriusLog.foldLeftRange(rangeStart, endSeq)(ListBuffer.empty[OrderedEvent])(
178+
(acc, evt) => acc += evt
179+
).toList
180+
if (endSeq < rangeEnd) {
181+
// the end of the range extends beyond the end of the log, so can only partially answer
182+
PartialSubrange(rangeStart, endSeq, events)
201183
} else {
202-
PartialSubrange(rangeStart, buffer.last.sequence, buffer.toList)
184+
// the range is entirely within the log, so can fully answer
185+
CompleteSubrange(rangeStart, endSeq, events)
203186
}
204-
}
205-
}
206-
207-
private def queryLimited(rangeStart: Long, limit: Long): LogSubrange = {
208-
if (limit <= 0) {
209-
// invalid query subrange or limit, we can't send anything useful back
210-
EmptySubrange
211-
} else {
212-
val nextSeq = siriusLog.getNextSeq
213-
if (rangeStart >= nextSeq) {
214-
// query is out of range, we can't send anything useful back
215-
EmptySubrange
216187
} else {
217-
val nextSeq = siriusLog.getNextSeq
218-
val lastSeq = nextSeq - 1
219-
val buffer = siriusLog.foldLeftRangeWhile(rangeStart, lastSeq)(ListBuffer.empty[OrderedEvent])(
220-
// continue folding events as long as the buffer is smaller than the limit
188+
// the limit is smaller than the subrange window
189+
val buffer = siriusLog.foldLeftRangeWhile(rangeStart, endSeq)(ListBuffer.empty[OrderedEvent])(
221190
buffer => buffer.size < limit
222191
)(
223-
(acc, event) => acc += event
192+
(acc, evt) => acc += evt
224193
)
225-
if (buffer.size < limit) {
226-
CompleteSubrange(rangeStart, lastSeq, buffer.toList)
194+
if (buffer.size < limit && endSeq < rangeEnd) {
195+
// the end of the subrange extended part the end of the log
196+
// and the buffer was not filled to the limit, so we can only partially respond
197+
PartialSubrange(rangeStart, endSeq, buffer.toList)
227198
} else {
228-
PartialSubrange(rangeStart, buffer.last.sequence, buffer.toList)
199+
// the buffer was filled to the limit, so completely respond using the sequence of the
200+
// last event as the end of the range
201+
CompleteSubrange(rangeStart, buffer.last.sequence, buffer.toList)
229202
}
230203
}
231204
}
232205
}
233206

234-
private def querySubrange(rangeStart: Long, rangeEnd: Long): LogSubrange =
235-
if (rangeEnd < rangeStart || rangeEnd <= 0) {
236-
// invalid query subrange or limit, we can't send anything useful back
237-
EmptySubrange
238-
} else {
239-
val nextSeq = siriusLog.getNextSeq
240-
if (rangeStart >= nextSeq) {
241-
// query is out of range, we can't send anything useful back
242-
EmptySubrange
243-
} else if (rangeEnd >= nextSeq) {
244-
// we can answer partially
245-
val lastSeq = nextSeq - 1
246-
val buffer = siriusLog.foldLeftRange(rangeStart, lastSeq)(ListBuffer.empty[OrderedEvent])(
247-
(acc, event) => acc += event
248-
)
249-
PartialSubrange(rangeStart, lastSeq, buffer.toList)
250-
} else {
251-
// we can answer fully
252-
val buffer = siriusLog.foldLeftRange(rangeStart, rangeEnd)(ListBuffer.empty[OrderedEvent])(
253-
(acc, event) => acc += event
254-
)
255-
CompleteSubrange(rangeStart, rangeEnd, buffer.toList)
256-
}
257-
}
258-
259207
/**
260208
* Monitoring hooks
261209
*/

0 commit comments

Comments
 (0)