Skip to content

Commit eaceaef

Browse files
authored
fix: move flush out of wasm resolver api (#288)
move flush out
1 parent 4d95d8a commit eaceaef

File tree

3 files changed

+18
-23
lines changed

3 files changed

+18
-23
lines changed

openfeature-provider-local/src/main/java/com/spotify/confidence/LocalResolverServiceFactory.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ class LocalResolverServiceFactory implements ResolverServiceFactory {
4646
Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).build());
4747
private static final Duration RESOLVE_INFO_LOG_INTERVAL = Duration.ofMinutes(1);
4848
private final StickyResolveStrategy stickyResolveStrategy;
49+
private static final ScheduledExecutorService logPollExecutor =
50+
Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).build());
4951

5052
private static ManagedChannel createConfidenceChannel() {
5153
final String confidenceDomain =
@@ -111,14 +113,19 @@ private static FlagResolverService createFlagResolverService(
111113
sidecarFlagsAdminFetcher.accountId,
112114
stickyResolveStrategy);
113115
flagsFetcherExecutor.scheduleAtFixedRate(
116+
sidecarFlagsAdminFetcher::reload,
117+
pollIntervalSeconds,
118+
pollIntervalSeconds,
119+
TimeUnit.SECONDS);
120+
121+
logPollExecutor.scheduleAtFixedRate(
114122
() -> {
115-
sidecarFlagsAdminFetcher.reload();
116-
wasmResolverApi.updateState(
123+
wasmResolverApi.updateStateAndFlushLogs(
117124
sidecarFlagsAdminFetcher.rawStateHolder().get().toByteArray(),
118125
sidecarFlagsAdminFetcher.accountId);
119126
},
120-
pollIntervalSeconds,
121-
pollIntervalSeconds,
127+
10,
128+
10,
122129
TimeUnit.SECONDS);
123130

124131
return new WasmFlagResolverService(wasmResolverApi, stickyResolveStrategy);
@@ -176,7 +183,7 @@ private static FlagResolverService createFlagResolverService(
176183
flagLogger, resolverStateProtobuf, accountId, stickyResolveStrategy);
177184
flagsFetcherExecutor.scheduleAtFixedRate(
178185
() -> {
179-
wasmResolverApi.updateState(accountStateProvider.provide(), accountId);
186+
wasmResolverApi.updateStateAndFlushLogs(accountStateProvider.provide(), accountId);
180187
},
181188
pollIntervalSeconds,
182189
pollIntervalSeconds,
@@ -236,7 +243,7 @@ private static FlagResolverService createFlagResolverService(
236243
@VisibleForTesting
237244
public void setState(byte[] state, String accountId) {
238245
if (this.wasmResolveApi != null) {
239-
wasmResolveApi.updateState(state, accountId);
246+
wasmResolveApi.updateStateAndFlushLogs(state, accountId);
240247
}
241248
}
242249

openfeature-provider-local/src/main/java/com/spotify/confidence/SwapWasmResolverApi.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,21 +34,20 @@ public SwapWasmResolverApi(
3434
this.wasmResolverApiRef.set(primaryWasmResolverApi);
3535
}
3636

37-
public void updateState(byte[] state, String accountId) {
37+
public void updateStateAndFlushLogs(byte[] state, String accountId) {
3838
if (isPrimary) {
3939
this.secondaryWasmResolverApi.setResolverState(state, accountId);
4040
this.wasmResolverApiRef.set(secondaryWasmResolverApi);
41+
this.primaryWasmResolverApi.flushLogs();
4142
} else {
4243
this.primaryWasmResolverApi.setResolverState(state, accountId);
4344
this.wasmResolverApiRef.set(primaryWasmResolverApi);
45+
this.secondaryWasmResolverApi.flushLogs();
4446
}
4547
isPrimary = !isPrimary;
4648
}
4749

48-
public void close() {
49-
primaryWasmResolverApi.close();
50-
secondaryWasmResolverApi.close();
51-
}
50+
public void close() {}
5251

5352
private final ReentrantLock logResolveLock = new ReentrantLock();
5453

openfeature-provider-local/src/main/java/com/spotify/confidence/WasmResolveApi.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import com.dylibso.chicory.wasm.WasmModule;
1111
import com.dylibso.chicory.wasm.types.FunctionType;
1212
import com.dylibso.chicory.wasm.types.ValType;
13-
import com.google.common.util.concurrent.ThreadFactoryBuilder;
1413
import com.google.protobuf.ByteString;
1514
import com.google.protobuf.BytesValue;
1615
import com.google.protobuf.GeneratedMessageV3;
@@ -29,8 +28,6 @@
2928
import java.security.SecureRandom;
3029
import java.time.Instant;
3130
import java.util.List;
32-
import java.util.concurrent.Executors;
33-
import java.util.concurrent.ScheduledExecutorService;
3431
import java.util.function.Function;
3532
import javax.crypto.Cipher;
3633
import javax.crypto.spec.IvParameterSpec;
@@ -57,14 +54,10 @@ class WasmResolveApi {
5754
private final ExportFunction wasmMsgGuestSetResolverState;
5855
private final ExportFunction wasmMsgFlushLogs;
5956
private final ExportFunction wasmMsgGuestResolve;
60-
private static final ScheduledExecutorService logPollExecutor =
61-
Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).build());
6257
private final ExportFunction wasmMsgGuestResolveWithSticky;
6358

6459
public WasmResolveApi(WasmFlagLogger flagLogger) {
6560
this.writeFlagLogs = flagLogger;
66-
logPollExecutor.scheduleAtFixedRate(
67-
this::flushLogs, 10, 10, java.util.concurrent.TimeUnit.SECONDS);
6861
try (InputStream wasmStream =
6962
getClass().getClassLoader().getResourceAsStream("wasm/confidence_resolver.wasm")) {
7063
if (wasmStream == null) {
@@ -145,10 +138,6 @@ private Timestamp currentTime(Messages.Void unused) {
145138
return Timestamp.newBuilder().setSeconds(Instant.now().getEpochSecond()).build();
146139
}
147140

148-
public void close() {
149-
logPollExecutor.shutdownNow();
150-
}
151-
152141
public void setResolverState(byte[] state, String accountId) {
153142
final var resolverStateRequest =
154143
Messages.SetResolverStateRequest.newBuilder()
@@ -165,7 +154,7 @@ public void setResolverState(byte[] state, String accountId) {
165154
consumeResponse(respPtr, Messages.Void::parseFrom);
166155
}
167156

168-
private void flushLogs() {
157+
public void flushLogs() {
169158
final var voidRequest = Messages.Void.getDefaultInstance();
170159
final var reqPtr = transferRequest(voidRequest);
171160
final var respPtr = (int) wasmMsgFlushLogs.apply(reqPtr)[0];

0 commit comments

Comments
 (0)