From 876930fb6bab8691779f8b306dd839a52d368c8d Mon Sep 17 00:00:00 2001 From: fenos Date: Thu, 13 Jul 2023 19:24:22 +0100 Subject: [PATCH] feat: multi storage providers support --- .env.sample | 130 ++++++++---- migrations/multitenant/0006-s3-provider.sql | 1 + src/config.ts | 193 ++++++++++++++++-- src/database/connection.ts | 2 +- src/database/tenant.ts | 15 +- src/http/plugins/storage.ts | 6 +- src/http/routes/object/getObject.ts | 4 +- src/http/routes/object/getObjectInfo.ts | 128 ++++++++++-- src/http/routes/object/getPublicObject.ts | 4 +- src/http/routes/object/getSignedObject.ts | 4 +- .../routes/render/renderAuthenticatedImage.ts | 4 +- src/http/routes/render/renderPublicImage.ts | 4 +- src/http/routes/render/renderSignedImage.ts | 4 +- src/http/routes/tenant/index.ts | 52 +++++ src/http/routes/tus/index.ts | 92 ++++++--- src/http/routes/tus/lifecycle.ts | 4 +- src/queue/events/base-event.ts | 45 ++-- .../events/multipart-upload-completed.ts | 4 +- src/queue/events/object-admin-delete.ts | 4 +- src/queue/events/object-created.ts | 17 ++ src/queue/events/object-updated.ts | 10 + src/storage/backend/index.ts | 28 +-- src/storage/backend/s3.ts | 117 ++++++++--- src/storage/object.ts | 18 +- src/storage/renderer/head.ts | 1 + src/storage/renderer/renderer.ts | 40 +++- src/storage/storage.ts | 4 +- src/storage/uploader.ts | 5 +- src/test/object.test.ts | 10 +- src/test/rls.test.ts | 26 ++- src/test/tenant.test.ts | 7 +- src/test/tus.test.ts | 20 +- src/test/x-forwarded-host.test.ts | 13 +- 33 files changed, 805 insertions(+), 211 deletions(-) create mode 100644 migrations/multitenant/0006-s3-provider.sql diff --git a/.env.sample b/.env.sample index 67fa7619..e233ebff 100644 --- a/.env.sample +++ b/.env.sample @@ -1,55 +1,115 @@ -# Tenant Configuration +####################################### +# Tenant +####################################### +TENANT_ID=bjhaohmqunupljrqypxz ANON_KEY=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJyb2xlIjoiYW5vbiIsImlhdCI6MTYxMzUzMTk4NSwiZXhwIjoxOTI5MTA3OTg1fQ.mqfi__KnQB4v6PkIjkhzfwWrYyF94MEbSC6LnuvVniE SERVICE_KEY=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJyb2xlIjoic2VydmljZV9yb2xlIiwiaWF0IjoxNjEzNTMxOTg1LCJleHAiOjE5MjkxMDc5ODV9.th84OKK0Iz8QchDyXZRrojmKSEZ-OuitQm_5DvLiSIc -TENANT_ID=bjhaohmqunupljrqypxz -PGRST_JWT_SECRET=f023d3db-39dc-4ac9-87b2-b2be72e9162b - -# Database configuration -DATABASE_URL=postgresql://postgres:postgres@127.0.0.1/postgres -DATABASE_POOL_URL=postgresql://postgres:postgres@127.0.0.1:6453/postgres - -# Upload configuration -FILE_SIZE_LIMIT=52428800 - -# Storage -STORAGE_BACKEND=s3 -GLOBAL_S3_BUCKET=name-of-your-s3-bucket +REGION=region-of-where-your-service-is-running -# S3 Configuration -REGION=region-of-your-s3-bucket -GLOBAL_S3_ENDPOINT=http://127.0.0.1:9000 -GLOBAL_S3_PROTOCOL=http +####################################### +# Server +####################################### +HOST=0.0.0.0 +PORT=5000 +ADMIN_PORT=5001 +SERVER_KEEP_ALIVE_TIMEOUT=61 +SERVER_HEADERS_TIMEOUT=65 -# Minio Configuration (if using Minio) -GLOBAL_S3_FORCE_PATH_STYLE=true -AWS_ACCESS_KEY_ID=supa-storage -AWS_SECRET_ACCESS_KEY=secret1234 -AWS_DEFAULT_REGION=ap-southeast-1 +####################################### +# Auth +####################################### +AUTH_JWT_SECRET=f023d3db-39dc-4ac9-87b2-b2be72e9162b +AUTH_JWT_ALGORITHM=HS256 -# Local File Storage Configuration -FILE_STORAGE_BACKEND_PATH=./data +####################################### +# Database +####################################### +DATABASE_URL=postgresql://postgres:postgres@127.0.0.1/postgres +DATABASE_POOL_URL=postgresql://postgres:postgres@127.0.0.1:6453/postgres +DATABASE_CONNECTION_TIMEOUT=3000 +DATABASE_MAX_CONNECTIONS=20 -# Multitenant +####################################### +# Multi Tenancy +####################################### IS_MULTITENANT=false MULTITENANT_DATABASE_URL=postgresql://postgres:postgres@127.0.0.1:5433/postgres X_FORWARDED_HOST_REGEXP= ADMIN_API_KEYS=apikey ENCRYPTION_KEY=encryptionkey -# Logs -LOGFLARE_ENABLED=false -LOGFLARE_API_KEY=api_key -LOGFLARE_SOURCE_TOKEN=source_token +####################################### +# Uploads +####################################### +FILE_SIZE_LIMIT=52428800 +SIGNED_UPLOAD_URL_EXPIRATION_TIME=60 +TUS_URL_PATH=/upload/resumable +TUS_URL_EXPIRY_MS=3600000 +####################################### # Image Transformation +####################################### ENABLE_IMAGE_TRANSFORMATION=true IMGPROXY_URL=http://localhost:50020 +IMG_LIMITS_MIN_SIZE=0 +IMG_LIMITS_MAX_SIZE=2000 +ENABLE_RATE_LIMITER=false +RATE_LIMITER_DRIVER=redis +RATE_LIMITER_REDIS_URL=localhost:6379 + +####################################### +# Storage Driver +####################################### +STORAGE_BACKEND=s3 + +####################################### +# S3 Providers +####################################### +STORAGE_S3_BUCKET=name-of-your-s3-bucket +STORAGE_S3_MAX_SOCKETS=200 + +# S3 Provider Configuration +# +# You can list more then one provider. +# The convention is STORAGE_S3_PROVIDER_{PROVIDER_NAME}_{CONFIGURATION} +# When specifying more than one provider you must also specify the default provider using STORAGE_S3_PROVIDER_{PROVIDER_NAME}_DEFAULT=true +# +# Example AWS S3 provider: +# +# STORAGE_S3_PROVIDER_AWS_ACCESS_KEY_ID=access_key_id +# STORAGE_S3_PROVIDER_AWS_SECRET_ACCESS_KEY=secret_access_key +# STORAGE_S3_PROVIDER_AWS_REGION=us-east-1 + +# Minio Provider Configuration +STORAGE_S3_PROVIDER_MINIO_DEFAULT=true +STORAGE_S3_PROVIDER_MINIO_ENDPOINT=http://127.0.0.1:9000 +STORAGE_S3_PROVIDER_MINIO_FORCE_PATH_STYLE=true +STORAGE_S3_PROVIDER_MINIO_ACCESS_KEY_ID=supa-storage +STORAGE_S3_PROVIDER_MINIO_SECRET_ACCESS_KEY=secret1234 +STORAGE_S3_PROVIDER_MINIO_REGION=us-east-1 + +####################################### +# File Provider Configuration +####################################### +FILE_STORAGE_BACKEND_PATH=./data + +####################################### # Queue -WEBHOOK_URL= +####################################### ENABLE_QUEUE_EVENTS=false -# Rate Limiter -ENABLE_RATE_LIMITER=true -RATE_LIMITER_DRIVER=redis -RATE_LIMITER_REDIS_URL=localhost:6379 \ No newline at end of file +####################################### +# Webhooks +####################################### +WEBHOOK_URL= +WEBHOOK_API_KEY= + +####################################### +# Monitoring +####################################### +LOG_LEVEL=info +LOGFLARE_ENABLED=false +LOGFLARE_API_KEY=api_key +LOGFLARE_SOURCE_TOKEN=source_token +ENABLED_DEFAULT_METRICS=true \ No newline at end of file diff --git a/migrations/multitenant/0006-s3-provider.sql b/migrations/multitenant/0006-s3-provider.sql new file mode 100644 index 00000000..b9db1a9f --- /dev/null +++ b/migrations/multitenant/0006-s3-provider.sql @@ -0,0 +1 @@ +ALTER table tenants ADD COLUMN s3_provider TEXT DEFAULT NULL; \ No newline at end of file diff --git a/src/config.ts b/src/config.ts index c92b9a98..76214885 100644 --- a/src/config.ts +++ b/src/config.ts @@ -1,6 +1,22 @@ import dotenv from 'dotenv' type StorageBackendType = 'file' | 's3' + +export interface StorageProviderConfig { + name: string + endpoint?: string + region?: string + forcePathStyle?: boolean + accessKey?: string + secretKey?: string + isDefault: boolean +} + +export interface StorageProviders { + default: StorageProviderConfig + [key: string]: StorageProviderConfig +} + type StorageConfigType = { keepAliveTimeout: number headersTimeout: number @@ -10,11 +26,10 @@ type StorageConfigType = { encryptionKey: string fileSizeLimit: number fileStoragePath?: string - globalS3Protocol?: 'http' | 'https' | string - globalS3MaxSockets?: number - globalS3Bucket: string - globalS3Endpoint?: string - globalS3ForcePathStyle?: boolean + storageS3MaxSockets?: number + storageS3Bucket: string + storageS3Protocol?: 'http' | 'https' | string + storageProviders: StorageProviders isMultitenant: boolean jwtSecret: string jwtAlgorithm: string @@ -67,30 +82,51 @@ type StorageConfigType = { tusPath: string tusUseFileVersionSeparator: boolean enableDefaultMetrics: boolean + sMaxAge: string } -function getOptionalConfigFromEnv(key: string): string | undefined { - return process.env[key] +function getOptionalConfigFromEnv(key: string, fallback?: string): string | undefined { + const envValue = process.env[key] + + if (!envValue && fallback) { + return getOptionalConfigFromEnv(fallback) + } + + return envValue } -function getConfigFromEnv(key: string): string { +function getConfigFromEnv(key: string, fallbackEnv?: string): string { const value = getOptionalConfigFromEnv(key) if (!value) { + if (fallbackEnv) { + return getConfigFromEnv(fallbackEnv) + } + throw new Error(`${key} is undefined`) } return value } -function getOptionalIfMultitenantConfigFromEnv(key: string): string | undefined { +function getOptionalIfMultitenantConfigFromEnv(key: string, fallback?: string): string | undefined { return getOptionalConfigFromEnv('IS_MULTITENANT') === 'true' - ? getOptionalConfigFromEnv(key) - : getConfigFromEnv(key) + ? getOptionalConfigFromEnv(key, fallback) + : getConfigFromEnv(key, fallback) } -export function getConfig(): StorageConfigType { +let config: StorageConfigType | undefined + +export function setConfig(newConfig: StorageConfigType) { + config = newConfig +} + +export function getConfig(options?: { reload?: boolean }): StorageConfigType { + if (config && !options?.reload) { + return config + } + dotenv.config() - return { + config = { keepAliveTimeout: parseInt(getOptionalConfigFromEnv('SERVER_KEEP_ALIVE_TIMEOUT') || '61', 10), headersTimeout: parseInt(getOptionalConfigFromEnv('SERVER_HEADERS_TIMEOUT') || '65', 10), adminApiKeys: getOptionalConfigFromEnv('ADMIN_API_KEYS') || '', @@ -99,14 +135,16 @@ export function getConfig(): StorageConfigType { encryptionKey: getOptionalConfigFromEnv('ENCRYPTION_KEY') || '', fileSizeLimit: Number(getConfigFromEnv('FILE_SIZE_LIMIT')), fileStoragePath: getOptionalConfigFromEnv('FILE_STORAGE_BACKEND_PATH'), - globalS3MaxSockets: parseInt(getOptionalConfigFromEnv('GLOBAL_S3_MAX_SOCKETS') || '200', 10), - globalS3Protocol: getOptionalConfigFromEnv('GLOBAL_S3_PROTOCOL') || 'https', - globalS3Bucket: getConfigFromEnv('GLOBAL_S3_BUCKET'), - globalS3Endpoint: getOptionalConfigFromEnv('GLOBAL_S3_ENDPOINT'), - globalS3ForcePathStyle: getOptionalConfigFromEnv('GLOBAL_S3_FORCE_PATH_STYLE') === 'true', + storageProviders: loadStorageS3ProviderFromEnv(), + storageS3MaxSockets: parseInt( + getOptionalConfigFromEnv('STORAGE_S3_MAX_SOCKETS', 'GLOBAL_S3_MAX_SOCKETS') || '200', + 10 + ), + storageS3Protocol: getOptionalConfigFromEnv('GLOBAL_S3_PROTOCOL') || 'https', + storageS3Bucket: getConfigFromEnv('STORAGE_S3_BUCKET', 'GLOBAL_S3_BUCKET'), isMultitenant: getOptionalConfigFromEnv('IS_MULTITENANT') === 'true', - jwtSecret: getOptionalIfMultitenantConfigFromEnv('PGRST_JWT_SECRET') || '', - jwtAlgorithm: getOptionalConfigFromEnv('PGRST_JWT_ALGORITHM') || 'HS256', + jwtSecret: getOptionalIfMultitenantConfigFromEnv('AUTH_JWT_SECRET', 'PGRST_JWT_SECRET') || '', + jwtAlgorithm: getOptionalConfigFromEnv('AUTH_JWT_ALGORITHM', 'PGRST_JWT_ALGORITHM') || 'HS256', multitenantDatabaseUrl: getOptionalConfigFromEnv('MULTITENANT_DATABASE_URL'), databaseURL: getOptionalIfMultitenantConfigFromEnv('DATABASE_URL') || '', databasePoolURL: getOptionalConfigFromEnv('DATABASE_POOL_URL') || '', @@ -159,7 +197,7 @@ export function getConfig(): StorageConfigType { imgProxyURL: getOptionalConfigFromEnv('IMGPROXY_URL'), imgLimits: { size: { - min: parseInt(getOptionalConfigFromEnv('IMG_LIMITS_MIN_SIZE') || '1', 10), + min: parseInt(getOptionalConfigFromEnv('IMG_LIMITS_MIN_SIZE') || '0', 10), max: parseInt(getOptionalConfigFromEnv('IMG_LIMITS_MAX_SIZE') || '2000', 10), }, }, @@ -195,5 +233,118 @@ export function getConfig(): StorageConfigType { tusUseFileVersionSeparator: getOptionalConfigFromEnv('TUS_USE_FILE_VERSION_SEPARATOR') === 'true', enableDefaultMetrics: getOptionalConfigFromEnv('ENABLE_DEFAULT_METRICS') === 'true', + sMaxAge: getOptionalConfigFromEnv('S_MAXAGE') || '31536000', + } + + return config +} + +/** + * Load S3 storage providers from env variables + * The convention is STORAGE_S3_PROVIDER_{PROVIDER_NAME}_{CONFIGURATION} + * When specifying more than one provider you must also specify the default provider using STORAGE_S3_PROVIDER_{PROVIDER_NAME}_DEFAULT=true + * + * Example Minio provider: + * + * STORAGE_S3_PROVIDER_MINIO_DEFAULT=true + * STORAGE_S3_PROVIDER_MINIO_ENDPOINT=http://127.0.0.1:9000 + * STORAGE_S3_PROVIDER_MINIO_FORCE_PATH_STYLE=true + * STORAGE_S3_PROVIDER_MINIO_ACCESS_KEY_ID=supa-storage + * STORAGE_S3_PROVIDER_MINIO_SECRET_ACCESS_KEY=secret1234 + * STORAGE_S3_PROVIDER_MINIO_REGION=us-east-1 + */ +function loadStorageS3ProviderFromEnv() { + const providersENV = Object.keys(process.env).filter((key) => + key.startsWith('STORAGE_S3_PROVIDER_') + ) + + const providers = providersENV.reduce((all, providerEnv) => { + const providerRegex = new RegExp('(STORAGE_S3_PROVIDER)_([A-Z0-9]+)_(.*)', 'gi') + const matches = providerRegex.exec(providerEnv) + + if (matches?.length !== 4) { + throw new Error( + `Invalid storage provider env variable: ${providerEnv} format is STORAGE_PROVIDER__` + ) + } + + const providerName = matches[2].toLowerCase() + const configName = matches[3].toLowerCase() + + if (!all[providerName]) { + all[providerName] = { + name: providerName, + isDefault: false, + } + } + + switch (configName) { + case 'region': + all[providerName].region = process.env[providerEnv] || '' + break + case 'endpoint': + all[providerName].endpoint = process.env[providerEnv] || '' + break + case 'access_key_id': + all[providerName].accessKey = process.env[providerEnv] || '' + break + case 'secret_access_key': + all[providerName].secretKey = process.env[providerEnv] || '' + break + case 'force_path_style': + all[providerName].forcePathStyle = process.env[providerEnv] === 'true' + break + case 'default': + all[providerName].isDefault = process.env[providerEnv] === 'true' + break + default: + throw new Error(`Invalid storage provider config: ${configName}`) + } + + return all + }, {} as Record) + + const providersNumber = Object.keys(providers).length + + // If multiple providers are configured we check if one is default + if (providersNumber > 1) { + const defaultProviderName = Object.keys(providers).find((providerName) => { + return providers[providerName].isDefault + }) + + if (!defaultProviderName) { + throw new Error( + `Missing default storage provider config please provide STORAGE_PROVIDER__DEFAULT=true` + ) + } + + providers.default = providers[defaultProviderName] + } + + // Only 1 provider specified, we set it as default + if (providersNumber === 1) { + providers.default = Object.values(providers)[0] } + + if (providersNumber === 0) { + // Backwards compatibility with old env variables + const endpoint = getOptionalConfigFromEnv('GLOBAL_S3_ENDPOINT') + const pathStyle = getOptionalConfigFromEnv('GLOBAL_S3_FORCE_PATH_STYLE') + + if (endpoint || pathStyle) { + providers.default = { + isDefault: true, + name: 'default', + endpoint, + forcePathStyle: pathStyle === 'true', + region: getOptionalConfigFromEnv('AWS_DEFAULT_REGION', 'REGION'), + accessKey: getOptionalConfigFromEnv('AWS_ACCESS_KEY_ID'), + secretKey: getOptionalConfigFromEnv('AWS_SECRET_ACCESS_KEY'), + } + } else if (getConfigFromEnv('STORAGE_BACKEND') === 's3') { + throw new Error('Missing storage provider config please provide STORAGE_PROVIDER_*') + } + } + + return providers as StorageProviders } diff --git a/src/database/connection.ts b/src/database/connection.ts index d829f5a5..69d8931c 100644 --- a/src/database/connection.ts +++ b/src/database/connection.ts @@ -138,7 +138,7 @@ export class TenantConnection { { minTimeout: 50, maxTimeout: 500, - maxRetryTime: 2000, + maxRetryTime: 4000, retries: 10, } ) diff --git a/src/database/tenant.ts b/src/database/tenant.ts index f25f6a26..a0168e41 100644 --- a/src/database/tenant.ts +++ b/src/database/tenant.ts @@ -13,6 +13,7 @@ interface TenantConfig { maxConnections?: number fileSizeLimit: number features: Features + s3Provider?: string jwtSecret: string serviceKey: string serviceKeyPayload: { @@ -26,7 +27,8 @@ export interface Features { } } -const { multitenantDatabaseUrl, isMultitenant, serviceKey, jwtSecret } = getConfig() +const { multitenantDatabaseUrl, isMultitenant, serviceKey, jwtSecret, storageProviders } = + getConfig() const tenantConfigCache = new Map() @@ -90,6 +92,7 @@ export async function getTenantConfig(tenantId: string): Promise { feature_image_transformation, database_pool_url, max_connections, + s3_provider, } = tenant const serviceKey = decrypt(service_key) @@ -105,6 +108,7 @@ export async function getTenantConfig(tenantId: string): Promise { jwtSecret: jwtSecret, serviceKey: serviceKey, serviceKeyPayload, + s3Provider: s3_provider, maxConnections: max_connections ? Number(max_connections) : undefined, features: { imageTransformation: { @@ -116,6 +120,15 @@ export async function getTenantConfig(tenantId: string): Promise { return config } +export async function getTenantBackendProvider(tenantId: string) { + if (!isMultitenant) { + return 'default' + } + + const tenantConfig = await getTenantConfig(tenantId) + return tenantConfig.s3Provider || 'default' +} + /** * Get the anon key from the tenant config * @param tenantId diff --git a/src/http/plugins/storage.ts b/src/http/plugins/storage.ts index f894c352..d3a57bfb 100644 --- a/src/http/plugins/storage.ts +++ b/src/http/plugins/storage.ts @@ -11,10 +11,12 @@ declare module 'fastify' { } export const storage = fastifyPlugin(async (fastify) => { - const storageBackend = createStorageBackend() - fastify.decorateRequest('storage', undefined) + fastify.decorateRequest('backend', undefined) + fastify.addHook('preHandler', async (request) => { + const storageBackend = await createStorageBackend(request.tenantId) + const database = new StorageKnexDB(request.db, { tenantId: request.tenantId, host: request.headers['x-forwarded-host'] as string, diff --git a/src/http/routes/object/getObject.ts b/src/http/routes/object/getObject.ts index d323a385..77bb3018 100644 --- a/src/http/routes/object/getObject.ts +++ b/src/http/routes/object/getObject.ts @@ -4,7 +4,7 @@ import { IncomingMessage, Server, ServerResponse } from 'http' import { getConfig } from '../../../config' import { AuthenticatedRangeRequest } from '../../request' -const { globalS3Bucket } = getConfig() +const { storageS3Bucket } = getConfig() const getObjectParamsSchema = { type: 'object', @@ -48,7 +48,7 @@ async function requestHandler( request.log.info(s3Key) return request.storage.renderer('asset').render(request, response, { - bucket: globalS3Bucket, + bucket: storageS3Bucket, key: s3Key, version: obj.version, download, diff --git a/src/http/routes/object/getObjectInfo.ts b/src/http/routes/object/getObjectInfo.ts index 51e55f0b..02f09828 100644 --- a/src/http/routes/object/getObjectInfo.ts +++ b/src/http/routes/object/getObjectInfo.ts @@ -4,8 +4,10 @@ import { IncomingMessage, Server, ServerResponse } from 'http' import { getConfig } from '../../../config' import { AuthenticatedRangeRequest } from '../../request' import { Obj } from '../../../storage/schemas' +import { getJwtSecret, SignedToken, verifyJWT } from '../../../auth' +import { StorageBackendError } from '../../../storage' -const { globalS3Bucket } = getConfig() +const { storageS3Bucket } = getConfig() const getObjectParamsSchema = { type: 'object', @@ -16,10 +18,64 @@ const getObjectParamsSchema = { required: ['bucketName', '*'], } as const +const getSignedObjectQSSchema = { + type: 'object', + properties: { + token: { + type: 'string', + examples: [ + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1cmwiOiJidWNrZXQyL3B1YmxpYy9zYWRjYXQtdXBsb2FkMjMucG5nIiwiaWF0IjoxNjE3NzI2MjczLCJleHAiOjE2MTc3MjcyNzN9.uBQcXzuvXxfw-9WgzWMBfE_nR3VOgpvfZe032sfLSSk', + ], + }, + }, +} as const + interface getObjectRequestInterface extends AuthenticatedRangeRequest { Params: FromSchema } +interface getSignObjectRequestInterface extends AuthenticatedRangeRequest { + Params: FromSchema + Querystring: FromSchema +} + +async function handleSignedToken(request: FastifyRequest) { + let payload: SignedToken + const jwtSecret = await getJwtSecret(request.tenantId) + const token = request.query.token + + if (!token) { + throw new StorageBackendError('missing_token', 400, 'Missing token') + } + + try { + payload = (await verifyJWT(token, jwtSecret)) as SignedToken + + const { url, exp } = payload + const bucketName = request.params.bucketName + const objectName = request.params['*'] + + const path = `${bucketName}/${objectName}` + + if (url !== path) { + throw new StorageBackendError('InvalidSignature', 400, 'The url do not match the signature') + } + + const obj = await request.storage + .asSuperUser() + .from(request.params.bucketName) + .findObject(objectName, 'id,version') + + return { obj, exp: new Date(exp * 1000).toUTCString() } + } catch (e) { + if (e instanceof StorageBackendError) { + throw e + } + const err = e as Error + throw new StorageBackendError('Invalid JWT', 400, err.message, err) + } +} + async function requestHandler( request: FastifyRequest, response: FastifyReply< @@ -29,7 +85,7 @@ async function requestHandler( getObjectRequestInterface, unknown >, - publicRoute = false + routeVisibility: 'public' | 'private' | 'signed' ) { const { bucketName } = request.params const objectName = request.params['*'] @@ -37,20 +93,65 @@ async function requestHandler( const s3Key = `${request.tenantId}/${bucketName}/${objectName}` let obj: Obj - if (publicRoute) { - obj = await request.storage.asSuperUser().from(bucketName).findObject(objectName, 'id,version') - } else { - obj = await request.storage.from(bucketName).findObject(objectName, 'id,version') + let expires: string | undefined + switch (routeVisibility) { + case 'public': + obj = await request.storage + .asSuperUser() + .from(bucketName) + .findObject(objectName, 'id,version') + break + case 'private': + obj = await request.storage.from(bucketName).findObject(objectName, 'id,version') + break + case 'signed': + const { obj: signedObj, exp } = await handleSignedToken( + request as FastifyRequest + ) + obj = signedObj + expires = exp + break + default: + throw new Error(`Invalid route visibility: ${routeVisibility}`) } return request.storage.renderer('head').render(request, response, { - bucket: globalS3Bucket, + bucket: storageS3Bucket, key: s3Key, version: obj.version, + expires, }) } export async function publicRoutes(fastify: FastifyInstance) { + fastify.head( + '/info/sign/:bucketName/*', + { + schema: { + params: getObjectParamsSchema, + description: 'returns object info', + tags: ['object'], + }, + }, + async (request, response) => { + return requestHandler(request, response, 'signed') + } + ) + + fastify.get( + '/info/sign/:bucketName/*', + { + schema: { + params: getObjectParamsSchema, + description: 'returns object info', + tags: ['object'], + }, + }, + async (request, response) => { + return requestHandler(request, response, 'signed') + } + ) + fastify.head( '/public/:bucketName/*', { @@ -63,7 +164,7 @@ export async function publicRoutes(fastify: FastifyInstance) { }, }, async (request, response) => { - return requestHandler(request, response, true) + return requestHandler(request, response, 'public') } ) @@ -80,13 +181,14 @@ export async function publicRoutes(fastify: FastifyInstance) { }, }, async (request, response) => { - return requestHandler(request, response, true) + return requestHandler(request, response, 'public') } ) } export async function authenticatedRoutes(fastify: FastifyInstance) { const summary = 'Retrieve object info' + fastify.head( '/authenticated/:bucketName/*', { @@ -99,7 +201,7 @@ export async function authenticatedRoutes(fastify: FastifyInstance) { }, }, async (request, response) => { - return requestHandler(request, response) + return requestHandler(request, response, 'private') } ) @@ -115,7 +217,7 @@ export async function authenticatedRoutes(fastify: FastifyInstance) { }, }, async (request, response) => { - return requestHandler(request, response) + return requestHandler(request, response, 'private') } ) @@ -132,7 +234,7 @@ export async function authenticatedRoutes(fastify: FastifyInstance) { }, }, async (request, response) => { - return requestHandler(request, response) + return requestHandler(request, response, 'private') } ) @@ -149,7 +251,7 @@ export async function authenticatedRoutes(fastify: FastifyInstance) { }, }, async (request, response) => { - return requestHandler(request, response) + return requestHandler(request, response, 'private') } ) } diff --git a/src/http/routes/object/getPublicObject.ts b/src/http/routes/object/getPublicObject.ts index c21e4478..3344ac45 100644 --- a/src/http/routes/object/getPublicObject.ts +++ b/src/http/routes/object/getPublicObject.ts @@ -2,7 +2,7 @@ import { FastifyInstance } from 'fastify' import { FromSchema } from 'json-schema-to-ts' import { getConfig } from '../../../config' -const { globalS3Bucket } = getConfig() +const { storageS3Bucket } = getConfig() const getPublicObjectParamsSchema = { type: 'object', @@ -59,7 +59,7 @@ export default async function routes(fastify: FastifyInstance) { request.log.info(s3Key) return request.storage.renderer('asset').render(request, response, { - bucket: globalS3Bucket, + bucket: storageS3Bucket, key: s3Key, version: obj.version, download, diff --git a/src/http/routes/object/getSignedObject.ts b/src/http/routes/object/getSignedObject.ts index 8b6b40bb..f2db3105 100644 --- a/src/http/routes/object/getSignedObject.ts +++ b/src/http/routes/object/getSignedObject.ts @@ -4,7 +4,7 @@ import { getConfig } from '../../../config' import { getJwtSecret, SignedToken, verifyJWT } from '../../../auth' import { StorageBackendError } from '../../../storage' -const { globalS3Bucket } = getConfig() +const { storageS3Bucket } = getConfig() const getSignedObjectParamsSchema = { type: 'object', @@ -82,7 +82,7 @@ export default async function routes(fastify: FastifyInstance) { .findObject(objParts.join('/'), 'id,version') return request.storage.renderer('asset').render(request, response, { - bucket: globalS3Bucket, + bucket: storageS3Bucket, key: s3Key, version: obj.version, download, diff --git a/src/http/routes/render/renderAuthenticatedImage.ts b/src/http/routes/render/renderAuthenticatedImage.ts index e5a54176..f4e9ee4d 100644 --- a/src/http/routes/render/renderAuthenticatedImage.ts +++ b/src/http/routes/render/renderAuthenticatedImage.ts @@ -4,7 +4,7 @@ import { FastifyInstance } from 'fastify' import { ImageRenderer } from '../../../storage/renderer' import { transformationOptionsSchema } from '../../schemas/transformations' -const { globalS3Bucket } = getConfig() +const { storageS3Bucket } = getConfig() const renderAuthenticatedImageParamsSchema = { type: 'object', @@ -53,7 +53,7 @@ export default async function routes(fastify: FastifyInstance) { const renderer = request.storage.renderer('image') as ImageRenderer return renderer.setTransformations(request.query).render(request, response, { - bucket: globalS3Bucket, + bucket: storageS3Bucket, key: s3Key, version: obj.version, download, diff --git a/src/http/routes/render/renderPublicImage.ts b/src/http/routes/render/renderPublicImage.ts index 92e2fd2f..6ecd8416 100644 --- a/src/http/routes/render/renderPublicImage.ts +++ b/src/http/routes/render/renderPublicImage.ts @@ -4,7 +4,7 @@ import { FastifyInstance } from 'fastify' import { ImageRenderer } from '../../../storage/renderer' import { transformationOptionsSchema } from '../../schemas/transformations' -const { globalS3Bucket } = getConfig() +const { storageS3Bucket } = getConfig() const renderPublicImageParamsSchema = { type: 'object', @@ -58,7 +58,7 @@ export default async function routes(fastify: FastifyInstance) { const renderer = request.storage.renderer('image') as ImageRenderer return renderer.setTransformations(request.query).render(request, response, { - bucket: globalS3Bucket, + bucket: storageS3Bucket, key: s3Key, version: obj.version, download, diff --git a/src/http/routes/render/renderSignedImage.ts b/src/http/routes/render/renderSignedImage.ts index f29c8627..130e3c85 100644 --- a/src/http/routes/render/renderSignedImage.ts +++ b/src/http/routes/render/renderSignedImage.ts @@ -5,7 +5,7 @@ import { ImageRenderer } from '../../../storage/renderer' import { getJwtSecret, SignedToken, verifyJWT } from '../../../auth' import { StorageBackendError } from '../../../storage' -const { globalS3Bucket } = getConfig() +const { storageS3Bucket } = getConfig() const renderAuthenticatedImageParamsSchema = { type: 'object', @@ -83,7 +83,7 @@ export default async function routes(fastify: FastifyInstance) { return renderer .setTransformationsFromString(transformations || '') .render(request, response, { - bucket: globalS3Bucket, + bucket: storageS3Bucket, key: s3Key, version: obj.version, download, diff --git a/src/http/routes/tenant/index.ts b/src/http/routes/tenant/index.ts index a98a609e..124721ad 100644 --- a/src/http/routes/tenant/index.ts +++ b/src/http/routes/tenant/index.ts @@ -4,6 +4,9 @@ import apiKey from '../../plugins/apikey' import { decrypt, encrypt } from '../../../auth' import { knex } from '../../../database/multitenant-db' import { deleteTenantConfig, runMigrations } from '../../../database/tenant' +import { getConfig } from '../../../config' + +const { storageProviders } = getConfig() const patchSchema = { body: { @@ -16,6 +19,7 @@ const patchSchema = { fileSizeLimit: { type: 'number' }, jwtSecret: { type: 'string' }, serviceKey: { type: 'string' }, + s3Provider: { type: 'string' }, features: { type: 'object', properties: { @@ -62,6 +66,7 @@ interface tenantDBInterface { service_key: string file_size_limit?: number feature_image_transformation?: boolean + s3_provider?: string } export default async function routes(fastify: FastifyInstance) { @@ -69,6 +74,7 @@ export default async function routes(fastify: FastifyInstance) { fastify.get('/', async () => { const tenants = await knex('tenants').select() + return tenants.map( ({ id, @@ -80,6 +86,7 @@ export default async function routes(fastify: FastifyInstance) { jwt_secret, service_key, feature_image_transformation, + s3_provider, }) => ({ id, anonKey: decrypt(anon_key), @@ -89,6 +96,7 @@ export default async function routes(fastify: FastifyInstance) { fileSizeLimit: Number(file_size_limit), jwtSecret: decrypt(jwt_secret), serviceKey: decrypt(service_key), + s3Provider: s3_provider, features: { imageTransformation: { enabled: feature_image_transformation, @@ -112,6 +120,7 @@ export default async function routes(fastify: FastifyInstance) { jwt_secret, service_key, feature_image_transformation, + s3_provider, } = tenant return { @@ -127,6 +136,7 @@ export default async function routes(fastify: FastifyInstance) { fileSizeLimit: Number(file_size_limit), jwtSecret: decrypt(jwt_secret), serviceKey: decrypt(service_key), + s3Provider: s3_provider || undefined, features: { imageTransformation: { enabled: feature_image_transformation, @@ -147,9 +157,20 @@ export default async function routes(fastify: FastifyInstance) { features, databasePoolUrl, maxConnections, + s3Provider, } = request.body await runMigrations(tenantId, databaseUrl) + + if (s3Provider) { + if (!validateS3Provider(s3Provider)) { + return reply.code(400).send({ + error: 'Bad Request', + message: `Invalid s3 provider.`, + }) + } + } + await knex('tenants').insert({ id: tenantId, anon_key: encrypt(anonKey), @@ -160,6 +181,7 @@ export default async function routes(fastify: FastifyInstance) { jwt_secret: encrypt(jwtSecret), service_key: encrypt(serviceKey), feature_image_transformation: features?.imageTransformation?.enabled ?? false, + s3_provider: s3Provider || undefined, }) reply.code(201).send() }) @@ -177,11 +199,22 @@ export default async function routes(fastify: FastifyInstance) { features, databasePoolUrl, maxConnections, + s3Provider, } = request.body const { tenantId } = request.params if (databaseUrl) { await runMigrations(tenantId, databaseUrl) } + + if (s3Provider) { + if (!validateS3Provider(s3Provider)) { + return reply.code(400).send({ + error: 'Bad Request', + message: `Invalid s3 provider.`, + }) + } + } + await knex('tenants') .update({ anon_key: anonKey !== undefined ? encrypt(anonKey) : undefined, @@ -192,6 +225,7 @@ export default async function routes(fastify: FastifyInstance) { jwt_secret: jwtSecret !== undefined ? encrypt(jwtSecret) : undefined, service_key: serviceKey !== undefined ? encrypt(serviceKey) : undefined, feature_image_transformation: features?.imageTransformation?.enabled, + s3_provider: s3Provider, }) .where('id', tenantId) reply.code(204).send() @@ -208,6 +242,7 @@ export default async function routes(fastify: FastifyInstance) { features, databasePoolUrl, maxConnections, + s3Provider, } = request.body const { tenantId } = request.params await runMigrations(tenantId, databaseUrl) @@ -218,6 +253,16 @@ export default async function routes(fastify: FastifyInstance) { database_url: encrypt(databaseUrl), jwt_secret: encrypt(jwtSecret), service_key: encrypt(serviceKey), + s3_provider: s3Provider, + } + + if (s3Provider) { + if (!validateS3Provider(s3Provider)) { + return reply.code(400).send({ + error: 'Bad Request', + message: `Invalid s3 provider.`, + }) + } } if (fileSizeLimit) { @@ -246,3 +291,10 @@ export default async function routes(fastify: FastifyInstance) { reply.code(204).send() }) } + +function validateS3Provider(s3Provider: string) { + const providerExists = Object.keys(storageProviders).find( + (provider) => provider === s3Provider.toLowerCase() + ) + return Boolean(providerExists) +} diff --git a/src/http/routes/tus/index.ts b/src/http/routes/tus/index.ts index 40fce9ae..90a8701b 100644 --- a/src/http/routes/tus/index.ts +++ b/src/http/routes/tus/index.ts @@ -13,13 +13,11 @@ import { getFileSizeLimit } from '../../../storage/limits' import { UploadId } from './upload-id' import { FileStore } from './file-store' import { TenantConnection } from '../../../database/connection' +import { getTenantBackendProvider } from '../../../database/tenant' const { - globalS3Bucket, - globalS3Endpoint, - globalS3Protocol, - globalS3ForcePathStyle, - region, + storageS3Bucket, + storageProviders, tusUrlExpiryMs, tusPath, storageBackendType, @@ -35,28 +33,45 @@ type MultiPartRequest = http.IncomingMessage & { } } -function createTusStore() { +const s3Providers: Record = {} + +async function createTusStore(tenantId: string) { if (storageBackendType === 's3') { + const backendProvider = await getTenantBackendProvider(tenantId) + + if (s3Providers[backendProvider]) { + return s3Providers[backendProvider] + } + + const { region, endpoint, forcePathStyle, accessKey, secretKey } = + storageProviders[backendProvider] + return new S3Store({ partSize: 6 * 1024 * 1024, // Each uploaded part will have ~6MB, uploadExpiryMilliseconds: tusUrlExpiryMs, s3ClientConfig: { - bucket: globalS3Bucket, + bucket: storageS3Bucket, region: region, - endpoint: globalS3Endpoint, - sslEnabled: globalS3Protocol !== 'http', - s3ForcePathStyle: globalS3ForcePathStyle, + endpoint: endpoint, + sslEnabled: !endpoint?.startsWith('http://'), + s3ForcePathStyle: forcePathStyle, + credentials: + accessKey && secretKey + ? { + accessKeyId: accessKey, + secretAccessKey: secretKey, + } + : undefined, }, }) } return new FileStore({ - directory: fileStoragePath + '/' + globalS3Bucket, + directory: fileStoragePath + '/' + storageS3Bucket, }) } -function createTusServer() { - const datastore = createTusStore() +function createTusServer(datastore: DataStore) { const serverOptions: ServerOptions & { datastore: DataStore } = { @@ -96,8 +111,6 @@ function createTusServer() { } export default async function routes(fastify: FastifyInstance) { - const tusServer = createTusServer() - fastify.register(async function authorizationContext(fastify) { fastify.addContentTypeParser('application/offset+octet-stream', (request, payload, done) => done(null) @@ -119,38 +132,53 @@ export default async function routes(fastify: FastifyInstance) { fastify.post( '/', { schema: { summary: 'Handle POST request for TUS Resumable uploads', tags: ['object'] } }, - (req, res) => { - tusServer.handle(req.raw, res.raw) + async (req, res) => { + const datastore = await createTusStore(req.tenantId) + const tusServer = createTusServer(datastore) + + await tusServer.handle(req.raw, res.raw) } ) fastify.post( '/*', { schema: { summary: 'Handle POST request for TUS Resumable uploads', tags: ['object'] } }, - (req, res) => { - tusServer.handle(req.raw, res.raw) + async (req, res) => { + const datastore = await createTusStore(req.tenantId) + const tusServer = createTusServer(datastore) + + await tusServer.handle(req.raw, res.raw) } ) fastify.put( '/*', { schema: { summary: 'Handle PUT request for TUS Resumable uploads', tags: ['object'] } }, - (req, res) => { - tusServer.handle(req.raw, res.raw) + async (req, res) => { + const datastore = await createTusStore(req.tenantId) + const tusServer = createTusServer(datastore) + + await tusServer.handle(req.raw, res.raw) } ) fastify.patch( '/*', { schema: { summary: 'Handle PATCH request for TUS Resumable uploads', tags: ['object'] } }, - (req, res) => { - tusServer.handle(req.raw, res.raw) + async (req, res) => { + const datastore = await createTusStore(req.tenantId) + const tusServer = createTusServer(datastore) + + await tusServer.handle(req.raw, res.raw) } ) fastify.head( '/*', { schema: { summary: 'Handle HEAD request for TUS Resumable uploads', tags: ['object'] } }, - (req, res) => { - tusServer.handle(req.raw, res.raw) + async (req, res) => { + const datastore = await createTusStore(req.tenantId) + const tusServer = createTusServer(datastore) + + await tusServer.handle(req.raw, res.raw) } ) }) @@ -181,8 +209,11 @@ export default async function routes(fastify: FastifyInstance) { description: 'Handle OPTIONS request for TUS Resumable uploads', }, }, - (req, res) => { - tusServer.handle(req.raw, res.raw) + async (req, res) => { + const datastore = await createTusStore(req.tenantId) + const tusServer = createTusServer(datastore) + + await tusServer.handle(req.raw, res.raw) } ) @@ -195,8 +226,11 @@ export default async function routes(fastify: FastifyInstance) { description: 'Handle OPTIONS request for TUS Resumable uploads', }, }, - (req, res) => { - tusServer.handle(req.raw, res.raw) + async (req, res) => { + const datastore = await createTusStore(req.tenantId) + const tusServer = createTusServer(datastore) + + await tusServer.handle(req.raw, res.raw) } ) }) diff --git a/src/http/routes/tus/lifecycle.ts b/src/http/routes/tus/lifecycle.ts index bd1f180d..a8e05579 100644 --- a/src/http/routes/tus/lifecycle.ts +++ b/src/http/routes/tus/lifecycle.ts @@ -7,7 +7,7 @@ import { UploadId } from './upload-id' import { Uploader } from '../../../storage/uploader' import { TenantConnection } from '../../../database/connection' -const { globalS3Bucket } = getConfig() +const { storageS3Bucket } = getConfig() export type MultiPartRequest = http.IncomingMessage & { upload: { @@ -107,7 +107,7 @@ export async function onUploadFinish( try { const s3Key = `${req.upload.tenantId}/${resourceId.bucket}/${resourceId.objectName}` const metadata = await req.upload.storage.backend.headObject( - globalS3Bucket, + storageS3Bucket, s3Key, resourceId.version ) diff --git a/src/queue/events/base-event.ts b/src/queue/events/base-event.ts index 36edb568..162676af 100644 --- a/src/queue/events/base-event.ts +++ b/src/queue/events/base-event.ts @@ -17,7 +17,21 @@ export interface BasePayload { } } -export type StaticThis = { new (...args: any): T } +type StaticThis> = BaseEventConstructor + +interface BaseEventConstructor> { + version: string + + new (...args: any): Base + + send( + this: StaticThis, + payload: Omit + ): Promise + + eventName(): string + beforeSend(payload: Omit): Promise +} const { enableQueueEvents } = getConfig() @@ -39,6 +53,10 @@ export abstract class BaseEvent> { return this.queueName } + static async beforeSend>(payload: Omit) { + return payload + } + static getQueueOptions(): SendOptions | undefined { return undefined } @@ -47,14 +65,16 @@ export abstract class BaseEvent> { return {} } - static send>( + static async send>( this: StaticThis, payload: Omit ) { if (!payload.$version) { - ;(payload as any).$version = (this as any).version + ;(payload as T['payload']).$version = this.version } - const that = new this(payload) + const newPayload = await this.beforeSend(payload) + + const that = new this(newPayload) return that.send() } @@ -64,28 +84,29 @@ export abstract class BaseEvent> { ) { // eslint-disable-next-line @typescript-eslint/no-var-requires const { Webhook } = require('./webhook') - const eventType = (this as any).eventName() + const eventType = this.eventName() + const newPayload = await this.beforeSend(payload) try { await Webhook.send({ event: { type: eventType, - $version: (this as any).version, + $version: this.version, applyTime: Date.now(), - payload, + payload: newPayload, }, - tenant: payload.tenant, + tenant: newPayload.tenant, }) } catch (e) { logger.error( { event: { type: eventType, - $version: (this as any).version, + $version: this.version, applyTime: Date.now(), - payload, + payload: newPayload, }, - tenant: payload.tenant, + tenant: newPayload.tenant, }, `error sending webhook: ${eventType}` ) @@ -111,7 +132,7 @@ export abstract class BaseEvent> { host: payload.tenant.host, }) - const storageBackend = createStorageBackend() + const storageBackend = await createStorageBackend(payload.tenant.ref) return new Storage(storageBackend, db) } diff --git a/src/queue/events/multipart-upload-completed.ts b/src/queue/events/multipart-upload-completed.ts index 31e0cf55..16f43fdc 100644 --- a/src/queue/events/multipart-upload-completed.ts +++ b/src/queue/events/multipart-upload-completed.ts @@ -10,7 +10,7 @@ interface UploadCompleted extends BasePayload { version: string } -const { globalS3Bucket } = getConfig() +const { storageS3Bucket } = getConfig() export class MultiPartUploadCompleted extends BaseEvent { static queueName = 'multipart:upload:completed' @@ -23,7 +23,7 @@ export class MultiPartUploadCompleted extends BaseEvent { const s3Key = `${job.data.tenant.ref}/${job.data.bucketName}/${job.data.objectName}/${version}` if (storage.backend instanceof S3Backend) { - await storage.backend.setMetadataToCompleted(globalS3Bucket, s3Key) + await storage.backend.setMetadataToCompleted(storageS3Bucket, s3Key) } } catch (e) { if (isS3Error(e) && e.$metadata.httpStatusCode === 404) { diff --git a/src/queue/events/object-admin-delete.ts b/src/queue/events/object-admin-delete.ts index ad00d02f..b45af713 100644 --- a/src/queue/events/object-admin-delete.ts +++ b/src/queue/events/object-admin-delete.ts @@ -10,7 +10,7 @@ export interface ObjectDeleteEvent extends BasePayload { version?: string } -const { globalS3Bucket } = getConfig() +const { storageS3Bucket } = getConfig() export class ObjectAdminDelete extends BaseEvent { static queueName = 'object:admin:delete' @@ -24,7 +24,7 @@ export class ObjectAdminDelete extends BaseEvent { const s3Key = `${job.data.tenant.ref}/${job.data.bucketId}/${job.data.name}` - await storage.backend.deleteObjects(globalS3Bucket, [ + await storage.backend.deleteObjects(storageS3Bucket, [ withOptionalVersion(s3Key, version), withOptionalVersion(s3Key, version) + '.info', ]) diff --git a/src/queue/events/object-created.ts b/src/queue/events/object-created.ts index a344a6b5..aa73aec5 100644 --- a/src/queue/events/object-created.ts +++ b/src/queue/events/object-created.ts @@ -1,15 +1,25 @@ import { BaseEvent, BasePayload } from './base-event' import { ObjectMetadata } from '../../storage/backend' import { ObjectRemovedEvent } from './object-removed' +import { getTenantBackendProvider } from '../../database/tenant' interface ObjectCreatedEvent extends BasePayload { name: string bucketId: string + version: string + provider?: string metadata: ObjectMetadata } abstract class ObjectCreated extends BaseEvent { protected static queueName = 'object:created' + + static async beforeSend>( + payload: Omit + ) { + payload.provider = payload.provider || (await getTenantBackendProvider(payload.tenant.ref)) + return payload + } } export class ObjectCreatedPutEvent extends ObjectCreated { @@ -38,4 +48,11 @@ export class ObjectCreatedMove extends BaseEvent { static eventName() { return `ObjectCreated:Move` } + + static async beforeSend>( + payload: Omit + ) { + payload.provider = payload.provider || (await getTenantBackendProvider(payload.tenant.ref)) + return payload + } } diff --git a/src/queue/events/object-updated.ts b/src/queue/events/object-updated.ts index 0b8ff4c5..37515497 100644 --- a/src/queue/events/object-updated.ts +++ b/src/queue/events/object-updated.ts @@ -1,9 +1,12 @@ import { BaseEvent, BasePayload } from './base-event' import { ObjectMetadata } from '../../storage/backend' +import { getTenantBackendProvider } from '../../database/tenant' interface ObjectUpdatedMetadataEvent extends BasePayload { name: string bucketId: string + version: string + provider?: string metadata: ObjectMetadata } @@ -13,4 +16,11 @@ export class ObjectUpdatedMetadata extends BaseEvent static eventName() { return `ObjectUpdated:Metadata` } + + static async beforeSend>( + payload: Omit + ) { + payload.provider = payload.provider || (await getTenantBackendProvider(payload.tenant.ref)) + return payload + } } diff --git a/src/storage/backend/index.ts b/src/storage/backend/index.ts index e438c08e..723f5874 100644 --- a/src/storage/backend/index.ts +++ b/src/storage/backend/index.ts @@ -1,22 +1,26 @@ -import { StorageBackendAdapter } from './generic' import { FileBackend } from './file' -import { S3Backend } from './s3' +import { getClient, S3Backend, S3Options } from './s3' import { getConfig } from '../../config' +import { getTenantBackendProvider } from '../../database/tenant' export * from './s3' export * from './file' export * from './generic' -const { region, globalS3Endpoint, globalS3ForcePathStyle, storageBackendType } = getConfig() +const { storageBackendType } = getConfig() -export function createStorageBackend() { - let storageBackend: StorageBackendAdapter - - if (storageBackendType === 'file') { - storageBackend = new FileBackend() - } else { - storageBackend = new S3Backend(region, globalS3Endpoint, globalS3ForcePathStyle) +export async function createStorageBackend(tenantId: string, options?: S3Options) { + switch (storageBackendType) { + case 'file': + return new FileBackend() + case 's3': + const provider = await getTenantBackendProvider(tenantId) + const s3Options = options || ({} as S3Options) + return new S3Backend({ + ...s3Options, + client: s3Options.client ?? getClient(provider), + }) + default: + throw new Error(`unknown storage backend type ${storageBackendType}`) } - - return storageBackend } diff --git a/src/storage/backend/s3.ts b/src/storage/backend/s3.ts index d1df5d22..b354f661 100644 --- a/src/storage/backend/s3.ts +++ b/src/storage/backend/s3.ts @@ -23,7 +23,94 @@ import { StorageBackendError } from '../errors' import { getConfig } from '../../config' import Agent, { HttpsAgent } from 'agentkeepalive' -const { globalS3Protocol, globalS3MaxSockets } = getConfig() +const { storageS3MaxSockets, storageProviders } = getConfig() + +export interface S3ClientOptions { + endpoint?: string + region?: string + forcePathStyle?: boolean + accessKey?: string + secretKey?: string + role?: string +} + +export interface S3Options { + bucket: string + prefix?: string + client: S3Client | S3ClientOptions +} + +const defaultHttpAgent = createAgent('http') +const defaultHttpsAgent = createAgent('https') +const defaultS3Clients: Record = {} + +/** + * Get a client for a given provider and cache it for subsequent calls + * @param provider + */ +export function getClient(provider: keyof typeof storageProviders) { + if (defaultS3Clients[provider]) { + return defaultS3Clients[provider] + } + + const options = storageProviders[provider] + + if (!options) { + throw new StorageBackendError( + 'invalid_storage_provider', + 400, + `invalid storage provider ${provider}` + ) + } + + const client = createS3Client(options) + defaultS3Clients[provider] = client + return client +} + +/** + * Creates an agent for the given protocol + * @param protocol + */ +function createAgent(protocol: 'http' | 'https') { + const agentOptions = { + maxSockets: storageS3MaxSockets, + keepAlive: true, + } + + return protocol === 'http' + ? { httpAgent: new Agent(agentOptions) } + : { httpsAgent: new HttpsAgent(agentOptions) } +} + +export function createS3Client(options: S3ClientOptions): S3Client { + const agent = options.endpoint?.startsWith('http://') ? defaultHttpAgent : defaultHttpsAgent + const params: S3ClientConfig = { + region: options.region, + runtime: 'node', + requestHandler: new NodeHttpHandler({ + ...agent, + }), + } + if (options.endpoint) { + params.endpoint = options.endpoint + } + if (options.forcePathStyle) { + params.forcePathStyle = true + } + if (options.accessKey && options.secretKey) { + params.credentials = { + accessKeyId: options.accessKey, + secretAccessKey: options.secretKey, + } + } + + if (options.role) { + // TODO: assume role + } + + return new S3Client(params) +} /** * S3Backend @@ -32,31 +119,9 @@ const { globalS3Protocol, globalS3MaxSockets } = getConfig() export class S3Backend implements StorageBackendAdapter { client: S3Client - constructor(region: string, endpoint?: string | undefined, globalS3ForcePathStyle?: boolean) { - const agentOptions = { - maxSockets: globalS3MaxSockets, - keepAlive: true, - } - - const agent = - globalS3Protocol === 'http' - ? { httpAgent: new Agent(agentOptions) } - : { httpsAgent: new HttpsAgent(agentOptions) } - - const params: S3ClientConfig = { - region, - runtime: 'node', - requestHandler: new NodeHttpHandler({ - ...agent, - }), - } - if (endpoint) { - params.endpoint = endpoint - } - if (globalS3ForcePathStyle) { - params.forcePathStyle = true - } - this.client = new S3Client(params) + constructor(private readonly options: S3Options) { + this.client = + options.client instanceof S3Client ? options.client : createS3Client(options.client) } /** diff --git a/src/storage/object.ts b/src/storage/object.ts index c28da431..0120e3f0 100644 --- a/src/storage/object.ts +++ b/src/storage/object.ts @@ -23,7 +23,7 @@ export interface UploadObjectOptions { version?: string } -const { urlLengthLimit, globalS3Bucket } = getConfig() +const { urlLengthLimit, storageS3Bucket } = getConfig() /** * ObjectStorage @@ -169,7 +169,7 @@ export class ObjectStorage { return all }, [] as string[]) - await this.backend.deleteObjects(globalS3Bucket, prefixesToDelete) + await this.backend.deleteObjects(storageS3Bucket, prefixesToDelete) await Promise.allSettled( data.map((object) => @@ -202,6 +202,7 @@ export class ObjectStorage { name: objectName, bucketId: this.bucketId, metadata, + version: result.version, }) return result @@ -274,14 +275,14 @@ export class ObjectStorage { }) const copyResult = await this.backend.copyObject( - globalS3Bucket, + storageS3Bucket, s3SourceKey, originObject.version, s3DestinationKey, newVersion ) - const metadata = await this.backend.headObject(globalS3Bucket, s3DestinationKey, newVersion) + const metadata = await this.backend.headObject(storageS3Bucket, s3DestinationKey, newVersion) const destObject = await this.db.createObject({ ...originObject, @@ -291,11 +292,13 @@ export class ObjectStorage { version: newVersion, }) + const tenant = this.db.tenant() await ObjectCreatedCopyEvent.sendWebhook({ - tenant: this.db.tenant(), + tenant, name: destinationKey, bucketId: this.bucketId, metadata, + version: newVersion, }) return { @@ -355,14 +358,14 @@ export class ObjectStorage { try { await this.backend.copyObject( - globalS3Bucket, + storageS3Bucket, s3SourceKey, sourceObj.version, s3DestinationKey, newVersion ) - const metadata = await this.backend.headObject(globalS3Bucket, s3DestinationKey, newVersion) + const metadata = await this.backend.headObject(storageS3Bucket, s3DestinationKey, newVersion) await this.db.asSuperUser().withTransaction(async (db) => { await db.createObject({ @@ -392,6 +395,7 @@ export class ObjectStorage { name: destinationObjectName, bucketId: this.bucketId, metadata: metadata, + version: newVersion, oldObject: { name: sourceObjectName, bucketId: this.bucketId, diff --git a/src/storage/renderer/head.ts b/src/storage/renderer/head.ts index 2b17f4d0..f27fe302 100644 --- a/src/storage/renderer/head.ts +++ b/src/storage/renderer/head.ts @@ -17,6 +17,7 @@ export class HeadRenderer extends Renderer { return { metadata, + version: options.version, transformations: ImageRenderer.applyTransformation(request.query as TransformOptions), } } diff --git a/src/storage/renderer/renderer.ts b/src/storage/renderer/renderer.ts index 03da06a1..df7e79d3 100644 --- a/src/storage/renderer/renderer.ts +++ b/src/storage/renderer/renderer.ts @@ -1,6 +1,7 @@ import { FastifyReply, FastifyRequest } from 'fastify' import { ObjectMetadata } from '../backend' import { Readable } from 'stream' +import { getConfig } from '../../config' export interface RenderOptions { bucket: string @@ -13,9 +14,12 @@ export interface RenderOptions { export interface AssetResponse { body?: Readable | ReadableStream | Blob | Buffer metadata: ObjectMetadata + version?: string transformations?: string[] } +const { storageBackendType, sMaxAge } = getConfig() + /** * Renderer * a generic renderer that respond to a request with an asset content @@ -34,7 +38,7 @@ export abstract class Renderer { try { const data = await this.getAsset(request, options) - await this.setHeaders(response, data, options) + await this.setHeaders(request, response, data, options) return response.send(data.body) } catch (err: any) { @@ -55,7 +59,12 @@ export abstract class Renderer { } } - protected setHeaders(response: FastifyReply, data: AssetResponse, options: RenderOptions) { + protected setHeaders( + request: FastifyRequest, + response: FastifyReply, + data: AssetResponse, + options: RenderOptions + ) { response .status(data.metadata.httpStatusCode ?? 200) .header('Accept-Ranges', 'bytes') @@ -67,7 +76,7 @@ export abstract class Renderer { if (options.expires) { response.header('Expires', options.expires) } else { - response.header('Cache-Control', data.metadata.cacheControl) + this.handleCacheControl(request, response, data.metadata) } if (data.metadata.contentRange) { @@ -78,9 +87,34 @@ export abstract class Renderer { response.header('X-Transformations', data.transformations.join(',')) } + if (data.version) { + response.header('X-Version', data.version) + } + this.handleDownload(response, options.download) } + protected handleCacheControl( + request: FastifyRequest, + response: FastifyReply, + metadata: ObjectMetadata + ) { + const cacheBuster = request.headers[`x-cache-buster`] + + const cacheControl = [metadata.cacheControl] + + if (cacheBuster) { + if (cacheBuster !== metadata.eTag) { + cacheControl.push(`s-maxage=${sMaxAge}`) + cacheControl.push('stale-while-revalidate=30') + } else { + cacheControl.push(`s-maxage=${sMaxAge}`) + } + } + + response.header('Cache-Control', cacheControl.join(', ')) + } + protected handleDownload(response: FastifyReply, download?: string) { if (typeof download !== 'undefined') { if (download === '') { diff --git a/src/storage/storage.ts b/src/storage/storage.ts index 3fcbae78..d5573160 100644 --- a/src/storage/storage.ts +++ b/src/storage/storage.ts @@ -6,7 +6,7 @@ import { getFileSizeLimit, mustBeValidBucketName, parseFileSizeToBytes } from '. import { getConfig } from '../config' import { ObjectStorage } from './object' -const { urlLengthLimit, globalS3Bucket } = getConfig() +const { urlLengthLimit, storageS3Bucket } = getConfig() /** * Storage @@ -207,7 +207,7 @@ export class Storage { return all }, [] as string[]) // delete files from s3 asynchronously - this.backend.deleteObjects(globalS3Bucket, params) + this.backend.deleteObjects(storageS3Bucket, params) } if (deleted?.length !== objects.length) { diff --git a/src/storage/uploader.ts b/src/storage/uploader.ts index 620ec383..4f024725 100644 --- a/src/storage/uploader.ts +++ b/src/storage/uploader.ts @@ -18,7 +18,7 @@ interface UploaderOptions extends UploadObjectOptions { allowedMimeTypes?: string[] | null } -const { globalS3Bucket } = getConfig() +const { storageS3Bucket } = getConfig() export interface UploadObjectOptions { bucketId: string @@ -97,7 +97,7 @@ export class Uploader { const s3Key = `${this.db.tenantId}/${path}` const objectMetadata = await this.backend.uploadObject( - globalS3Bucket, + storageS3Bucket, s3Key, version, file.body, @@ -187,6 +187,7 @@ export class Uploader { name: objectName, bucketId: bucketId, metadata: objectMetadata, + version: id, }) ) diff --git a/src/test/object.test.ts b/src/test/object.test.ts index 39a03899..b3cc7c79 100644 --- a/src/test/object.test.ts +++ b/src/test/object.test.ts @@ -4,7 +4,7 @@ import dotenv from 'dotenv' import FormData from 'form-data' import fs from 'fs' import app from '../app' -import { getConfig } from '../config' +import { getConfig, setConfig } from '../config' import { S3Backend } from '../storage/backend' import { Obj } from '../storage/schemas' import { signJWT } from '../auth' @@ -42,6 +42,10 @@ afterEach(async () => { } }) +beforeEach(() => { + getConfig({ reload: true }) +}) + /* * GET /object/:id */ @@ -397,7 +401,7 @@ describe('testing POST object via multipart upload', () => { }) test('return 400 when exceeding file size limit', async () => { - process.env.FILE_SIZE_LIMIT = '1' + getConfig().fileSizeLimit = 1 const form = new FormData() form.append('file', fs.createReadStream(`./src/test/assets/sadcat.jpg`)) const headers = Object.assign({}, form.getHeaders(), { @@ -629,7 +633,7 @@ describe('testing POST object via binary upload', () => { }) test('return 400 when exceeding file size limit', async () => { - process.env.FILE_SIZE_LIMIT = '1' + getConfig().fileSizeLimit = 1 const path = './src/test/assets/sadcat.jpg' const { size } = fs.statSync(path) diff --git a/src/test/rls.test.ts b/src/test/rls.test.ts index a97c11f4..b56cebfd 100644 --- a/src/test/rls.test.ts +++ b/src/test/rls.test.ts @@ -1,14 +1,18 @@ +import dotenv from 'dotenv' +import path from 'path' + +dotenv.config({ path: path.resolve(__dirname, '..', '..', '.env.test'), override: false }) + import { randomUUID } from 'crypto' -import { CreateBucketCommand, S3Client } from '@aws-sdk/client-s3' +import { CreateBucketCommand } from '@aws-sdk/client-s3' import { StorageKnexDB } from '../storage/database' import app from '../app' import { getConfig } from '../config' import { checkBucketExists } from './common' -import { createStorageBackend } from '../storage/backend' +import { createStorageBackend, S3Backend, StorageBackendAdapter } from '../storage/backend' import { Knex, knex } from 'knex' import { signJWT } from '../auth' import fs from 'fs' -import path from 'path' import { Storage } from '../storage' import { getPostgresConnection } from '../database' import FormData from 'form-data' @@ -66,23 +70,27 @@ const testSpec = yaml.load( fs.readFileSync(path.resolve(__dirname, 'rls_tests.yaml'), 'utf8') ) as RlsTestSpec -const { serviceKey, tenantId, jwtSecret, databaseURL, globalS3Bucket } = getConfig() -const backend = createStorageBackend() -const client = backend.client +const { serviceKey, tenantId, jwtSecret, databaseURL, storageS3Bucket } = getConfig({ + reload: true, +}) jest.setTimeout(10000) describe('RLS policies', () => { let db: Knex + let backend: StorageBackendAdapter beforeAll(async () => { + backend = await createStorageBackend(tenantId) + // parse yaml file - if (client instanceof S3Client) { - const bucketExists = await checkBucketExists(client, globalS3Bucket) + if (backend instanceof S3Backend) { + const client = backend.client + const bucketExists = await checkBucketExists(client, storageS3Bucket) if (!bucketExists) { const createBucketCommand = new CreateBucketCommand({ - Bucket: globalS3Bucket, + Bucket: storageS3Bucket, }) await client.send(createBucketCommand) } diff --git a/src/test/tenant.test.ts b/src/test/tenant.test.ts index ac4b3515..bcea2dc8 100644 --- a/src/test/tenant.test.ts +++ b/src/test/tenant.test.ts @@ -3,8 +3,11 @@ import dotenv from 'dotenv' import * as migrate from '../database/migrate' import { knex } from '../database/multitenant-db' import { adminApp } from './common' +import { getConfig } from '../config' -dotenv.config({ path: '.env.test' }) +dotenv.config({ path: '.env.test', override: false }) + +getConfig({ reload: true }) const payload = { anonKey: 'a', @@ -19,6 +22,7 @@ const payload = { enabled: true, }, }, + s3Provider: 'default', } const payload2 = { @@ -34,6 +38,7 @@ const payload2 = { enabled: false, }, }, + s3Provider: 'default', } beforeAll(async () => { diff --git a/src/test/tus.test.ts b/src/test/tus.test.ts index ce046d54..82789335 100644 --- a/src/test/tus.test.ts +++ b/src/test/tus.test.ts @@ -1,6 +1,5 @@ import dotenv from 'dotenv' import path from 'path' -dotenv.config({ path: path.resolve(__dirname, '..', '..', '.env.test') }) import { getPostgresConnection } from '../database' import { getConfig } from '../config' @@ -10,28 +9,31 @@ import * as tus from 'tus-js-client' import fs from 'fs' import app from '../app' import { FastifyInstance } from 'fastify' -import { isS3Error, Storage } from '../storage' -import { createStorageBackend } from '../storage/backend' +import { Storage } from '../storage' +import { createStorageBackend, StorageBackendAdapter } from '../storage/backend' import { CreateBucketCommand, S3Client } from '@aws-sdk/client-s3' import { logger } from '../monitoring' import { DetailedError } from 'tus-js-client' import { getServiceKeyUser } from '../database/tenant' import { checkBucketExists } from './common' -const { serviceKey, tenantId, globalS3Bucket } = getConfig() +dotenv.config({ path: path.resolve(__dirname, '..', '..', '.env.test'), override: false }) + +const { serviceKey, tenantId, storageS3Bucket } = getConfig({ reload: true }) const oneChunkFile = fs.createReadStream(path.resolve(__dirname, 'assets', 'sadcat.jpg')) const localServerAddress = 'http://127.0.0.1:8999' -const backend = createStorageBackend() -const client = backend.client - describe('Tus multipart', () => { let db: StorageKnexDB let storage: Storage let server: FastifyInstance + let backend: StorageBackendAdapter let bucketName: string beforeAll(async () => { + backend = await createStorageBackend(tenantId) + const client = backend.client + server = await app({ logger: logger, }) @@ -41,11 +43,11 @@ describe('Tus multipart', () => { }) if (client instanceof S3Client) { - const bucketExists = await checkBucketExists(client, globalS3Bucket) + const bucketExists = await checkBucketExists(client, storageS3Bucket) if (!bucketExists) { const createBucketCommand = new CreateBucketCommand({ - Bucket: globalS3Bucket, + Bucket: storageS3Bucket, }) await client.send(createBucketCommand) } diff --git a/src/test/x-forwarded-host.test.ts b/src/test/x-forwarded-host.test.ts index ade548cc..18d06d75 100644 --- a/src/test/x-forwarded-host.test.ts +++ b/src/test/x-forwarded-host.test.ts @@ -5,11 +5,10 @@ import * as migrate from '../database/migrate' import { knex } from '../database/multitenant-db' import app from '../app' import * as tenant from '../database/tenant' +import { getConfig, setConfig } from '../config' dotenv.config({ path: '.env.test' }) -const ENV = process.env - beforeAll(async () => { await migrate.runMultitenantMigrations() jest.spyOn(migrate, 'runMigrationsOnTenant').mockResolvedValue() @@ -39,9 +38,13 @@ beforeAll(async () => { }) beforeEach(() => { - process.env = { ...ENV } - process.env.IS_MULTITENANT = 'true' - process.env.X_FORWARDED_HOST_REGEXP = '^([a-z]{20})\\.supabase\\.(?:co|in|net)$' + const originalConfig = getConfig({ reload: true }) + + setConfig({ + ...originalConfig, + isMultitenant: true, + xForwardedHostRegExp: '^([a-z]{20})\\.supabase\\.(?:co|in|net)$', + }) }) afterAll(async () => {