Skip to content

Commit 27e7116

Browse files
authored
refactor(sdk): Improve StreamRegistry cache invalidation (#2874)
## Summary Now there are two separate cache invalidations in `StreamRegistry`: one for metadata and another for permissions. ## Changes Previously the `invalidateStreamCache()` invalidated both caches. Now there are separate `invalidatePermissionCaches()` and `invalidateMetadataCache()` methods. Also changed log level from `debug` to `trace`. ## Future improvements - If there are multiple instances of `Stream` classes for same `streamId`, and `setMetadata()` is called for some of the instances, the metadata is not updated to other instances. - Maybe `isStreamPublisher` and `isStreamSubscriber` caches could be positive caches, i.e. cache the value only if result is `true`? We invalidate cache manually when we find out that an operation fails because of missing permission (see `MessageFactory#69` and `messagePipeline#60`). Those are the only places outside of `StreamRegistry` manual where cache invalidation is done. Therefore we could encapsulate the caching fully into `StreamRegistry` and also simplify cache key handing by doing this refactoring. - The `invalidatePermissionCaches` should invalidate also the `hasPublicSubscribePermission` cache? (TODO comment in `StreamRegistry`)
1 parent 5fcd20b commit 27e7116

File tree

8 files changed

+36
-34
lines changed

8 files changed

+36
-34
lines changed

packages/sdk/src/contracts/StreamRegistry.ts

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,9 @@ const streamContractErrorProcessor = (err: any, streamId: StreamID, registry: st
104104
}
105105
}
106106

107-
const CACHE_KEY_SEPARATOR = '|'
107+
const formCacheKeyPrefix = (streamId: StreamID): string => {
108+
return `${streamId}|`
109+
}
108110

109111
@scoped(Lifecycle.ContainerScoped)
110112
export class StreamRegistry {
@@ -167,25 +169,19 @@ export class StreamRegistry {
167169
return this.getStreamMetadata_nonCached(streamId)
168170
}, {
169171
...config.cache,
170-
cacheKey: ([streamId]): string => {
171-
return `${streamId}${CACHE_KEY_SEPARATOR}`
172-
}
172+
cacheKey: ([streamId]) => formCacheKeyPrefix(streamId)
173173
})
174174
this.isStreamPublisher_cached = new CachingMap((streamId: StreamID, userId: UserID) => {
175175
return this.isStreamPublisher(streamId, userId, false)
176176
}, {
177177
...config.cache,
178-
cacheKey([streamId, userId]): string {
179-
return [streamId, userId].join(CACHE_KEY_SEPARATOR)
180-
}
178+
cacheKey: ([streamId, userId]) =>`${formCacheKeyPrefix(streamId)}${userId}`
181179
})
182180
this.isStreamSubscriber_cached = new CachingMap((streamId: StreamID, userId: UserID) => {
183181
return this.isStreamSubscriber(streamId, userId, false)
184182
}, {
185183
...config.cache,
186-
cacheKey([streamId, userId]): string {
187-
return [streamId, userId].join(CACHE_KEY_SEPARATOR)
188-
}
184+
cacheKey: ([streamId, userId]) =>`${formCacheKeyPrefix(streamId)}${userId}`
189185
})
190186
this.hasPublicSubscribePermission_cached = new CachingMap((streamId: StreamID) => {
191187
return this.hasPermission({
@@ -195,9 +191,7 @@ export class StreamRegistry {
195191
})
196192
}, {
197193
...config.cache,
198-
cacheKey([streamId]): string {
199-
return ['PublicSubscribe', streamId].join(CACHE_KEY_SEPARATOR)
200-
}
194+
cacheKey: ([streamId]) => formCacheKeyPrefix(streamId)
201195
})
202196
}
203197

@@ -264,7 +258,7 @@ export class StreamRegistry {
264258
JSON.stringify(metadata),
265259
ethersOverrides
266260
))
267-
this.invalidateStreamCache(streamId)
261+
this.invalidateMetadataCache(streamId)
268262
}
269263

270264
async deleteStream(streamIdOrPath: string): Promise<void> {
@@ -275,7 +269,8 @@ export class StreamRegistry {
275269
streamId,
276270
ethersOverrides
277271
))
278-
this.invalidateStreamCache(streamId)
272+
this.invalidateMetadataCache(streamId)
273+
this.invalidatePermissionCaches(streamId)
279274
}
280275

281276
private async streamExistsOnChain(streamIdOrPath: string): Promise<boolean> {
@@ -462,7 +457,7 @@ export class StreamRegistry {
462457
...assignments: InternalPermissionAssignment[]
463458
): Promise<void> {
464459
const streamId = await this.streamIdBuilder.toStreamID(streamIdOrPath)
465-
this.invalidateStreamCache(streamId)
460+
this.invalidatePermissionCaches(streamId)
466461
await this.connectToContract()
467462
for (const assignment of assignments) {
468463
for (const permission of assignment.permissions) {
@@ -484,7 +479,7 @@ export class StreamRegistry {
484479
for (const item of items) {
485480
validatePermissionAssignments(item.assignments)
486481
const streamId = await this.streamIdBuilder.toStreamID(item.streamId)
487-
this.invalidateStreamCache(streamId)
482+
this.invalidatePermissionCaches(streamId)
488483
streamIds.push(streamId)
489484
targets.push(item.assignments.map((assignment) => {
490485
return isPublicPermissionAssignment(assignment) ? PUBLIC_PERMISSION_USER_ID : assignment.userId
@@ -543,13 +538,15 @@ export class StreamRegistry {
543538
hasPublicSubscribePermission(streamId: StreamID): Promise<boolean> {
544539
return this.hasPublicSubscribePermission_cached.get(streamId)
545540
}
541+
542+
invalidateMetadataCache(streamId: StreamID): void {
543+
this.logger.trace('Clear metadata cache for stream', { streamId })
544+
this.getStreamMetadata_cached.invalidate((s) => s.startsWith(formCacheKeyPrefix(streamId)))
545+
}
546546

547-
invalidateStreamCache(streamId: StreamID): void {
548-
this.logger.debug('Clear caches matching stream', { streamId })
549-
// include separator so startsWith(streamid) doesn't match streamid-something
550-
const target = `${streamId}${CACHE_KEY_SEPARATOR}`
551-
const matchTarget = (s: string) => s.startsWith(target)
552-
this.getStreamMetadata_cached.invalidate(matchTarget)
547+
invalidatePermissionCaches(streamId: StreamID): void {
548+
this.logger.trace('Clear permission caches for stream', { streamId })
549+
const matchTarget = (s: string) => s.startsWith(formCacheKeyPrefix(streamId))
553550
this.isStreamPublisher_cached.invalidate(matchTarget)
554551
this.isStreamSubscriber_cached.invalidate(matchTarget)
555552
// TODO should also clear cache for hasPublicSubscribePermission?

packages/sdk/src/publish/MessageFactory.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import { createMessageRef, createRandomMsgChainId } from './messageChain'
2626
export interface MessageFactoryOptions {
2727
streamId: StreamID
2828
authentication: Authentication
29-
streamRegistry: Pick<StreamRegistry, 'getStreamMetadata' | 'hasPublicSubscribePermission' | 'isStreamPublisher' | 'invalidateStreamCache'>
29+
streamRegistry: Pick<StreamRegistry, 'getStreamMetadata' | 'hasPublicSubscribePermission' | 'isStreamPublisher' | 'invalidatePermissionCaches'>
3030
groupKeyQueue: GroupKeyQueue
3131
signatureValidator: SignatureValidator
3232
messageSigner: MessageSigner
@@ -40,7 +40,7 @@ export class MessageFactory {
4040
private readonly defaultMessageChainIds: Mapping<[partition: number], string>
4141
private readonly prevMsgRefs: Map<string, MessageRef> = new Map()
4242
// eslint-disable-next-line max-len
43-
private readonly streamRegistry: Pick<StreamRegistry, 'getStreamMetadata' | 'hasPublicSubscribePermission' | 'isStreamPublisher' | 'invalidateStreamCache'>
43+
private readonly streamRegistry: Pick<StreamRegistry, 'getStreamMetadata' | 'hasPublicSubscribePermission' | 'isStreamPublisher' | 'invalidatePermissionCaches'>
4444
private readonly groupKeyQueue: GroupKeyQueue
4545
private readonly signatureValidator: SignatureValidator
4646
private readonly messageSigner: MessageSigner
@@ -66,7 +66,7 @@ export class MessageFactory {
6666
const publisherId = await this.getPublisherId(metadata)
6767
const isPublisher = await this.streamRegistry.isStreamPublisher(this.streamId, publisherId)
6868
if (!isPublisher) {
69-
this.streamRegistry.invalidateStreamCache(this.streamId)
69+
this.streamRegistry.invalidatePermissionCaches(this.streamId)
7070
throw new StreamrClientError(`You don't have permission to publish to this stream. Using address: ${publisherId}`, 'MISSING_PERMISSION')
7171
}
7272

packages/sdk/src/subscribe/messagePipeline.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ export const createMessagePipeline = (opts: MessagePipelineOptions): PushPipelin
5757
// TODO log this in onError? if we want to log all errors?
5858
logger.debug('Failed to decrypt', { messageId: msg.messageId, err })
5959
// clear cached permissions if cannot decrypt, likely permissions need updating
60-
opts.streamRegistry.invalidateStreamCache(msg.getStreamId())
60+
opts.streamRegistry.invalidatePermissionCaches(msg.getStreamId())
6161
throw err
6262
}
6363
} else {

packages/sdk/src/utils/addStreamToStorageNode.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ export const addStreamToStorageNode = async (
5252
'storage node did not respond'
5353
)
5454
} finally {
55-
streamRegistry.invalidateStreamCache(streamId)
55+
streamRegistry.invalidatePermissionCaches(streamId)
5656
await assignmentSubscription?.unsubscribe() // should never reject...
5757
}
5858
} else {

packages/sdk/test/test-utils/fake/FakeStreamRegistry.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,9 +166,14 @@ export class FakeStreamRegistry implements Methods<StreamRegistry> {
166166
permission: StreamPermission.SUBSCRIBE
167167
})
168168
}
169+
170+
// eslint-disable-next-line class-methods-use-this
171+
invalidateMetadataCache(): void {
172+
// no-op
173+
}
169174

170175
// eslint-disable-next-line class-methods-use-this
171-
invalidateStreamCache(): void {
176+
invalidatePermissionCaches(): void {
172177
// no-op
173178
}
174179

packages/sdk/test/test-utils/utils.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ export const createStreamRegistry = (opts?: {
201201
isStreamSubscriber: async () => {
202202
return opts?.isStreamSubscriber ?? true
203203
},
204-
invalidateStreamCache: () => {}
204+
invalidatePermissionCaches: () => {}
205205
} as any
206206
}
207207

packages/sdk/test/unit/Publisher.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ describe('Publisher', () => {
1313
const streamIdBuilder = new StreamIDBuilder(authentication)
1414
const streamRegistry = {
1515
isStreamPublisher: async () => false,
16-
invalidateStreamCache: () => {}
16+
invalidatePermissionCaches: () => {}
1717
}
1818
const publisher = new Publisher(
1919
undefined as any,

packages/sdk/test/unit/messagePipeline.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ describe('messagePipeline', () => {
7777
streamRegistry = {
7878
getStreamMetadata: async () => ({ partitions: 1 }),
7979
isStreamPublisher: async () => true,
80-
invalidateStreamCache: jest.fn()
80+
invalidatePermissionCaches: jest.fn()
8181
}
8282
pipeline = createMessagePipeline({
8383
streamPartId,
@@ -171,8 +171,8 @@ describe('messagePipeline', () => {
171171
expect(error).toBeInstanceOf(DecryptError)
172172
expect(error.message).toMatch(/timed out/)
173173
expect(output).toEqual([])
174-
expect(streamRegistry.invalidateStreamCache).toBeCalledTimes(1)
175-
expect(streamRegistry.invalidateStreamCache).toBeCalledWith(StreamPartIDUtils.getStreamID(streamPartId))
174+
expect(streamRegistry.invalidatePermissionCaches).toBeCalledTimes(1)
175+
expect(streamRegistry.invalidatePermissionCaches).toBeCalledWith(StreamPartIDUtils.getStreamID(streamPartId))
176176
})
177177

178178
it('error: exception', async () => {

0 commit comments

Comments
 (0)