Skip to content

Commit fa4f643

Browse files
authored
Adds parallel flag to onBootstrapStarted notification (#164)
* Add flag to indicate to the request handler that bootstrap is in parallel mode * Unit tests * nix error handling, allow to bubble up * Rename to writesEnabled * Fix writesEnabled, make parallel flag a non-breaking change * Bump minor version
1 parent 5d3dbed commit fa4f643

File tree

6 files changed

+63
-29
lines changed

6 files changed

+63
-29
lines changed

build.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import java.lang.{Runtime => JRuntime}
1717
name := "sirius"
1818

1919
versionScheme := Some("semver-spec")
20-
version := "2.5.0"
20+
version := "2.5.1"
2121
ThisBuild / tlBaseVersion := "2.5"
2222

2323
scalaVersion := "2.13.6"

src/main/scala/com/comcast/xfinity/sirius/api/AbstractParallelBootstrapRequestHandler.scala

+14-11
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,13 @@ import java.util.function.BiFunction
66
abstract class AbstractParallelBootstrapRequestHandler[K, M] extends RequestHandler {
77
private var sequences: Option[ConcurrentHashMap[K, Long]] = None
88

9-
final override def onBootstrapStarting(): Unit = {
10-
onBootstrapStartingImpl()
11-
sequences = Some(new ConcurrentHashMap[K, Long]())
9+
final override def onBootstrapStarting(parallel: Boolean): Unit = {
10+
onBootstrapStartingImpl(parallel)
11+
12+
// if in parallel bootstrap mode then create the map used to track sequence by key
13+
sequences = if (parallel)
14+
Some(new ConcurrentHashMap[K, Long]())
15+
else None
1216
}
1317

1418
final override def onBootstrapComplete(): Unit = {
@@ -17,19 +21,18 @@ abstract class AbstractParallelBootstrapRequestHandler[K, M] extends RequestHand
1721
}
1822

1923
final override def handleGet(key: String): SiriusResult =
20-
if (enabled()) handleGetImpl(createKey(key))
21-
else SiriusResult.none()
24+
handleGetImpl(createKey(key))
2225

2326
final override def handlePut(key: String, body: Array[Byte]): SiriusResult =
24-
if (enabled()) handlePutImpl(createKey(key), deserialize(body))
27+
if (writesEnabled()) handlePutImpl(createKey(key), deserialize(body))
2528
else SiriusResult.none()
2629

2730
final override def handleDelete(key: String): SiriusResult =
28-
if (enabled()) handleDeleteImpl(createKey(key))
31+
if (writesEnabled()) handleDeleteImpl(createKey(key))
2932
else SiriusResult.none()
3033

3134
final override def handlePut(sequence: Long, key: String, body: Array[Byte]): SiriusResult =
32-
if (enabled())
35+
if (writesEnabled())
3336
sequences match {
3437
case Some(map) =>
3538
val k = createKey(key)
@@ -74,11 +77,11 @@ abstract class AbstractParallelBootstrapRequestHandler[K, M] extends RequestHand
7477
case None => handleDeleteImpl(sequence, createKey(key))
7578
}
7679

77-
protected def enabled(): Boolean
80+
protected def writesEnabled(): Boolean = true
7881
protected def createKey(key: String): K
79-
protected def deserialize(body: Array[Byte]): M
82+
@throws[Exception] protected def deserialize(body: Array[Byte]): M
8083

81-
def onBootstrapStartingImpl(): Unit = { }
84+
def onBootstrapStartingImpl(parallel: Boolean): Unit = { }
8285
def onBootstrapCompletedImpl(): Unit = { }
8386
def handleGetImpl(key: K): SiriusResult
8487
def handlePutImpl(key: K, body: M): SiriusResult

src/main/scala/com/comcast/xfinity/sirius/api/ParallelBootstrapRequestHandler.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ object ParallelBootstrapRequestHandler {
66
}
77

88
class ParallelBootstrapRequestHandler(val requestHandler: RequestHandler) extends AbstractParallelBootstrapRequestHandler[String, Array[Byte]] {
9-
override protected def enabled(): Boolean = true
9+
override protected def writesEnabled(): Boolean = true
1010
override protected def createKey(key: String): String = key
1111
override protected def deserialize(body: Array[Byte]): Array[Byte] = body
1212
override def handleGetImpl(key: String): SiriusResult = requestHandler.handleGet(key)
@@ -15,6 +15,6 @@ class ParallelBootstrapRequestHandler(val requestHandler: RequestHandler) extend
1515
override def handlePutImpl(sequence: Long, key: String, body: Array[Byte]): SiriusResult = requestHandler.handlePut(sequence, key, body)
1616
override def handleDeleteImpl(sequence: Long, key: String): SiriusResult = requestHandler.handleDelete(sequence, key)
1717

18-
override def onBootstrapStartingImpl(): Unit = requestHandler.onBootstrapStarting()
18+
override def onBootstrapStartingImpl(parallel: Boolean): Unit = requestHandler.onBootstrapStarting(parallel)
1919
override def onBootstrapCompletedImpl(): Unit = requestHandler.onBootstrapComplete()
2020
}

src/main/scala/com/comcast/xfinity/sirius/api/RequestHandler.scala

+8
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,17 @@ trait RequestHandler {
9494

9595
/**
9696
* Indicates that the Sirius bootstrap from the Uberstore is starting
97+
*
9798
*/
9899
def onBootstrapStarting(): Unit = { }
99100

101+
/**
102+
* Indicates that the Sirius bootstrap from the Uberstore is starting
103+
*
104+
* @param parallel whether bootstrap is in parallel mode
105+
*/
106+
def onBootstrapStarting(parallel: Boolean): Unit = onBootstrapStarting()
107+
100108
/**
101109
* Indicates that the Sirius bootstrap from Uberstore has completed
102110
*/

src/main/scala/com/comcast/xfinity/sirius/api/impl/state/StateSup.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ class StateSup(requestHandler: RequestHandler,
122122
val parallel = config.getProp(SiriusConfiguration.LOG_PARALLEL_ENABLED, default = false)
123123
val start = System.currentTimeMillis
124124
logger.info("Beginning SiriusLog replay at {}", start)
125-
requestHandler.onBootstrapStarting()
125+
requestHandler.onBootstrapStarting(parallel)
126126
if (parallel) {
127127
siriusLog.parallelForeach(bootstrapEvent)
128128
} else {

src/test/scala/com/comcast/xfinity/sirius/api/ParallelBootstrapRequestHandlerTest.scala

+37-14
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
package com.comcast.xfinity.sirius.api
22

33
import com.comcast.xfinity.sirius.NiceTest
4-
import org.mockito.Mockito.{verify, verifyNoMoreInteractions, when}
4+
import org.mockito.Mockito
5+
import org.mockito.Mockito.{inOrder, verify, verifyNoMoreInteractions, when}
56

67
class ParallelBootstrapRequestHandlerTest extends NiceTest {
78

@@ -70,9 +71,9 @@ class ParallelBootstrapRequestHandlerTest extends NiceTest {
7071
val requestHandler = mock[RequestHandler]
7172

7273
val underTest = ParallelBootstrapRequestHandler(requestHandler)
73-
underTest.onBootstrapStarting()
74+
underTest.onBootstrapStarting(true)
7475

75-
verify(requestHandler).onBootstrapStarting()
76+
verify(requestHandler).onBootstrapStarting(true)
7677
verifyNoMoreInteractions(requestHandler)
7778
}
7879
it("onBootstrapComplete") {
@@ -86,36 +87,58 @@ class ParallelBootstrapRequestHandlerTest extends NiceTest {
8687
}
8788
}
8889

89-
describe("during bootstrap") {
90+
describe("during parallel bootstrap") {
9091
it ("drops out-of-order events for the same key") {
9192
val requestHandler = mock[RequestHandler]
9293
val underTest = ParallelBootstrapRequestHandler(requestHandler)
9394

94-
underTest.onBootstrapStarting()
95-
verify(requestHandler).onBootstrapStarting()
95+
underTest.onBootstrapStarting(true)
96+
verify(requestHandler).onBootstrapStarting(true)
9697

9798
underTest.handlePut(1L, "key1", Array.empty)
9899
underTest.handlePut(4L, "key1", Array.empty)
99100
underTest.handlePut(3L, "key1", Array.empty)
100101

101-
verify(requestHandler).handlePut(1L, "key1", Array.empty)
102-
verify(requestHandler).handlePut(4L, "key1", Array.empty)
103-
verifyNoMoreInteractions(requestHandler)
102+
val inOrder = Mockito.inOrder(requestHandler)
103+
inOrder.verify(requestHandler).handlePut(1L, "key1", Array.empty)
104+
inOrder.verify(requestHandler).handlePut(4L, "key1", Array.empty)
105+
inOrder.verifyNoMoreInteractions()
104106
}
105107
it ("allows out-of-order events for different keys") {
106108
val requestHandler = mock[RequestHandler]
107109
val underTest = ParallelBootstrapRequestHandler(requestHandler)
108110

109-
underTest.onBootstrapStarting()
110-
verify(requestHandler).onBootstrapStarting()
111+
underTest.onBootstrapStarting(true)
112+
verify(requestHandler).onBootstrapStarting(true)
111113

112114
underTest.handlePut(1L, "key1", Array.empty)
113115
underTest.handlePut(4L, "key1", Array.empty)
114116
underTest.handlePut(3L, "key2", Array.empty)
115117

116-
verify(requestHandler).handlePut(1L, "key1", Array.empty)
117-
verify(requestHandler).handlePut(4L, "key1", Array.empty)
118-
verify(requestHandler).handlePut(3L, "key2", Array.empty)
118+
val inOrder = Mockito.inOrder(requestHandler)
119+
inOrder.verify(requestHandler).handlePut(1L, "key1", Array.empty)
120+
inOrder.verify(requestHandler).handlePut(4L, "key1", Array.empty)
121+
inOrder.verify(requestHandler).handlePut(3L, "key2", Array.empty)
122+
inOrder.verifyNoMoreInteractions()
123+
}
124+
}
125+
126+
describe("during non-parallel bootstrap") {
127+
it("does not drop out-of-order events for the same key") {
128+
val requestHandler = mock[RequestHandler]
129+
val underTest = ParallelBootstrapRequestHandler(requestHandler)
130+
131+
underTest.onBootstrapStarting(false)
132+
verify(requestHandler).onBootstrapStarting(false)
133+
134+
underTest.handlePut(1L, "key1", Array.empty)
135+
underTest.handlePut(4L, "key1", Array.empty)
136+
underTest.handlePut(3L, "key1", Array.empty)
137+
138+
val inOrder = Mockito.inOrder(requestHandler)
139+
inOrder.verify(requestHandler).handlePut(1L, "key1", Array.empty)
140+
inOrder.verify(requestHandler).handlePut(4L, "key1", Array.empty)
141+
inOrder.verify(requestHandler).handlePut(3L, "key1", Array.empty)
119142
verifyNoMoreInteractions(requestHandler)
120143
}
121144
}

0 commit comments

Comments
 (0)