11/* eslint-disable no-console */
22/* eslint-disable new-cap */
33/* eslint-disable camelcase */
4+ const path = require ( 'path' ) ;
5+ const { existsSync } = require ( 'fs' ) ;
46const chalk = require ( 'chalk' ) ;
5- const { cliux } = require ( '@contentstack/cli-utilities' ) ;
7+ const { cliux, FsUtility } = require ( '@contentstack/cli-utilities' ) ;
68const { getQueue } = require ( '../util/queue' ) ;
79const { performBulkPublish, publishAsset, initializeLogger } = require ( '../consumer/publish' ) ;
810const retryFailedLogs = require ( '../util/retryfailed' ) ;
911const { validateFile } = require ( '../util/fs' ) ;
1012const { isEmpty } = require ( '../util' ) ;
1113const { fetchBulkPublishLimit } = require ( '../util/common-utility' ) ;
1214const { generateBulkPublishStatusUrl } = require ( '../util/generate-bulk-publish-url' ) ;
13- const { resolveInQueueAssets, ASSET_SCAN_STATUS } = require ( '../util/asset-scan' ) ;
15+ const { resolveInQueueAssets, fetchScanStatusBatch , ASSET_SCAN_STATUS } = require ( '../util/asset-scan' ) ;
1416
1517const queue = getQueue ( ) ;
1618let logFileName ;
@@ -227,6 +229,216 @@ async function processPendingAssets(pendingItems, stack, bulkPublish, environmen
227229 }
228230}
229231
232+ /**
233+ * Publish assets from an import backup directory (post-import flow).
234+ *
235+ * Unlike getAssets (live folder scan), this drives publishing from the backup:
236+ * each imported asset is published ONLY to the environments/locales it was
237+ * published to in the source stack (from its publish_details), remapped to the
238+ * target stack. Scan-status gating is applied to the target asset UIDs.
239+ *
240+ * Mirrors the publish_details/env-name resolution of contentstack-import's
241+ * assets `publish()` (the bulk publish API resolves environment NAMES against
242+ * the target stack, and import preserves env names, so source name == target
243+ * name), and adds the clean/quarantined/in-queue scan gating that import skips.
244+ *
245+ * Source of truth split:
246+ * - publish_details + environments come from the BACKUP (post-import flow): an
247+ * asset's target environments are its source publish_details, gated by the
248+ * environment uid-mapping (only environments actually imported into the target
249+ * are publishable) — avoids doomed publish calls to envs never created there.
250+ * - scan status comes from the LIVE target API (it is a runtime property of the
251+ * freshly-imported assets and cannot exist in the backup).
252+ *
253+ * Streaming: asset chunks are processed and released one at a time and scan-gated
254+ * per chunk, so memory does not scale with total asset count. The only structures
255+ * retained across chunks are bounded (partial publish batches + the in-queue
256+ * subset). The single in-memory floor is the asset uid-mapping file itself (same
257+ * as import's publish()); for very large stacks raise Node's --max-old-space-size.
258+ */
259+ async function getAssetsFromBackup ( stack , dataDir , bulkPublish , apiVersion , bulkPublishLimit ) {
260+ const assetsPath = path . join ( dataDir , 'assets' ) ;
261+ const assetsIndexPath = path . join ( assetsPath , 'assets.json' ) ;
262+ const assetUidMapperPath = path . join ( dataDir , 'mapper' , 'assets' , 'uid-mapping.json' ) ;
263+ const envUidMapperPath = path . join ( dataDir , 'mapper' , 'environments' , 'uid-mapping.json' ) ;
264+ const environmentsPath = path . join ( dataDir , 'environments' , 'environments.json' ) ;
265+
266+ // Fail fast with actionable errors when the backup is incomplete.
267+ if ( ! existsSync ( assetsPath ) || ! existsSync ( assetsIndexPath ) ) {
268+ throw new Error ( `No assets found in backup. Expected '${ assetsIndexPath } '. Check the --data-dir path.` ) ;
269+ }
270+ if ( ! existsSync ( assetUidMapperPath ) ) {
271+ throw new Error (
272+ `Asset UID mapping not found at '${ assetUidMapperPath } '. Run import against this data dir before publishing.` ,
273+ ) ;
274+ }
275+ if ( ! existsSync ( environmentsPath ) ) {
276+ throw new Error ( `Environments not found at '${ environmentsPath } '. Cannot resolve target environments.` ) ;
277+ }
278+
279+ const fsUtil = new FsUtility ( { basePath : assetsPath , indexFileName : 'assets.json' } ) ;
280+ const assetUidMap = fsUtil . readFile ( assetUidMapperPath , true ) || { } ;
281+ // environments.json: { [sourceEnvUid]: { name, ... } } — source env definitions.
282+ const environments = fsUtil . readFile ( environmentsPath , true ) || { } ;
283+ // uid-mapping.json: { [sourceEnvUid]: targetEnvUid } — only environments actually
284+ // imported into the target. Used as the "is publishable" gate. Optional: older
285+ // backups (or runs with no imported environments) may not have it.
286+ const envUidMapping = existsSync ( envUidMapperPath ) ? fsUtil . readFile ( envUidMapperPath , true ) || { } : null ;
287+ if ( ! envUidMapping ) {
288+ console . log (
289+ chalk . yellow (
290+ `Environment UID mapping not found at '${ envUidMapperPath } '. Falling back to environment names from ` +
291+ `environments.json — ensure the target stack has environments with matching names.` ,
292+ ) ,
293+ ) ;
294+ }
295+ const isEnvImported = ( sourceEnvUid ) =>
296+ ! envUidMapping || Object . prototype . hasOwnProperty . call ( envUidMapping , sourceEnvUid ) ;
297+
298+ // Resolve an asset's deduped, env-gated target (envName, locale) pairs from its
299+ // publish_details. Env name comes from the backup; only environments actually
300+ // imported into the target (per the env uid-mapping) are publishable.
301+ const resolvePairs = ( asset ) => {
302+ const seen = new Set ( ) ;
303+ const pairs = [ ] ;
304+ for ( const pd of asset . publish_details ) {
305+ const env = environments [ pd . environment ] ;
306+ if ( ! env || ! env . name ) continue ; // env not in the data dir — cannot resolve a name
307+ if ( ! isEnvImported ( pd . environment ) ) continue ; // env not imported into target — skip
308+ const key = `${ env . name } ||${ pd . locale } ` ;
309+ if ( seen . has ( key ) ) continue ;
310+ seen . add ( key ) ;
311+ pairs . push ( { envName : env . name , locale : pd . locale } ) ;
312+ }
313+ return pairs ;
314+ } ;
315+
316+ // Bounded cross-chunk state only — nothing scales with total asset count:
317+ // - `buffers`: partial publish batches, capped at envCount x localeCount x bulkPublishLimit.
318+ // - `pending`: the in-queue (scanning) subset awaiting retry.
319+ // The full asset universe is never held in memory; chunks are processed and
320+ // released one at a time (same streaming shape as contentstack-import's publish()).
321+ const buffers = new Map ( ) ; // "envName||locale" -> { envName, locale, uids: [] }
322+ const pending = [ ] ; // { targetUid, pairs } for assets whose scan is still in queue
323+ let skippedNoUidMapping = 0 ; // source asset was not imported (no asset uid mapping)
324+ let skippedNoMappableEnv = 0 ; // asset has publish details, but none of its envs were imported
325+ let publishableAssets = 0 ; // assets enqueued for publish (across all env/locale pairs)
326+
327+ const enqueueBatch = async ( envName , locale , uids ) => {
328+ if ( uids . length === 0 ) return ;
329+ if ( bulkPublish ) {
330+ const assets = uids . map ( ( uid ) => ( { uid, locale } ) ) ;
331+ await queue . Enqueue ( { assets, Type : 'asset' , environments : [ envName ] , locale, stack, apiVersion } ) ;
332+ } else {
333+ for ( const uid of uids ) {
334+ await queue . Enqueue ( { assetUid : uid , environments : [ envName ] , Type : 'asset' , locale, stack } ) ;
335+ }
336+ }
337+ } ;
338+
339+ // Add a publishable asset to its (env, locale) buffers, flushing any that fill up.
340+ // Grouping by the exact pair keeps the bulk API from publishing an asset to a
341+ // combo it was not published to in source.
342+ const bufferAsset = async ( targetUid , pairs ) => {
343+ publishableAssets ++ ;
344+ for ( const { envName, locale } of pairs ) {
345+ const key = `${ envName } ||${ locale } ` ;
346+ let buf = buffers . get ( key ) ;
347+ if ( ! buf ) {
348+ buf = { envName, locale, uids : [ ] } ;
349+ buffers . set ( key , buf ) ;
350+ }
351+ buf . uids . push ( targetUid ) ;
352+ if ( buf . uids . length >= bulkPublishLimit ) {
353+ await enqueueBatch ( envName , locale , buf . uids ) ;
354+ buf . uids = [ ] ;
355+ }
356+ }
357+ } ;
358+
359+ const indexer = fsUtil . indexFileContent ;
360+
361+ // NOTE: one readChunkFiles.next() call per index entry — the iteration count must
362+ // equal the number of chunk files (same contract as contentstack-import's publish()).
363+ for ( const _index in indexer ) {
364+ const chunk = await fsUtil . readChunkFiles . next ( ) ;
365+ const assetsArr = Object . values ( chunk || { } ) ;
366+
367+ // Resolve this chunk's assets to publish targets (bounded by chunk size).
368+ const resolved = [ ] ;
369+ for ( const asset of assetsArr ) {
370+ if ( ! asset || ! Array . isArray ( asset . publish_details ) || asset . publish_details . length === 0 ) {
371+ continue ;
372+ }
373+ const targetUid = assetUidMap [ asset . uid ] ;
374+ if ( ! targetUid ) {
375+ skippedNoUidMapping ++ ;
376+ continue ;
377+ }
378+ const pairs = resolvePairs ( asset ) ;
379+ if ( pairs . length === 0 ) {
380+ skippedNoMappableEnv ++ ;
381+ continue ;
382+ }
383+ resolved . push ( { targetUid, pairs } ) ;
384+ }
385+ if ( resolved . length === 0 ) continue ;
386+
387+ // Scan status is a target-stack property of the freshly-imported assets, so it
388+ // is fetched live (one batched read per chunk) — it is not in the backup.
389+ const statusMap = await fetchScanStatusBatch (
390+ stack ,
391+ resolved . map ( ( r ) => r . targetUid ) ,
392+ ) ;
393+
394+ for ( const { targetUid, pairs } of resolved ) {
395+ const status = statusMap . get ( targetUid ) ;
396+ if ( status === ASSET_SCAN_STATUS . QUARANTINE ) {
397+ scanSummary . quarantined ++ ;
398+ console . log ( chalk . yellow ( `Skipped (quarantined): Asset UID '${ targetUid } '` ) ) ;
399+ } else if ( status === ASSET_SCAN_STATUS . IN_QUEUE ) {
400+ scanSummary . inQueue ++ ;
401+ pending . push ( { targetUid, pairs } ) ;
402+ } else {
403+ if ( status === ASSET_SCAN_STATUS . READY ) scanSummary . clean ++ ;
404+ else scanSummary . noStatus ++ ;
405+ await bufferAsset ( targetUid , pairs ) ;
406+ }
407+ }
408+ }
409+
410+ // Resolve in-queue assets once (incremental backoff); publish those that turn clean.
411+ if ( pending . length > 0 ) {
412+ const resolvedUids = await resolveInQueueAssets (
413+ stack ,
414+ pending . map ( ( p ) => p . targetUid ) ,
415+ ) ;
416+ const resolvedSet = new Set ( resolvedUids ) ;
417+ for ( const { targetUid, pairs } of pending ) {
418+ if ( resolvedSet . has ( targetUid ) ) await bufferAsset ( targetUid , pairs ) ;
419+ }
420+ }
421+
422+ // Flush remaining partial (env, locale) batches.
423+ for ( const { envName, locale, uids } of buffers . values ( ) ) {
424+ await enqueueBatch ( envName , locale , uids ) ;
425+ }
426+
427+ if ( skippedNoUidMapping > 0 ) {
428+ console . log ( chalk . yellow ( `Skipped ${ skippedNoUidMapping } asset(s): no UID mapping (not imported into target).` ) ) ;
429+ }
430+ if ( skippedNoMappableEnv > 0 ) {
431+ console . log (
432+ chalk . yellow (
433+ `Skipped ${ skippedNoMappableEnv } asset(s): none of their published environments were imported into the target.` ,
434+ ) ,
435+ ) ;
436+ }
437+ if ( publishableAssets === 0 ) {
438+ console . log ( chalk . yellow ( 'No publishable assets found in backup (no mapped assets with publishable environments).' ) ) ;
439+ }
440+ }
441+
230442function setConfig ( conf , bp ) {
231443 if ( bp ) {
232444 queue . consumer = performBulkPublish ;
@@ -242,7 +454,7 @@ function setConfig(conf, bp) {
242454 scanSummary = { clean : 0 , quarantined : 0 , inQueue : 0 , noStatus : 0 } ;
243455}
244456
245- async function start ( { retryFailed, bulkPublish, environments, folderUid, locales, apiVersion } , stack , config ) {
457+ async function start ( { retryFailed, bulkPublish, environments, folderUid, locales, apiVersion, dataDir } , stack , config ) {
246458 process . on ( 'beforeExit' , async ( ) => {
247459 const isErrorLogEmpty = await isEmpty ( `${ filePath } .error` ) ;
248460 const isSuccessLogEmpty = await isEmpty ( `${ filePath } .success` ) ;
@@ -280,6 +492,13 @@ async function start({ retryFailed, bulkPublish, environments, folderUid, locale
280492 } else {
281493 await retryFailedLogs ( retryFailed , { assetQueue : queue } , 'publish' ) ;
282494 }
495+ } else if ( dataDir ) {
496+ // Post-import flow: publish each imported asset only to its original
497+ // environments/locales (from backup publish_details), scan-gated.
498+ setConfig ( config , bulkPublish ) ;
499+ const bulkPublishLimit = fetchBulkPublishLimit ( stack ?. org_uid ) ;
500+ await getAssetsFromBackup ( stack , dataDir , bulkPublish , apiVersion , bulkPublishLimit ) ;
501+ printScanSummary ( scanSummary ) ;
283502 } else if ( folderUid ) {
284503 setConfig ( config , bulkPublish ) ;
285504 const bulkPublishLimit = fetchBulkPublishLimit ( stack ?. org_uid ) ;
@@ -299,6 +518,7 @@ async function start({ retryFailed, bulkPublish, environments, folderUid, locale
299518
300519module . exports = {
301520 getAssets,
521+ getAssetsFromBackup,
302522 setConfig,
303523 start,
304524 processPendingAssets,
0 commit comments