diff --git a/.changeset/composite-file-upload.md b/.changeset/composite-file-upload.md new file mode 100644 index 0000000000..2e683fa13f --- /dev/null +++ b/.changeset/composite-file-upload.md @@ -0,0 +1,6 @@ +--- +'@e2b/python-sdk': minor +'e2b': minor +--- + +automatically split large file uploads (>64MB) into parallel chunks and compose them server-side (async Python SDK and JS SDK only) diff --git a/packages/js-sdk/src/envd/schema.gen.ts b/packages/js-sdk/src/envd/schema.gen.ts index 9c39a43d3e..cb95e50654 100644 --- a/packages/js-sdk/src/envd/schema.gen.ts +++ b/packages/js-sdk/src/envd/schema.gen.ts @@ -106,6 +106,51 @@ export interface paths { patch?: never; trace?: never; }; + "/files/compose": { + parameters: { + query?: never; + header?: never; + path?: never; + cookie?: never; + }; + get?: never; + put?: never; + /** Compose multiple files into a single file using zero-copy concatenation. Source files are deleted after successful composition. */ + post: { + parameters: { + query?: never; + header?: never; + path?: never; + cookie?: never; + }; + requestBody: { + content: { + "application/json": components["schemas"]["ComposeRequest"]; + }; + }; + responses: { + /** @description Files composed successfully */ + 200: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["EntryInfo"]; + }; + }; + 400: components["responses"]["InvalidPath"]; + 401: components["responses"]["InvalidUser"]; + 404: components["responses"]["FileNotFound"]; + 500: components["responses"]["InternalServerError"]; + 507: components["responses"]["NotEnoughDiskSpace"]; + }; + }; + delete?: never; + options?: never; + head?: never; + patch?: never; + trace?: never; + }; "/health": { parameters: { query?: never; @@ -233,6 +278,14 @@ export interface paths { export type webhooks = Record; export interface components { schemas: { + ComposeRequest: { + /** @description Destination file path for the composed file */ + destination: string; + /** @description Ordered list of source file paths to concatenate */ + source_paths: string[]; + /** @description User for setting ownership and resolving relative paths */ + username?: string; + }; EntryInfo: { /** @description Name of the file */ name: string; diff --git a/packages/js-sdk/src/sandbox/filesystem/index.ts b/packages/js-sdk/src/sandbox/filesystem/index.ts index 2803cc8ce3..92958fe11c 100644 --- a/packages/js-sdk/src/sandbox/filesystem/index.ts +++ b/packages/js-sdk/src/sandbox/filesystem/index.ts @@ -184,6 +184,8 @@ export interface FilesystemReadOpts extends FilesystemRequestOpts { gzip?: boolean } +const DEFAULT_CHUNK_SIZE = 64 * 1024 * 1024 // 64 MB + export interface FilesystemListOpts extends FilesystemRequestOpts { /** * Depth of the directory to list. @@ -388,7 +390,7 @@ export class Filesystem { typeof pathOrFiles === 'string' ? { path: pathOrFiles, - writeOpts: opts as FilesystemWriteOpts, + writeOpts: opts as FilesystemWriteOpts | undefined, writeFiles: [ { data: dataOrOpts as @@ -401,7 +403,7 @@ export class Filesystem { } : { path: undefined, - writeOpts: dataOrOpts as FilesystemWriteOpts, + writeOpts: dataOrOpts as FilesystemWriteOpts | undefined, writeFiles: pathOrFiles as WriteEntry[], } @@ -418,6 +420,16 @@ export class Filesystem { const useOctetStream = compareVersions(this.envdApi.version, ENVD_OCTET_STREAM_UPLOAD) >= 0 + // Composite upload: automatically chunk large files, upload parts in parallel, then compose + if (path && useOctetStream) { + const blob = await toBlob(writeFiles[0].data) + if (blob.size > DEFAULT_CHUNK_SIZE) { + return this.compositeWrite(path, blob, user, writeOpts) + } + // Data fits in a single chunk — fall through to normal write path + writeFiles[0] = { data: blob } + } + const results: WriteInfo[] = [] const useGzip = writeOpts?.gzip === true @@ -821,4 +833,75 @@ export class Filesystem { throw handleFilesystemRpcError(err) } } + + private async compositeWrite( + destination: string, + blob: Blob, + user: Username | undefined, + opts?: FilesystemWriteOpts + ): Promise { + const totalSize = blob.size + const chunkSize = DEFAULT_CHUNK_SIZE + const useGzip = opts?.gzip === true + + const headers: Record = { + 'Content-Type': 'application/octet-stream', + } + if (useGzip) { + headers['Content-Encoding'] = 'gzip' + } + + // Split into chunks and upload in parallel + const chunkCount = Math.ceil(totalSize / chunkSize) + const uploadId = crypto.randomUUID() + const chunkPaths: string[] = [] + + for (let i = 0; i < chunkCount; i++) { + chunkPaths.push(`/tmp/.e2b-upload-${uploadId}-${i}`) + } + + await Promise.all( + chunkPaths.map(async (chunkPath, i) => { + const start = i * chunkSize + const end = Math.min(start + chunkSize, totalSize) + const chunk = blob.slice(start, end) + const body = await toUploadBody(chunk, useGzip) + + const res = await this.envdApi.api.POST('/files', { + params: { + query: { + path: chunkPath, + username: user, + }, + }, + bodySerializer: () => body, + headers, + signal: this.connectionConfig.getSignal(opts?.requestTimeoutMs), + body: {}, + }) + + const err = await handleFilesystemEnvdApiError(res) + if (err) { + throw err + } + }) + ) + + // Compose chunks into the final file + const composeRes = await this.envdApi.api.POST('/files/compose', { + body: { + source_paths: chunkPaths, + destination, + username: user, + }, + signal: this.connectionConfig.getSignal(opts?.requestTimeoutMs), + }) + + const composeErr = await handleFilesystemEnvdApiError(composeRes) + if (composeErr) { + throw composeErr + } + + return composeRes.data as WriteInfo + } } diff --git a/packages/python-sdk/e2b/envd/api.py b/packages/python-sdk/e2b/envd/api.py index 7e2a59c131..6c2f6fd5ec 100644 --- a/packages/python-sdk/e2b/envd/api.py +++ b/packages/python-sdk/e2b/envd/api.py @@ -14,6 +14,7 @@ ENVD_API_FILES_ROUTE = "/files" +ENVD_API_FILES_COMPOSE_ROUTE = "/files/compose" ENVD_API_HEALTH_ROUTE = "/health" _DEFAULT_API_ERROR_MAP: dict[int, Callable[[str], Exception]] = { diff --git a/packages/python-sdk/e2b/sandbox_async/filesystem/filesystem.py b/packages/python-sdk/e2b/sandbox_async/filesystem/filesystem.py index 8f70f58a1c..4fa2880d5d 100644 --- a/packages/python-sdk/e2b/sandbox_async/filesystem/filesystem.py +++ b/packages/python-sdk/e2b/sandbox_async/filesystem/filesystem.py @@ -1,4 +1,7 @@ import asyncio +import gzip +import uuid + from io import IOBase, TextIOBase from typing import IO, AsyncIterator, List, Literal, Optional, Union, overload @@ -15,7 +18,11 @@ Username, default_username, ) -from e2b.envd.api import ENVD_API_FILES_ROUTE, ahandle_envd_api_exception +from e2b.envd.api import ( + ENVD_API_FILES_COMPOSE_ROUTE, + ENVD_API_FILES_ROUTE, + ahandle_envd_api_exception, +) from e2b.envd.filesystem import filesystem_connect, filesystem_pb2 from e2b.envd.rpc import authentication_header, handle_rpc_exception from e2b.envd.versions import ( @@ -58,6 +65,9 @@ async def _ahandle_filesystem_envd_api_exception(r): return await ahandle_envd_api_exception(r, _FILESYSTEM_HTTP_ERROR_MAP) +_DEFAULT_CHUNK_SIZE = 64 * 1024 * 1024 # 64 MB + + class Filesystem: """ Module for interacting with the filesystem in the sandbox. @@ -212,6 +222,15 @@ async def write( :return: Information about the written file """ + if self._envd_version >= ENVD_OCTET_STREAM_UPLOAD: + content = to_upload_body(data, False) + if len(content) > _DEFAULT_CHUNK_SIZE: + return await self._composite_write( + path, content, user, request_timeout, gzip + ) + # Use materialized bytes to avoid consuming IO streams twice + data = content + result = await self.write_files( [WriteEntry(path=path, data=data)], user, @@ -345,6 +364,78 @@ async def _upload_file(file): return results + async def _composite_write( + self, + destination: str, + content: bytes, + user: Optional[Username] = None, + request_timeout: Optional[float] = None, + use_gzip: bool = False, + ) -> WriteInfo: + username = user + if username is None and self._envd_version < ENVD_DEFAULT_USER: + username = default_username + + total_size = len(content) + chunk_size = _DEFAULT_CHUNK_SIZE + + headers = {"Content-Type": "application/octet-stream"} + if use_gzip: + headers["Content-Encoding"] = "gzip" + + # Split into chunks and upload in parallel + upload_id = str(uuid.uuid4()) + chunk_count = (total_size + chunk_size - 1) // chunk_size + chunk_paths = [f"/tmp/.e2b-upload-{upload_id}-{i}" for i in range(chunk_count)] + + async def _upload_chunk(i: int) -> None: + start = i * chunk_size + end = min(start + chunk_size, total_size) + chunk_data = content[start:end] + + params = {"path": chunk_paths[i]} + if username: + params["username"] = username + + if use_gzip: + upload_content = await asyncio.to_thread(gzip.compress, chunk_data) + else: + upload_content = chunk_data + + r = await self._envd_api.post( + ENVD_API_FILES_ROUTE, + content=upload_content, + headers=headers, + params=params, + timeout=self._connection_config.get_request_timeout(request_timeout), + ) + + err = await _ahandle_filesystem_envd_api_exception(r) + if err: + raise err + + await asyncio.gather(*[_upload_chunk(i) for i in range(chunk_count)]) + + # Compose chunks into the final file + body = { + "source_paths": chunk_paths, + "destination": destination, + } + if username: + body["username"] = username + + r = await self._envd_api.post( + ENVD_API_FILES_COMPOSE_ROUTE, + json=body, + timeout=self._connection_config.get_request_timeout(request_timeout), + ) + + err = await _ahandle_filesystem_envd_api_exception(r) + if err: + raise err + + return WriteInfo(**r.json()) + async def list( self, path: str, diff --git a/packages/python-sdk/e2b/sandbox_sync/filesystem/filesystem.py b/packages/python-sdk/e2b/sandbox_sync/filesystem/filesystem.py index 9b26a22d0f..9d68059615 100644 --- a/packages/python-sdk/e2b/sandbox_sync/filesystem/filesystem.py +++ b/packages/python-sdk/e2b/sandbox_sync/filesystem/filesystem.py @@ -15,7 +15,10 @@ ) from e2b_connect.client import Code -from e2b.envd.api import ENVD_API_FILES_ROUTE, handle_envd_api_exception +from e2b.envd.api import ( + ENVD_API_FILES_ROUTE, + handle_envd_api_exception, +) from e2b.envd.filesystem import filesystem_connect, filesystem_pb2 from e2b.envd.rpc import authentication_header, handle_rpc_exception from e2b.envd.versions import ( diff --git a/spec/envd/envd.yaml b/spec/envd/envd.yaml index 9649c24a11..0aed1cf9d2 100644 --- a/spec/envd/envd.yaml +++ b/spec/envd/envd.yaml @@ -125,6 +125,37 @@ paths: '507': $ref: '#/components/responses/NotEnoughDiskSpace' + /files/compose: + post: + summary: Compose multiple files into a single file using zero-copy concatenation. Source files are deleted after successful composition. + tags: [files] + security: + - AccessTokenAuth: [] + - {} + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/ComposeRequest' + responses: + '200': + description: Files composed successfully + content: + application/json: + schema: + $ref: '#/components/schemas/EntryInfo' + '400': + $ref: '#/components/responses/InvalidPath' + '401': + $ref: '#/components/responses/InvalidUser' + '404': + $ref: '#/components/responses/FileNotFound' + '500': + $ref: '#/components/responses/InternalServerError' + '507': + $ref: '#/components/responses/NotEnoughDiskSpace' + components: securitySchemes: AccessTokenAuth: @@ -252,6 +283,23 @@ components: description: Type of the file enum: - file + ComposeRequest: + type: object + required: + - source_paths + - destination + properties: + source_paths: + type: array + items: + type: string + description: Ordered list of source file paths to concatenate + destination: + type: string + description: Destination file path for the composed file + username: + type: string + description: User for setting ownership and resolving relative paths EnvVars: type: object description: Environment variables to set