Replies: 5 comments 6 replies
-
Ideally there would be an API to control when each scheduled job is triggered. Currently the API only allows completely stopping the queue which doesn't give us much control around when tasks are added and executed. Something like this can be added, for example: myMethod(ctx: RequestContext) {
const acfQueue = this.jobQueueService.getJobQueues().find(q => q.name === 'apply-collection-filters');
acfQueue.stop(); // This stops registration of new jobs, but it also prevents execution of tasks
// A method like this can be useful when doing imports
acfQueue.pauseForContext(ctx);
products.forEach(async productData => await this.productService.create(ctx, { ...productData }));
acfQueue.resumeForContext(ctx);
}
// We also want to be able to prevent queue from taking new jobs but still be able to execute them
onInit() {
const acfQueue = this.jobQueueService.getJobQueues().find(q => q.name === 'apply-collection-filters');
acfQueue.sleep(); // Queues that sleep don't add tasks anymore but will run already added tasks (maybe a better name is needed)
}
addProducts(ctx: RequestContext) {
products.forEach(async productData => await this.productService.create(ctx, { ...productData }));
const acfQueue = this.jobQueueService.getJobQueues().find(q => q.name === 'apply-collection-filters');
// This will "wake" the queue in order to add the job, then it will go back to "sleep"
acfQueue.add({ ...data }, { wake: true });
} |
Beta Was this translation helpful? Give feedback.
-
Thank you both @Izayda & @skid for your thoughts on this. Here's my initial thoughts:
If I understand you correctly, you are asking if it is possible to calculate the Collections a given ProductVariant should be included in? This might be possible, I'll need to do some investigation. So I see 2 kinda distinct issues/use-cases here:
For the 1st issue, I agree with the idea of making it possible to turn off automatic
Ideally we should not need to run a full reindex after these tasks. That's overkill. So a possibility might be to have some kind of "buffer" for all modified Products/Variants. Once we finish all our changes, we "flush" this buffer, which will apply the collection filters and then update the search index for these affected products/variants. Re-reading @Izayda's suggestions, I think this workflow is very similar to what you are saying, but without the need to explicitly start a batch session and pass an ID around. It would need to work nicely with the Admin UI as well as programatically. For the 2nd issue, I think that @skid's suggestion is worth exploring too. The general ability to better control the firing of jobs could come in useful in many situations. I'll spend a bit of time exploring these this week and note any further ideas or issues with what has been suggested. |
Beta Was this translation helpful? Give feedback.
-
Alright, I thought about this some more and here's a proposal: DefaultSearchPlugin.init({ useEventBatching: true }), We introduce a new option to the DefaultSearchPlugin (the exact same solution can apply to ElasticsearchPlugin too) extend type Query {
batchedSearchIndexEventCount: Int!
}
extend type Mutation {
flushBatchedSearchIndexEvents: Boolean!
} If we have So running an import or just updating lots of products/variants/collections/assets will not trigger any jobs. All these events will just get buffered in the batch. In the Admin UI, we can make a call to When this mutation is triggered, the DefaultSearchPlugin will read all the batched events, and de-duplicate all the entities by ID so each is only re-indexed once. It would run the |
Beta Was this translation helpful? Give feedback.
-
Thinking now of a solution which can sit transparently between the call to /**
* This class is used to control the buffering of jobs. It can be injected into your services.
*/
@Injectable()
export class JobQueueBuffer {
private processors: JobQueueBufferProcessor[];
addProcessor(processor: JobQueueBufferProcessor);
removeProcessor(processor: JobQueueBufferProcessor);
add(job: Job): Promise<Job>;
bufferSize(forProcessors?: JobQueueBufferProcessor[]): Promise<number>;
flush(forProcessors?: JobQueueBufferProcessor[]): Promise<void>;
}
// This is used to control which jobs get buffered, and how
// the buffered jobs then get batched before processing
export interface JobQueueBufferProcessor {
collect(job: Job): Promise<boolean>;
reduce(collectedJobs: Job[]): Promise<Job[]>;
}
// This defines how the buffered jobs get physically stored
export interface JobQueueBufferStorageStrategy {
add(processorId: string, job: Job): Promise<Job>;
bufferSize(processorIds?: string[]): Promise<number>;
flush(processorIds?: string[]): Promise<void>;
} Here's how it would work:
Example: class SearchIndexBuffer implements JobQueueBuffer {
collect(job: Job) {
if (job.queueName === 'apply-collection-filters' || job.queueName === 'update-search-index') {
return true;
} else {
return false;
}
}
reduce(jobs: Job[]) {
const updateSearchIndexJobs: Array<Job<UpdateIndexQueueJobData>> = jobs
.filter(job => job.queueName === 'update-search-index');
const updateVariantJobs = updateSearchIndexJobs.filter(job => job.data.type === 'update-variants-by-id');
const allVariantIds = unique(updateVariantJobs.reduce((ids, job) => [...ids, ...job.data.ids], []));
const batchedUpdateVariantsJob = updateVariantJobs[0];
batchedUpdateVariantsJob.data.ids = allVariantIds;
// then logic for consolidating the other jobs in a similar way
return consolidatedJobs;
}
}
// in the DefaultSearchPlugin
onApplicationBootstrap() {
this.jobQueueBuffer.addProcessor(new SearchIndexBuffer());
} Here's another example that implements a pause/resume behaviour: myMethod() {
const myProcessor = new MyProcessor();
this.jobQueueBuffer.addProcessor(myProcessor);
// MyProcessor is designed to collect all the jobs fired during product creation
products.forEach(async productData => await this.productService.create(ctx, { ...productData }));
await this.jobQueueBuffer.flush([myProcessor]);
this.jobQueueBuffer.removeProcessor(myProcessor);
} |
Beta Was this translation helpful? Give feedback.
-
This thread became more interesting with each message :) Batching solution is great! |
Beta Was this translation helpful? Give feedback.
-
Currently, bulk import/update of products generate huge amount of similar operations:
apply-collections-filters
job. Problems:update-index-job
. Problems:product variant
level. But is executed atproduct level
, because each job get product for variant and reindex all variants for product. So, there will be many equal jobs.apply-collection-job
, so, outdated collections data can be passed to indexThe aim of this discussion: to determine issues, that should be resolved to eliminate this problems.
Original discussion in Slack by @skid: https://vendure-ecommerce.slack.com/archives/CKYMF0ZTJ/p1632747675049800
Probably possible ways:
Is it possible to implement third way (to get collection filters to update from
updated
data of variant)?If yes, i think combination of 2 and 3 can be implemented. If no, only second variant.
@michaelbromley need your input and decision.
Beta Was this translation helpful? Give feedback.
All reactions