Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 20 additions & 23 deletions packages/sdk/src/contracts/StreamRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ const streamContractErrorProcessor = (err: any, streamId: StreamID, registry: st
}
}

const CACHE_KEY_SEPARATOR = '|'
const formCacheKeyPrefix = (streamId: StreamID): string => {
return `${streamId}|`
}

@scoped(Lifecycle.ContainerScoped)
export class StreamRegistry {
Expand Down Expand Up @@ -167,25 +169,19 @@ export class StreamRegistry {
return this.getStreamMetadata_nonCached(streamId)
}, {
...config.cache,
cacheKey: ([streamId]): string => {
return `${streamId}${CACHE_KEY_SEPARATOR}`
}
cacheKey: ([streamId]) => formCacheKeyPrefix(streamId)
})
this.isStreamPublisher_cached = new CachingMap((streamId: StreamID, userId: UserID) => {
return this.isStreamPublisher(streamId, userId, false)
}, {
...config.cache,
cacheKey([streamId, userId]): string {
return [streamId, userId].join(CACHE_KEY_SEPARATOR)
}
cacheKey: ([streamId, userId]) =>`${formCacheKeyPrefix(streamId)}${userId}`
})
this.isStreamSubscriber_cached = new CachingMap((streamId: StreamID, userId: UserID) => {
return this.isStreamSubscriber(streamId, userId, false)
}, {
...config.cache,
cacheKey([streamId, userId]): string {
return [streamId, userId].join(CACHE_KEY_SEPARATOR)
}
cacheKey: ([streamId, userId]) =>`${formCacheKeyPrefix(streamId)}${userId}`
})
this.hasPublicSubscribePermission_cached = new CachingMap((streamId: StreamID) => {
return this.hasPermission({
Expand All @@ -195,9 +191,7 @@ export class StreamRegistry {
})
}, {
...config.cache,
cacheKey([streamId]): string {
return ['PublicSubscribe', streamId].join(CACHE_KEY_SEPARATOR)
}
cacheKey: ([streamId]) => formCacheKeyPrefix(streamId)
})
}

Expand Down Expand Up @@ -264,7 +258,7 @@ export class StreamRegistry {
JSON.stringify(metadata),
ethersOverrides
))
this.invalidateStreamCache(streamId)
this.invalidateMetadataCache(streamId)
}

async deleteStream(streamIdOrPath: string): Promise<void> {
Expand All @@ -275,7 +269,8 @@ export class StreamRegistry {
streamId,
ethersOverrides
))
this.invalidateStreamCache(streamId)
this.invalidateMetadataCache(streamId)
this.invalidatePermissionCaches(streamId)
}

private async streamExistsOnChain(streamIdOrPath: string): Promise<boolean> {
Expand Down Expand Up @@ -462,7 +457,7 @@ export class StreamRegistry {
...assignments: InternalPermissionAssignment[]
): Promise<void> {
const streamId = await this.streamIdBuilder.toStreamID(streamIdOrPath)
this.invalidateStreamCache(streamId)
this.invalidatePermissionCaches(streamId)
await this.connectToContract()
for (const assignment of assignments) {
for (const permission of assignment.permissions) {
Expand All @@ -484,7 +479,7 @@ export class StreamRegistry {
for (const item of items) {
validatePermissionAssignments(item.assignments)
const streamId = await this.streamIdBuilder.toStreamID(item.streamId)
this.invalidateStreamCache(streamId)
this.invalidatePermissionCaches(streamId)
streamIds.push(streamId)
targets.push(item.assignments.map((assignment) => {
return isPublicPermissionAssignment(assignment) ? PUBLIC_PERMISSION_USER_ID : assignment.userId
Expand Down Expand Up @@ -543,13 +538,15 @@ export class StreamRegistry {
hasPublicSubscribePermission(streamId: StreamID): Promise<boolean> {
return this.hasPublicSubscribePermission_cached.get(streamId)
}

invalidateMetadataCache(streamId: StreamID): void {
this.logger.trace('Clear metadata cache for stream', { streamId })
this.getStreamMetadata_cached.invalidate((s) => s.startsWith(formCacheKeyPrefix(streamId)))
}

invalidateStreamCache(streamId: StreamID): void {
this.logger.debug('Clear caches matching stream', { streamId })
// include separator so startsWith(streamid) doesn't match streamid-something
const target = `${streamId}${CACHE_KEY_SEPARATOR}`
const matchTarget = (s: string) => s.startsWith(target)
this.getStreamMetadata_cached.invalidate(matchTarget)
invalidatePermissionCaches(streamId: StreamID): void {
this.logger.trace('Clear permission caches for stream', { streamId })
const matchTarget = (s: string) => s.startsWith(formCacheKeyPrefix(streamId))
this.isStreamPublisher_cached.invalidate(matchTarget)
this.isStreamSubscriber_cached.invalidate(matchTarget)
// TODO should also clear cache for hasPublicSubscribePermission?
Expand Down
6 changes: 3 additions & 3 deletions packages/sdk/src/publish/MessageFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import { createMessageRef, createRandomMsgChainId } from './messageChain'
export interface MessageFactoryOptions {
streamId: StreamID
authentication: Authentication
streamRegistry: Pick<StreamRegistry, 'getStreamMetadata' | 'hasPublicSubscribePermission' | 'isStreamPublisher' | 'invalidateStreamCache'>
streamRegistry: Pick<StreamRegistry, 'getStreamMetadata' | 'hasPublicSubscribePermission' | 'isStreamPublisher' | 'invalidatePermissionCaches'>
groupKeyQueue: GroupKeyQueue
signatureValidator: SignatureValidator
messageSigner: MessageSigner
Expand All @@ -40,7 +40,7 @@ export class MessageFactory {
private readonly defaultMessageChainIds: Mapping<[partition: number], string>
private readonly prevMsgRefs: Map<string, MessageRef> = new Map()
// eslint-disable-next-line max-len
private readonly streamRegistry: Pick<StreamRegistry, 'getStreamMetadata' | 'hasPublicSubscribePermission' | 'isStreamPublisher' | 'invalidateStreamCache'>
private readonly streamRegistry: Pick<StreamRegistry, 'getStreamMetadata' | 'hasPublicSubscribePermission' | 'isStreamPublisher' | 'invalidatePermissionCaches'>
private readonly groupKeyQueue: GroupKeyQueue
private readonly signatureValidator: SignatureValidator
private readonly messageSigner: MessageSigner
Expand All @@ -66,7 +66,7 @@ export class MessageFactory {
const publisherId = await this.getPublisherId(metadata)
const isPublisher = await this.streamRegistry.isStreamPublisher(this.streamId, publisherId)
if (!isPublisher) {
this.streamRegistry.invalidateStreamCache(this.streamId)
this.streamRegistry.invalidatePermissionCaches(this.streamId)
throw new StreamrClientError(`You don't have permission to publish to this stream. Using address: ${publisherId}`, 'MISSING_PERMISSION')
}

Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/src/subscribe/messagePipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export const createMessagePipeline = (opts: MessagePipelineOptions): PushPipelin
// TODO log this in onError? if we want to log all errors?
logger.debug('Failed to decrypt', { messageId: msg.messageId, err })
// clear cached permissions if cannot decrypt, likely permissions need updating
opts.streamRegistry.invalidateStreamCache(msg.getStreamId())
opts.streamRegistry.invalidatePermissionCaches(msg.getStreamId())
throw err
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/src/utils/addStreamToStorageNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ export const addStreamToStorageNode = async (
'storage node did not respond'
)
} finally {
streamRegistry.invalidateStreamCache(streamId)
streamRegistry.invalidatePermissionCaches(streamId)
await assignmentSubscription?.unsubscribe() // should never reject...
}
} else {
Expand Down
7 changes: 6 additions & 1 deletion packages/sdk/test/test-utils/fake/FakeStreamRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,14 @@ export class FakeStreamRegistry implements Methods<StreamRegistry> {
permission: StreamPermission.SUBSCRIBE
})
}

// eslint-disable-next-line class-methods-use-this
invalidateMetadataCache(): void {
// no-op
}

// eslint-disable-next-line class-methods-use-this
invalidateStreamCache(): void {
invalidatePermissionCaches(): void {
// no-op
}

Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/test/test-utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ export const createStreamRegistry = (opts?: {
isStreamSubscriber: async () => {
return opts?.isStreamSubscriber ?? true
},
invalidateStreamCache: () => {}
invalidatePermissionCaches: () => {}
} as any
}

Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/test/unit/Publisher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ describe('Publisher', () => {
const streamIdBuilder = new StreamIDBuilder(authentication)
const streamRegistry = {
isStreamPublisher: async () => false,
invalidateStreamCache: () => {}
invalidatePermissionCaches: () => {}
}
const publisher = new Publisher(
undefined as any,
Expand Down
6 changes: 3 additions & 3 deletions packages/sdk/test/unit/messagePipeline.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ describe('messagePipeline', () => {
streamRegistry = {
getStreamMetadata: async () => ({ partitions: 1 }),
isStreamPublisher: async () => true,
invalidateStreamCache: jest.fn()
invalidatePermissionCaches: jest.fn()
}
pipeline = createMessagePipeline({
streamPartId,
Expand Down Expand Up @@ -171,8 +171,8 @@ describe('messagePipeline', () => {
expect(error).toBeInstanceOf(DecryptError)
expect(error.message).toMatch(/timed out/)
expect(output).toEqual([])
expect(streamRegistry.invalidateStreamCache).toBeCalledTimes(1)
expect(streamRegistry.invalidateStreamCache).toBeCalledWith(StreamPartIDUtils.getStreamID(streamPartId))
expect(streamRegistry.invalidatePermissionCaches).toBeCalledTimes(1)
expect(streamRegistry.invalidatePermissionCaches).toBeCalledWith(StreamPartIDUtils.getStreamID(streamPartId))
})

it('error: exception', async () => {
Expand Down
Loading