Skip to content

Commit 7546a61

Browse files
gangb-techhezean
andauthored
✨ feat: support realtime synchronizer model (#36)
* feat: support realtime toggle update via socket.io * cleanup code * test: add it for socketio * Update build.yml --------- Co-authored-by: hezean <[email protected]>
1 parent c98df26 commit 7546a61

File tree

9 files changed

+226
-6
lines changed

9 files changed

+226
-6
lines changed

.github/workflows/build.yml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ jobs:
2424
run: mvn spotless:check
2525

2626
- name: Build with Maven
27-
run: mvn -B package --file pom.xml
27+
run: |
28+
mvn -B package --file pom.xml
29+
mvn failsafe:integration-test
2830
2931
- name: Upload coverage reports to Codecov with GitHub Action
30-
uses: codecov/codecov-action@v2
32+
uses: codecov/codecov-action@v2

pom.xml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
<version.jackson>2.14.0</version.jackson>
2020
<version.commons>3.11</version.commons>
2121
<version.maven-artifact>3.8.5</version.maven-artifact>
22+
<version.socketio>2.1.0</version.socketio>
2223
<groovy.version>3.0.9</groovy.version>
2324
</properties>
2425

@@ -45,6 +46,11 @@
4546
<artifactId>okhttp</artifactId>
4647
<version>${version.okhttp}</version>
4748
</dependency>
49+
<dependency>
50+
<groupId>io.socket</groupId>
51+
<artifactId>socket.io-client</artifactId>
52+
<version>${version.socketio}</version>
53+
</dependency>
4854
<dependency>
4955
<groupId>org.apache.commons</groupId>
5056
<artifactId>commons-lang3</artifactId>
@@ -216,6 +222,19 @@
216222
</markdown>
217223
</configuration>
218224
</plugin>
225+
<plugin>
226+
<groupId>org.apache.maven.plugins</groupId>
227+
<artifactId>maven-failsafe-plugin</artifactId>
228+
<version>3.0.0-M4</version>
229+
<executions>
230+
<execution>
231+
<goals>
232+
<goal>integration-test</goal>
233+
<goal>verify</goal>
234+
</goals>
235+
</execution>
236+
</executions>
237+
</plugin>
219238
<plugin>
220239
<groupId>org.jacoco</groupId>
221240
<artifactId>jacoco-maven-plugin</artifactId>

src/main/java/com/featureprobe/sdk/server/FPConfig.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ public final class FPConfig {
2828

2929
static final Duration DEFAULT_INTERVAL = Duration.ofSeconds(5);
3030

31+
static final Duration DEFAULT_REALTIME_INTERVAL = Duration.ofSeconds(10);
32+
3133
static final Long DEFAULT_START_WAIT = TimeUnit.SECONDS.toNanos(5);
3234

3335
protected static final FPConfig DEFAULT = new Builder().build();
@@ -42,6 +44,8 @@ public final class FPConfig {
4244

4345
URL eventUrl;
4446

47+
URI realtimeUri;
48+
4549
final String location;
4650

4751
final SynchronizerFactory synchronizerFactory;
@@ -57,14 +61,15 @@ protected FPConfig(Builder builder) {
5761
this.remoteUri = builder.remoteUri;
5862
this.location = builder.location;
5963
this.synchronizerFactory =
60-
builder.synchronizer == null ? new PollingSynchronizerFactory() : builder.synchronizer;
64+
builder.synchronizer == null ? new StreamingSynchronizerFactory() : builder.synchronizer;
6165
this.dataRepositoryFactory =
6266
builder.dataRepository == null ? new MemoryDataRepositoryFactory() : builder.dataRepository;
6367
this.eventProcessorFactory = new DefaultEventProcessorFactory();
6468
this.httpConfiguration =
6569
builder.httpConfiguration == null ? HttpConfiguration.DEFAULT : builder.httpConfiguration;
6670
this.synchronizerUrl = builder.synchronizerUrl;
6771
this.eventUrl = builder.eventUrl;
72+
this.realtimeUri = builder.realtimeUri;
6873
this.startWait = builder.startWait == null ? DEFAULT_START_WAIT : builder.startWait;
6974
}
7075

@@ -90,6 +95,8 @@ public static class Builder {
9095

9196
private URL eventUrl;
9297

98+
private URI realtimeUri;
99+
93100
private Long startWait;
94101

95102
public Builder() {
@@ -105,6 +112,18 @@ public Builder pollingMode() {
105112
return this;
106113
}
107114

115+
public Builder streamingMode() {
116+
this.refreshInterval = DEFAULT_REALTIME_INTERVAL;
117+
this.synchronizer = new StreamingSynchronizerFactory();
118+
return this;
119+
}
120+
121+
public Builder streamingMode(Duration refreshInterval) {
122+
this.refreshInterval = refreshInterval;
123+
this.synchronizer = new StreamingSynchronizerFactory();
124+
return this;
125+
}
126+
108127
public Builder pollingMode(Duration refreshInterval) {
109128
this.refreshInterval = refreshInterval;
110129
this.synchronizer = new PollingSynchronizerFactory();
@@ -143,6 +162,16 @@ public Builder eventUrl(URL eventUrl) {
143162
return this;
144163
}
145164

165+
public Builder realtimeUri(URI realtimeUri) {
166+
this.realtimeUri = realtimeUri;
167+
return this;
168+
}
169+
170+
public Builder realtimeUri(String realtimeUri) {
171+
this.realtimeUri = URI.create(realtimeUri);
172+
return this;
173+
}
174+
146175
public Builder startWait(Long startWaitTime, TimeUnit unit) {
147176
this.startWait = unit.toNanos(startWaitTime);
148177
return this;

src/main/java/com/featureprobe/sdk/server/FPContext.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222

2323
import java.io.InputStream;
2424
import java.net.MalformedURLException;
25+
import java.net.URI;
26+
import java.net.URISyntaxException;
2527
import java.net.URL;
2628
import java.time.Duration;
2729
import java.util.Objects;
@@ -43,8 +45,12 @@ final class FPContext {
4345

4446
private static final String POST_EVENTS_DATA_API = "/api/events";
4547

48+
private static final String REALTIME_URI_PATH = "/realtime";
49+
4650
private URL synchronizerUrl;
4751

52+
private URI realtimeUri;
53+
4854
private URL eventUrl;
4955

5056
private final String serverSdkKey;
@@ -69,8 +75,15 @@ final class FPContext {
6975
} else {
7076
this.eventUrl = config.eventUrl;
7177
}
78+
if (Objects.isNull(config.realtimeUri)) {
79+
this.realtimeUri = new URI(config.remoteUri.toString() + REALTIME_URI_PATH);
80+
} else {
81+
this.realtimeUri = config.realtimeUri;
82+
}
7283
} catch (MalformedURLException e) {
7384
logger.error("construction context error", e);
85+
} catch (URISyntaxException e) {
86+
logger.error("construction context error", e);
7487
}
7588
this.serverSdkKey = serverSdkKey;
7689
this.refreshInterval = config.refreshInterval;
@@ -119,7 +132,7 @@ private synchronized String getVersion() {
119132
.getResourceAsStream("/META-INF/maven/com.featureprobe/server-sdk-java/pom.properties");
120133
if (is != null) {
121134
p.load(is);
122-
version = p.getProperty("version", "");
135+
version = p.getProperty("version", DEFAULT_SDK_VERSION);
123136
}
124137
} catch (Exception e) {
125138

@@ -134,8 +147,12 @@ private synchronized String getVersion() {
134147
}
135148
}
136149
if (version == null) {
137-
version = "";
150+
version = DEFAULT_SDK_VERSION;
138151
}
139152
return version;
140153
}
154+
155+
public URI getRealtimeUri() {
156+
return realtimeUri;
157+
}
141158
}

src/main/java/com/featureprobe/sdk/server/FeatureProbe.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public final class FeatureProbe {
5252
@VisibleForTesting
5353
final DataRepository dataRepository;
5454

55+
@VisibleForTesting
5556
Synchronizer synchronizer;
5657

5758
@VisibleForTesting

src/main/java/com/featureprobe/sdk/server/PollingSynchronizer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public void close() throws IOException {
9797
}
9898
}
9999

100-
private void poll() {
100+
public void poll() {
101101
Request request = new Request.Builder()
102102
.url(apiUrl.toString())
103103
.headers(headers)
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.featureprobe.sdk.server;
19+
20+
import com.google.common.annotations.VisibleForTesting;
21+
import io.socket.client.IO;
22+
import io.socket.client.Socket;
23+
import io.socket.engineio.client.transports.WebSocket;
24+
import org.slf4j.Logger;
25+
26+
import java.io.IOException;
27+
import java.util.HashMap;
28+
import java.util.Map;
29+
import java.util.concurrent.Future;
30+
31+
public class StreamingSynchronizer implements Synchronizer {
32+
33+
private static final Logger logger = Loggers.SYNCHRONIZER;
34+
35+
private PollingSynchronizer pollingSynchronizer;
36+
37+
@VisibleForTesting
38+
Socket socket;
39+
40+
StreamingSynchronizer(FPContext context, DataRepository dataRepository) {
41+
pollingSynchronizer = new PollingSynchronizer(context, dataRepository);
42+
this.socket = connectSocket(context);
43+
}
44+
45+
@Override
46+
public Future<Void> sync() {
47+
return pollingSynchronizer.sync();
48+
}
49+
50+
@Override
51+
public void close() throws IOException {
52+
synchronized (this) {
53+
if (socket != null) {
54+
socket.close();
55+
socket = null;
56+
}
57+
}
58+
}
59+
60+
private Socket connectSocket(FPContext context) {
61+
IO.Options sioOptions = IO.Options.builder()
62+
.setTransports(new String[]{WebSocket.NAME})
63+
.setPath(context.getRealtimeUri().getPath())
64+
.build();
65+
Socket sio = IO.socket(context.getRealtimeUri(), sioOptions);
66+
67+
sio.on("connect", objects -> {
68+
logger.info("connect socket-io success");
69+
Map<String, String> credentials = new HashMap<>(1);
70+
credentials.put("key", context.getServerSdkKey());
71+
sio.emit("register", credentials);
72+
});
73+
74+
sio.on("update", objects -> {
75+
logger.info("socket-io recv update event");
76+
pollingSynchronizer.poll();
77+
});
78+
79+
sio.on("disconnect", objects -> logger.info("socket-io disconnected"));
80+
81+
sio.on("connect_error", objects -> logger.error("socket-io error: {}", objects));
82+
83+
return sio.connect();
84+
}
85+
86+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.featureprobe.sdk.server;
19+
20+
public final class StreamingSynchronizerFactory implements SynchronizerFactory {
21+
22+
@Override
23+
public Synchronizer createSynchronizer(FPContext context, DataRepository dataRepository) {
24+
return new StreamingSynchronizer(context, dataRepository);
25+
}
26+
27+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package com.featureprobe.sdk.server
2+
3+
import ch.qos.logback.classic.Level
4+
import ch.qos.logback.classic.Logger
5+
import org.slf4j.LoggerFactory
6+
import spock.lang.Specification
7+
8+
class PollingSynchronizerIT extends Specification {
9+
10+
def "Socketio realtime toggle update"() {
11+
12+
(LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME) as Logger).setLevel(Level.DEBUG)
13+
14+
given:
15+
def config = FPConfig.builder()
16+
.streamingMode()
17+
.remoteUri("https://featureprobe.io/server")
18+
.realtimeUri("https://featureprobe.io/server/realtime")
19+
.useMemoryRepository()
20+
.build()
21+
def featureProbe = new FeatureProbe("server-61db54ecea79824cae3ac38d73f1961d698d0477", config)
22+
def repository = featureProbe.dataRepository
23+
def socket = (featureProbe.synchronizer as StreamingSynchronizer).socket
24+
def updateCnt = 0
25+
socket.on("update", objects -> updateCnt++)
26+
27+
sleep(5000)
28+
29+
featureProbe.close()
30+
31+
sleep(5000)
32+
33+
expect:
34+
repository.initialized()
35+
!socket.connected()
36+
updateCnt > 0
37+
}
38+
}
39+

0 commit comments

Comments
 (0)