diff --git a/app/common/src/queryClient.ts b/app/common/src/queryClient.ts index 4a2b615f5e19..1fca28b65893 100644 --- a/app/common/src/queryClient.ts +++ b/app/common/src/queryClient.ts @@ -9,31 +9,8 @@ import type { AsyncStorage, StoragePersisterOptions } from '@tanstack/query-pers import { experimental_createPersister as createPersister } from '@tanstack/query-persist-client-core' import * as vueQuery from '@tanstack/vue-query' import { toRaw } from 'vue' -import { ConditionVariable } from './utilities/ConditionVariable' import { useCallbackRegistry } from './utilities/data/callbacks' import { cloneDeepUnref } from './utilities/data/reactive' - -/** An enumeration of all mutation pool ids. */ -export interface MutationPools { - // Required otherwise in this module there are no keys, and `pools[poolMeta.id]` below becomes - // `never`. - readonly [DUMMY_MUTATION_POOL_SYMBOL]: true -} -declare const DUMMY_MUTATION_POOL_SYMBOL: unique symbol - -/** - * Declaration merge into `MutationPools` to add a new mutation pool id: - * - * ```ts - * declare module 'enso-common/src/queryClient' { - * interface MutationPools { - * myNewPoolId: true - * } - * } - * ``` - */ -export type MutationPoolId = keyof MutationPools - declare module '@tanstack/query-core' { /** Query client with additional methods. */ interface QueryClient { @@ -65,10 +42,6 @@ declare module '@tanstack/query-core' { */ readonly awaitInvalidates?: queryCore.QueryKey[] | boolean readonly refetchType?: queryCore.InvalidateQueryFilters['refetchType'] - readonly pool?: { - id: MutationPoolId - parallelism: number - } } readonly queryMeta: { @@ -172,30 +145,6 @@ function useMutationCache(): { } } -function useConcurrencyControl({ onMutate, onSettled }: MutationHooks) { - const pools: Partial> = {} - - onMutate(async (_variables, mutation) => { - const poolMeta = mutation.meta?.pool - if (!poolMeta) { - return - } - const poolInfo = (pools[poolMeta.id] ??= { usedLanes: 0, queue: new ConditionVariable() }) - if (poolInfo.usedLanes >= poolMeta.parallelism) { - await poolInfo.queue.wait() - } - poolInfo.usedLanes += 1 - }) - onSettled(async (_data, _error, _variables, _context, mutation) => { - const poolMeta = mutation.meta?.pool - if (poolMeta) { - const poolInfo = (pools[poolMeta.id] ??= { usedLanes: 1, queue: new ConditionVariable() }) - poolInfo.usedLanes -= 1 - while (poolInfo.usedLanes < poolMeta.parallelism && (await poolInfo.queue.notifyOne())); - } - }) -} - declare const brandRaw: unique symbol /** * A value that is known not to be a reactive proxy; this marker type can be used to ensure @@ -309,7 +258,6 @@ export function createQueryClient( }, }, }) - useConcurrencyControl(mutationHooks) useInvalidation({ mutationHooks, queryHooks, queryClient }) Object.defineProperty(queryClient, 'nukePersister', { diff --git a/app/common/src/services/Backend.ts b/app/common/src/services/Backend.ts index 648e741d11d9..72ac44780dca 100644 --- a/app/common/src/services/Backend.ts +++ b/app/common/src/services/Backend.ts @@ -1959,11 +1959,20 @@ export default abstract class Backend { abstract uploadFileStart( params: UploadFileRequestParams, file: File, + abort?: AbortSignal, ): Promise /** Upload a chunk of a large file. */ - abstract uploadFileChunk(url: HttpsUrl, file: Blob, index: number): Promise + abstract uploadFileChunk( + url: HttpsUrl, + file: Blob, + index: number, + abort?: AbortSignal, + ): Promise<{ part: S3MultipartPart; size: number }> /** Finish uploading a large file. */ - abstract uploadFileEnd(body: UploadFileEndRequestBody): Promise + abstract uploadFileEnd( + body: UploadFileEndRequestBody, + abort?: AbortSignal, + ): Promise /** Change the name of a file. */ abstract updateFile(fileId: FileId, body: UpdateFileRequestBody, title: string): Promise @@ -2097,9 +2106,9 @@ export default abstract class Backend { } /** Send a binary HTTP POST request to the given path. */ - protected postBinary(path: string, payload: Blob) { + protected postBinary(path: string, payload: Blob, options?: HttpClientPostOptions) { return this.checkForAuthenticationError(() => - this.client.postBinary(this.resolvePath(path), payload), + this.client.postBinary(this.resolvePath(path), payload, options), ) } diff --git a/app/common/src/services/HttpClient.ts b/app/common/src/services/HttpClient.ts index f673c20457ec..28a4c9ac41e6 100644 --- a/app/common/src/services/HttpClient.ts +++ b/app/common/src/services/HttpClient.ts @@ -16,6 +16,7 @@ export interface ResponseWithTypedJson extends Response { /** Options for {@link HttpClient['post']} method. */ export interface HttpClientPostOptions { readonly keepalive?: boolean + readonly abort?: AbortSignal | undefined } /** Options for {@link HttpClient.request} private method. */ @@ -54,16 +55,19 @@ export class HttpClient { payload: JSON.stringify(payload), mimetype: 'application/json', keepalive: options?.keepalive ?? false, + abort: options?.abort, }) } /** Send a base64-encoded binary HTTP POST request to the specified URL. */ - async postBinary(url: string, payload: Blob) { + async postBinary(url: string, payload: Blob, options?: HttpClientPostOptions) { return await this.request<'POST', T>({ method: 'POST', url, payload, mimetype: 'application/octet-stream', + keepalive: options?.keepalive ?? false, + abort: options?.abort, }) } diff --git a/app/common/src/utilities/ConditionVariable.ts b/app/common/src/utilities/ConditionVariable.ts index 6252fdec08a9..f62cb0458ad8 100644 --- a/app/common/src/utilities/ConditionVariable.ts +++ b/app/common/src/utilities/ConditionVariable.ts @@ -16,20 +16,20 @@ export class ConditionVariable { } /** Resolve all promises in the queue. */ - notifyAll(): Promise { + notifyAll(): boolean { const success = this.resolveQueue.length !== 0 for (const resolve of this.resolveQueue.splice(0, this.resolveQueue.length)) { resolve() } // Give the code after the resolved promises time to execute. - return Promise.resolve(success) + return success } /** Resolve a single promise in the queue. */ - notifyOne(): Promise { + notifyOne(): boolean { const resolve = this.resolveQueue.shift() resolve?.() // Give the code after the resolved promise time to execute. - return Promise.resolve(resolve != null) + return resolve != null } } diff --git a/app/gui/src/dashboard/hooks/backendUploadFilesHooks.tsx b/app/gui/src/dashboard/hooks/backendUploadFilesHooks.tsx index 99cd43033fc3..466bbc239be1 100644 --- a/app/gui/src/dashboard/hooks/backendUploadFilesHooks.tsx +++ b/app/gui/src/dashboard/hooks/backendUploadFilesHooks.tsx @@ -15,24 +15,16 @@ import { import { resolveDuplications } from '#/modals/DuplicateAssetsModal' import { useSetSelectedAssets, type SelectedAssetInfo } from '#/providers/DriveProvider' import type LocalBackend from '#/services/LocalBackend' -import { noop } from '#/utilities/functions' -import { usePreventNavigation } from '#/utilities/preventNavigation' import { useMutationCallback } from '#/utilities/tanstackQuery' import { useBackends, useHttpClient, useText } from '$/providers/react' -import { useFeatureFlag } from '$/providers/react/featureFlags' -import { - queryOptions, - useQueryClient, - type QueryClient, - type QueryKey, - type UseMutationResult, -} from '@tanstack/react-query' +import { useUploadsToCloudStore } from '$/providers/react/upload' +import { useQueryClient } from '@tanstack/react-query' import { AssetType, + BackendType, escapeSpecialCharacters, extractProjectExtension, fileIsProject, - S3_CHUNK_SIZE_BYTES, stripProjectExtension, type AnyAsset, type AssetId, @@ -40,11 +32,8 @@ import { type DirectoryId, type FileId, type ProjectId, - type UploadedAsset, type UploadFileRequestParams, } from 'enso-common/src/services/Backend' -import { uniqueString } from 'enso-common/src/utilities/uniqueString' -import { useState } from 'react' import { toast } from 'react-toastify' import invariant from 'tiny-invariant' @@ -55,15 +44,38 @@ declare module 'enso-common/src/queryClient' { } } -/** The delay, in milliseconds, before query data for a file being uploaded is cleared. */ -const CLEAR_PROGRESS_DELAY_MS = 5_000 -const UPLOADING_FILES_QUERY_KEY = ['uploadingFiles'] satisfies QueryKey +/** + * Function for uploading files to Local Backend. It requires less hassle than multipart + * upload to Cloud. + */ +function useUploadLocally(backend: Backend) { + const localUploadFileStart = useMutationCallback( + backendMutationOptions(backend, 'uploadFileStart'), + ) + const uploadFileEnd = useMutationCallback(backendMutationOptions(backend, 'uploadFileEnd')) + + return async (file: File, params: UploadFileRequestParams) => { + const { uploadId, sourcePath } = await localUploadFileStart([params, file]) + return uploadFileEnd([ + { + uploadId, + sourcePath, + parts: [], + assetId: params.fileId, + ...params, + }, + ]) + } +} /** A function to upload files. */ export function useUploadFiles(backend: Backend, category: Category) { const ensureListDirectory = useEnsureListDirectory(backend, category) - const uploadFile = useUploadFile(backend) + const uploads = useUploadsToCloudStore() + const uploadLocally = useUploadLocally(backend) const setSelectedAssets = useSetSelectedAssets() + const uploadFile = + backend.type === BackendType.local ? uploadLocally : uploads.uploadFile.bind(uploads) return useEventCallback(async (filesToUpload: readonly File[], parentId: DirectoryId) => { const reversedFiles = Array.from(filesToUpload).reverse() @@ -95,10 +107,15 @@ export function useUploadFiles(backend: Backend, category: Category) { const { extension } = extractProjectExtension(file.name) title = escapeSpecialCharacters(stripProjectExtension(title)) - await uploadFile([ - { fileId, fileName: `${title}.${extension}`, parentDirectoryId: parentId }, + await uploadFile( file, - ]).then((result) => { + { + fileId, + fileName: `${title}.${extension}`, + parentDirectoryId: parentId, + }, + 'requestedByUser', + ).then((result) => { if (result.jobId != null) { return } @@ -113,21 +130,23 @@ export function useUploadFiles(backend: Backend, category: Category) { }) } else { title = escapeSpecialCharacters(title) - await uploadFile([{ fileId, fileName: title, parentDirectoryId: parentId }, file]).then( - (result) => { - if (result.jobId != null) { - return - } - addToSelection({ - type: AssetType.file, - // This is SAFE, because it is guarded behind `assetIsFile`. - // eslint-disable-next-line no-restricted-syntax - id: result.id as FileId, - parentId, - title, - }) - }, - ) + await uploadFile( + file, + { fileId, fileName: title, parentDirectoryId: parentId }, + 'requestedByUser', + ).then((result) => { + if (result.jobId != null) { + return + } + addToSelection({ + type: AssetType.file, + // This is SAFE, because it is guarded behind `assetIsFile`. + // eslint-disable-next-line no-restricted-syntax + id: result.id as FileId, + parentId, + title, + }) + }) } } @@ -169,98 +188,6 @@ export function useUploadFiles(backend: Backend, category: Category) { }) } -/** Upload progress for {@link useUploadFile}. */ -export interface UploadFileMutationProgress { - /** - * Whether this is the first progress update. - * Useful to determine whether to create a new toast or to update an existing toast. - */ - readonly event: 'begin' | 'chunk' | 'end' - readonly sentBytes: number - readonly totalBytes: number -} - -/** Options for {@link useUploadFile}. */ -export interface UploadFileMutationOptions { - /** Defaults to `true`. */ - readonly updateProgress?: boolean | undefined - /** - * Defaults to `3`. - * Controls the default value of {@link UploadFileMutationOptions['chunkRetries']} - * and {@link UploadFileMutationOptions['endRetries']}. - */ - readonly retries?: number | undefined - /** Defaults to {@link UploadFileMutationOptions['retries']}. */ - readonly chunkRetries?: number | undefined - /** Defaults to {@link UploadFileMutationOptions['retries']}. */ - readonly endRetries?: number | undefined - /** Called for all progress updates (`onBegin`, `onChunkSuccess` and `onSuccess`). */ - readonly onProgress?: ((progress: UploadFileMutationProgress) => void) | undefined - /** Called before any mutations are sent. */ - readonly onBegin?: ((progress: UploadFileMutationProgress) => void) | undefined - /** Called after each successful chunk upload mutation. */ - readonly onChunkSuccess?: ((progress: UploadFileMutationProgress) => void) | undefined - /** Called after the entire mutation succeeds. */ - readonly onSuccess?: ((progress: UploadFileMutationProgress) => void) | undefined - /** Called after any mutations fail. */ - readonly onError?: ((error: unknown) => void) | undefined - /** Called after `onSuccess` or `onError`, depending on whether the mutation succeeded. */ - readonly onSettled?: - | ((progress: UploadFileMutationProgress | null, error: unknown) => void) - | undefined -} - -/** The result of a {@link useUploadFile}. */ -export type UploadFileMutationResult = UseMutationResult< - UploadedAsset, - Error, - [body: UploadFileRequestParams, file: File], - unknown -> & { readonly sentBytes: number; readonly totalBytes: number } - -/** A key for an "uploading file" computed query. */ -export function uploadingFilesQueryKey() { - return UPLOADING_FILES_QUERY_KEY -} - -/** Options for an "uploading file" computed query. */ -export function uploadingFileQueryOptions() { - return queryOptions>({ - queryKey: uploadingFilesQueryKey(), - initialData: {}, - }) -} - -/** Set the progress of a file upload. */ -function setUploadingFileProgress( - queryClient: QueryClient, - id: string, - progress: UploadFileMutationProgress, -) { - queryClient.setQueryData>( - uploadingFilesQueryKey(), - (data) => ({ ...data, [id]: progress }), - ) -} - -/** Clear the progress of file uploads if all current file uploads are done. */ -function clearUploadingFileProgressIfDone(queryClient: QueryClient) { - queryClient.setQueryData>( - uploadingFilesQueryKey(), - (data) => { - if (!data) { - return - } - for (const [, progress] of Object.entries(data)) { - if (progress.event !== 'end') { - return - } - } - return {} - }, - ) -} - /** * Options for {@link useUploadFileToCloud}. */ @@ -351,7 +278,7 @@ export function useUploadFileToCloud() { const httpClient = useHttpClient() const toastAndLog = useToastAndLog() const { remoteBackend } = useBackends() - const uploadFile = useUploadFile(remoteBackend) + const uploads = useUploadsToCloudStore() const getSiblings = useGetSiblings() const { cloudCategories } = useCategoriesAPI() const cloudHomeCategory = cloudCategories.categories.find((category) => category.type === 'cloud') @@ -482,14 +409,15 @@ export function useUploadFileToCloud() { } })() - await uploadFile([ + await uploads.uploadFile( + fileData.file, { fileName: fileData.fileName, fileId: asset.cloudId ?? null, parentDirectoryId: targetDirectoryId, }, - fileData.file, - ]) + 'requestedByUser', + ) toast.success(getText('uploadProjectToCloudSuccess')) } catch (error) { @@ -503,106 +431,6 @@ export function useUploadFileToCloud() { return upload } -/** - * Call "upload file" mutations for a file. - * Always uses multipart upload for Cloud backend. - */ -export function useUploadFile(backend: Backend, options: UploadFileMutationOptions = {}) { - const queryClient = useQueryClient() - const toastAndLog = useToastAndLog() - const { getText } = useText() - const fileChunkUploadPoolSize = useFeatureFlag('fileChunkUploadPoolSize') - const { - retries = 3, - chunkRetries = retries, - endRetries = retries, - updateProgress = true, - onError = (error) => { - toastAndLog('uploadLargeFileError', error) - }, - } = options - const setProgress: typeof setUploadingFileProgress = - updateProgress ? setUploadingFileProgress : noop - const uploadFileStart = useMutationCallback(backendMutationOptions(backend, 'uploadFileStart')) - const [isPending, setIsPending] = useState(false) - const uploadFileChunk = useMutationCallback( - backendMutationOptions(backend, 'uploadFileChunk', { - retry: chunkRetries, - meta: { pool: { id: 'uploadFileChunk', parallelism: fileChunkUploadPoolSize } }, - }), - ) - const uploadFileEnd = useMutationCallback( - backendMutationOptions(backend, 'uploadFileEnd', { retry: endRetries }), - ) - - usePreventNavigation({ message: getText('anUploadIsInProgress'), isEnabled: isPending }) - - return useEventCallback(async ([body, file]: [body: UploadFileRequestParams, file: File]) => { - setIsPending(true) - const progressId = uniqueString() - const fileSizeBytes = file.size - const beginProgress: UploadFileMutationProgress = { - event: 'begin', - sentBytes: 0, - totalBytes: fileSizeBytes, - } - options.onBegin?.(beginProgress) - setProgress(queryClient, progressId, beginProgress) - try { - const { sourcePath, uploadId, presignedUrls } = await uploadFileStart([body, file]) - let completedChunkCount = 0 - const parts = await Promise.all( - presignedUrls.map((url, i) => - uploadFileChunk([url, file, i]).then((part) => { - // This cannot be the `onSuccess` callback in `mutateAsync` because then it would not run - // if the component is unmounted beforehand (which seems to be the case?). - completedChunkCount += 1 - const newSentBytes = Math.min(completedChunkCount * S3_CHUNK_SIZE_BYTES, fileSizeBytes) - const chunkProgress: UploadFileMutationProgress = { - event: 'chunk', - sentBytes: newSentBytes, - totalBytes: fileSizeBytes, - } - options.onChunkSuccess?.(chunkProgress) - setProgress(queryClient, progressId, chunkProgress) - return part - }), - ), - ) - const result = await uploadFileEnd([ - { - parentDirectoryId: body.parentDirectoryId, - parts, - sourcePath: sourcePath, - uploadId: uploadId, - assetId: body.fileId, - fileName: body.fileName, - }, - ]) - const endProgress: UploadFileMutationProgress = { - event: 'end', - sentBytes: fileSizeBytes, - totalBytes: fileSizeBytes, - } - options.onSuccess?.(endProgress) - options.onSettled?.(endProgress, null) - setProgress(queryClient, progressId, endProgress) - if (updateProgress) { - setTimeout(() => { - clearUploadingFileProgressIfDone(queryClient) - }, CLEAR_PROGRESS_DELAY_MS) - } - return result - } catch (error) { - onError(error) - options.onSettled?.(null, error) - throw error - } finally { - setIsPending(false) - } - }) -} - /** * Download a file to local. * Does not work in environments that do not have a local backend. diff --git a/app/gui/src/dashboard/hooks/projectHooks.ts b/app/gui/src/dashboard/hooks/projectHooks.ts index 2d4e1df519a7..ef29f0fe2575 100644 --- a/app/gui/src/dashboard/hooks/projectHooks.ts +++ b/app/gui/src/dashboard/hooks/projectHooks.ts @@ -20,7 +20,6 @@ import { } from '$/providers/react/container' import { useCanRunProjects } from '#/hooks/backendHooks' -import { useUploadFile } from '#/hooks/backendUploadFilesHooks' import { useToastAndLog } from '#/hooks/toastAndLogHooks' import { useLogger } from '#/providers/LoggerProvider' import type Backend from '#/services/Backend' @@ -29,6 +28,7 @@ import { assert } from '#/utilities/error' import { usePreventNavigation } from '#/utilities/preventNavigation' import { useBackends, useText } from '$/providers/react' import { useFeatureFlag } from '$/providers/react/featureFlags' +import { useUploadsToCloudStore } from '$/providers/react/upload' import { useState } from 'react' import { z } from 'zod' import { useEnsureQueryData, useMutationCallback } from '../utilities/tanstackQuery' @@ -303,8 +303,8 @@ export function useCloseProjectMutation() { const client = reactQuery.useQueryClient() const logger = useLogger() const { remoteBackend, localBackend } = useBackends() - const uploadFile = useUploadFile(remoteBackend, { updateProgress: false }) const [isHybridPending, setIsHybridPending] = useState(false) + const uploads = useUploadsToCloudStore() const toastAndLog = useToastAndLog() const addClosingProject = useAddClosingProject() const removeClosingProject = useRemoveClosingProject() @@ -346,16 +346,19 @@ export function useCloseProjectMutation() { if (hybrid) { const fileName = 'project_root.enso-project' const file = await remoteBackend.getProjectArchive(parentId, fileName) - await uploadFile([ - { - fileId: hybrid.cloudProjectId, - fileName, - parentDirectoryId: hybrid.cloudParentId, - }, - file, - ]).catch((error) => { - toastAndLog('uploadProjectError', error) - }) + await uploads + .uploadFile( + file, + { + fileId: hybrid.cloudProjectId, + fileName, + parentDirectoryId: hybrid.cloudParentId, + }, + 'hybridSync', + ) + .catch((error) => { + toastAndLog('uploadProjectError', error) + }) invariant(localBackend != null, 'LocalBackend is null') await localBackend .deleteAsset(hybrid.parentId, { force: true }, null) @@ -373,16 +376,19 @@ export function useCloseProjectMutation() { if (hybrid) { const fileName = 'project_root.enso-project' const file = await remoteBackend.getProjectArchive(parentId, fileName) - await uploadFile([ - { - fileId: hybrid.cloudProjectId, - fileName, - parentDirectoryId: hybrid.cloudParentId, - }, - file, - ]).catch((error) => { - toastAndLog('uploadProjectError', error) - }) + await uploads + .uploadFile( + file, + { + fileId: hybrid.cloudProjectId, + fileName, + parentDirectoryId: hybrid.cloudParentId, + }, + 'hybridSync', + ) + .catch((error) => { + toastAndLog('uploadProjectError', error) + }) invariant(localBackend != null, 'LocalBackend is null') await localBackend diff --git a/app/gui/src/dashboard/pages/dashboard/Dashboard.tsx b/app/gui/src/dashboard/pages/dashboard/Dashboard.tsx index 930fadd7951a..11c189a03be7 100644 --- a/app/gui/src/dashboard/pages/dashboard/Dashboard.tsx +++ b/app/gui/src/dashboard/pages/dashboard/Dashboard.tsx @@ -121,10 +121,6 @@ export function Dashboard(props: DashboardProps) { null!, ) const endMetadata = await localBackend.uploadFileEnd({ - parentDirectoryId, - fileName: projectName, - assetId: null, - parts: [], ...metadata, }) if (endMetadata.project == null) { diff --git a/app/gui/src/dashboard/pages/dashboard/UserBar/NotificationTray/computedNotificationHooks.tsx b/app/gui/src/dashboard/pages/dashboard/UserBar/NotificationTray/computedNotificationHooks.tsx index beefedf6690e..37016fdc32ba 100644 --- a/app/gui/src/dashboard/pages/dashboard/UserBar/NotificationTray/computedNotificationHooks.tsx +++ b/app/gui/src/dashboard/pages/dashboard/UserBar/NotificationTray/computedNotificationHooks.tsx @@ -1,8 +1,9 @@ /** @file Hooks for computing temporary notifications. */ -import { uploadingFileQueryOptions } from '#/hooks/backendUploadFilesHooks' import { useEventCallback } from '#/hooks/eventCallbackHooks' import { useText } from '$/providers/react' -import { useIsMutating, useQuery, type MutationKey } from '@tanstack/react-query' +import { useVueValue } from '$/providers/react/common' +import { useUploadsToCloudStore } from '$/providers/react/upload' +import { useIsMutating, type MutationKey } from '@tanstack/react-query' import { BackendType } from 'enso-common/src/services/Backend' import { omit } from 'enso-common/src/utilities/data/object' import { useState } from 'react' @@ -113,20 +114,21 @@ export function useComputedNotifications(options: UseComputedNotificationsOption const { getComputedNotification, upsertComputedNotification } = options const { getText } = useText() - const { data: uploadingFiles } = useQuery(uploadingFileQueryOptions()) + const uploadsStore = useUploadsToCloudStore() + const uploadingFiles = useVueValue(() => [...uploadsStore.uploads.entries()]) + const uploadingFilesEntries = uploadingFiles.filter(([, data]) => data.kind === 'requestedByUser') - const uploadingFilesEntries = Object.entries(uploadingFiles) if (uploadingFilesEntries[0]) { const totalFiles = uploadingFilesEntries.length let sentFiles = 0 let sentBytes = 0 let totalBytes = 0 - for (const [, progress] of uploadingFilesEntries) { - if (progress.sentBytes === progress.totalBytes) { + for (const [, data] of uploadingFilesEntries) { + if (data.sentBytes === data.totalBytes) { sentFiles += 1 } - sentBytes += progress.sentBytes - totalBytes += progress.totalBytes + sentBytes += data.sentBytes + totalBytes += data.totalBytes } const sentMb = sentBytes / MB_BYTES const totalMb = totalBytes / MB_BYTES diff --git a/app/gui/src/dashboard/services/LocalBackend.ts b/app/gui/src/dashboard/services/LocalBackend.ts index c2ef93947e22..8864c4f176ae 100644 --- a/app/gui/src/dashboard/services/LocalBackend.ts +++ b/app/gui/src/dashboard/services/LocalBackend.ts @@ -711,13 +711,13 @@ export default class LocalBackend extends Backend { } /** Upload a chunk of a large file. */ - override uploadFileChunk(): Promise { + override uploadFileChunk(): Promise<{ part: backend.S3MultipartPart; size: number }> { // Do nothing, the entire file has already been uploaded in `uploadFileStart`. - return Promise.resolve({ eTag: '', partNumber: 0 }) + return Promise.resolve({ part: { eTag: '', partNumber: 0 }, size: 0 }) } /** Finish uploading a large file. */ - override uploadFileEnd(body: backend.UploadFileEndRequestBody): Promise { + override uploadFileEnd(body: { uploadId: string }): Promise { // Do nothing, the entire file has already been uploaded in `uploadFileStart`. const file = this.uploadedFiles.get(body.uploadId) invariant(file, 'Uploaded file not found') diff --git a/app/gui/src/dashboard/services/RemoteBackend.ts b/app/gui/src/dashboard/services/RemoteBackend.ts index 96b8f6077b50..3688ce5cfb03 100644 --- a/app/gui/src/dashboard/services/RemoteBackend.ts +++ b/app/gui/src/dashboard/services/RemoteBackend.ts @@ -868,13 +868,14 @@ export default class RemoteBackend extends Backend { override async uploadFileStart( body: backend.UploadFileRequestParams, file: File, + abort?: AbortSignal, ): Promise { const path = remoteBackendPaths.UPLOAD_FILE_START_PATH const requestBody: backend.UploadFileStartRequestBody = { fileName: body.fileName, size: file.size, } - const response = await this.post(path, requestBody) + const response = await this.post(path, requestBody, { abort }) if (!response.ok) { return await this.throw(response, 'uploadFileStartBackendError') } else { @@ -890,16 +891,17 @@ export default class RemoteBackend extends Backend { url: backend.HttpsUrl, file: Blob, index: number, - ): Promise { + abort?: AbortSignal, + ): Promise<{ part: backend.S3MultipartPart; size: number }> { const start = index * backend.S3_CHUNK_SIZE_BYTES const end = Math.min(start + backend.S3_CHUNK_SIZE_BYTES, file.size) const body = file.slice(start, end) - const response = await fetch(url, { method: 'PUT', body }) + const response = await fetch(url, { method: 'PUT', body, ...(abort ? { signal: abort } : {}) }) const eTag = response.headers.get('ETag') if (!response.ok || eTag == null) { return await this.throw(response, 'uploadFileChunkBackendError') } else { - return { eTag, partNumber: index + 1 } + return { part: { eTag, partNumber: index + 1 }, size: body.size } } } @@ -909,9 +911,10 @@ export default class RemoteBackend extends Backend { */ override async uploadFileEnd( body: backend.UploadFileEndRequestBody, + abort?: AbortSignal, ): Promise { const path = remoteBackendPaths.UPLOAD_FILE_END_PATH - const response = await this.post(path, body) + const response = await this.post(path, body, { abort }) if (!response.ok) { return await this.throw(response, 'uploadFileEndBackendError') } else { diff --git a/app/gui/src/project-view/components/widgets/FileBrowserWidget/FileBrowserContent.vue b/app/gui/src/project-view/components/widgets/FileBrowserWidget/FileBrowserContent.vue index d187b73a1df9..22a4d8ef9b57 100644 --- a/app/gui/src/project-view/components/widgets/FileBrowserWidget/FileBrowserContent.vue +++ b/app/gui/src/project-view/components/widgets/FileBrowserWidget/FileBrowserContent.vue @@ -27,7 +27,7 @@ const emit = defineEmits<{ renameDirectory: [ Directory | undefined, string, - (action: Promise) => void, + (action: Promise) => void, ] enterDirectory: [Directory] choose: [AnyAsset, boolean] diff --git a/app/gui/src/project-view/components/widgets/FileBrowserWidget/fileBrowser.ts b/app/gui/src/project-view/components/widgets/FileBrowserWidget/fileBrowser.ts index e7a41236ba1d..324a41967b20 100644 --- a/app/gui/src/project-view/components/widgets/FileBrowserWidget/fileBrowser.ts +++ b/app/gui/src/project-view/components/widgets/FileBrowserWidget/fileBrowser.ts @@ -148,7 +148,7 @@ export function useUpsertDirectory({ function acceptName( editedAsset: Directory | undefined, name: string, - handler: (action: Promise) => void, + handler: (action: Promise) => void, ) { const parentId = toValue(currentDirectory)?.id if (parentId == null) { @@ -159,7 +159,7 @@ export function useUpsertDirectory({ editedAsset == null ? createDir.mutateAsync([{ title: name, parentId }, false]) : editedAsset.title != name ? updateDir.mutateAsync([editedAsset.id, { title: name }, editedAsset.title]) - : Promise.resolve(undefined) + : Promise.resolve(null) handler(action) } diff --git a/app/gui/src/project-view/composables/backend.ts b/app/gui/src/project-view/composables/backend.ts index bbe79a278fc1..c634e07127c9 100644 --- a/app/gui/src/project-view/composables/backend.ts +++ b/app/gui/src/project-view/composables/backend.ts @@ -58,11 +58,12 @@ export function backendQueryOptions = ToValue< +type MutationOptions = ToValue< Omit< UnwrapRef< UseMutationOptions< - Awaited> | undefined, + B extends Backend ? Awaited> + : Awaited> | null, Error, Parameters > @@ -74,12 +75,16 @@ type MutationOptions = ToValue< /** * Create Tanstack Query mutation options for given backend method call. */ -export function backendMutationOptions( +export function backendMutationOptions< + Method extends BackendMutationMethod, + B extends Backend | null, +>( method: Method, - backend: ToValue, - options?: MutationOptions, + backend: ToValue, + options?: MutationOptions, ): UseMutationOptions< - Awaited> | undefined, + B extends Backend ? Awaited> + : Awaited> | null, Error, Parameters > { @@ -98,11 +103,15 @@ export function backendMutationOptions( ...backendBaseOptions(backendVal), ...opts, mutationKey: [backendVal?.type, method, ...(toValue(opts?.mutationKey) ?? [])], - mutationFn: (args) => (backendVal ? (backendVal[method] as any)(...args) : undefined), + mutationFn: (args) => + backendVal ? (backendVal[method] as any)(...args) : (Promise.resolve(null) as any), meta: { invalidates, awaitInvalidates: true, - refetchType: invalidates.some((key) => key[1] === 'listDirectory') ? 'all' : 'active', + refetchType: + invalidates.some((key) => key[1] === 'listDirectory') ? + ('all' as const) + : ('active' as const), ...opts?.meta, }, } @@ -117,7 +126,7 @@ export function backendMutationOptions( export function useBackend(which: 'remote' | 'project') { const queryClient = useQueryClient() const { localBackend: project, remoteBackend: remote } = useBackends() - const backend = which === 'project' ? project : remote + const backend: Backend | null = which === 'project' ? project : remote /** Perform the specified query, and keep the result up-to-date if the provided arguments change. */ function query( @@ -152,9 +161,9 @@ export function useBackend(which: 'remote' | 'project') { function mutation( method: Method, - options?: MutationOptions, + options?: MutationOptions, ): UseMutationReturnType< - Awaited> | undefined, + Awaited> | null, Error, Parameters, unknown diff --git a/app/gui/src/providers/__tests__/upload.test.ts b/app/gui/src/providers/__tests__/upload.test.ts new file mode 100644 index 000000000000..416dcd109110 --- /dev/null +++ b/app/gui/src/providers/__tests__/upload.test.ts @@ -0,0 +1,168 @@ +import { + DirectoryId, + HttpsUrl, + S3FilePath, + type S3MultipartPart, + type UploadedAsset, + type UploadFileEndRequestBody, + type UploadFileRequestParams, + type UploadLargeFileMetadata, +} from '#/services/Backend' +import {} from '@/util/assert' +import { withSetup } from '@/util/testing' +import { flushPromises } from '@vue/test-utils' +import { assert, expect, test, vi } from 'vitest' +import { setFeatureFlag } from '../featureFlags' +import { createUploadsStore } from '../upload' + +const CHUNK_SIZE = 10 + +function fixture(files: { fileName: string; partsCount: number }[]) { + const partsInProgress = new Map<[string, number], () => void>() + + const filesMap = new Map( + files.map((f) => [ + f.fileName, + { + ...f, + file: new File(Array(f.partsCount).fill(Array(CHUNK_SIZE).fill(0)), 'test-path'), + params: { + fileId: null, + fileName: f.fileName, + parentDirectoryId: DirectoryId('directory-testdir'), + }, + }, + ]), + ) + + const backend = { + uploadFileStart: vi.fn( + (params: UploadFileRequestParams, file: File): Promise => { + const f = filesMap.get(params.fileName) + assert(f != null) + expect(params).toEqual(f.params) + expect(file).toBe(f.file) + return Promise.resolve({ + presignedUrls: [...Array(f.partsCount).keys()].map((i) => + HttpsUrl(`url-${f.fileName}-${i}`), + ), + uploadId: `upload-${f.fileName}`, + sourcePath: S3FilePath('s2path'), + }) + }, + ), + uploadFileChunk: vi.fn( + ( + url: HttpsUrl, + file: Blob, + index: number, + ): Promise<{ part: S3MultipartPart; size: number }> => { + const fileName = url.split('-')[1] + assert(fileName != null) + const f = filesMap.get(fileName) + assert(f != null) + expect([...partsInProgress.keys()]).not.toContain([fileName, index]) + expect(file).toBe(f.file) + return new Promise((resolve) => { + partsInProgress.set([fileName, index], () => + resolve({ part: { eTag: '', partNumber: index + 1 }, size: CHUNK_SIZE }), + ) + }) + }, + ), + uploadFileEnd: vi.fn((params: UploadFileEndRequestBody): Promise => { + const f = filesMap.get(params.fileName) + assert(f != null) + expect(params).toEqual({ + parentDirectoryId: f.params.parentDirectoryId, + parts: [...Array(f.partsCount).keys()].map((i) => ({ eTag: '', partNumber: i + 1 })), + sourcePath: S3FilePath('s2path'), + uploadId: `upload-${f.fileName}`, + assetId: f.params.fileId, + fileName: f.fileName, + }) + return Promise.resolve({} as UploadedAsset) + }), + } + + const store = createUploadsStore(backend as any) + + return { partsInProgress, store, filesMap } +} + +test('Pooling single multipart file upload', () => + withSetup(async () => { + setFeatureFlag('fileChunkUploadPoolSize', 2) + const { partsInProgress, store, filesMap } = fixture([{ fileName: 'file', partsCount: 5 }]) + const file = filesMap.get('file')! + const uploadResult = store.uploadFile(file.file, file.params).catch(assert.fail) + + await flushPromises() + expect(partsInProgress.size).toBe(2) + expect(store.uploads.size).toBe(1) + expect(store.uploads.values().next().value?.sentBytes).toBe(0) + + // Resolve single part + const firstPart = partsInProgress.keys().next().value! + partsInProgress.get(firstPart)?.() + partsInProgress.delete(firstPart) + await flushPromises() + expect(partsInProgress.size).toBe(2) + expect(store.uploads.values().next().value?.sentBytes).toBe(CHUNK_SIZE) + + // Resovle two parts at once + partsInProgress.forEach((resolve) => resolve()) + partsInProgress.clear() + await flushPromises() + expect(partsInProgress.size).toBe(2) + expect(store.uploads.values().next().value?.sentBytes).toBe(3 * CHUNK_SIZE) + + // Resolve last parts + partsInProgress.forEach((resolve) => resolve()) + await uploadResult + expect(store.uploads.values().next().value?.sentBytes).toBe(5 * CHUNK_SIZE) + })) + +test('Pooling multiple files upload', () => + withSetup(async () => { + setFeatureFlag('fileChunkUploadPoolSize', 2) + const { partsInProgress, store, filesMap } = fixture([ + { fileName: 'file1', partsCount: 1 }, + { fileName: 'file2', partsCount: 1 }, + { fileName: 'file3', partsCount: 1 }, + ]) + const results = Promise.all( + [...filesMap.entries()].map(([, file]) => store.uploadFile(file.file, file.params)), + ).catch(assert.fail) + + await flushPromises() + expect(partsInProgress.size).toBe(2) + expect(store.uploads.size).toBe(3) + + // Resolve single part + const firstPart = partsInProgress.keys().next().value! + partsInProgress.get(firstPart)?.() + partsInProgress.delete(firstPart) + await flushPromises() + expect(partsInProgress.size).toBe(2) + expect([...store.uploads.values()].map(({ sentBytes }) => sentBytes).sort()).toEqual([ + 0, + 0, + CHUNK_SIZE, + ]) + + // Resovle rest of the parts + partsInProgress.forEach((resolve) => resolve()) + partsInProgress.clear() + await flushPromises() + expect(partsInProgress.size).toBe(0) + expect([...store.uploads.values()].map(({ sentBytes }) => sentBytes).sort()).toEqual([ + CHUNK_SIZE, + CHUNK_SIZE, + CHUNK_SIZE, + ]) + + // Resolve last parts + partsInProgress.forEach((resolve) => resolve()) + await results + })) diff --git a/app/gui/src/providers/react/globalProvider.tsx b/app/gui/src/providers/react/globalProvider.tsx index ee4dbeb19284..a78bfb24ba14 100644 --- a/app/gui/src/providers/react/globalProvider.tsx +++ b/app/gui/src/providers/react/globalProvider.tsx @@ -16,8 +16,10 @@ import { AuthContext } from '$/providers/react/auth' import { BackendsContext } from '$/providers/react/backends' import { QueryParamsContext } from '$/providers/react/queryParams' import { RouterContext, type RouterForReact } from '$/providers/react/router' +import { UploadsToCloudStoreContext } from '$/providers/react/upload' import { useSession, type SessionStore } from '$/providers/session' import { useText, type TextStore } from '$/providers/text' +import { useUploadsToCloudStore, type UploadsToCloudStore } from '$/providers/upload' import { injectGuiConfig, type GuiConfig } from '@/providers/guiConfig' import { reactComponent } from '@/util/react' import { proxyRefs } from '@/util/reactivity' @@ -36,6 +38,7 @@ interface ContextsForReactProviderProps { auth: AuthStore queryParams: QueryParams actionsStore: ActionsStore + uploadsToCloudStore: UploadsToCloudStore } /** @@ -58,6 +61,7 @@ export const ContextsForReactProvider = reactComponent( auth, queryParams, actionsStore, + uploadsToCloudStore, } = props return ( @@ -70,7 +74,9 @@ export const ContextsForReactProvider = reactComponent( - {children} + + {children} + @@ -101,6 +107,7 @@ export const ContextsForReactProvider = reactComponent( auth: useAuth(), queryParams: useQueryParams(), actionsStore: useActionsStore(), + uploadsToCloudStore: useUploadsToCloudStore(), }) // Avoid annoying warning about __veauryInjectedProps__ property. Returning a function here // avoids the code path that assigns that property to overwrite a computed value with constant. diff --git a/app/gui/src/providers/react/upload.ts b/app/gui/src/providers/react/upload.ts new file mode 100644 index 000000000000..9fc13275ede1 --- /dev/null +++ b/app/gui/src/providers/react/upload.ts @@ -0,0 +1,6 @@ +import { useInReactFunction } from '$/providers/react/common' +import type { UploadsToCloudStore } from '$/providers/upload' +import * as react from 'react' + +export const UploadsToCloudStoreContext = react.createContext(null) +export const useUploadsToCloudStore = useInReactFunction(UploadsToCloudStoreContext) diff --git a/app/gui/src/providers/upload.ts b/app/gui/src/providers/upload.ts new file mode 100644 index 000000000000..d8e6b21f9df7 --- /dev/null +++ b/app/gui/src/providers/upload.ts @@ -0,0 +1,111 @@ +import type Backend from '#/services/Backend' +import type { HttpsUrl, UploadFileRequestParams } from '#/services/Backend' +import { backendMutationOptions } from '@/composables/backend' +import * as vueQuery from '@tanstack/vue-query' +import { createGlobalState } from '@vueuse/core' +import { ConditionVariable } from 'enso-common/src/utilities/ConditionVariable' +import { reactive } from 'vue' +import { useBackends } from './backends' +import { useFeatureFlag } from './featureFlags' + +/** The delay, in milliseconds, before query data for a file being uploaded is cleared. */ +const CLEAR_PROGRESS_DELAY_MS = 5_000 +const RETRIES = 3 + +export type UploadKind = 'requestedByUser' | 'hybridSync' + +export interface OngoingUpload { + kind?: UploadKind | undefined + sentBytes: number + totalBytes: number + finished: boolean + abortController: AbortController +} + +export type UploadsToCloudStore = ReturnType + +/** Constructor of UploadsToCloudStore. see {@link useUploadsToCloudStore} docs. */ +export function createUploadsStore(backend: Backend) { + const uploads = reactive(new Map()) + const chunkUploadPoolSize = useFeatureFlag('fileChunkUploadPoolSize') + let chunksBeingUploaded = 0 + const chunkUploadCondVar = new ConditionVariable() + + const uploadFileStart = vueQuery.useMutation(backendMutationOptions('uploadFileStart', backend)) + const uploadFileChunk = vueQuery.useMutation( + backendMutationOptions('uploadFileChunk', backend, { retry: RETRIES }), + ) + const uploadFileEnd = vueQuery.useMutation( + backendMutationOptions('uploadFileEnd', backend, { retry: RETRIES }), + ) + + async function uploadChunk(url: HttpsUrl, file: File, index: number, abort: AbortSignal) { + while (chunkUploadPoolSize.value > 0 && chunksBeingUploaded >= chunkUploadPoolSize.value) { + await chunkUploadCondVar.wait() + abort.throwIfAborted() + } + chunksBeingUploaded += 1 + return uploadFileChunk.mutateAsync([url, file, index, abort]).finally(() => { + chunksBeingUploaded -= 1 + chunkUploadCondVar.notifyOne() + }) + } + + async function uploadFile(file: File, params: UploadFileRequestParams, kind?: UploadKind) { + const abortController = new AbortController() + const { sourcePath, uploadId, presignedUrls } = await uploadFileStart.mutateAsync([ + params, + file, + abortController.signal, + ]) + + const data: OngoingUpload = reactive({ + kind, + sentBytes: 0, + totalBytes: file.size, + finished: false, + abortController, + }) + uploads.set(uploadId, data) + + const parts = await Promise.all( + presignedUrls.map((url, i) => + uploadChunk(url, file, i, abortController.signal).then(({ part, size }) => { + data.sentBytes += size + return part + }), + ), + ) + const result = await uploadFileEnd.mutateAsync([ + { + parentDirectoryId: params.parentDirectoryId, + parts, + sourcePath: sourcePath, + uploadId: uploadId, + assetId: params.fileId, + fileName: params.fileName, + }, + abortController.signal, + ]) + data.finished = true + setTimeout(() => { + uploads.delete(uploadId) + }, CLEAR_PROGRESS_DELAY_MS) + return result + } + + return { uploads, uploadFile } +} + +/** + * Uploads to Cloud Store. + * + * This store handles and keeps track of multipart file upload to Remote Backend. + * The number of chunks uploaded at once is throttled by 'fileChunkUploadPoolSize' + * feature flag. `uploads` map contains all uploads with their progress, including + * the uploads finished no longer than {@link CLEAR_PROGRESS_DELAY_MS} ago. + */ +export const useUploadsToCloudStore = createGlobalState(() => { + const { remoteBackend } = useBackends() + return createUploadsStore(remoteBackend) +})