Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Try to recover locks after migrations on Apify #2883

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 43 additions & 3 deletions packages/core/src/storages/request_queue_v2.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { BatchAddRequestsResult, Dictionary } from '@crawlee/types';
import type { BatchAddRequestsResult, Dictionary, RequestQueueHeadItem } from '@crawlee/types';

import { checkStorageAccess } from './access_checking';
import type {
Expand All @@ -8,7 +8,7 @@
} from './request_provider';
import { RequestProvider } from './request_provider';
import { getRequestId } from './utils';
import { Configuration } from '../configuration';
import { Configuration, ConfigurationOptions } from '../configuration';

Check failure on line 11 in packages/core/src/storages/request_queue_v2.ts

View workflow job for this annotation

GitHub Actions / Lint

Imports "ConfigurationOptions" are only used as type
import { EventType } from '../events';
import type { Request, Source } from '../request';

Expand Down Expand Up @@ -61,6 +61,8 @@
private queueHasLockedRequests: boolean | undefined = undefined;
private shouldCheckForForefrontRequests = false;
private dequeuedRequestCount = 0;
private initialized = false;
private runId?: string;

constructor(options: RequestProviderOptions, config = Configuration.getGlobalConfig()) {
super(
Expand All @@ -73,6 +75,9 @@
config,
);

// HACK - runId is only used when running on Apify, and in that case, the Apify SDK will monkeypatch the Configuration class to provide it
this.runId = config.get('actorRunId' as keyof ConfigurationOptions, undefined as string | undefined);

const eventManager = config.getEventManager();

eventManager.on(EventType.MIGRATING, async () => {
Expand Down Expand Up @@ -537,10 +542,45 @@
}
}

private async initialize(): Promise<void> {
// When opening this particular storage for the first time with the Apify request queue implementation,
// check for requests locked by the same run and try to re-lock and enqueue them locally.
// This prevents us from waiting for locks made by the same run (e.g., before a migration) to time out.

if (!this.initialized && this.runId !== undefined) {
this.initialized = true;

let items: RequestQueueHeadItem[] = [];

try {
const head = await this.client.listHead({ limit: 1_000_000, lockedByRunId: this.runId }); // Might be overkill, but who knows
items = head.items;
} catch {
this.log.warning(
'Could not check for locked requests from our run - make sure to update the apify-client package',
);
return;
}

for (const item of items) {
const lockResult = await this._prolongRequestLock(item.id);
if (lockResult !== null) {
this.queueHeadIds.add(item.id, item.id, false);
}
}

this.log.info(
`Found ${items.length} requests that the current run locked in the past and recovered ${this.queueHeadIds.length()} of those`,
);
}
}

/**
* @inheritDoc
*/
static override async open(...args: Parameters<typeof RequestProvider.open>): Promise<RequestQueue> {
return super.open(...args) as Promise<RequestQueue>;
const rq = (await super.open(...args)) as RequestQueue;
await rq.initialize();
return rq;
}
}
6 changes: 6 additions & 0 deletions packages/types/src/storages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,12 @@ export interface ListOptions {
* @default 100
*/
limit?: number;

locked?: boolean;

pending?: boolean;

lockedByRunId?: string;
}

export interface ListAndLockOptions extends ListOptions {
Expand Down
7 changes: 7 additions & 0 deletions test/core/storages/request_queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1009,4 +1009,11 @@ describe('RequestQueue v2', () => {

expect(retrievedUrls.map((x) => new URL(x).pathname)).toEqual(Array.from({ length: 5 }, (_, i) => `/${i + 1}`));
});

test('repeated `open` calls should return the same instance', async () => {
const queue1 = await RequestQueueV2.open('repeated-open-call');
const queue2 = await RequestQueueV2.open('repeated-open-call');

expect(queue1).toBe(queue2);
});
});
Loading