-
Notifications
You must be signed in to change notification settings - Fork 45
Feat cache for the queue #496
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
3e393b4
c788485
6636d04
8401279
98162e8
f805c1d
48eb27f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
--- | ||
"@opennextjs/cloudflare": patch | ||
--- | ||
|
||
add an optional cache for the durable queue |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
import type { Queue } from "@opennextjs/aws/types/overrides"; | ||
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; | ||
|
||
import queueCache from "./queue-cache"; | ||
|
||
const mockedQueue = { | ||
name: "mocked-queue", | ||
send: vi.fn(), | ||
} satisfies Queue; | ||
|
||
const generateMessage = () => ({ | ||
MessageGroupId: "test", | ||
MessageBody: { | ||
eTag: "test", | ||
url: "test", | ||
host: "test", | ||
lastModified: Date.now(), | ||
}, | ||
MessageDeduplicationId: "test", | ||
}); | ||
|
||
const mockedPut = vi.fn(); | ||
const mockedMatch = vi.fn().mockReturnValue(null); | ||
|
||
describe("queue-cache", () => { | ||
beforeEach(() => { | ||
// @ts-ignore | ||
globalThis.caches = { | ||
open: vi.fn().mockReturnValue({ | ||
put: mockedPut, | ||
match: mockedMatch, | ||
}), | ||
}; | ||
}); | ||
|
||
afterEach(() => { | ||
vi.resetAllMocks(); | ||
}); | ||
test("should send the message to the original queue", async () => { | ||
const msg = generateMessage(); | ||
const queue = queueCache(mockedQueue, {}); | ||
expect(queue.name).toBe("cached-mocked-queue"); | ||
await queue.send(msg); | ||
expect(mockedQueue.send).toHaveBeenCalledWith(msg); | ||
}); | ||
|
||
test("should use the local cache", async () => { | ||
const msg = generateMessage(); | ||
const queue = queueCache(mockedQueue, {}); | ||
await queue.send(msg); | ||
|
||
expect(queue.localCache.size).toBe(1); | ||
expect(queue.localCache.has(`queue/test/test`)).toBe(true); | ||
expect(mockedPut).toHaveBeenCalled(); | ||
|
||
const spiedHas = vi.spyOn(queue.localCache, "has"); | ||
await queue.send(msg); | ||
expect(spiedHas).toHaveBeenCalled(); | ||
|
||
expect(mockedQueue.send).toHaveBeenCalledTimes(1); | ||
|
||
expect(mockedMatch).toHaveBeenCalledTimes(1); | ||
}); | ||
|
||
test("should clear the local cache after 5s", async () => { | ||
vi.useFakeTimers(); | ||
const msg = generateMessage(); | ||
const queue = queueCache(mockedQueue, {}); | ||
await queue.send(msg); | ||
expect(queue.localCache.size).toBe(1); | ||
expect(queue.localCache.has(`queue/test/test`)).toBe(true); | ||
|
||
vi.advanceTimersByTime(5001); | ||
const alteredMsg = generateMessage(); | ||
alteredMsg.MessageGroupId = "test2"; | ||
await queue.send(alteredMsg); | ||
expect(queue.localCache.size).toBe(1); | ||
console.log(queue.localCache); | ||
expect(queue.localCache.has(`queue/test2/test`)).toBe(true); | ||
expect(queue.localCache.has(`queue/test/test`)).toBe(false); | ||
vi.useRealTimers(); | ||
}); | ||
|
||
test("should use the regional cache if not in local cache", async () => { | ||
const msg = generateMessage(); | ||
const queue = queueCache(mockedQueue, {}); | ||
await queue.send(msg); | ||
|
||
expect(mockedMatch).toHaveBeenCalledTimes(1); | ||
expect(mockedPut).toHaveBeenCalledTimes(1); | ||
expect(queue.localCache.size).toBe(1); | ||
expect(queue.localCache.has(`queue/test/test`)).toBe(true); | ||
// We need to delete the local cache to test the regional cache | ||
queue.localCache.delete(`queue/test/test`); | ||
|
||
const spiedHas = vi.spyOn(queue.localCache, "has"); | ||
await queue.send(msg); | ||
expect(spiedHas).toHaveBeenCalled(); | ||
expect(mockedMatch).toHaveBeenCalledTimes(2); | ||
}); | ||
|
||
test("should return early if the message is in the regional cache", async () => { | ||
const msg = generateMessage(); | ||
const queue = queueCache(mockedQueue, {}); | ||
|
||
mockedMatch.mockReturnValueOnce(new Response(null, { status: 200 })); | ||
|
||
const spiedSend = mockedQueue.send; | ||
await queue.send(msg); | ||
expect(spiedSend).not.toHaveBeenCalled(); | ||
}); | ||
}); |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,120 @@ | ||||||||||||||||||||||||||||||||
import { error } from "@opennextjs/aws/adapters/logger.js"; | ||||||||||||||||||||||||||||||||
import type { Queue, QueueMessage } from "@opennextjs/aws/types/overrides"; | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
interface QueueCachingOptions { | ||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||
* The TTL for the regional cache in seconds. | ||||||||||||||||||||||||||||||||
* @default 5 | ||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||
regionalCacheTtlSec?: number; | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||
* Whether to wait for the queue ack before returning. | ||||||||||||||||||||||||||||||||
* When set to false, the cache will be populated asap and the queue will be called after. | ||||||||||||||||||||||||||||||||
* When set to true, the cache will be populated only after the queue ack is received. | ||||||||||||||||||||||||||||||||
* @default false | ||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||
waitForQueueAck?: boolean; | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
const DEFAULT_QUEUE_CACHE_TTL_SEC = 5; | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
class QueueCache implements Queue { | ||||||||||||||||||||||||||||||||
readonly name; | ||||||||||||||||||||||||||||||||
readonly regionalCacheTtlSec: number; | ||||||||||||||||||||||||||||||||
readonly waitForQueueAck: boolean; | ||||||||||||||||||||||||||||||||
cache: Cache | undefined; | ||||||||||||||||||||||||||||||||
localCache: Map<string, number> = new Map(); | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
constructor( | ||||||||||||||||||||||||||||||||
private originalQueue: Queue, | ||||||||||||||||||||||||||||||||
options: QueueCachingOptions | ||||||||||||||||||||||||||||||||
) { | ||||||||||||||||||||||||||||||||
this.name = `cached-${originalQueue.name}`; | ||||||||||||||||||||||||||||||||
this.regionalCacheTtlSec = options.regionalCacheTtlSec ?? DEFAULT_QUEUE_CACHE_TTL_SEC; | ||||||||||||||||||||||||||||||||
this.waitForQueueAck = options.waitForQueueAck ?? false; | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
async send(msg: QueueMessage) { | ||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||
const isCached = await this.isInCache(msg); | ||||||||||||||||||||||||||||||||
if (isCached) { | ||||||||||||||||||||||||||||||||
return; | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
if (!this.waitForQueueAck) { | ||||||||||||||||||||||||||||||||
await this.putToCache(msg); | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
await this.originalQueue.send(msg); | ||||||||||||||||||||||||||||||||
if (this.waitForQueueAck) { | ||||||||||||||||||||||||||||||||
await this.putToCache(msg); | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
Comment on lines
+44
to
+51
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: I think the below version more readable, but up to you
Suggested change
|
||||||||||||||||||||||||||||||||
} catch (e) { | ||||||||||||||||||||||||||||||||
error("Error sending message to queue", e); | ||||||||||||||||||||||||||||||||
} finally { | ||||||||||||||||||||||||||||||||
this.clearLocalCache(); | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
private async getCache() { | ||||||||||||||||||||||||||||||||
if (!this.cache) { | ||||||||||||||||||||||||||||||||
this.cache = await caches.open("durable-queue"); | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
return this.cache; | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
private getCacheUrlString(msg: QueueMessage) { | ||||||||||||||||||||||||||||||||
return `queue/${msg.MessageGroupId}/${msg.MessageDeduplicationId}`; | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
Comment on lines
+66
to
+68
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you please change the return type to String to be consistent with other overiddes (i.e. regional-cache) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I meant to comment on the next method |
||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
private getCacheKey(msg: QueueMessage) { | ||||||||||||||||||||||||||||||||
return new Request(new URL(this.getCacheUrlString(msg), "http://local.cache")); | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
private async putToCache(msg: QueueMessage) { | ||||||||||||||||||||||||||||||||
this.localCache.set(this.getCacheUrlString(msg), Date.now()); | ||||||||||||||||||||||||||||||||
const cacheKey = this.getCacheKey(msg); | ||||||||||||||||||||||||||||||||
const cache = await this.getCache(); | ||||||||||||||||||||||||||||||||
await cache.put( | ||||||||||||||||||||||||||||||||
cacheKey, | ||||||||||||||||||||||||||||||||
new Response(null, { | ||||||||||||||||||||||||||||||||
status: 200, | ||||||||||||||||||||||||||||||||
headers: { | ||||||||||||||||||||||||||||||||
"Cache-Control": `max-age=${this.regionalCacheTtlSec}`, | ||||||||||||||||||||||||||||||||
"Cache-Tag": `_N_T_/${msg.MessageBody.url}`, | ||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could please you add a comment here to explain the tag value? |
||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
private async isInCache(msg: QueueMessage) { | ||||||||||||||||||||||||||||||||
if (this.localCache.has(this.getCacheUrlString(msg))) { | ||||||||||||||||||||||||||||||||
const insertedAt = this.localCache.get(this.getCacheUrlString(msg))!; | ||||||||||||||||||||||||||||||||
if (Date.now() - insertedAt < this.regionalCacheTtlSec * 1000) { | ||||||||||||||||||||||||||||||||
return true; | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
this.localCache.delete(this.getCacheUrlString(msg)); | ||||||||||||||||||||||||||||||||
return false; | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
const cacheKey = this.getCacheKey(msg); | ||||||||||||||||||||||||||||||||
const cache = await this.getCache(); | ||||||||||||||||||||||||||||||||
const cachedResponse = await cache.match(cacheKey); | ||||||||||||||||||||||||||||||||
if (cachedResponse) { | ||||||||||||||||||||||||||||||||
return true; | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||
* Remove any value older than the TTL from the local cache | ||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||
private clearLocalCache() { | ||||||||||||||||||||||||||||||||
const now = Date.now(); | ||||||||||||||||||||||||||||||||
for (const [key, value] of this.localCache.entries()) { | ||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nits:
|
||||||||||||||||||||||||||||||||
if (now - value > this.regionalCacheTtlSec * 1000) { | ||||||||||||||||||||||||||||||||
this.localCache.delete(key); | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
export default (originalQueue: Queue, opts: QueueCachingOptions = {}) => new QueueCache(originalQueue, opts); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you please add a comment that this is mapping key to insertAtSec