Skip to content

Commit 1b6fb69

Browse files
author
naman-contentstack
committed
feat: add asset scanning support in bulk publish v1
1 parent cbffcfa commit 1b6fb69

4 files changed

Lines changed: 434 additions & 20 deletions

File tree

packages/contentstack-bulk-publish/.mocharc.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
"test/unit/commands/assets/unpublish.test.js",
66
"test/unit/commands/bulk-publish/cross-publish.test.js",
77
"test/unit/commands/entries/publish.test.js",
8-
"test/unit/commands/entries/unpublish.test.js"
8+
"test/unit/commands/entries/unpublish.test.js",
9+
"test/unit/util/asset-scan.test.js"
910
],
1011
"reporter": "dot",
1112
"timeout": 60000,

packages/contentstack-bulk-publish/src/producer/publish-assets.js

Lines changed: 149 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
/* eslint-disable no-console */
22
/* eslint-disable new-cap */
33
/* eslint-disable camelcase */
4+
const chalk = require('chalk');
45
const { cliux } = require('@contentstack/cli-utilities');
56
const { getQueue } = require('../util/queue');
67
const { performBulkPublish, publishAsset, initializeLogger } = require('../consumer/publish');
@@ -9,14 +10,27 @@ const { validateFile } = require('../util/fs');
910
const { isEmpty } = require('../util');
1011
const { fetchBulkPublishLimit } = require('../util/common-utility');
1112
const { generateBulkPublishStatusUrl } = require('../util/generate-bulk-publish-url');
13+
const { resolveInQueueAssets, ASSET_SCAN_STATUS } = require('../util/asset-scan');
1214

1315
const queue = getQueue();
1416
let logFileName;
1517
let bulkPublishSet = [];
18+
let pendingAssetsForRetry = [];
19+
let scanSummary = { clean: 0, quarantined: 0, inQueue: 0, noStatus: 0 };
1620
let filePath;
1721

1822
/* eslint-disable no-param-reassign */
1923

24+
function printScanSummary({ clean, noStatus, inQueue, quarantined }) {
25+
const total = clean + noStatus + inQueue + quarantined;
26+
if (total === 0) return;
27+
console.log(chalk.bold(`\nAsset scan summary (${total} total):`));
28+
console.log(chalk.green(` ✓ Clean (publishing): ${clean}`));
29+
if (noStatus > 0) console.log(chalk.green(` ✓ No scan status (publishing): ${noStatus}`));
30+
if (inQueue > 0) console.log(chalk.yellow(` ⧖ In queue (retrying): ${inQueue}`));
31+
if (quarantined > 0) console.log(chalk.red(` ✗ Quarantined (skipped): ${quarantined}`));
32+
}
33+
2034
async function getAssets(stack, folder, bulkPublish, environments, locale, apiVersion, bulkPublishLimit, skip = 0) {
2135
return new Promise((resolve, reject) => {
2236
let queryParams = {
@@ -25,6 +39,7 @@ async function getAssets(stack, folder, bulkPublish, environments, locale, apiVe
2539
include_count: true,
2640
include_folders: true,
2741
include_publish_details: true,
42+
include_asset_scan_status: true,
2843
};
2944
stack
3045
.asset()
@@ -34,7 +49,8 @@ async function getAssets(stack, folder, bulkPublish, environments, locale, apiVe
3449
if (assetResponse && assetResponse.items.length > 0) {
3550
skip += assetResponse.items.length;
3651
let assets = assetResponse.items;
37-
for (let index = 0; index < assetResponse.items.length; index++) {
52+
53+
for (let index = 0; index < assets.length; index++) {
3854
if (assets[index].is_dir === true) {
3955
await getAssets(
4056
stack,
@@ -48,6 +64,35 @@ async function getAssets(stack, folder, bulkPublish, environments, locale, apiVe
4864
);
4965
continue;
5066
}
67+
68+
const scanStatus = assets[index]._asset_scan_status;
69+
70+
// Quarantined assets are skipped permanently
71+
if (scanStatus === ASSET_SCAN_STATUS.QUARANTINE) {
72+
scanSummary.quarantined++;
73+
console.log(chalk.yellow(`Skipped (quarantined): Asset UID '${assets[index].uid}'`));
74+
continue;
75+
}
76+
77+
// In-queue assets are deferred for retry after all pages are processed
78+
if (scanStatus === ASSET_SCAN_STATUS.IN_QUEUE) {
79+
scanSummary.inQueue++;
80+
pendingAssetsForRetry.push({
81+
uid: assets[index].uid,
82+
locale,
83+
publish_details: assets[index].publish_details || [],
84+
environments,
85+
});
86+
continue;
87+
}
88+
89+
// Ready (clean) or no scan status — enqueue for publish
90+
if (scanStatus === ASSET_SCAN_STATUS.READY) {
91+
scanSummary.clean++;
92+
} else {
93+
scanSummary.noStatus++;
94+
}
95+
5196
if (bulkPublish) {
5297
if (bulkPublishSet.length < bulkPublishLimit) {
5398
bulkPublishSet.push({
@@ -67,22 +112,6 @@ async function getAssets(stack, folder, bulkPublish, environments, locale, apiVe
67112
});
68113
bulkPublishSet = [];
69114
}
70-
71-
if (
72-
assetResponse.items.length - 1 === index &&
73-
bulkPublishSet.length > 0 &&
74-
bulkPublishSet.length < bulkPublishLimit
75-
) {
76-
await queue.Enqueue({
77-
assets: bulkPublishSet,
78-
Type: 'asset',
79-
environments: environments,
80-
locale,
81-
stack: stack,
82-
apiVersion,
83-
});
84-
bulkPublishSet = [];
85-
}
86115
} else {
87116
await queue.Enqueue({
88117
assetUid: assets[index].uid,
@@ -94,6 +123,23 @@ async function getAssets(stack, folder, bulkPublish, environments, locale, apiVe
94123
});
95124
}
96125
}
126+
127+
// Flush any partial bulk batch at the end of the page.
128+
// Done outside the for-loop so it fires correctly even when some assets
129+
// were skipped (quarantined/in-queue) and the last non-skipped asset is
130+
// not at the final array index.
131+
if (bulkPublish && bulkPublishSet.length > 0) {
132+
await queue.Enqueue({
133+
assets: bulkPublishSet,
134+
Type: 'asset',
135+
environments: environments,
136+
locale,
137+
stack: stack,
138+
apiVersion,
139+
});
140+
bulkPublishSet = [];
141+
}
142+
97143
if (skip === assetResponse.count) {
98144
return resolve(true);
99145
}
@@ -109,6 +155,78 @@ async function getAssets(stack, folder, bulkPublish, environments, locale, apiVe
109155
});
110156
}
111157

158+
/**
159+
* After all pages/locales are scanned, retry any assets that were in-queue.
160+
* Takes pendingItems explicitly — does not read from module-level state.
161+
* Uses incremental backoff (see asset-scan.js SCAN_RETRY config).
162+
*/
163+
async function processPendingAssets(pendingItems, stack, bulkPublish, environments, apiVersion, bulkPublishLimit) {
164+
if (pendingItems.length === 0) return;
165+
166+
// Deduplicate UIDs across locales — scan status is per-asset, not per-locale.
167+
// Resolving once avoids redundant retry loops for multi-locale runs.
168+
const allUids = [...new Set(pendingItems.map((a) => a.uid))];
169+
const resolvedUids = await resolveInQueueAssets(stack, allUids);
170+
171+
if (resolvedUids.length === 0) {
172+
console.log(chalk.yellow('No in-queue assets resolved after retries.'));
173+
return;
174+
}
175+
176+
const resolvedSet = new Set(resolvedUids);
177+
178+
// Group resolved items by locale for correct enqueue context
179+
const byLocale = {};
180+
for (const item of pendingItems) {
181+
if (!resolvedSet.has(item.uid)) continue;
182+
if (!byLocale[item.locale]) byLocale[item.locale] = [];
183+
byLocale[item.locale].push(item);
184+
}
185+
186+
for (const locale of Object.keys(byLocale)) {
187+
const resolvedItems = byLocale[locale];
188+
189+
if (bulkPublish) {
190+
let batchSet = [];
191+
for (const item of resolvedItems) {
192+
batchSet.push({ uid: item.uid, locale, publish_details: item.publish_details });
193+
if (batchSet.length === bulkPublishLimit) {
194+
await queue.Enqueue({
195+
assets: batchSet,
196+
Type: 'asset',
197+
environments,
198+
locale,
199+
stack,
200+
apiVersion,
201+
});
202+
batchSet = [];
203+
}
204+
}
205+
if (batchSet.length > 0) {
206+
await queue.Enqueue({
207+
assets: batchSet,
208+
Type: 'asset',
209+
environments,
210+
locale,
211+
stack,
212+
apiVersion,
213+
});
214+
}
215+
} else {
216+
for (const item of resolvedItems) {
217+
await queue.Enqueue({
218+
assetUid: item.uid,
219+
publish_details: item.publish_details,
220+
environments,
221+
Type: 'asset',
222+
locale,
223+
stack,
224+
});
225+
}
226+
}
227+
}
228+
}
229+
112230
function setConfig(conf, bp) {
113231
if (bp) {
114232
queue.consumer = performBulkPublish;
@@ -120,6 +238,8 @@ function setConfig(conf, bp) {
120238
config = conf;
121239
queue.config = conf;
122240
filePath = initializeLogger(logFileName);
241+
pendingAssetsForRetry = [];
242+
scanSummary = { clean: 0, quarantined: 0, inQueue: 0, noStatus: 0 };
123243
}
124244

125245
async function start({ retryFailed, bulkPublish, environments, folderUid, locales, apiVersion }, stack, config) {
@@ -131,7 +251,7 @@ async function start({ retryFailed, bulkPublish, environments, folderUid, locale
131251
} else if (!isSuccessLogEmpty) {
132252
console.log(`The success log for this session is stored at ${filePath}.success`);
133253
}
134-
254+
135255
// Generate and display the bulk publish status link
136256
if (bulkPublish && stack && config) {
137257
const statusUrl = generateBulkPublishStatusUrl(stack, config);
@@ -142,11 +262,12 @@ async function start({ retryFailed, bulkPublish, environments, folderUid, locale
142262
process.stdout.write('\n');
143263
}
144264
}
145-
265+
146266
process.exit(0);
147267
});
148268

149269
if (retryFailed) {
270+
console.log(chalk.yellow('Note: --retry-failed replays from log and skips asset scan status checks.'));
150271
if (!validateFile(retryFailed, ['publish-assets', 'bulk-publish-assets'])) {
151272
return false;
152273
}
@@ -165,11 +286,20 @@ async function start({ retryFailed, bulkPublish, environments, folderUid, locale
165286
for (const locale of locales) {
166287
await getAssets(stack, folderUid, bulkPublish, environments, locale, apiVersion, bulkPublishLimit);
167288
}
289+
290+
printScanSummary(scanSummary);
291+
292+
// Resolve in-queue assets with incremental retry; pass pendingAssetsForRetry explicitly
293+
if (pendingAssetsForRetry.length > 0) {
294+
await processPendingAssets(pendingAssetsForRetry, stack, bulkPublish, environments, apiVersion, bulkPublishLimit);
295+
pendingAssetsForRetry = [];
296+
}
168297
}
169298
}
170299

171300
module.exports = {
172301
getAssets,
173302
setConfig,
174303
start,
304+
processPendingAssets,
175305
};

0 commit comments

Comments
 (0)