From e1ba768090b060a99b466dfce57f25d4ba319e5e Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Mon, 10 Feb 2025 00:32:50 -0800 Subject: [PATCH] fix page quotas: - add ensure_page_limit_quotas() which sets the limit to the quota - set the page limit when: creating new crawl, creating configmap, returning crawl workflow - don't set the quota page limit on new or existing crawl workflows (remove setting it on new workflows) to allow updated quotas to take affect for next crawl - fixes #2369 --- backend/btrixcloud/crawlconfigs.py | 28 ++++++++++++++++++------- backend/btrixcloud/operator/crawls.py | 26 +++++++++-------------- backend/btrixcloud/operator/cronjobs.py | 1 + backend/btrixcloud/operator/models.py | 3 ++- 4 files changed, 34 insertions(+), 24 deletions(-) diff --git a/backend/btrixcloud/crawlconfigs.py b/backend/btrixcloud/crawlconfigs.py index 86e0e0c2b8..af440656a9 100644 --- a/backend/btrixcloud/crawlconfigs.py +++ b/backend/btrixcloud/crawlconfigs.py @@ -251,11 +251,6 @@ async def add_crawl_config( crawlconfig.lastStartedBy = user.id crawlconfig.lastStartedByName = user.name - # Ensure page limit is below org maxPagesPerCall if set - max_pages = org.quotas.maxPagesPerCrawl or 0 - if max_pages > 0: - crawlconfig.config.limit = max_pages - # add CrawlConfig to DB here result = await self.crawl_configs.insert_one(crawlconfig.to_dict()) @@ -286,13 +281,30 @@ async def add_crawl_config( execMinutesQuotaReached=exec_mins_quota_reached, ) + def ensure_quota_page_limit(self, crawlconfig: CrawlConfig, org: Organization): + """ensure page limit is set to smaller or existing limit or quota limit""" + if org.quotas.maxPagesPerCrawl and org.quotas.maxPagesPerCrawl > 0: + if crawlconfig.config.limit and crawlconfig.config.limit > 0: + crawlconfig.config.limit = min( + org.quotas.maxPagesPerCrawl, crawlconfig.config.limit + ) + else: + crawlconfig.config.limit = org.quotas.maxPagesPerCrawl + async def add_new_crawl( - self, crawl_id: str, crawlconfig: CrawlConfig, user: User, manual: bool + self, + crawl_id: str, + crawlconfig: CrawlConfig, + user: User, + org: Organization, + manual: bool, ) -> None: """increments crawl count for this config and adds new crawl""" started = dt_now() + self.ensure_quota_page_limit(crawlconfig, org) + inc = self.inc_crawl_count(crawlconfig.id) add = self.crawl_ops.add_new_crawl( crawl_id, crawlconfig, user.id, started, manual @@ -695,6 +707,8 @@ async def get_crawl_config_out(self, cid: UUID, org: Organization): crawlconfig.config.seeds = None + self.ensure_quota_page_limit(crawlconfig, org) + return crawlconfig async def get_crawl_config_seed_count(self, cid: UUID, org: Organization): @@ -892,7 +906,7 @@ async def run_now_internal( storage_filename=storage_filename, profile_filename=profile_filename or "", ) - await self.add_new_crawl(crawl_id, crawlconfig, user, manual=True) + await self.add_new_crawl(crawl_id, crawlconfig, user, org, manual=True) return crawl_id except Exception as exc: diff --git a/backend/btrixcloud/operator/crawls.py b/backend/btrixcloud/operator/crawls.py index 41e8f53ec1..91976cad42 100644 --- a/backend/btrixcloud/operator/crawls.py +++ b/backend/btrixcloud/operator/crawls.py @@ -6,6 +6,7 @@ from pprint import pprint from typing import Optional, Any, Sequence from datetime import datetime +from uuid import UUID import json @@ -29,7 +30,6 @@ CrawlFile, CrawlCompleteIn, StorageRef, - Organization, ) from btrixcloud.utils import str_to_date, date_to_str, dt_now @@ -145,11 +145,13 @@ async def sync_crawls(self, data: MCSyncData): params["userid"] = spec.get("userid", "") pods = data.children[POD] + org = await self.org_ops.get_org_by_id(UUID(oid)) crawl = CrawlSpec( id=crawl_id, cid=cid, oid=oid, + org=org, storage=StorageRef(spec["storageName"]), crawler_channel=spec.get("crawlerChannel"), proxy_id=spec.get("proxyId"), @@ -204,8 +206,6 @@ async def sync_crawls(self, data: MCSyncData): await self.k8s.delete_crawl_job(crawl.id) return {"status": status.dict(exclude_none=True), "children": []} - org = None - # first, check storage quota, and fail immediately if quota reached if status.state in ( "starting", @@ -215,7 +215,6 @@ async def sync_crawls(self, data: MCSyncData): # only check on very first run, before any pods/pvcs created # for now, allow if crawl has already started (pods/pvcs created) if not pods and not data.children[PVC]: - org = await self.org_ops.get_org_by_id(crawl.oid) if self.org_ops.storage_quota_reached(org): await self.mark_finished( crawl, status, "skipped_storage_quota_reached" @@ -229,7 +228,7 @@ async def sync_crawls(self, data: MCSyncData): return self._empty_response(status) if status.state in ("starting", "waiting_org_limit"): - if not await self.can_start_new(crawl, data, status, org): + if not await self.can_start_new(crawl, data, status): return self._empty_response(status) await self.set_state( @@ -382,8 +381,9 @@ async def _load_crawl_configmap(self, crawl: CrawlSpec, children, params): crawlconfig = await self.crawl_config_ops.get_crawl_config(crawl.cid, crawl.oid) - raw_config = crawlconfig.get_raw_config() + self.crawl_config_ops.ensure_quota_page_limit(crawlconfig, crawl.org) + raw_config = crawlconfig.get_raw_config() raw_config["behaviors"] = self._filter_autoclick_behavior( raw_config["behaviors"], params["crawler_image"] ) @@ -637,14 +637,10 @@ async def can_start_new( crawl: CrawlSpec, data: MCSyncData, status: CrawlStatus, - org: Optional[Organization] = None, ): """return true if crawl can start, otherwise set crawl to 'queued' state until more crawls for org finish""" - if not org: - org = await self.org_ops.get_org_by_id(crawl.oid) - - max_crawls = org.quotas.maxConcurrentCrawls or 0 + max_crawls = crawl.org.quotas.maxConcurrentCrawls or 0 if not max_crawls: return True @@ -1238,15 +1234,13 @@ def get_log_line(self, message, details): } return json.dumps(err) - async def add_file_to_crawl(self, cc_data, crawl, redis): + async def add_file_to_crawl(self, cc_data, crawl: CrawlSpec, redis): """Handle finished CrawlFile to db""" filecomplete = CrawlCompleteIn(**cc_data) - org = await self.org_ops.get_org_by_id(crawl.oid) - filename = self.storage_ops.get_org_relative_path( - org, crawl.storage, filecomplete.filename + crawl.org, crawl.storage, filecomplete.filename ) crawl_file = CrawlFile( @@ -1299,7 +1293,7 @@ async def is_crawl_stopping( return "size-limit" # gracefully stop crawl if current running crawl sizes reach storage quota - org = await self.org_ops.get_org_by_id(crawl.oid) + org = crawl.org if org.readOnly: return "stopped_org_readonly" diff --git a/backend/btrixcloud/operator/cronjobs.py b/backend/btrixcloud/operator/cronjobs.py index 018fae9af8..929d7920fb 100644 --- a/backend/btrixcloud/operator/cronjobs.py +++ b/backend/btrixcloud/operator/cronjobs.py @@ -112,6 +112,7 @@ async def make_new_crawljob( crawl_id, crawlconfig, user, + org, manual=False, ) print("Scheduled Crawl Created: " + crawl_id) diff --git a/backend/btrixcloud/operator/models.py b/backend/btrixcloud/operator/models.py index 439eeb262c..9f511ee75e 100644 --- a/backend/btrixcloud/operator/models.py +++ b/backend/btrixcloud/operator/models.py @@ -5,7 +5,7 @@ from typing import Optional, DefaultDict, Literal, Annotated, Any from pydantic import BaseModel, Field from kubernetes.utils import parse_quantity -from btrixcloud.models import StorageRef, TYPE_ALL_CRAWL_STATES +from btrixcloud.models import StorageRef, TYPE_ALL_CRAWL_STATES, Organization BTRIX_API = "btrix.cloud/v1" @@ -70,6 +70,7 @@ class CrawlSpec(BaseModel): id: str cid: UUID oid: UUID + org: Organization scale: int = 1 storage: StorageRef started: str