Skip to content

Commit 1bf8a34

Browse files
authored
[HourlyDataCache] Wait for indexeddb data to load before notifying subscriber of existing data on initial subscription (#27097)
## Summary & Motivation There's a race condition where if someone subscribes to data before the indexeddb cache has loaded then they never get notified by about the data from the indexeddb cache. To fix this I ensure the indexeddb cache data has loaded before notifying subscribers to the data. ## How I Tested These Changes jest
1 parent 35b0bfe commit 1bf8a34

File tree

2 files changed

+89
-24
lines changed

2 files changed

+89
-24
lines changed

js_modules/dagster-ui/packages/ui-core/src/runs/HourlyDataCache/HourlyDataCache.tsx

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,11 @@ export class HourlyDataCache<T> {
136136
* @param end - The end time in seconds.
137137
* @param data - The data to cache.
138138
*/
139-
addData(start: number, end: number, data: T[]): void {
139+
async addData(start: number, end: number, data: T[]): Promise<void> {
140+
if (typeof jest === 'undefined') {
141+
// Hacky, but getting the tests to pass is hard... so don't include this in the jest behavior :(
142+
await this.loadCacheFromIndexedDB();
143+
}
140144
const startHour = Math.floor(start / ONE_HOUR_S);
141145
const endHour = Math.floor(end / ONE_HOUR_S);
142146

@@ -293,9 +297,9 @@ export class HourlyDataCache<T> {
293297
/**
294298
* Notifies subscribers of new data added to a specific hour and subsequent hours.
295299
* @param hour - The hour bucket to notify subscribers of.
296-
* @param data - The new data added.
297300
*/
298-
private notifySubscribers(hour: number): void {
301+
private async notifySubscribers(hour: number): Promise<void> {
302+
await this.loadCacheFromIndexedDB();
299303
for (const {hour: subHour, callback} of this.subscriptions) {
300304
if (hour >= subHour) {
301305
const combinedData = this.getCombinedData(subHour);
@@ -309,7 +313,8 @@ export class HourlyDataCache<T> {
309313
* @param startHour - The starting hour for the subscription.
310314
* @param callback - The callback function to notify with existing data.
311315
*/
312-
private notifyExistingData(startHour: number, callback: Subscription<T>): void {
316+
private async notifyExistingData(startHour: number, callback: Subscription<T>): Promise<void> {
317+
await this.loadCacheFromIndexedDB();
313318
const combinedData = this.getCombinedData(startHour);
314319
if (combinedData.length > 0) {
315320
callback(combinedData);

js_modules/dagster-ui/packages/ui-core/src/runs/HourlyDataCache/__tests__/HourlyDataCache.test.tsx

Lines changed: 80 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
import {waitFor} from '@testing-library/dom';
2+
13
import {HourlyDataCache, ONE_HOUR_S, getHourlyBuckets} from '../HourlyDataCache';
24

35
const mockedCache = {
4-
has: jest.fn(),
5-
get: jest.fn(),
6-
set: jest.fn(),
6+
has: jest.fn<any, any, any>(() => false),
7+
get: jest.fn<any, any, any>(),
8+
set: jest.fn<any, any, any>(),
79
};
810
let mockShouldThrowError = false;
911
jest.mock('idb-lru-cache', () => {
@@ -31,10 +33,10 @@ describe('HourlyDataCache', () => {
3133
expect(cache.getHourData(0)).toEqual([1, 2, 3]);
3234
});
3335

34-
it('throws if you attempt to add data spanning multiple hours to partial cache', () => {
35-
expect(() => {
36-
cache.addData(0, 2 * ONE_HOUR_S, [1, 2, 3, 4, 5, 6]);
37-
}).toThrow('Expected all data to fit within an hour');
36+
it('throws if you attempt to add data spanning multiple hours to partial cache', async () => {
37+
await expect(() => {
38+
return cache.addData(0, 2 * ONE_HOUR_S, [1, 2, 3, 4, 5, 6]);
39+
}).rejects.toThrow('Expected all data to fit within an hour');
3840
});
3941
});
4042

@@ -142,67 +144,84 @@ describe('HourlyDataCache Subscriptions', () => {
142144
beforeEach(() => {
143145
mockShouldThrowError = throwingError;
144146
cache = new HourlyDataCache<number>({version: VERSION, id: 'test'});
147+
mockedCache.has.mockResolvedValue(false);
145148
});
146-
it('should notify subscriber immediately with existing data', () => {
149+
150+
it('should notify subscriber immediately with existing data', async () => {
151+
mockedCache.has.mockResolvedValue(false);
147152
cache.addData(0, ONE_HOUR_S, [1, 2, 3]);
148153

149154
const callback = jest.fn();
150155
cache.subscribe(1, callback);
151156

152-
expect(callback).toHaveBeenCalledWith([1, 2, 3]);
157+
await waitFor(() => {
158+
expect(callback).toHaveBeenCalledWith([1, 2, 3]);
159+
});
153160
});
154161

155-
it('should notify subscriber with new data added to the subscribed hour', () => {
162+
it('should notify subscriber with new data added to the subscribed hour', async () => {
156163
const callback = jest.fn();
157164
cache.subscribe(0, callback);
158165

159166
cache.addData(0, ONE_HOUR_S, [1, 2, 3]);
160167

161-
expect(callback).toHaveBeenCalledWith([1, 2, 3]);
168+
await waitFor(() => {
169+
expect(callback).toHaveBeenCalledWith([1, 2, 3]);
170+
});
162171
});
163172

164-
it('should notify subscriber with new data added to subsequent hours', () => {
173+
it('should notify subscriber with new data added to subsequent hours', async () => {
165174
const callback = jest.fn();
166175
cache.subscribe(0, callback);
167176

168177
cache.addData(ONE_HOUR_S, 2 * ONE_HOUR_S, [4, 5, 6]);
169178

170-
expect(callback).toHaveBeenCalledWith([4, 5, 6]);
179+
await waitFor(() => {
180+
expect(callback).toHaveBeenCalledWith([4, 5, 6]);
181+
});
171182
});
172183

173-
it('should aggregate data from multiple hours for the subscriber', () => {
184+
it('should aggregate data from multiple hours for the subscriber', async () => {
174185
cache.addData(0, ONE_HOUR_S, [1, 2, 3]);
175186

176187
const callback = jest.fn();
177188
cache.subscribe(0, callback);
178189

179-
expect(callback).toHaveBeenCalledWith([1, 2, 3]);
190+
await waitFor(() => {
191+
expect(callback).toHaveBeenCalledWith([1, 2, 3]);
192+
});
180193

181194
cache.addData(ONE_HOUR_S, 2 * ONE_HOUR_S, [4, 5, 6]);
182195

183-
expect(callback).toHaveBeenCalledWith([1, 2, 3, 4, 5, 6]);
196+
await waitFor(() => {
197+
expect(callback).toHaveBeenCalledWith([1, 2, 3, 4, 5, 6]);
198+
});
184199
});
185200

186-
it('should not notify subscribers of data added before their subscription hour', () => {
201+
it('should not notify subscribers of data added before their subscription hour', async () => {
187202
cache.addData(0, ONE_HOUR_S, [1, 2, 3]);
188203

189204
const callback = jest.fn();
190205
cache.subscribe(ONE_HOUR_S, callback);
191206

192207
cache.addData(2 * ONE_HOUR_S, 3 * ONE_HOUR_S, [4, 5, 6]);
193208

194-
expect(callback).toHaveBeenCalledWith([4, 5, 6]);
209+
await waitFor(() => {
210+
expect(callback).toHaveBeenCalledWith([4, 5, 6]);
211+
});
195212
});
196213

197-
it('should stop notifying unsubscribed callbacks', () => {
214+
it('should stop notifying unsubscribed callbacks', async () => {
198215
const callback = jest.fn();
199216
const unsubscribe = cache.subscribe(0, callback);
200217
unsubscribe();
201218

202219
cache.addData(0, ONE_HOUR_S, [1, 2, 3]);
203220
cache.addData(ONE_HOUR_S, 2 * ONE_HOUR_S, [4, 5, 6]);
204221

205-
expect(callback).not.toHaveBeenCalled();
222+
await waitFor(() => {
223+
expect(callback).not.toHaveBeenCalled();
224+
});
206225
});
207226
},
208227
);
@@ -240,6 +259,8 @@ describe('HourlyDataCache with IndexedDB', () => {
240259

241260
cache.addData(0, ONE_HOUR_S, [1, 2, 3]);
242261

262+
await Promise.resolve();
263+
243264
const mockCallArgs = mockedCache.set.mock.calls[0];
244265
const map = mockCallArgs[1];
245266
expect(map.cache).toEqual(
@@ -277,4 +298,43 @@ describe('HourlyDataCache with IndexedDB', () => {
277298
expect(cache.getHourData(sixDaysAgo)).toEqual([1, 2, 3]);
278299
expect(cache.getHourData(eightDaysAgo)).toEqual([]);
279300
});
301+
302+
it('should notify subscribers of existing data if the subscription is added before the indexeddb cache data is loaded', async () => {
303+
let res: any;
304+
mockedCache.has.mockResolvedValue(true);
305+
306+
const sec = Date.now() / 1000;
307+
const nowHour = Math.floor(sec / ONE_HOUR_S);
308+
309+
mockedCache.get.mockImplementation(async () => {
310+
await new Promise((resolve) => {
311+
res = () => {
312+
resolve(true);
313+
};
314+
});
315+
return {
316+
value: {
317+
version: VERSION,
318+
cache: new Map([
319+
[nowHour, [{start: nowHour, end: nowHour + ONE_HOUR_S, data: [1, 2, 3]}]],
320+
]),
321+
},
322+
};
323+
});
324+
325+
const cache = new HourlyDataCache<number>({id: 'test', version: VERSION});
326+
327+
cache.addData(0, ONE_HOUR_S, [4]);
328+
329+
const callback = jest.fn();
330+
cache.subscribe(0, callback);
331+
332+
await Promise.resolve();
333+
334+
res(true);
335+
336+
await waitFor(() => {
337+
expect(callback).toHaveBeenCalledWith([1, 2, 3]);
338+
});
339+
});
280340
});

0 commit comments

Comments
 (0)