Skip to content

Pause / Resume Crawls Initial Implmentation. #2572

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion backend/btrixcloud/crawlconfigs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import re
import os
import traceback
from datetime import datetime
from datetime import datetime, timedelta
from uuid import UUID, uuid4
import urllib.parse

Expand Down Expand Up @@ -89,6 +89,8 @@ class CrawlConfigOps:
crawler_images_map: dict[str, str]
crawler_image_pull_policy_map: dict[str, str]

paused_expiry_delta: timedelta

def __init__(
self,
dbclient,
Expand All @@ -115,6 +117,10 @@ def __init__(
"DEFAULT_CRAWLER_IMAGE_PULL_POLICY", "IfNotPresent"
)

self.paused_expiry_delta = timedelta(
minutes=int(os.environ.get("PAUSED_CRAWL_LIMIT_MINUTES", "10080"))
)

self.router = APIRouter(
prefix="/crawlconfigs",
tags=["crawlconfigs"],
Expand Down Expand Up @@ -749,6 +755,13 @@ async def _add_running_curr_crawl_stats(self, crawlconfig: CrawlConfigOut):
crawlconfig.lastCrawlState = crawl.state
crawlconfig.lastCrawlSize = crawl.stats.size if crawl.stats else 0
crawlconfig.lastCrawlStopping = crawl.stopping
crawlconfig.lastCrawlPausing = crawl.pausing
crawlconfig.lastCrawlPausedAt = crawl.pausedAt
crawlconfig.lastCrawlPausedExpiry = None
if crawl.pausedAt:
crawlconfig.lastCrawlPausedExpiry = (
crawl.pausedAt + self.paused_expiry_delta
)
crawlconfig.isCrawlRunning = True

async def get_crawl_config_out(self, cid: UUID, org: Organization):
Expand Down
10 changes: 9 additions & 1 deletion backend/btrixcloud/crawlmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import secrets

from typing import Optional, Dict, Tuple
from datetime import timedelta
from datetime import datetime, timedelta

from fastapi import HTTPException

Expand Down Expand Up @@ -386,6 +386,14 @@ async def shutdown_crawl(self, crawl_id: str, graceful=True) -> dict:

return await self.delete_crawl_job(crawl_id)

async def pause_resume_crawl(
self, crawl_id: str, paused_at: Optional[datetime] = None
) -> dict:
"""pause or unpause a crawl"""
return await self._patch_job(
crawl_id, {"pausedAt": date_to_str(paused_at) if paused_at else ""}
)

async def delete_crawl_configs_for_org(self, org: str) -> None:
"""Delete all crawl configs for given org"""
await self._delete_crawl_configs(f"btrix.org={org}")
Expand Down
49 changes: 49 additions & 0 deletions backend/btrixcloud/crawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,39 @@ async def get_crawl_stats(

return crawls_data

async def pause_crawl(
self, crawl_id: str, org: Organization, pause: bool
) -> Dict[str, bool]:
"""pause or unpause a crawl temporarily"""
crawl = await self.get_base_crawl(crawl_id, org)
if crawl and crawl.type != "crawl":
raise HTTPException(status_code=400, detail="not_a_crawl")

result = None

if pause:
paused_at = dt_now()
else:
paused_at = None

try:
result = await self.crawl_manager.pause_resume_crawl(
crawl_id, paused_at=paused_at
)

if result.get("success"):
await self.crawls.find_one_and_update(
{"_id": crawl_id, "type": "crawl", "oid": org.id},
{"$set": {"pausing": pause, "pausedAt": paused_at}},
)

return {"success": True}
# pylint: disable=bare-except
except:
pass

raise HTTPException(status_code=404, detail="crawl_not_found")

async def shutdown_crawl(
self, crawl_id: str, org: Organization, graceful: bool
) -> Dict[str, bool]:
Expand Down Expand Up @@ -1242,6 +1275,22 @@ async def crawl_cancel_immediately(
async def crawl_graceful_stop(crawl_id, org: Organization = Depends(org_crawl_dep)):
return await ops.shutdown_crawl(crawl_id, org, graceful=True)

@app.post(
"/orgs/{oid}/crawls/{crawl_id}/pause",
tags=["crawls"],
response_model=SuccessResponse,
)
async def pause_crawl(crawl_id, org: Organization = Depends(org_crawl_dep)):
return await ops.pause_crawl(crawl_id, org, pause=True)

@app.post(
"/orgs/{oid}/crawls/{crawl_id}/unpause",
tags=["crawls"],
response_model=SuccessResponse,
)
async def unpause_crawl(crawl_id, org: Organization = Depends(org_crawl_dep)):
return await ops.pause_crawl(crawl_id, org, pause=False)

@app.post(
"/orgs/{oid}/crawls/delete",
tags=["crawls"],
Expand Down
11 changes: 10 additions & 1 deletion backend/btrixcloud/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,9 @@ class UserOrgInfoOut(BaseModel):
]
RUNNING_STATES = get_args(TYPE_RUNNING_STATES)

TYPE_WAITING_STATES = Literal["starting", "waiting_capacity", "waiting_org_limit"]
TYPE_WAITING_STATES = Literal[
"starting", "waiting_capacity", "waiting_org_limit", "paused"
]
WAITING_STATES = get_args(TYPE_WAITING_STATES)

TYPE_FAILED_STATES = Literal[
Expand All @@ -236,6 +238,7 @@ class UserOrgInfoOut(BaseModel):
TYPE_SUCCESSFUL_STATES = Literal[
"complete",
"stopped_by_user",
"stopped_pause_expired",
"stopped_storage_quota_reached",
"stopped_time_quota_reached",
"stopped_org_readonly",
Expand Down Expand Up @@ -478,6 +481,9 @@ class CrawlConfigOut(CrawlConfigCore, CrawlConfigAdditional):
id: UUID

lastCrawlStopping: Optional[bool] = False
lastCrawlPausing: Optional[bool] = False
lastCrawlPausedAt: Optional[datetime] = None
lastCrawlPausedExpiry: Optional[datetime] = None
profileName: Optional[str] = None
firstSeed: Optional[str] = None
seedCount: int = 0
Expand Down Expand Up @@ -863,6 +869,8 @@ class CrawlOut(BaseMongoModel):
seedCount: Optional[int] = None
profileName: Optional[str] = None
stopping: Optional[bool] = False
pausing: Optional[bool] = False
pausedAt: Optional[datetime] = None
manual: bool = False
cid_rev: Optional[int] = None
scale: Scale = 1
Expand Down Expand Up @@ -1017,6 +1025,7 @@ class Crawl(BaseCrawl, CrawlConfigCore):
manual: bool = False

stopping: Optional[bool] = False
pausing: Optional[bool] = False

qaCrawlExecSeconds: int = 0

Expand Down
98 changes: 88 additions & 10 deletions backend/btrixcloud/operator/crawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import math
from pprint import pprint
from typing import Optional, Any, Sequence
from datetime import datetime
from datetime import datetime, timedelta
from uuid import UUID

import json
Expand Down Expand Up @@ -79,6 +79,7 @@

# pylint: disable=too-many-public-methods, too-many-locals, too-many-branches, too-many-statements
# pylint: disable=invalid-name, too-many-lines, too-many-return-statements
# pylint: disable=too-many-instance-attributes
# ============================================================================
class CrawlOperator(BaseOperator):
"""CrawlOperator Handler"""
Expand All @@ -93,6 +94,8 @@ class CrawlOperator(BaseOperator):

min_avail_storage_ratio: float

paused_expires_delta: timedelta

def __init__(self, *args):
super().__init__(*args)

Expand All @@ -110,6 +113,13 @@ def __init__(self, *args):
os.environ.get("CRAWLER_MIN_AVAIL_STORAGE_RATIO") or 0
)

# time in minutes before paused crawl is stopped - default is 7 days
paused_crawl_limit_minutes = int(
os.environ.get("PAUSED_CRAWL_LIMIT_MINUTES", "10080")
)

self.paused_expires_delta = timedelta(minutes=paused_crawl_limit_minutes)

def init_routes(self, app):
"""init routes for this operator"""

Expand Down Expand Up @@ -160,6 +170,7 @@ async def sync_crawls(self, data: MCSyncData):
scale=spec.get("scale", 1),
started=data.parent["metadata"]["creationTimestamp"],
stopping=spec.get("stopping", False),
paused_at=str_to_date(spec.get("pausedAt")),
timeout=spec.get("timeout") or 0,
max_crawl_size=int(spec.get("maxCrawlSize") or 0),
scheduled=spec.get("manual") != "1",
Expand Down Expand Up @@ -263,6 +274,27 @@ async def sync_crawls(self, data: MCSyncData):
else:
status.scale = 1

# stopping paused crawls
if crawl.paused_at:
stop_reason: Optional[StopReason] = None
state: Optional[TYPE_NON_RUNNING_STATES] = None
# Check if pause expiry limit is reached and if so, stop crawl
if dt_now() >= (crawl.paused_at + self.paused_expires_delta):
print(f"Paused crawl expiry reached, stopping crawl, id: {crawl.id}")
stop_reason = "stopped_pause_expired"
state = "stopped_pause_expired"

# Check if paused crawl was stopped manually
elif crawl.stopping:
print(f"Paused crawl stopped by user, id: {crawl.id}")
stop_reason = "stopped_by_user"
state = "stopped_by_user"

if stop_reason and state:
status.stopping = True
status.stopReason = stop_reason
await self.mark_finished(crawl, status, state)

children = self._load_redis(params, status, data.children)

storage_path = crawl.storage.get_storage_extra_path(oid)
Expand Down Expand Up @@ -326,7 +358,11 @@ async def sync_crawls(self, data: MCSyncData):
children.extend(await self._load_qa_configmap(params, data.children))

for i in range(0, status.scale):
children.extend(self._load_crawler(params, i, status, data.children))
children.extend(
self._load_crawler(
params, i, status, data.children, bool(crawl.paused_at)
)
)

return {
"status": status.dict(exclude_none=True),
Expand Down Expand Up @@ -430,7 +466,8 @@ async def _load_qa_configmap(self, params, children):
params["qa_source_replay_json"] = crawl_replay.json(include={"resources"})
return self.load_from_yaml("qa_configmap.yaml", params)

def _load_crawler(self, params, i, status: CrawlStatus, children):
# pylint: disable=too-many-arguments
def _load_crawler(self, params, i, status: CrawlStatus, children, paused: bool):
name = f"crawl-{params['id']}-{i}"
has_pod = name in children[POD]

Expand All @@ -456,12 +493,12 @@ def _load_crawler(self, params, i, status: CrawlStatus, children):
params["memory_limit"] = self.k8s.max_crawler_memory_size
params["storage"] = pod_info.newStorage or params.get("crawler_storage")
params["workers"] = params.get(worker_field) or 1
params["do_restart"] = False
if has_pod:
params["init_crawler"] = not paused
if has_pod and not paused:
restart_reason = pod_info.should_restart_pod(params.get("force_restart"))
if restart_reason:
print(f"Restarting {name}, reason: {restart_reason}")
params["do_restart"] = True
params["init_crawler"] = False

return self.load_from_yaml("crawler.yaml", params)

Expand Down Expand Up @@ -840,17 +877,43 @@ async def sync_crawl_state(
if status.anyCrawlPodNewExit:
await self.log_crashes(crawl.id, status.podStatus, redis)

if crawl.paused_at and redis:
for name in pods.keys():
pod_status = status.podStatus[name]
if (
pod_status.isNewExit
and pod_status.exitTime
and pod_status.reason == "done"
):
await redis.hset(f"{crawl.id}:status", name, "interrupted")

# remove stopping key (used for pause) unless actually stopping if:
# 1. no more crawler pods running (to allow easily to resume)
# 2. crawl has already been resumed, to allow pods to resume instantly
if (
not crawl.stopping
and redis
and status.stopReason == "paused"
and (not crawler_running or not crawl.paused_at)
):
await redis.delete(f"{crawl.id}:stopping")
await redis.delete("crawl-stop")

if not crawler_running or not redis:
# if either crawler is not running or redis is inaccessible
if not pod_done_count and self.should_mark_waiting(
status.state, crawl.started
):
# mark as waiting (if already running)
await self.set_state(
"waiting_capacity",
"waiting_capacity" if not crawl.paused_at else "paused",
status,
crawl,
allowed_from=RUNNING_AND_STARTING_ONLY,
allowed_from=(
RUNNING_AND_STARTING_ONLY
if not crawl.paused_at
else RUNNING_AND_WAITING_STATES
),
)

if not crawler_running and redis:
Expand All @@ -866,6 +929,7 @@ async def sync_crawl_state(
f"Pausing redis, no running crawler pods for >{REDIS_TTL} secs"
)
status.initRedis = False

elif crawler_running and not redis:
# if crawler is running, but no redis, init redis
status.initRedis = True
Expand All @@ -874,6 +938,8 @@ async def sync_crawl_state(
status.resync_after = self.fast_retry_secs
return status

# only get here if at least one crawler pod is running

# update lastActiveTime if crawler is running
if crawler_running:
status.lastActiveTime = date_to_str(dt_now())
Expand Down Expand Up @@ -1303,6 +1369,9 @@ async def is_crawl_stopping(
if crawl.stopping:
return "stopped_by_user"

if crawl.paused_at:
return "paused"

# check timeout if timeout time exceeds elapsed time
if crawl.timeout:
elapsed = status.elapsedCrawlTime
Expand Down Expand Up @@ -1424,18 +1493,27 @@ async def update_crawl_state(
f"Attempting to adjust storage to {pod_info.newStorage} for {key}"
)

# check if no longer paused, clear paused stopping state
if status.stopReason == "paused" and not crawl.paused_at:
status.stopReason = None
status.stopping = False

if not status.stopReason:
status.stopReason = await self.is_crawl_stopping(crawl, status, data)
status.stopping = status.stopReason is not None
if status.stopping:
print("Crawl gracefully stopping: {status.stopReason}, id: {crawl.id}")

# mark crawl as stopping
if status.stopping:
await redis.set(f"{crawl.id}:stopping", "1")
# backwards compatibility with older crawler
await redis.set("crawl-stop", "1")

if status.stopReason == "paused":
print(f"Crawl paused, id: {crawl.id}")
return status

print(f"Crawl gracefully stopping: {status.stopReason}, id: {crawl.id}")

# resolve scale
if crawl.scale != status.scale:
status.scale = await self._resolve_scale(
Expand Down
Loading
Loading