Skip to content

Commit 60807bf

Browse files
authored
fix: retry all streaming errors (#687)
1 parent 098551c commit 60807bf

File tree

3 files changed

+52
-17
lines changed

3 files changed

+52
-17
lines changed

src/repository/index.ts

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ export default class Repository extends EventEmitter implements EventEmitter {
9595

9696
private eventSource: EventSource | undefined;
9797

98+
private initialEventSourceConnected: boolean = false;
99+
98100
constructor({
99101
url,
100102
appName,
@@ -130,26 +132,37 @@ export default class Repository extends EventEmitter implements EventEmitter {
130132
this.segments = new Map();
131133
this.eventSource = eventSource;
132134
if (this.eventSource) {
133-
this.eventSource.addEventListener('unleash-updated', (event: { data: string }) => {
134-
try {
135-
const data: ClientFeaturesResponse & { meta: { etag: string } } = JSON.parse(event.data);
136-
const etag = data.meta.etag;
137-
if (etag !== null) {
138-
this.etag = etag;
139-
} else {
140-
this.etag = undefined;
141-
}
142-
this.save(data, true);
143-
} catch (err) {
144-
this.emit(UnleashEvents.Error, err);
135+
// On re-connect it guarantees catching up with the latest state.
136+
this.eventSource.addEventListener('unleash-connected', (event: { data: string }) => {
137+
// reconnect
138+
if (this.initialEventSourceConnected) {
139+
this.handleFlagsFromStream(event);
140+
} else {
141+
this.initialEventSourceConnected = true;
145142
}
146143
});
144+
this.eventSource.addEventListener('unleash-updated', this.handleFlagsFromStream.bind(this));
147145
this.eventSource.addEventListener('error', (error: unknown) => {
148146
this.emit(UnleashEvents.Warn, error);
149147
});
150148
}
151149
}
152150

151+
private handleFlagsFromStream(event: { data: string }) {
152+
try {
153+
const data: ClientFeaturesResponse & { meta: { etag: string } } = JSON.parse(event.data);
154+
const etag = data.meta.etag;
155+
if (etag !== null) {
156+
this.etag = etag;
157+
} else {
158+
this.etag = undefined;
159+
}
160+
this.save(data, true);
161+
} catch (err) {
162+
this.emit(UnleashEvents.Error, err);
163+
}
164+
}
165+
153166
timedFetch(interval: number) {
154167
if (interval > 0 && !this.eventSource) {
155168
this.timer = setTimeout(() => this.fetch(), interval);
@@ -180,6 +193,7 @@ export default class Repository extends EventEmitter implements EventEmitter {
180193
}
181194

182195
async start(): Promise<void> {
196+
// the first fetch is used as a fallback even when streaming is enabled
183197
await Promise.all([this.fetch(), this.loadBackup(), this.loadBootstrap()]);
184198
}
185199

src/test/repository.test.ts

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1363,8 +1363,8 @@ test('Stopping repository should stop storage provider updates', async (t) => {
13631363
});
13641364

13651365
test('Streaming', async (t) => {
1366-
t.plan(6);
1367-
const url = 'irrelevant';
1366+
t.plan(7);
1367+
const url = 'http://unleash-test-streaming.app';
13681368
const feature = {
13691369
name: 'feature',
13701370
enabled: true,
@@ -1374,6 +1374,7 @@ test('Streaming', async (t) => {
13741374
},
13751375
],
13761376
};
1377+
setup(url, [{ ...feature, name: 'initialFetch' }]);
13771378
const storageProvider: StorageProvider<ClientFeaturesResponse> = new InMemStorageProvider();
13781379
const eventSource = {
13791380
eventEmitter: new EventEmitter(),
@@ -1402,16 +1403,24 @@ test('Streaming', async (t) => {
14021403
eventSource,
14031404
});
14041405

1406+
await repo.start();
1407+
1408+
// first connection is ignored, since we do regular fetch
1409+
eventSource.emit('unleash-connected', {
1410+
type: 'unleash-connected',
1411+
data: JSON.stringify({ meta: {}, features: [{ ...feature, name: 'intialConnectedIgnored' }] }),
1412+
});
1413+
14051414
const before = repo.getToggles();
1406-
t.deepEqual(before, []);
1415+
t.deepEqual(before, [{ ...feature, name: 'initialFetch' }]);
14071416

14081417
// update with feature
14091418
eventSource.emit('unleash-updated', {
14101419
type: 'unleash-updated',
1411-
data: JSON.stringify({ meta: {}, features: [feature] }),
1420+
data: JSON.stringify({ meta: {}, features: [{ ...feature, name: 'firstUpdate' }] }),
14121421
});
14131422
const firstUpdate = repo.getToggles();
1414-
t.deepEqual(firstUpdate, [feature]);
1423+
t.deepEqual(firstUpdate, [{ ...feature, name: 'firstUpdate' }]);
14151424
// @ts-expect-error
14161425
t.is(repo.etag, undefined);
14171426

@@ -1430,4 +1439,12 @@ test('Streaming', async (t) => {
14301439
t.is(msg, 'some error');
14311440
});
14321441
eventSource.emit('error', 'some error');
1442+
1443+
// re-connect simulation
1444+
eventSource.emit('unleash-connected', {
1445+
type: 'unleash-connected',
1446+
data: JSON.stringify({ meta: {}, features: [{ ...feature, name: 'reconnectUpdate' }] }),
1447+
});
1448+
const reconnectUpdate = repo.getToggles();
1449+
t.deepEqual(reconnectUpdate, [{ ...feature, name: 'reconnectUpdate' }]);
14331450
});

src/unleash.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,10 @@ export class Unleash extends EventEmitter {
143143
maxBackoffMillis: 30000,
144144
retryResetIntervalMillis: 60000,
145145
jitterRatio: 0.5,
146+
errorFilter: function () {
147+
// retry all errors
148+
return true;
149+
},
146150
})
147151
: undefined,
148152
storageProvider: storageProvider || new FileStorageProvider(backupPath),

0 commit comments

Comments
 (0)