Skip to content

Implement custom storage for orgs #2093

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 69 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
39f5acb
Add back custom storage endpoints
tw4l Sep 20, 2024
814f4e4
Flush out tests for setting custom storage
tw4l Sep 20, 2024
da40394
Fix test issue with bucket not existing for now
tw4l Sep 20, 2024
fb13d7e
Add additional tests
tw4l Sep 23, 2024
f940601
Fix custom storage so it works as expected
tw4l Sep 24, 2024
8f8cb68
Actually unset custom replica storage before deleting
tw4l Sep 24, 2024
33393f6
Add TODO where custom storage deletion is failing
tw4l Sep 24, 2024
bb86c49
Fix check for whether storage label is in use
tw4l Sep 24, 2024
c45b209
Remove todo on endpoint that's fine
tw4l Sep 24, 2024
21b4867
Add todos re: tasks necessary to change storage
tw4l Sep 24, 2024
b60d7cc
Check that no crawls are running before updating storage
tw4l Sep 25, 2024
bd91ce4
Start adding post-storage update logic
tw4l Sep 25, 2024
b7feded
WIP: Add background job to copy old s3 bucket to new
tw4l Sep 25, 2024
b15c329
WIP: Start adding logic to handle replica location updates
tw4l Sep 25, 2024
311c011
Add additional note
tw4l Sep 25, 2024
d991900
Fix argument
tw4l Sep 25, 2024
933c043
Fix another argument
tw4l Sep 25, 2024
e77bfe6
Fixups
tw4l Sep 25, 2024
b4115a2
Fix linting
tw4l Sep 25, 2024
e1007ca
More linting fixes
tw4l Sep 25, 2024
eeaff5b
Refactor, seperate storage and replicas updates
tw4l Sep 26, 2024
84a5811
More refactoring
tw4l Sep 26, 2024
aede438
Make post-update task methods private
tw4l Sep 26, 2024
a0e6e2e
Check if any bg jobs running before changing storage
tw4l Sep 26, 2024
2826ee7
Check bg job finished as well
tw4l Sep 26, 2024
271f956
Fixups
tw4l Sep 26, 2024
66ed5db
Storage update improvements
tw4l Sep 26, 2024
c4772f7
Fixup
tw4l Sep 26, 2024
eef9951
Remove another todo
tw4l Sep 26, 2024
d3d0f49
More fixups
tw4l Sep 26, 2024
8cbf976
Add provider to s3storage for rclone
tw4l Sep 26, 2024
6625345
Fix typo
tw4l Sep 26, 2024
703e7d8
Make API endpoints that change storage superuser-only for now
tw4l Sep 30, 2024
11f6b8e
Add typing for init_storages_api, import Callable
tw4l Sep 30, 2024
3f12cb3
Fix StorageOps in operator main
tw4l Oct 1, 2024
94df418
Always use oid prefix in s3 storage
tw4l Oct 1, 2024
75f064a
Post-rebase fixups and remove create bucket fallback
tw4l Oct 10, 2024
989144b
Create extra test buckets in CI
tw4l Oct 15, 2024
c1e5f37
Add test for non-verified custom storage
tw4l Oct 15, 2024
ad6e061
Refactor to move updates to FastAPI background tasks
tw4l Oct 15, 2024
958a721
Include default replicas in /storage response if no org replicas
tw4l Oct 15, 2024
e8df4e4
Fix unsetting of presigned URLs
tw4l Oct 16, 2024
82c4ef5
Add --progress flag to rclone copy command
tw4l Oct 16, 2024
7946d1a
Increase ttl seconds after finished for testing on dev
tw4l Oct 17, 2024
cc93be6
Ensure there are no double slashes between bucket name and oid
tw4l Oct 17, 2024
da320f2
Increase memory limit/request for copy job to 500Mi
tw4l Oct 17, 2024
e1f3ebd
Reduce copy job ttlSecondsAfterFinished to 60
tw4l Oct 17, 2024
e7ae330
Add storage tag to API endpoints
tw4l Oct 17, 2024
7e074d0
Add flags to rclone to reduce memory usage, set limit to 350Mi
tw4l Oct 17, 2024
7497893
Fix positional operator in storage ref update
tw4l Oct 17, 2024
0c861e1
One more positional operator fix
tw4l Oct 17, 2024
12eb105
Update docstrings and comments
tw4l Oct 17, 2024
8d50dd0
Make all-storages response valid JSON with response model
tw4l Oct 17, 2024
9c391db
Add admin docs for storage
tw4l Oct 17, 2024
b824bab
Fix API endpoint path in docs example
tw4l Oct 17, 2024
f986efb
Docs typo fix
tw4l Oct 17, 2024
8c7133e
Add provider field note
tw4l Oct 17, 2024
23f3887
Docs language cleanup
tw4l Oct 17, 2024
a695d83
Check /all-storages in backend tests
tw4l Oct 17, 2024
e7958d3
Add API endpoint for background job progress
tw4l Oct 18, 2024
da14020
Fix linting
tw4l Oct 18, 2024
d17075f
Format post-rebase with Black
tw4l Dec 3, 2024
46172e3
Format with Black
tw4l Jan 24, 2025
698b692
Fix linting
tw4l Jan 24, 2025
b74017c
Fix linking
tw4l Feb 24, 2025
0d44854
Cast job to CopyBucketJob
tw4l Feb 24, 2025
d220318
Fix type errors from rebase
tw4l Feb 24, 2025
6eafcbf
Reformat
tw4l Apr 22, 2025
3a65fee
Fix linting
tw4l Apr 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/k3d-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ jobs:
- name: Wait for all pods to be ready
run: kubectl wait --for=condition=ready pod --all --timeout=240s

- name: Create Extra Test Buckets
run: |
kubectl exec -i deployment/local-minio -c minio -- mkdir /data/custom-primary &&
kubectl exec -i deployment/local-minio -c minio -- mkdir /data/custom-replica

- name: Run Tests
timeout-minutes: 30
run: pytest -vv ./backend/test/test_*.py
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/microk8s-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ jobs:
- name: Wait for all pods to be ready
run: sudo microk8s kubectl wait --for=condition=ready pod --all --timeout=240s

- name: Create Extra Test Buckets
run: |
kubectl exec -i deployment/local-minio -c minio -- mkdir /data/custom-primary &&
kubectl exec -i deployment/local-minio -c minio -- mkdir /data/custom-replica

- name: Run Tests
run: pytest -vv ./backend/test/test_*.py

Expand Down
181 changes: 169 additions & 12 deletions backend/btrixcloud/background_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,22 @@
from .models import (
BaseFile,
Organization,
BackgroundJob,
BgJobType,
CreateReplicaJob,
DeleteReplicaJob,
DeleteOrgJob,
RecalculateOrgStatsJob,
ReAddOrgPagesJob,
OptimizePagesJob,
CopyBucketJob,
PaginatedBackgroundJobResponse,
AnyJob,
StorageRef,
User,
SuccessResponse,
SuccessResponseId,
JobProgress,
BackgroundJob,
)
from .pagination import DEFAULT_PAGE_SIZE, paginated_format
from .utils import dt_now
Expand All @@ -43,7 +45,7 @@


# ============================================================================
# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-instance-attributes, too-many-lines, too-many-return-statements, too-many-public-methods
class BackgroundJobOps:
"""k8s background job management"""

Expand All @@ -56,7 +58,7 @@ class BackgroundJobOps:

migration_jobs_scale: int

# pylint: disable=too-many-locals, too-many-arguments, invalid-name
# pylint: disable=too-many-locals, too-many-arguments, too-many-positional-arguments, invalid-name

def __init__(self, mdb, email, user_manager, org_ops, crawl_manager, storage_ops):
self.jobs = mdb["jobs"]
Expand Down Expand Up @@ -302,7 +304,7 @@ async def create_delete_org_job(
self,
org: Organization,
existing_job_id: Optional[str] = None,
) -> Optional[str]:
) -> str:
"""Create background job to delete org and its data"""

try:
Expand Down Expand Up @@ -339,7 +341,7 @@ async def create_delete_org_job(
except Exception as exc:
# pylint: disable=raise-missing-from
print(f"warning: delete org job could not be started: {exc}")
return None
return ""

async def create_recalculate_org_stats_job(
self,
Expand Down Expand Up @@ -473,6 +475,73 @@ async def create_optimize_crawl_pages_job(
print(f"warning: optimize pages job could not be started: {exc}")
return None

async def create_copy_bucket_job(
self,
org: Organization,
prev_storage_ref: StorageRef,
new_storage_ref: StorageRef,
existing_job_id: Optional[str] = None,
) -> str:
"""Start background job to copy entire s3 bucket and return job id"""
prev_storage = self.storage_ops.get_org_storage_by_ref(org, prev_storage_ref)
prev_endpoint, prev_bucket = self.strip_bucket(prev_storage.endpoint_url)

new_storage = self.storage_ops.get_org_storage_by_ref(org, new_storage_ref)
new_endpoint, new_bucket = self.strip_bucket(new_storage.endpoint_url)

# Ensure buckets terminate with trailing slash
prev_bucket = os.path.join(prev_bucket, "")
new_bucket = os.path.join(new_bucket, "")

job_type = BgJobType.COPY_BUCKET.value

try:
job_id = await self.crawl_manager.run_copy_bucket_job(
oid=str(org.id),
job_type=job_type,
prev_storage=prev_storage_ref,
prev_endpoint=prev_endpoint,
prev_bucket=prev_bucket,
new_storage=new_storage_ref,
new_endpoint=new_endpoint,
new_bucket=new_bucket,
job_id_prefix=f"{job_type}-{org.id}",
existing_job_id=existing_job_id,
)
if existing_job_id:
copy_job = await self.get_background_job(existing_job_id, org.id)
previous_attempt = {
"started": copy_job.started,
"finished": copy_job.finished,
}
if copy_job.previousAttempts:
copy_job.previousAttempts.append(previous_attempt)
else:
copy_job.previousAttempts = [previous_attempt]
copy_job.started = dt_now()
copy_job.finished = None
copy_job.success = None
else:
copy_job = CopyBucketJob(
id=job_id,
oid=org.id,
started=dt_now(),
prev_storage=prev_storage_ref,
new_storage=new_storage_ref,
)

await self.jobs.find_one_and_update(
{"_id": job_id}, {"$set": copy_job.to_dict()}, upsert=True
)

return job_id
# pylint: disable=broad-exception-caught
except Exception as exc:
print(
f"warning: copy bucket job could not be started for org {org.id}: {exc}"
)
return ""

async def job_finished(
self,
job_id: str,
Expand All @@ -498,6 +567,9 @@ async def job_finished(
await self.handle_delete_replica_job_finished(
cast(DeleteReplicaJob, job)
)
if job_type == BgJobType.COPY_BUCKET and job.oid:
org = await self.org_ops.get_org_by_id(job.oid)
await self.org_ops.update_read_only(org, False)
else:
print(
f"Background job {job.id} failed, sending email to superuser",
Expand Down Expand Up @@ -528,6 +600,9 @@ async def get_background_job(
DeleteReplicaJob,
DeleteOrgJob,
RecalculateOrgStatsJob,
CopyBucketJob,
DeleteOrgJob,
RecalculateOrgStatsJob,
ReAddOrgPagesJob,
OptimizePagesJob,
]:
Expand All @@ -544,33 +619,84 @@ async def get_background_job(

def _get_job_by_type_from_data(self, data: dict[str, object]):
"""convert dict to propert background job type"""
if data["type"] == BgJobType.CREATE_REPLICA:
if data["type"] == BgJobType.CREATE_REPLICA.value:
return CreateReplicaJob.from_dict(data)

if data["type"] == BgJobType.DELETE_REPLICA:
if data["type"] == BgJobType.DELETE_REPLICA.value:
return DeleteReplicaJob.from_dict(data)

if data["type"] == BgJobType.RECALCULATE_ORG_STATS:
if data["type"] == BgJobType.RECALCULATE_ORG_STATS.value:
return RecalculateOrgStatsJob.from_dict(data)

if data["type"] == BgJobType.READD_ORG_PAGES:
if data["type"] == BgJobType.READD_ORG_PAGES.value:
return ReAddOrgPagesJob.from_dict(data)

if data["type"] == BgJobType.OPTIMIZE_PAGES:
if data["type"] == BgJobType.OPTIMIZE_PAGES.value:
return OptimizePagesJob.from_dict(data)

if data["type"] == BgJobType.COPY_BUCKET.value:
return CopyBucketJob.from_dict(data)

return DeleteOrgJob.from_dict(data)

async def get_job_progress(self, job_id: str) -> JobProgress:
"""Return progress of background job for supported types"""
job = await self.get_background_job(job_id)

if job.type != BgJobType.COPY_BUCKET:
raise HTTPException(status_code=403, detail="job_type_not_supported")

if job.success is False:
raise HTTPException(status_code=400, detail="job_failed")

if job.finished:
return JobProgress(percentage=1.0)

log_tail = await self.crawl_manager.tail_background_job(job_id)
if not log_tail:
raise HTTPException(status_code=400, detail="job_log_not_available")

lines = log_tail.splitlines()
reversed_lines = list(reversed(lines))

progress = JobProgress(percentage=0.0)

# Parse lines in reverse order until we find one with latest stats
for line in reversed_lines:
try:
if "ETA" not in line:
continue

stats_groups = line.split(",")
for group in stats_groups:
group = group.strip()
if "%" in group:
progress.percentage = float(group.strip("%")) / 100
if "ETA" in group:
eta_str = group.strip("ETA ")
# Split on white space to remove byte mark rclone sometimes
# adds to end of stats line
eta_list = eta_str.split(" ")
progress.eta = eta_list[0]

break
# pylint: disable=bare-except
except:
continue

return progress

async def list_background_jobs(
self,
org: Optional[Organization] = None,
page_size: int = DEFAULT_PAGE_SIZE,
page: int = 1,
success: Optional[bool] = None,
running: Optional[bool] = None,
job_type: Optional[str] = None,
sort_by: Optional[str] = None,
sort_direction: Optional[int] = -1,
) -> Tuple[List[BackgroundJob], int]:
) -> Tuple[List[Union[CreateReplicaJob, DeleteReplicaJob, CopyBucketJob]], int]:
"""List all background jobs"""
# pylint: disable=duplicate-code
# Zero-index page for query
Expand All @@ -585,6 +711,12 @@ async def list_background_jobs(
if success in (True, False):
query["success"] = success

if running:
query["success"] = None

if running is False:
query["success"] = {"$in": [True, False]}

if job_type:
query["type"] = job_type

Expand Down Expand Up @@ -676,6 +808,7 @@ async def retry_org_background_job(
self, job: BackgroundJob, org: Organization
) -> Dict[str, Union[bool, Optional[str]]]:
"""Retry background job specific to one org"""
# pylint: disable=too-many-return-statements
if job.type == BgJobType.CREATE_REPLICA:
job = cast(CreateReplicaJob, job)
file = await self.get_replica_job_file(job, org)
Expand Down Expand Up @@ -736,6 +869,16 @@ async def retry_org_background_job(
)
return {"success": True}

if job.type == BgJobType.COPY_BUCKET:
job = cast(CopyBucketJob, job)
await self.create_copy_bucket_job(
org,
job.prev_storage,
job.new_storage,
existing_job_id=job.id,
)
return {"success": True}

return {"success": False}

async def retry_failed_org_background_jobs(
Expand Down Expand Up @@ -773,7 +916,7 @@ async def retry_all_failed_background_jobs(


# ============================================================================
# pylint: disable=too-many-arguments, too-many-locals, invalid-name, fixme
# pylint: disable=too-many-arguments, too-many-locals, invalid-name, fixme, too-many-positional-arguments
def init_background_jobs_api(
app, mdb, email, user_manager, org_ops, crawl_manager, storage_ops, user_dep
):
Expand All @@ -800,6 +943,18 @@ async def get_org_background_job(
"""Retrieve information for background job"""
return await ops.get_background_job(job_id, org.id)

@router.get(
"/{job_id}/progress",
response_model=JobProgress,
)
async def get_job_progress(
job_id: str,
# pylint: disable=unused-argument
org: Organization = Depends(org_crawl_dep),
):
"""Return progress information for background job"""
return await ops.get_job_progress(job_id)

@app.get("/orgs/all/jobs/{job_id}", response_model=AnyJob, tags=["jobs"])
async def get_background_job_all_orgs(job_id: str, user: User = Depends(user_dep)):
"""Get background job from any org"""
Expand Down Expand Up @@ -894,6 +1049,7 @@ async def list_background_jobs(
pageSize: int = DEFAULT_PAGE_SIZE,
page: int = 1,
success: Optional[bool] = None,
running: Optional[bool] = None,
jobType: Optional[str] = None,
sortBy: Optional[str] = None,
sortDirection: Optional[int] = -1,
Expand All @@ -904,6 +1060,7 @@ async def list_background_jobs(
page_size=pageSize,
page=page,
success=success,
running=running,
job_type=jobType,
sort_by=sortBy,
sort_direction=sortDirection,
Expand Down
2 changes: 1 addition & 1 deletion backend/btrixcloud/basecrawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@


# ============================================================================
# pylint: disable=too-many-instance-attributes, too-many-public-methods, too-many-lines, too-many-branches
# pylint: disable=too-many-instance-attributes, too-many-public-methods, too-many-lines, too-many-branches, too-many-positional-arguments
class BaseCrawlOps:
"""operations that apply to all crawls"""

Expand Down
1 change: 1 addition & 0 deletions backend/btrixcloud/colls.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@


# ============================================================================
# pylint: disable=too-many-positional-arguments
class CollectionOps:
"""ops for working with named collections of crawls"""

Expand Down
4 changes: 2 additions & 2 deletions backend/btrixcloud/crawlconfigs.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
class CrawlConfigOps:
"""Crawl Config Operations"""

# pylint: disable=too-many-arguments, too-many-instance-attributes, too-many-public-methods
# pylint: disable=too-many-arguments, too-many-instance-attributes, too-many-public-methods, too-many-positional-arguments

user_manager: UserManager
org_ops: OrgOps
Expand Down Expand Up @@ -1265,7 +1265,7 @@ async def stats_recompute_all(crawl_configs, crawls, cid: UUID):


# ============================================================================
# pylint: disable=redefined-builtin,invalid-name,too-many-locals,too-many-arguments
# pylint: disable=redefined-builtin,invalid-name,too-many-locals,too-many-arguments,too-many-positional-arguments
def init_crawl_config_api(
app,
dbclient,
Expand Down
Loading
Loading