Skip to content

Commit fd98efa

Browse files
authored
Fix contract of AbstractParallelBootstrapRequestHandler to Java protected methods (#165)
* Fix contract of AbstractParallelBootstrapRequestHandler to Java protected methods Reimplement AbstractParallelBootstrapRequestHandler in Java as Scala is not capable of emitting a class with JVM protected methods * Bump version
1 parent fa4f643 commit fd98efa

4 files changed

+159
-93
lines changed

build.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import java.lang.{Runtime => JRuntime}
1717
name := "sirius"
1818

1919
versionScheme := Some("semver-spec")
20-
version := "2.5.1"
20+
version := "2.5.2"
2121
ThisBuild / tlBaseVersion := "2.5"
2222

2323
scalaVersion := "2.13.6"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
package com.comcast.xfinity.sirius.api;
2+
3+
import java.util.concurrent.ConcurrentHashMap;
4+
import java.util.concurrent.ConcurrentMap;
5+
import java.util.function.BiFunction;
6+
7+
/**
8+
* An implementation of {@link RequestHandler} that supports parallelized bootstrapping by dropping events that have an
9+
* older sequence number for a given key than events that have already been processed.
10+
*
11+
* @param <K> the key value
12+
* @param <M> the deserialized message
13+
*/
14+
public abstract class AbstractParallelBootstrapRequestHandler<K, M> implements RequestHandler {
15+
private ConcurrentMap<K, Long> latestSequencePerKey = null;
16+
17+
@Override
18+
public final void onBootstrapStarting() {
19+
onBootstrapStarting(false);
20+
}
21+
22+
@Override
23+
public final void onBootstrapStarting(boolean parallel) {
24+
if (parallel) {
25+
latestSequencePerKey = new ConcurrentHashMap<>();
26+
}
27+
onBootstrapStartingImpl(parallel);
28+
}
29+
30+
@Override
31+
public final void onBootstrapComplete() {
32+
latestSequencePerKey = null;
33+
onBootstrapCompletedImpl();
34+
}
35+
36+
@Override
37+
public SiriusResult handleGet(String key) {
38+
return readsEnabled() ? handleGetImpl(createKey(key)) : SiriusResult.none();
39+
}
40+
41+
@Override
42+
public SiriusResult handlePut(String key, byte[] body) {
43+
return writesEnabled() ? handlePutImpl(createKey(key), deserialize(body)) : SiriusResult.none();
44+
}
45+
46+
@Override
47+
public SiriusResult handlePut(long sequence, String key, byte[] body) {
48+
if (!writesEnabled()) {
49+
return SiriusResult.none();
50+
}
51+
K k = createKey(key);
52+
ConcurrentMap<K, Long> latestSequencePerKey = this.latestSequencePerKey;
53+
if (latestSequencePerKey == null) {
54+
return handlePutImpl(sequence, k, deserialize(body));
55+
}
56+
Long existing = latestSequencePerKey.get(k);
57+
if (existing != null && existing > sequence) {
58+
// bail early
59+
return SiriusResult.none();
60+
}
61+
62+
PutUpdateFunction f = new PutUpdateFunction(sequence, deserialize(body));
63+
latestSequencePerKey.compute(k, f);
64+
return f.result();
65+
}
66+
67+
@Override
68+
public SiriusResult handleDelete(String key) {
69+
return writesEnabled() ? handleDeleteImpl(createKey(key)) : SiriusResult.none();
70+
}
71+
72+
@Override
73+
public SiriusResult handleDelete(long sequence, String key) {
74+
if (!writesEnabled()) {
75+
return SiriusResult.none();
76+
}
77+
K k = createKey(key);
78+
ConcurrentMap<K, Long> latestSequencePerKey = this.latestSequencePerKey;
79+
if (latestSequencePerKey == null) {
80+
return handleDeleteImpl(sequence, k);
81+
}
82+
83+
DeleteUpdateFunction f = new DeleteUpdateFunction(sequence);
84+
latestSequencePerKey.compute(k, f);
85+
return f.result();
86+
}
87+
88+
protected boolean readsEnabled() {
89+
return true;
90+
}
91+
92+
protected boolean writesEnabled() {
93+
return true;
94+
}
95+
96+
protected abstract K createKey(String key);
97+
protected abstract M deserialize(byte[] body);
98+
99+
protected void onBootstrapStartingImpl(boolean parallel) { }
100+
protected void onBootstrapCompletedImpl() { }
101+
protected abstract SiriusResult handleGetImpl(K key);
102+
protected abstract SiriusResult handlePutImpl(K key, M body);
103+
protected abstract SiriusResult handleDeleteImpl(K key);
104+
105+
protected SiriusResult handlePutImpl(long sequence, K key, M body) {
106+
return handlePutImpl(key, body);
107+
}
108+
109+
protected SiriusResult handleDeleteImpl(long sequence, K key) {
110+
return handleDeleteImpl(key);
111+
}
112+
113+
private final class PutUpdateFunction implements BiFunction<K, Long, Long> {
114+
private final long sequence;
115+
private final M body;
116+
private SiriusResult result = SiriusResult.none();
117+
118+
public PutUpdateFunction(long sequence, M body) {
119+
this.sequence = sequence;
120+
this.body = body;
121+
}
122+
123+
public SiriusResult result() {
124+
return result;
125+
}
126+
127+
@Override
128+
public Long apply(K key, Long existing) {
129+
if (existing != null && existing > sequence) {
130+
return existing;
131+
}
132+
result = handlePutImpl(sequence, key, body);
133+
return sequence;
134+
}
135+
}
136+
137+
private final class DeleteUpdateFunction implements BiFunction<K, Long, Long> {
138+
private final long sequence;
139+
private SiriusResult result = SiriusResult.none();
140+
141+
public DeleteUpdateFunction(long sequence) {
142+
this.sequence = sequence;
143+
}
144+
145+
public SiriusResult result() {
146+
return result;
147+
}
148+
149+
@Override
150+
public Long apply(K key, Long existing) {
151+
if (existing != null && existing > sequence) {
152+
return existing;
153+
}
154+
result = handleDeleteImpl(sequence, key);
155+
return sequence;
156+
}
157+
}
158+
}

src/main/scala/com/comcast/xfinity/sirius/api/AbstractParallelBootstrapRequestHandler.scala

-91
This file was deleted.

src/main/scala/com/comcast/xfinity/sirius/api/ParallelBootstrapRequestHandler.scala

-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ object ParallelBootstrapRequestHandler {
66
}
77

88
class ParallelBootstrapRequestHandler(val requestHandler: RequestHandler) extends AbstractParallelBootstrapRequestHandler[String, Array[Byte]] {
9-
override protected def writesEnabled(): Boolean = true
109
override protected def createKey(key: String): String = key
1110
override protected def deserialize(body: Array[Byte]): Array[Byte] = body
1211
override def handleGetImpl(key: String): SiriusResult = requestHandler.handleGet(key)

0 commit comments

Comments
 (0)