diff --git a/cloudflare_workers/plugin/wrangler.jsonc b/cloudflare_workers/plugin/wrangler.jsonc index 5bcca71f84..0ceb46004b 100644 --- a/cloudflare_workers/plugin/wrangler.jsonc +++ b/cloudflare_workers/plugin/wrangler.jsonc @@ -100,6 +100,7 @@ "database_id": "81236a0c-db6e-454d-87da-944fa9bc100c" } ], + "kv_namespaces": [{ "binding": "CHANNEL_SELF_STORE", "id": "2860ca0f6b1e416fb8651792895cbffa" }], "hyperdrive": [ { "binding": "HYPERDRIVE_CAPGO_DIRECT_EU", @@ -198,6 +199,7 @@ "database_id": "81236a0c-db6e-454d-87da-944fa9bc100c" } ], + "kv_namespaces": [{ "binding": "CHANNEL_SELF_STORE", "id": "2860ca0f6b1e416fb8651792895cbffa" }], "hyperdrive": [ { "binding": "HYPERDRIVE_CAPGO_DIRECT_EU", @@ -296,6 +298,7 @@ "database_id": "81236a0c-db6e-454d-87da-944fa9bc100c" } ], + "kv_namespaces": [{ "binding": "CHANNEL_SELF_STORE", "id": "2860ca0f6b1e416fb8651792895cbffa" }], "hyperdrive": [ { "binding": "HYPERDRIVE_CAPGO_DIRECT_EU", @@ -394,6 +397,7 @@ "database_id": "81236a0c-db6e-454d-87da-944fa9bc100c" } ], + "kv_namespaces": [{ "binding": "CHANNEL_SELF_STORE", "id": "2860ca0f6b1e416fb8651792895cbffa" }], "hyperdrive": [ { "binding": "HYPERDRIVE_CAPGO_DIRECT_EU", @@ -492,6 +496,7 @@ "database_id": "81236a0c-db6e-454d-87da-944fa9bc100c" } ], + "kv_namespaces": [{ "binding": "CHANNEL_SELF_STORE", "id": "2860ca0f6b1e416fb8651792895cbffa" }], "hyperdrive": [ { "binding": "HYPERDRIVE_CAPGO_DIRECT_EU", @@ -590,6 +595,7 @@ "database_id": "81236a0c-db6e-454d-87da-944fa9bc100c" } ], + "kv_namespaces": [{ "binding": "CHANNEL_SELF_STORE", "id": "2860ca0f6b1e416fb8651792895cbffa" }], "hyperdrive": [ { "binding": "HYPERDRIVE_CAPGO_DIRECT_EU", @@ -688,6 +694,7 @@ "database_id": "81236a0c-db6e-454d-87da-944fa9bc100c" } ], + "kv_namespaces": [{ "binding": "CHANNEL_SELF_STORE", "id": "2860ca0f6b1e416fb8651792895cbffa" }], "hyperdrive": [ { "binding": "HYPERDRIVE_CAPGO_DIRECT_EU", @@ -786,6 +793,7 @@ "database_id": "81236a0c-db6e-454d-87da-944fa9bc100c" } ], + "kv_namespaces": [{ "binding": "CHANNEL_SELF_STORE", "id": "2860ca0f6b1e416fb8651792895cbffa" }], "hyperdrive": [ { "binding": "HYPERDRIVE_CAPGO_DIRECT_EU", @@ -884,6 +892,7 @@ "database_id": "81236a0c-db6e-454d-87da-944fa9bc100c" } ], + "kv_namespaces": [{ "binding": "CHANNEL_SELF_STORE", "id": "2860ca0f6b1e416fb8651792895cbffa" }], "hyperdrive": [ { "binding": "HYPERDRIVE_CAPGO_DIRECT_EU", @@ -941,6 +950,7 @@ "database_id": "81236a0c-db6e-454d-87da-944fa9bc100c" } ], + "kv_namespaces": [{ "binding": "CHANNEL_SELF_STORE", "id": "9c405a5248604951a2a12125799bc21b" }], "analytics_engine_datasets": [ { "binding": "DEVICE_USAGE", @@ -1035,6 +1045,7 @@ "database_id": "81236a0c-db6e-454d-87da-944fa9bc100c" } ], + "kv_namespaces": [{ "binding": "CHANNEL_SELF_STORE", "id": "16a85350d8a246ff857a645c8f51d6ae" }], "analytics_engine_datasets": [ { "binding": "DEVICE_USAGE", diff --git a/supabase/functions/_backend/plugins/channel_self.ts b/supabase/functions/_backend/plugins/channel_self.ts index 7f6285fdc1..405c758ba0 100644 --- a/supabase/functions/_backend/plugins/channel_self.ts +++ b/supabase/functions/_backend/plugins/channel_self.ts @@ -8,11 +8,12 @@ import { parse } from '@std/semver' import { Hono } from 'hono/tiny' import { getAppStatus, setAppStatus } from '../utils/appStatus.ts' import { checkChannelSelfIPRateLimit, isChannelSelfRateLimited, recordChannelSelfIPRequest, recordChannelSelfRequest } from '../utils/channelSelfRateLimit.ts' +import { deleteChannelSelfOverride, getChannelSelfOverride, isChannelSelfStoreEnabled, setChannelSelfOverride } from '../utils/channelSelfStore.ts' import { BRES, parseBody, simpleError200, simpleRateLimit } from '../utils/hono.ts' import { cloudlog } from '../utils/logging.ts' import { sendNotifOrgCached } from '../utils/notifications.ts' import { sendNotifToOrgMembersCached } from '../utils/org_email_notifications.ts' -import { closeClient, deleteChannelDevicePg, getAppByIdPg, getAppOwnerPostgres, getChannelByNamePg, getChannelDeviceOverridePg, getChannelsPg, getCompatibleChannelsPg, getDrizzleClient, getMainChannelsPg, getPgClient, setReplicationLagHeader, upsertChannelDevicePg } from '../utils/pg.ts' +import { closeClient, deleteChannelDevicePg, getAppByIdPg, getAppOwnerPostgres, getChannelByIdPg, getChannelByNamePg, getChannelDeviceOverridePg, getChannelsPg, getCompatibleChannelsPg, getDrizzleClient, getMainChannelsPg, getPgClient, setReplicationLagHeader, upsertChannelDevicePg } from '../utils/pg.ts' import { convertQueryToBody, makeDevice, parsePluginBody } from '../utils/plugin_parser.ts' import { channelSelfGetRequestSchema, channelSelfRequestSchema, isDevicePlatform } from '../utils/plugin_validation.ts' import { buildRateLimitInfo } from '../utils/rateLimitInfo.ts' @@ -66,6 +67,8 @@ async function recordChannelSelfRequestSafely( } type AppOwnerResult = Awaited> +type ChannelSelfOverrideResult = Awaited> +type ChannelSelfDeviceOperation = 'set' | 'get' | 'delete' async function assertChannelSelfCachedStatus( c: Context, @@ -139,27 +142,108 @@ function isChannelSelfLocalChannelStorageVersion(c: Context, body: DeviceLink, o } } -async function post(c: Context, drizzleClient: ReturnType, body: DeviceLink): Promise { - cloudlog({ requestId: c.get('requestId'), message: 'post channel self body', body }) - const { app_id, device_id, channel } = body +async function getChannelSelfOverrideForDevice( + c: Context, + appId: string, + deviceId: string, + drizzleClient: ReturnType, +): Promise { + if (isChannelSelfStoreEnabled(c)) { + const storedOverride = await getChannelSelfOverride(c, appId, deviceId) + if (!storedOverride) + return null + + const channel = await getChannelByIdPg(c, appId, storedOverride.channel_id.id, drizzleClient) + if (!channel) + return null + + return { + app_id: storedOverride.app_id, + device_id: storedOverride.device_id, + channel_id: { + id: channel.id, + allow_device_self_set: channel.allow_device_self_set, + name: channel.name, + }, + } + } + + return getChannelDeviceOverridePg(c, appId, deviceId, drizzleClient) +} + +async function deleteChannelSelfOverrideForDevice( + c: Context, + appId: string, + deviceId: string, + drizzleClient: ReturnType, +) { + if (isChannelSelfStoreEnabled(c)) + return deleteChannelSelfOverride(c, appId, deviceId) + return deleteChannelDevicePg(c, appId, deviceId, drizzleClient) +} + +async function upsertChannelSelfOverrideForDevice( + c: Context, + appId: string, + deviceId: string, + channel: NonNullable>>, + drizzleClient: ReturnType, +) { + if (isChannelSelfStoreEnabled(c)) { + return setChannelSelfOverride(c, appId, deviceId, { + app_id: appId, + device_id: deviceId, + channel_id: { + id: channel.id, + }, + }) + } + + return upsertChannelDevicePg(c, { + device_id: deviceId, + channel_id: channel.id, + app_id: appId, + owner_org: channel.owner_org, + }, drizzleClient) +} + +async function prepareChannelSelfDeviceRequest( + c: Context, + drizzleClient: ReturnType, + body: DeviceLink, + operationLabel: string, +): Promise<{ response: Response } | { appOwner: NonNullable, device: ReturnType }> { + const { app_id, device_id } = body const cachedAppStatus = await getAppStatus(c, app_id) - const cachedLimit = await assertChannelSelfCachedStatus(c, cachedAppStatus.status, app_id, makeDevice(body, cachedAppStatus.allow_device_custom_id), 'post') + const cachedLimit = await assertChannelSelfCachedStatus(c, cachedAppStatus.status, app_id, makeDevice(body, cachedAppStatus.allow_device_custom_id), operationLabel.toLowerCase()) if (cachedLimit) { - return cachedLimit + return { response: cachedLimit } } + const appOwner = await getAppOwnerPostgres(c, app_id, drizzleClient as ReturnType, PLAN_MAU_ACTIONS) const device = makeDevice(body, appOwner?.allow_device_custom_id) - - // Check if app exists first - Read operation can use v2 flag - const ownerRes = await assertChannelSelfAppOwnerPlanValid(c, drizzleClient, appOwner, app_id, device, 'POST', device_id) + const ownerRes = await assertChannelSelfAppOwnerPlanValid(c, drizzleClient, appOwner, app_id, device, operationLabel, device_id) if ('response' in ownerRes) { - return ownerRes.response + return { response: ownerRes.response } } - const validatedAppOwner = ownerRes.appOwner - // Read operations can use v2 flag - const dataChannelOverride = await getChannelDeviceOverridePg(c, app_id, device_id, drizzleClient as ReturnType) + return { appOwner: ownerRes.appOwner, device } +} + +async function post(c: Context, drizzleClient: ReturnType, body: DeviceLink): Promise { + cloudlog({ requestId: c.get('requestId'), message: 'post channel self body', body }) + const { app_id, device_id, channel } = body + + const requestContext = await prepareChannelSelfDeviceRequest(c, drizzleClient, body, 'POST') + if ('response' in requestContext) { + return requestContext.response + } + const { appOwner: validatedAppOwner, device } = requestContext + + const isNewVersion = isChannelSelfLocalChannelStorageVersion(c, body, 'POST') + // Only old versions use server-side channel_self storage. + const dataChannelOverride = isNewVersion ? null : await getChannelSelfOverrideForDevice(c, app_id, device_id, drizzleClient) if (!channel) { return simpleError200(c, 'cannot_override', 'Missing channel') @@ -217,20 +301,8 @@ async function post(c: Context, drizzleClient: ReturnType) // We DO NOT return if there is no main channel as it's not a critical error - // We will just set the channel_devices as the user requested + // We will just set the override as the user requested let mainChannelName = null as string | null if (mainChannel && mainChannel.length > 0) { const devicePlatform = body.platform as Database['public']['Enums']['platform_os'] @@ -259,7 +331,7 @@ async function post(c: Context, drizzleClient: ReturnType, PLAN_MAU_ACTIONS) - const device = makeDevice(body, appOwner?.allow_device_custom_id) - - const ownerRes = await assertChannelSelfAppOwnerPlanValid(c, drizzleClient, appOwner, app_id, device, 'PUT', device_id) - if ('response' in ownerRes) { - return ownerRes.response + const requestContext = await prepareChannelSelfDeviceRequest(c, drizzleClient, body, 'PUT') + if ('response' in requestContext) { + return requestContext.response } + const { device } = requestContext const isNewVersion = isChannelSelfLocalChannelStorageVersion(c, body, 'PUT') @@ -343,11 +403,11 @@ async function put(c: Context, drizzleClient: ReturnType) - const dataChannelOverride = await getChannelDeviceOverridePg(c, app_id, device_id, drizzleClient as ReturnType) + const dataChannelOverride = await getChannelSelfOverrideForDevice(c, app_id, device_id, drizzleClient) if (dataChannelOverride?.channel_id) { await sendStatsAndDevice(c, device, [{ action: 'getChannel' }]) return c.json({ @@ -386,38 +446,25 @@ async function deleteOverride(c: Context, drizzleClient: ReturnType, PLAN_MAU_ACTIONS) - const device = makeDevice(body, appOwner?.allow_device_custom_id) cloudlog({ requestId: c.get('requestId'), message: 'delete override', version_build }) - const ownerRes = await assertChannelSelfAppOwnerPlanValid(c, drizzleClient, appOwner, app_id, device, 'DELETE', device_id) - if ('response' in ownerRes) { - return ownerRes.response + const requestContext = await prepareChannelSelfDeviceRequest(c, drizzleClient, body, 'DELETE') + if ('response' in requestContext) { + return requestContext.response } - const validatedAppOwner = ownerRes.appOwner + const { appOwner: validatedAppOwner, device } = requestContext const isNewVersion = isChannelSelfLocalChannelStorageVersion(c, body, 'DELETE') - // For vX.34.0+: Still check and clean up old channel_devices entries (migration cleanup) - // Read operation can use v2 flag - const dataChannelOverride = await getChannelDeviceOverridePg(c, app_id, device_id, drizzleClient as ReturnType) + // For vX.34.0+: do not read or write server-side channel_self storage. + const dataChannelOverride = isNewVersion ? null : await getChannelSelfOverrideForDevice(c, app_id, device_id, drizzleClient) if (isNewVersion) { - // For vX.34.0+: Clean up old entry if it exists from previous versions - if (dataChannelOverride?.channel_id) { - cloudlog({ requestId: c.get('requestId'), message: 'Plugin vX.34.0+ detected in unsetChannel, cleaning up old channel_devices entry' }) - await deleteChannelDevicePg(c, app_id, device_id, drizzleClient) - } await sendStatsAndDevice(c, device, [{ action: 'setChannel' }]) return c.json(BRES) } - // Old behavior (< v7.34.0): Validate and delete from channel_devices table + // Old behavior (< v7.34.0): Validate and delete the server-side override. if (!dataChannelOverride?.channel_id) { return simpleError200(c, 'cannot_override', 'Cannot change device override current channel don\'t allow it') @@ -443,7 +490,7 @@ async function deleteOverride(c: Context, drizzleClient: ReturnType) => Promise, + channel?: string, +) { + const rateLimitStatus = await isChannelSelfRateLimited(c, bodyParsed.app_id, bodyParsed.device_id, operation, channel) + if (rateLimitStatus.limited) { + cloudlog({ requestId: c.get('requestId'), message: `Channel self ${operation} rate limited`, app_id: bodyParsed.app_id, device_id: bodyParsed.device_id, channel }) + return simpleRateLimit({ app_id: bodyParsed.app_id, device_id: bodyParsed.device_id, ...buildRateLimitInfo(rateLimitStatus.resetAt) }) + } + + // Old KV-backed requests and new local-storage requests can use the read replica. + const pgClient = getPgClient(c, isChannelSelfStoreEnabled(c) || isChannelSelfLocalChannelStorageVersion(c, bodyParsed, operationLabel)) + + return await runChannelSelfWithPgClient( + c, + pgClient, + run, + async () => { + await recordChannelSelfRequestSafely(c, bodyParsed.app_id, bodyParsed.device_id, operation, channel) + recordChannelSelfIPRateLimit(c, bodyParsed.app_id) + }, + ) +} + // Plugin endpoints are intentionally public device endpoints: their responses are // considered public data, so we do not require Capgo JWT/API-key auth or add // checks beyond Supabase/platform protections. Endpoint-specific validation, plan @@ -556,24 +631,13 @@ app.post('/', async (c) => { } // Rate limit: max 5 set per second per device+app, and same set max once per 60 seconds - const rateLimitStatus = await isChannelSelfRateLimited(c, bodyParsed.app_id, bodyParsed.device_id, 'set', bodyParsed.channel) - if (rateLimitStatus.limited) { - cloudlog({ requestId: c.get('requestId'), message: 'Channel self set rate limited', app_id: bodyParsed.app_id, device_id: bodyParsed.device_id, channel: bodyParsed.channel }) - return simpleRateLimit({ app_id: bodyParsed.app_id, device_id: bodyParsed.device_id, ...buildRateLimitInfo(rateLimitStatus.resetAt) }) - } - - // POST has writes, so always create PG client - const pgClient = getPgClient(c) - - return await runChannelSelfWithPgClient( + return await runChannelSelfDeviceOperation( c, - pgClient, + bodyParsed, + 'set', + 'POST', drizzleClient => post(c, drizzleClient, bodyParsed), - async () => { - // Record the request for rate limiting (all requests, not just successful ones, to prevent abuse) - await recordChannelSelfRequestSafely(c, bodyParsed.app_id, bodyParsed.device_id, 'set', bodyParsed.channel) - recordChannelSelfIPRateLimit(c, bodyParsed.app_id) - }, + bodyParsed.channel, ) }) @@ -587,23 +651,12 @@ app.put('/', async (c) => { const { bodyParsed } = parsed // Rate limit: max 5 get per second per device+app - const rateLimitStatus = await isChannelSelfRateLimited(c, bodyParsed.app_id, bodyParsed.device_id, 'get') - if (rateLimitStatus.limited) { - cloudlog({ requestId: c.get('requestId'), message: 'Channel self get rate limited', app_id: bodyParsed.app_id, device_id: bodyParsed.device_id }) - return simpleRateLimit({ app_id: bodyParsed.app_id, device_id: bodyParsed.device_id, ...buildRateLimitInfo(rateLimitStatus.resetAt) }) - } - - const pgClient = getPgClient(c, isChannelSelfLocalChannelStorageVersion(c, bodyParsed, 'PUT')) - - return await runChannelSelfWithPgClient( + return await runChannelSelfDeviceOperation( c, - pgClient, + bodyParsed, + 'get', + 'PUT', drizzleClient => put(c, drizzleClient, bodyParsed), - async () => { - // Record the request for rate limiting (all requests to prevent abuse) - await recordChannelSelfRequestSafely(c, bodyParsed.app_id, bodyParsed.device_id, 'get') - recordChannelSelfIPRateLimit(c, bodyParsed.app_id) - }, ) }) @@ -616,24 +669,12 @@ app.delete('/', async (c) => { const { bodyParsed } = parsed // Rate limit: max 5 delete per second per device+app - const rateLimitStatus = await isChannelSelfRateLimited(c, bodyParsed.app_id, bodyParsed.device_id, 'delete') - if (rateLimitStatus.limited) { - cloudlog({ requestId: c.get('requestId'), message: 'Channel self delete rate limited', app_id: bodyParsed.app_id, device_id: bodyParsed.device_id }) - return simpleRateLimit({ app_id: bodyParsed.app_id, device_id: bodyParsed.device_id, ...buildRateLimitInfo(rateLimitStatus.resetAt) }) - } - - // DELETE has writes, so always create PG client - const pgClient = getPgClient(c) - - return await runChannelSelfWithPgClient( + return await runChannelSelfDeviceOperation( c, - pgClient, + bodyParsed, + 'delete', + 'DELETE', drizzleClient => deleteOverride(c, drizzleClient, bodyParsed), - async () => { - // Record the request for rate limiting (all requests to prevent abuse) - await recordChannelSelfRequestSafely(c, bodyParsed.app_id, bodyParsed.device_id, 'delete') - recordChannelSelfIPRateLimit(c, bodyParsed.app_id) - }, ) }) diff --git a/supabase/functions/_backend/utils/cache.ts b/supabase/functions/_backend/utils/cache.ts index c63f555a18..fe478b8a90 100644 --- a/supabase/functions/_backend/utils/cache.ts +++ b/supabase/functions/_backend/utils/cache.ts @@ -93,6 +93,18 @@ export class CacheHelper { } } + async delete(key: Request) { + const cache = await this.ensureCache() + if (!cache) + return + try { + await cache.delete(key) + } + catch (error) { + this.logCacheError('Error deleting cached response', error) + } + } + private buildCacheControl(ttlSeconds: number) { const sanitized = Math.max(0, Math.floor(ttlSeconds)) return `public, s-maxage=${sanitized}` diff --git a/supabase/functions/_backend/utils/channelSelfStore.ts b/supabase/functions/_backend/utils/channelSelfStore.ts new file mode 100644 index 0000000000..d010dd29dd --- /dev/null +++ b/supabase/functions/_backend/utils/channelSelfStore.ts @@ -0,0 +1,139 @@ +import type { Context } from 'hono' +import type { MiddlewareKeyVariables } from './hono.ts' +import { CacheHelper } from './cache.ts' +import { cloudlogErr, serializeError } from './logging.ts' + +const CHANNEL_SELF_CACHE_PATH = '/.channel-self-override-v1' +const CHANNEL_SELF_CACHE_TTL_SECONDS = 60 +const CHANNEL_SELF_KV_CACHE_TTL_SECONDS = 60 + +// TODO: Delete this legacy channel_self KV/cache bridge once old plugin versions are no longer used. // NOSONAR +// The cache layer only exists for those old versions so channel_self writes do not hit the primary database. +export interface ChannelSelfOverride { + app_id: string + device_id: string + channel_id: { + id: number + } +} + +interface ChannelSelfOverridePayload { + app_id: string + device_id: string + channel_id: number + updated_at: string +} + +type ChannelSelfContext = Context + +function getChannelSelfStore(c: ChannelSelfContext) { + return c.env?.CHANNEL_SELF_STORE ?? null +} + +function buildChannelSelfStoreKey(appId: string, deviceId: string) { + return `channel_self:v1:${encodeURIComponent(appId)}:${encodeURIComponent(deviceId)}` +} + +function buildChannelSelfCacheRequest(cache: CacheHelper, appId: string, deviceId: string) { + return cache.buildRequest(CHANNEL_SELF_CACHE_PATH, { + app_id: appId, + device_id: deviceId, + }) +} + +function toPayload(appId: string, deviceId: string, override: ChannelSelfOverride): ChannelSelfOverridePayload { + return { + app_id: appId, + device_id: deviceId, + channel_id: override.channel_id.id, + updated_at: new Date().toISOString(), + } +} + +function fromPayload(appId: string, deviceId: string, payload: ChannelSelfOverridePayload | null): ChannelSelfOverride | null { + if ( + payload?.app_id !== appId + || payload.device_id !== deviceId + || typeof payload.channel_id !== 'number' + ) { + return null + } + + return { + app_id: payload.app_id, + device_id: payload.device_id, + channel_id: { + id: payload.channel_id, + }, + } +} + +export function isChannelSelfStoreEnabled(c: ChannelSelfContext) { + return Boolean(getChannelSelfStore(c)) +} + +export async function getChannelSelfOverride(c: ChannelSelfContext, appId: string, deviceId: string): Promise { + const store = getChannelSelfStore(c) + if (!store) + return null + + const cache = new CacheHelper(c) + const cacheRequest = buildChannelSelfCacheRequest(cache, appId, deviceId) + const cachedPayload = await cache.matchJson(cacheRequest) + const cachedOverride = fromPayload(appId, deviceId, cachedPayload) + if (cachedOverride) + return cachedOverride + + const key = buildChannelSelfStoreKey(appId, deviceId) + try { + const payload = await store.get(key, { + type: 'json', + cacheTtl: CHANNEL_SELF_KV_CACHE_TTL_SECONDS, + }) + const override = fromPayload(appId, deviceId, payload) + if (override) + await cache.putJson(cacheRequest, payload, CHANNEL_SELF_CACHE_TTL_SECONDS) + return override + } + catch (error) { + cloudlogErr({ requestId: c.get('requestId'), message: 'Error reading channel_self override store', app_id: appId, device_id: deviceId, error: serializeError(error) }) + return null + } +} + +export async function setChannelSelfOverride(c: ChannelSelfContext, appId: string, deviceId: string, override: ChannelSelfOverride) { + const store = getChannelSelfStore(c) + if (!store) + return false + + const key = buildChannelSelfStoreKey(appId, deviceId) + const payload = toPayload(appId, deviceId, override) + try { + await store.put(key, JSON.stringify(payload)) + const cache = new CacheHelper(c) + await cache.putJson(buildChannelSelfCacheRequest(cache, appId, deviceId), payload, CHANNEL_SELF_CACHE_TTL_SECONDS) + return true + } + catch (error) { + cloudlogErr({ requestId: c.get('requestId'), message: 'Error writing channel_self override store', app_id: appId, device_id: deviceId, error: serializeError(error) }) + return false + } +} + +export async function deleteChannelSelfOverride(c: ChannelSelfContext, appId: string, deviceId: string) { + const store = getChannelSelfStore(c) + if (!store) + return false + + const key = buildChannelSelfStoreKey(appId, deviceId) + try { + await store.delete(key) + const cache = new CacheHelper(c) + await cache.delete(buildChannelSelfCacheRequest(cache, appId, deviceId)) + return true + } + catch (error) { + cloudlogErr({ requestId: c.get('requestId'), message: 'Error deleting channel_self override store', app_id: appId, device_id: deviceId, error: serializeError(error) }) + return false + } +} diff --git a/supabase/functions/_backend/utils/cloudflare.ts b/supabase/functions/_backend/utils/cloudflare.ts index 8dc547453b..a432ddd9f7 100644 --- a/supabase/functions/_backend/utils/cloudflare.ts +++ b/supabase/functions/_backend/utils/cloudflare.ts @@ -1,4 +1,4 @@ -import type { AnalyticsEngineDataPoint, D1Database, Hyperdrive } from '@cloudflare/workers-types' +import type { AnalyticsEngineDataPoint, D1Database, Hyperdrive, KVNamespace } from '@cloudflare/workers-types' import type { Context } from 'hono' import type { DeviceComparable } from './deviceComparison.ts' import type { Database } from './supabase.types.ts' @@ -42,6 +42,7 @@ export type Bindings = { APP_LOG: AnalyticsEngineDataPoint DEVICE_INFO: AnalyticsEngineDataPoint DB_STOREAPPS: D1Database + CHANNEL_SELF_STORE?: KVNamespace HYPERDRIVE_CAPGO_DIRECT_EU: Hyperdrive // Add Hyperdrive binding HYPERDRIVE_CAPGO_READ_NA: Hyperdrive HYPERDRIVE_CAPGO_READ_EU: Hyperdrive diff --git a/supabase/functions/_backend/utils/pg.ts b/supabase/functions/_backend/utils/pg.ts index 2af0711135..541de3f6c7 100644 --- a/supabase/functions/_backend/utils/pg.ts +++ b/supabase/functions/_backend/utils/pg.ts @@ -1,3 +1,4 @@ +import type { SQL } from 'drizzle-orm' import type { Context } from 'hono' import { and, eq, or, sql } from 'drizzle-orm' import { drizzle } from 'drizzle-orm/node-postgres' @@ -18,6 +19,7 @@ const REPLICATION_LAG_CACHE_TTL_SECONDS = 60 const REPLICATION_LAG_CACHE_TTL_MS = REPLICATION_LAG_CACHE_TTL_SECONDS * 1000 type ReplicationStatus = 'ok' | 'lagging' | 'unknown' +interface ChannelLookupResult { id: number, name: string, allow_device_self_set: boolean, public: boolean, owner_org: string } interface ReplicationLagStatus { status: ReplicationStatus @@ -536,6 +538,40 @@ export function requestInfosChannelDevicePostgres( return channelDevice.then(data => data.at(0)) } +export function requestInfosChannelByIdPostgres( + c: Context, + app_id: string, + channelId: number, + drizzleClient: ReturnType, + includeManifest: boolean, + includeMetadata = false, +) { + const { versionSelect, channelAlias, channelSelect, manifestSelect, versionAlias } = getSchemaUpdatesAlias(includeMetadata) + const baseSelect = { + version: versionSelect, + channels: channelSelect, + } + const selectShape = withOptionalManifestSelect(baseSelect, includeManifest, manifestSelect) + + const baseQuery = drizzleClient + .select(selectShape) + .from(channelAlias) + .innerJoin(versionAlias, activeChannelVersionJoin(channelAlias.version, versionAlias)) + + const channel = (includeManifest + ? baseQuery.leftJoin(schema.manifest, eq(schema.manifest.app_version_id, versionAlias.id)) + : baseQuery) + .where(and( + eq(channelAlias.app_id, app_id), + eq(channelAlias.id, channelId), + )) + .groupBy(channelAlias.id, versionAlias.id) + .limit(1) + cloudlog({ requestId: c.get('requestId'), message: 'channel self override Query:', channelSelfOverrideQuery: channel.toSQL() }) + + return channel.then(data => data.at(0)) +} + export function requestInfosChannelPostgres( c: Context, platform: string, @@ -596,17 +632,22 @@ export function requestInfosPostgres( channelDeviceCount?: number | null, manifestBundleCount?: number | null, includeMetadata = false, + channelSelfOverrideChannelId?: number | null, ) { const shouldQueryChannelOverride = channelDeviceCount === undefined || channelDeviceCount === null ? true : channelDeviceCount > 0 const shouldFetchManifest = manifestBundleCount === undefined || manifestBundleCount === null ? true : manifestBundleCount > 0 + let channelDevice: ReturnType | ReturnType | Promise - const channelDevice = shouldQueryChannelOverride - ? requestInfosChannelDevicePostgres(c, app_id, device_id, drizzleClient, shouldFetchManifest, includeMetadata) - : Promise.resolve(undefined) - .then(() => { - cloudlog({ requestId: c.get('requestId'), message: 'Skipping channel device override query' }) - return null - }) + if (typeof channelSelfOverrideChannelId === 'number') { + channelDevice = requestInfosChannelByIdPostgres(c, app_id, channelSelfOverrideChannelId, drizzleClient, shouldFetchManifest, includeMetadata) + } + else if (shouldQueryChannelOverride) { + channelDevice = requestInfosChannelDevicePostgres(c, app_id, device_id, drizzleClient, shouldFetchManifest, includeMetadata) + } + else { + cloudlog({ requestId: c.get('requestId'), message: 'Skipping channel device override query' }) + channelDevice = Promise.resolve(null) + } const channel = requestInfosChannelPostgres(c, platform, app_id, defaultChannel, drizzleClient, shouldFetchManifest, includeMetadata) return Promise.all([channelDevice, channel]) @@ -816,14 +857,15 @@ export async function getChannelDeviceOverridePg( } } -export async function getChannelByNamePg( +async function getChannelByPg( c: Context, appId: string, - channelName: string, + channelFilter: SQL, drizzleClient: ReturnType, -): Promise<{ id: number, name: string, allow_device_self_set: boolean, public: boolean, owner_org: string } | null> { + logName: string, +): Promise { try { - const channel = await drizzleClient + return await drizzleClient .select({ id: schema.channels.id, name: schema.channels.name, @@ -834,18 +876,35 @@ export async function getChannelByNamePg( .from(schema.channels) .where(and( eq(schema.channels.app_id, appId), - eq(schema.channels.name, channelName), + channelFilter, )) .limit(1) .then(data => data[0]) - return channel } catch (e: unknown) { - logPgError(c, 'getChannelByNamePg', e) + logPgError(c, logName, e) return null } } +export async function getChannelByNamePg( + c: Context, + appId: string, + channelName: string, + drizzleClient: ReturnType, +): Promise { + return getChannelByPg(c, appId, eq(schema.channels.name, channelName), drizzleClient, 'getChannelByNamePg') +} + +export async function getChannelByIdPg( + c: Context, + appId: string, + channelId: number, + drizzleClient: ReturnType, +): Promise { + return getChannelByPg(c, appId, eq(schema.channels.id, channelId), drizzleClient, 'getChannelByIdPg') +} + export async function getMainChannelsPg( c: Context, appId: string, diff --git a/supabase/functions/_backend/utils/update.ts b/supabase/functions/_backend/utils/update.ts index ae7bb67775..6e255c927f 100644 --- a/supabase/functions/_backend/utils/update.ts +++ b/supabase/functions/_backend/utils/update.ts @@ -1,5 +1,7 @@ +import type { SemVer } from '@std/semver' import type { Context } from 'hono' import type { ManifestEntry } from './downloadUrl.ts' +import type { MiddlewareKeyVariables } from './hono.ts' import type { Database } from './supabase.types.ts' import type { AppInfos } from './types.ts' import { @@ -23,6 +25,10 @@ import { isUpdateEnumerationLimited, recordUpdateEnumerationMiss, updateEnumerat import { backgroundTask, BROTLI_MIN_UPDATER_VERSION_V5, BROTLI_MIN_UPDATER_VERSION_V6, BROTLI_MIN_UPDATER_VERSION_V7, fixSemver, isDeprecatedPluginVersion, isInternalVersionName } from './utils.ts' const PLAN_LIMIT: Array<'mau' | 'bandwidth' | 'storage'> = ['mau', 'bandwidth'] +const CHANNEL_SELF_STORE_MIN_V5 = '5.34.0' +const CHANNEL_SELF_STORE_MIN_V6 = '6.34.0' +const CHANNEL_SELF_STORE_MIN_V7 = '7.34.0' +const CHANNEL_SELF_STORE_MIN_V8 = '8.0.0' export type UpdateResponseKind = 'up_to_date' | 'blocked' | 'failed' @@ -93,6 +99,19 @@ export function resToVersion(plugin_version: string, signedURL: string, version: return res } +function hasChannelSelfStoreBinding(c: Context) { + return Boolean((c as Context).env?.CHANNEL_SELF_STORE) +} + +function usesLegacyChannelSelfStoreVersion(pluginVersion: SemVer) { + return isDeprecatedPluginVersion(pluginVersion, CHANNEL_SELF_STORE_MIN_V5, CHANNEL_SELF_STORE_MIN_V6, CHANNEL_SELF_STORE_MIN_V7, CHANNEL_SELF_STORE_MIN_V8) +} + +async function getStoredChannelSelfOverride(c: Context, appId: string, deviceId: string) { + const { getChannelSelfOverride } = await import('./channelSelfStore.ts') + return getChannelSelfOverride(c as Context, appId, deviceId) +} + export async function updateWithPG( c: Context, body: AppInfos, @@ -152,10 +171,15 @@ export async function updateWithPG( return c.json({ error: 'on_premise_app', message: 'On-premise app detected' }, 429) } await setAppStatus(c, app_id, 'cloud', appOwner.allow_device_custom_id) + const pluginVersion = parse(plugin_version) + const channelSelfStoreEnabled = hasChannelSelfStoreBinding(c) + const shouldUseChannelSelfStore = channelSelfStoreEnabled && usesLegacyChannelSelfStoreVersion(pluginVersion) + const channelSelfOverride = shouldUseChannelSelfStore + ? await getStoredChannelSelfOverride(c, app_id, device_id) + : null const channelDeviceCount = appOwner.channel_device_count ?? 0 const manifestBundleCount = appOwner.manifest_bundle_count ?? 0 - const bypassChannelOverrides = channelDeviceCount <= 0 - const pluginVersion = parse(plugin_version) + const bypassChannelOverrides = !channelSelfOverride && channelDeviceCount <= 0 // v5 is deprecated if < 5.10.0, v6 is deprecated if < 6.25.0, v7 is deprecated if < 7.25.0 const isDeprecated = isDeprecatedPluginVersion(pluginVersion) // Ensure there is manifest and the plugin version support manifest fetching (v5.10.0+, v6.25.0+, v7.0.35+) @@ -216,7 +240,7 @@ export async function updateWithPG( // Only query link/comment if plugin supports it (v5.35.0+, v6.35.0+, v7.35.0+, v8.35.0+) AND app has expose_metadata enabled const needsMetadata = appOwner.expose_metadata && !isDeprecatedPluginVersion(pluginVersion, '5.35.0', '6.35.0', '7.35.0', '8.35.0') - const requestedInto = await requestInfosPostgres(c, platform, app_id, device_id, defaultChannel, drizzleClient, channelDeviceCount, manifestBundleCount, needsMetadata) + const requestedInto = await requestInfosPostgres(c, platform, app_id, device_id, defaultChannel, drizzleClient, channelDeviceCount, manifestBundleCount, needsMetadata, channelSelfOverride?.channel_id.id) const { channelOverride } = requestedInto let { channelData } = requestedInto cloudlog({ requestId: c.get('requestId'), message: `channelData exists ? ${channelData !== undefined}, channelOverride exists ? ${channelOverride !== undefined}` }) diff --git a/tests/channel-self-pg-client.unit.test.ts b/tests/channel-self-pg-client.unit.test.ts index ce3360611b..3c2c399473 100644 --- a/tests/channel-self-pg-client.unit.test.ts +++ b/tests/channel-self-pg-client.unit.test.ts @@ -2,9 +2,13 @@ import { beforeEach, describe, expect, it, vi } from 'vitest' const getPgClientMock = vi.fn(() => ({ client: 'pg' })) const getDrizzleClientMock = vi.fn(() => ({})) +const deleteChannelDevicePgMock = vi.fn(() => Promise.resolve(true)) const getAppOwnerPostgresMock = vi.fn() +const getChannelByIdPgMock = vi.fn() +const getChannelByNamePgMock = vi.fn() const getChannelsPgMock = vi.fn() const getChannelDeviceOverridePgMock = vi.fn() +const upsertChannelDevicePgMock = vi.fn(() => Promise.resolve(true)) ;(globalThis as any).EdgeRuntime = undefined @@ -35,10 +39,11 @@ vi.mock('../supabase/functions/_backend/utils/org_email_notifications.ts', () => vi.mock('../supabase/functions/_backend/utils/pg.ts', () => ({ closeClient: vi.fn(() => Promise.resolve()), - deleteChannelDevicePg: vi.fn(() => Promise.resolve(true)), + deleteChannelDevicePg: deleteChannelDevicePgMock, getAppByIdPg: vi.fn(), getAppOwnerPostgres: getAppOwnerPostgresMock, - getChannelByNamePg: vi.fn(), + getChannelByIdPg: getChannelByIdPgMock, + getChannelByNamePg: getChannelByNamePgMock, getChannelDeviceOverridePg: getChannelDeviceOverridePgMock, getChannelsPg: getChannelsPgMock, getCompatibleChannelsPg: vi.fn(), @@ -46,7 +51,7 @@ vi.mock('../supabase/functions/_backend/utils/pg.ts', () => ({ getMainChannelsPg: vi.fn(), getPgClient: getPgClientMock, setReplicationLagHeader: vi.fn(() => Promise.resolve()), - upsertChannelDevicePg: vi.fn(() => Promise.resolve(true)), + upsertChannelDevicePg: upsertChannelDevicePgMock, })) vi.mock('../supabase/functions/_backend/utils/stats.ts', () => ({ @@ -66,13 +71,59 @@ function putBody(pluginVersion: string) { } } -async function fetchPut(pluginVersion: string) { +function createKvStore() { + const values = new Map() + return { + values, + get: vi.fn(async (key: string, options?: { type?: string }) => { + const value = values.get(key) + if (!value) + return null + return options?.type === 'json' ? JSON.parse(value) : value + }), + put: vi.fn(async (key: string, value: string) => { + values.set(key, value) + }), + delete: vi.fn(async (key: string) => { + values.delete(key) + }), + } +} + +function channelSelfStoreKey() { + return 'channel_self:v1:com.test.app:11111111-1111-4111-8111-111111111111' +} + +async function fetchPut(pluginVersion: string, env = {}) { const { app } = await import('../supabase/functions/_backend/plugins/channel_self.ts') return app.fetch(new Request('http://localhost/', { method: 'PUT', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(putBody(pluginVersion)), - }), {}, { waitUntil: () => { } } as any) + }), env, { waitUntil: () => { } } as any) +} + +async function fetchPost(pluginVersion: string, env = {}) { + const { app } = await import('../supabase/functions/_backend/plugins/channel_self.ts') + return app.fetch(new Request('http://localhost/', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + ...putBody(pluginVersion), + channel: 'beta', + }), + }), env, { waitUntil: () => { } } as any) +} + +async function fetchDelete(pluginVersion: string, env = {}) { + const { app } = await import('../supabase/functions/_backend/plugins/channel_self.ts') + const params = new URLSearchParams() + for (const [key, value] of Object.entries(putBody(pluginVersion))) { + params.set(key, String(value)) + } + return app.fetch(new Request(`http://localhost/?${params.toString()}`, { + method: 'DELETE', + }), env, { waitUntil: () => { } } as any) } describe('channel_self PUT database routing', () => { @@ -93,6 +144,20 @@ describe('channel_self PUT database routing', () => { }, ]) getChannelDeviceOverridePgMock.mockResolvedValue(null) + getChannelByNamePgMock.mockResolvedValue({ + id: 12, + name: 'beta', + allow_device_self_set: true, + public: false, + owner_org: 'test-org', + }) + getChannelByIdPgMock.mockResolvedValue({ + id: 12, + name: 'beta-current', + allow_device_self_set: true, + public: false, + owner_org: 'test-org', + }) }) it('uses primary for old plugin channel_devices read-after-write consistency', async () => { @@ -108,4 +173,78 @@ describe('channel_self PUT database routing', () => { expect(response.status).toBe(200) expect(getPgClientMock).toHaveBeenCalledWith(expect.anything(), true) }) + + it('uses replica and KV for old plugin reads when channel self store is bound', async () => { + const kv = createKvStore() + const response = await fetchPut('7.33.0', { CHANNEL_SELF_STORE: kv }) + + expect(response.status).toBe(200) + expect(getPgClientMock).toHaveBeenCalledWith(expect.anything(), true) + expect(getChannelDeviceOverridePgMock).not.toHaveBeenCalled() + expect(kv.get).toHaveBeenCalled() + }) + + it('resolves KV channel id through current channel metadata for old plugin reads', async () => { + const kv = createKvStore() + kv.values.set(channelSelfStoreKey(), JSON.stringify({ + app_id: 'com.test.app', + device_id: '11111111-1111-4111-8111-111111111111', + channel_id: 12, + channel_name: 'stale-name', + allow_device_self_set: false, + updated_at: '2026-01-01T00:00:00.000Z', + })) + + const response = await fetchPut('7.33.0', { CHANNEL_SELF_STORE: kv }) + + expect(response.status).toBe(200) + expect(await response.json()).toMatchObject({ + channel: 'beta-current', + status: 'override', + allowSet: true, + }) + expect(getChannelByIdPgMock).toHaveBeenCalledWith(expect.anything(), 'com.test.app', 12, expect.anything()) + }) + + it('uses replica and KV for old plugin writes when channel self store is bound', async () => { + const kv = createKvStore() + const response = await fetchPost('7.33.0', { CHANNEL_SELF_STORE: kv }) + + expect(response.status).toBe(200) + expect(getPgClientMock).toHaveBeenCalledWith(expect.anything(), true) + expect(upsertChannelDevicePgMock).not.toHaveBeenCalled() + expect(kv.put).toHaveBeenCalled() + expect(JSON.parse(kv.put.mock.calls[0][1])).toEqual({ + app_id: 'com.test.app', + device_id: '11111111-1111-4111-8111-111111111111', + channel_id: 12, + updated_at: expect.any(String), + }) + }) + + it('does not query KV for new plugin channel self storage when store is bound', async () => { + const putKv = createKvStore() + const putResponse = await fetchPut('7.34.0', { CHANNEL_SELF_STORE: putKv }) + + expect(putResponse.status).toBe(200) + expect(putKv.get).not.toHaveBeenCalled() + expect(putKv.put).not.toHaveBeenCalled() + expect(putKv.delete).not.toHaveBeenCalled() + + const postKv = createKvStore() + const postResponse = await fetchPost('7.34.0', { CHANNEL_SELF_STORE: postKv }) + + expect(postResponse.status).toBe(200) + expect(postKv.get).not.toHaveBeenCalled() + expect(postKv.put).not.toHaveBeenCalled() + expect(postKv.delete).not.toHaveBeenCalled() + + const deleteKv = createKvStore() + const deleteResponse = await fetchDelete('7.34.0', { CHANNEL_SELF_STORE: deleteKv }) + + expect(deleteResponse.status).toBe(200) + expect(deleteKv.get).not.toHaveBeenCalled() + expect(deleteKv.put).not.toHaveBeenCalled() + expect(deleteKv.delete).not.toHaveBeenCalled() + }) }) diff --git a/tests/channel_self.test.ts b/tests/channel_self.test.ts index 185d27a3ef..3ece55c909 100644 --- a/tests/channel_self.test.ts +++ b/tests/channel_self.test.ts @@ -1166,7 +1166,7 @@ describe('[POST] /channel_self - new plugin version (>= 7.34.0) behavior', () => } }) - it('should clean up old channel_devices entry when migrating from old to new version', async () => { + it('should leave old channel_devices entry untouched when migrating from old to new version', async () => { const deviceId = randomUUID() const data = getUniqueBaseData(APPNAME) data.device_id = deviceId @@ -1204,7 +1204,7 @@ describe('[POST] /channel_self - new plugin version (>= 7.34.0) behavior', () => expect(oldChannelDevice).toBeTruthy() - // Then, set channel with new version (should clean up old entry) + // Then, set channel with new version (should not read or write old server-side storage) data.plugin_version = '7.34.0' data.channel = 'development' @@ -1215,7 +1215,7 @@ describe('[POST] /channel_self - new plugin version (>= 7.34.0) behavior', () => expect(result.status).toBe('ok') expect(result.allowSet).toBe(true) - // Verify old entry was deleted + // Verify old entry was left untouched const { data: newChannelDevice } = await getSupabaseClient() .from('channel_devices') .select('*') @@ -1223,9 +1223,16 @@ describe('[POST] /channel_self - new plugin version (>= 7.34.0) behavior', () => .eq('app_id', APPNAME) .maybeSingle() - expect(newChannelDevice).toBeNull() + expect(newChannelDevice).toBeTruthy() + expect(newChannelDevice?.channel_id).toBe(oldChannelDevice?.channel_id) } finally { + await getSupabaseClient() + .from('channel_devices') + .delete() + .eq('device_id', deviceId) + .eq('app_id', APPNAME) + // Reset beta channel await getSupabaseClient() .from('channels') @@ -1274,7 +1281,7 @@ describe('[PUT] /channel_self - new plugin version (>= 7.34.0) behavior', () => }) describe('[DELETE] /channel_self - new plugin version (>= 7.34.0) behavior', () => { - it('should return success and clean up old channel_devices entries for new plugin versions', async () => { + it('should return success and leave old channel_devices entries untouched for new plugin versions', async () => { const deviceId = randomUUID() const data = getUniqueBaseData(APPNAME) data.device_id = deviceId @@ -1299,32 +1306,42 @@ describe('[DELETE] /channel_self - new plugin version (>= 7.34.0) behavior', () owner_org: productionChannel!.owner_org, }) - // Verify the old entry exists - const { data: beforeDelete } = await getSupabaseClient() - .from('channel_devices') - .select('*') - .eq('device_id', deviceId) - .eq('app_id', APPNAME) - .maybeSingle() + try { + // Verify the old entry exists + const { data: beforeDelete } = await getSupabaseClient() + .from('channel_devices') + .select('*') + .eq('device_id', deviceId) + .eq('app_id', APPNAME) + .maybeSingle() - expect(beforeDelete).toBeTruthy() + expect(beforeDelete).toBeTruthy() - // Call DELETE with new plugin version - const response = await fetchEndpoint('DELETE', data) - expect(response.status).toBe(200) + // Call DELETE with new plugin version + const response = await fetchEndpoint('DELETE', data) + expect(response.status).toBe(200) - const result = await response.json<{ status: string }>() - expect(result.status).toBe('ok') + const result = await response.json<{ status: string }>() + expect(result.status).toBe('ok') - // Verify the old entry was cleaned up - const { data: afterDelete } = await getSupabaseClient() - .from('channel_devices') - .select('*') - .eq('device_id', deviceId) - .eq('app_id', APPNAME) - .maybeSingle() + // Verify the old entry was left untouched + const { data: afterDelete } = await getSupabaseClient() + .from('channel_devices') + .select('*') + .eq('device_id', deviceId) + .eq('app_id', APPNAME) + .maybeSingle() - expect(afterDelete).toBeNull() + expect(afterDelete).toBeTruthy() + expect(afterDelete?.channel_id).toBe(productionChannel!.id) + } + finally { + await getSupabaseClient() + .from('channel_devices') + .delete() + .eq('device_id', deviceId) + .eq('app_id', APPNAME) + } }) it('should return success even when no old channel_devices entry exists', async () => { diff --git a/tests/update-channel-self-store.unit.test.ts b/tests/update-channel-self-store.unit.test.ts new file mode 100644 index 0000000000..6311a2bb83 --- /dev/null +++ b/tests/update-channel-self-store.unit.test.ts @@ -0,0 +1,150 @@ +import { Hono } from 'hono/tiny' +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const getAppOwnerPostgresMock = vi.fn() +const requestInfosPostgresMock = vi.fn() +const getChannelSelfOverrideMock = vi.fn() + +;(globalThis as any).EdgeRuntime = undefined + +vi.mock('../supabase/functions/_backend/utils/appStatus.ts', () => ({ + getAppStatus: vi.fn(() => Promise.resolve({ status: null, allow_device_custom_id: true })), + setAppStatus: vi.fn(() => Promise.resolve()), +})) + +vi.mock('../supabase/functions/_backend/utils/channelSelfStore.ts', () => ({ + getChannelSelfOverride: getChannelSelfOverrideMock, + isChannelSelfStoreEnabled: vi.fn(() => true), +})) + +vi.mock('../supabase/functions/_backend/utils/downloadUrl.ts', () => ({ + getBundleUrl: vi.fn(), + getManifestUrl: vi.fn(), +})) + +vi.mock('../supabase/functions/_backend/utils/notifications.ts', () => ({ + sendNotifOrgCached: vi.fn(() => Promise.resolve()), +})) + +vi.mock('../supabase/functions/_backend/utils/pg.ts', () => ({ + closeClient: vi.fn(() => Promise.resolve()), + getAppOwnerPostgres: getAppOwnerPostgresMock, + getDrizzleClient: vi.fn(() => ({})), + getPgClient: vi.fn(() => ({ client: 'pg' })), + requestInfosPostgres: requestInfosPostgresMock, + setReplicationLagHeader: vi.fn(() => Promise.resolve()), +})) + +vi.mock('../supabase/functions/_backend/utils/s3.ts', () => ({ + s3: vi.fn(), +})) + +vi.mock('../supabase/functions/_backend/utils/stats.ts', () => ({ + createStatsBandwidth: vi.fn(() => Promise.resolve()), + createStatsMau: vi.fn(() => Promise.resolve()), + createStatsVersion: vi.fn(() => Promise.resolve()), + onPremStats: vi.fn(() => Promise.resolve(new Response('{}'))), + sendStatsAndDevice: vi.fn(() => Promise.resolve()), +})) + +describe('updates channel_self store override routing', () => { + beforeEach(() => { + vi.clearAllMocks() + getAppOwnerPostgresMock.mockResolvedValue({ + allow_device_custom_id: true, + channel_device_count: 12, + expose_metadata: false, + manifest_bundle_count: 0, + owner_org: 'test-org', + orgs: { management_email: 'owner@example.com' }, + plan_valid: true, + }) + getChannelSelfOverrideMock.mockResolvedValue({ + app_id: 'com.test.app', + device_id: '11111111-1111-4111-8111-111111111111', + channel_id: { + id: 42, + }, + }) + requestInfosPostgresMock.mockRejectedValue(new Error('stop-after-request-infos')) + }) + + it.concurrent('queries KV-backed channel_self override only for old plugin versions', async () => { + const { updateWithPG } = await import('../supabase/functions/_backend/utils/update.ts') + const app = new Hono() + const buildBody = (pluginVersion: string) => ({ + app_id: 'com.test.app', + device_id: '11111111-1111-4111-8111-111111111111', + platform: 'ios', + version_build: '1.0.0', + version_name: '1.0.0', + version_os: '17.0', + plugin_version: pluginVersion, + defaultChannel: '', + is_emulator: false, + is_prod: true, + }) + + app.get('/old', c => updateWithPG(c, buildBody('7.33.0'), {} as any)) + app.get('/old-missing-kv', c => updateWithPG(c, buildBody('7.33.0'), {} as any)) + app.get('/new', c => updateWithPG(c, buildBody('7.34.0'), {} as any)) + + const oldResponse = await app.fetch(new Request('http://localhost/old'), { CHANNEL_SELF_STORE: {} }, { waitUntil: () => { } } as any) + + expect(oldResponse.status).toBe(500) + expect(getChannelSelfOverrideMock).toHaveBeenCalledOnce() + + expect(requestInfosPostgresMock).toHaveBeenCalledWith( + expect.anything(), + 'ios', + 'com.test.app', + '11111111-1111-4111-8111-111111111111', + '', + expect.anything(), + 12, + 0, + false, + 42, + ) + + getChannelSelfOverrideMock.mockClear() + requestInfosPostgresMock.mockClear() + + const newResponse = await app.fetch(new Request('http://localhost/new'), { CHANNEL_SELF_STORE: {} }, { waitUntil: () => { } } as any) + + expect(newResponse.status).toBe(500) + expect(getChannelSelfOverrideMock).not.toHaveBeenCalled() + expect(requestInfosPostgresMock).toHaveBeenCalledWith( + expect.anything(), + 'ios', + 'com.test.app', + '11111111-1111-4111-8111-111111111111', + '', + expect.anything(), + 12, + 0, + false, + undefined, + ) + + getChannelSelfOverrideMock.mockResolvedValue(null) + requestInfosPostgresMock.mockClear() + + const oldMissingKvResponse = await app.fetch(new Request('http://localhost/old-missing-kv'), { CHANNEL_SELF_STORE: {} }, { waitUntil: () => { } } as any) + + expect(oldMissingKvResponse.status).toBe(500) + expect(getChannelSelfOverrideMock).toHaveBeenCalledOnce() + expect(requestInfosPostgresMock).toHaveBeenCalledWith( + expect.anything(), + 'ios', + 'com.test.app', + '11111111-1111-4111-8111-111111111111', + '', + expect.anything(), + 12, + 0, + false, + undefined, + ) + }) +})