Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 0 additions & 52 deletions app/common/src/queryClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,8 @@ import type { AsyncStorage, StoragePersisterOptions } from '@tanstack/query-pers
import { experimental_createPersister as createPersister } from '@tanstack/query-persist-client-core'
import * as vueQuery from '@tanstack/vue-query'
import { toRaw } from 'vue'
import { ConditionVariable } from './utilities/ConditionVariable'
import { useCallbackRegistry } from './utilities/data/callbacks'
import { cloneDeepUnref } from './utilities/data/reactive'

/** An enumeration of all mutation pool ids. */
export interface MutationPools {
// Required otherwise in this module there are no keys, and `pools[poolMeta.id]` below becomes
// `never`.
readonly [DUMMY_MUTATION_POOL_SYMBOL]: true
}
declare const DUMMY_MUTATION_POOL_SYMBOL: unique symbol

/**
* Declaration merge into `MutationPools` to add a new mutation pool id:
*
* ```ts
* declare module 'enso-common/src/queryClient' {
* interface MutationPools {
* myNewPoolId: true
* }
* }
* ```
*/
export type MutationPoolId = keyof MutationPools

declare module '@tanstack/query-core' {
/** Query client with additional methods. */
interface QueryClient {
Expand Down Expand Up @@ -65,10 +42,6 @@ declare module '@tanstack/query-core' {
*/
readonly awaitInvalidates?: queryCore.QueryKey[] | boolean
readonly refetchType?: queryCore.InvalidateQueryFilters['refetchType']
readonly pool?: {
id: MutationPoolId
parallelism: number
}
}

readonly queryMeta: {
Expand Down Expand Up @@ -172,30 +145,6 @@ function useMutationCache(): {
}
}

function useConcurrencyControl({ onMutate, onSettled }: MutationHooks) {
const pools: Partial<Record<MutationPoolId, { usedLanes: number; queue: ConditionVariable }>> = {}

onMutate(async (_variables, mutation) => {
const poolMeta = mutation.meta?.pool
if (!poolMeta) {
return
}
const poolInfo = (pools[poolMeta.id] ??= { usedLanes: 0, queue: new ConditionVariable() })
if (poolInfo.usedLanes >= poolMeta.parallelism) {
await poolInfo.queue.wait()
}
poolInfo.usedLanes += 1
})
onSettled(async (_data, _error, _variables, _context, mutation) => {
const poolMeta = mutation.meta?.pool
if (poolMeta) {
const poolInfo = (pools[poolMeta.id] ??= { usedLanes: 1, queue: new ConditionVariable() })
poolInfo.usedLanes -= 1
while (poolInfo.usedLanes < poolMeta.parallelism && (await poolInfo.queue.notifyOne()));
}
})
}

declare const brandRaw: unique symbol
/**
* A value that is known not to be a reactive proxy; this marker type can be used to ensure
Expand Down Expand Up @@ -309,7 +258,6 @@ export function createQueryClient<TStorageValue = string>(
},
},
})
useConcurrencyControl(mutationHooks)
useInvalidation({ mutationHooks, queryHooks, queryClient })

Object.defineProperty(queryClient, 'nukePersister', {
Expand Down
17 changes: 13 additions & 4 deletions app/common/src/services/Backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1960,11 +1960,20 @@ export default abstract class Backend {
abstract uploadFileStart(
params: UploadFileRequestParams,
file: File,
abort?: AbortSignal,
): Promise<UploadLargeFileMetadata>
/** Upload a chunk of a large file. */
abstract uploadFileChunk(url: HttpsUrl, file: Blob, index: number): Promise<S3MultipartPart>
abstract uploadFileChunk(
url: HttpsUrl,
file: Blob,
index: number,
abort?: AbortSignal,
): Promise<{ part: S3MultipartPart; size: number }>
/** Finish uploading a large file. */
abstract uploadFileEnd(body: UploadFileEndRequestBody): Promise<UploadedAsset>
abstract uploadFileEnd(
body: UploadFileEndRequestBody,
abort?: AbortSignal,
): Promise<UploadedAsset>
/** Change the name of a file. */
abstract updateFile(fileId: FileId, body: UpdateFileRequestBody, title: string): Promise<void>

Expand Down Expand Up @@ -2098,9 +2107,9 @@ export default abstract class Backend {
}

/** Send a binary HTTP POST request to the given path. */
protected postBinary<T = void>(path: string, payload: Blob) {
protected postBinary<T = void>(path: string, payload: Blob, options?: HttpClientPostOptions) {
return this.checkForAuthenticationError(() =>
this.client.postBinary<T>(this.resolvePath(path), payload),
this.client.postBinary<T>(this.resolvePath(path), payload, options),
)
}

Expand Down
6 changes: 5 additions & 1 deletion app/common/src/services/HttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export interface ResponseWithTypedJson<U> extends Response {
/** Options for {@link HttpClient['post']} method. */
export interface HttpClientPostOptions {
readonly keepalive?: boolean
readonly abort?: AbortSignal | undefined
}

/** Options for {@link HttpClient.request} private method. */
Expand Down Expand Up @@ -54,16 +55,19 @@ export class HttpClient {
payload: JSON.stringify(payload),
mimetype: 'application/json',
keepalive: options?.keepalive ?? false,
abort: options?.abort,
})
}

/** Send a base64-encoded binary HTTP POST request to the specified URL. */
async postBinary<T = void>(url: string, payload: Blob) {
async postBinary<T = void>(url: string, payload: Blob, options?: HttpClientPostOptions) {
return await this.request<'POST', T>({
method: 'POST',
url,
payload,
mimetype: 'application/octet-stream',
keepalive: options?.keepalive ?? false,
abort: options?.abort,
})
}

Expand Down
8 changes: 4 additions & 4 deletions app/common/src/utilities/ConditionVariable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,20 @@ export class ConditionVariable {
}

/** Resolve all promises in the queue. */
notifyAll(): Promise<boolean> {
notifyAll(): boolean {
const success = this.resolveQueue.length !== 0
for (const resolve of this.resolveQueue.splice(0, this.resolveQueue.length)) {
resolve()
}
// Give the code after the resolved promises time to execute.
return Promise.resolve(success)
return success
}

/** Resolve a single promise in the queue. */
notifyOne(): Promise<boolean> {
notifyOne(): boolean {
const resolve = this.resolveQueue.shift()
resolve?.()
// Give the code after the resolved promise time to execute.
return Promise.resolve(resolve != null)
return resolve != null
}
}
Loading
Loading