Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
HansVRP committed Jan 15, 2025
1 parent f4c5dad commit 789b83c
Showing 1 changed file with 25 additions and 25 deletions.
50 changes: 25 additions & 25 deletions openeo/extra/job_management/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
Optional,
Union,
)
import os

import numpy
import pandas as pd
import requests
Expand Down Expand Up @@ -492,7 +492,7 @@ def run_jobs(
# TODO: support user-provided `stats`
stats = collections.defaultdict(int)

while sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running", "downloading"]).values()) > 0:
while sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0:
self._job_update_loop(job_db=job_db, start_job=start_job, stats=stats)
stats["run_jobs loop"] += 1

Expand Down Expand Up @@ -523,7 +523,7 @@ def _job_update_loop(
not_started = job_db.get_by_status(statuses=["not_started"], max=200).copy()
if len(not_started) > 0:
# Check number of jobs running at each backend
running = job_db.get_by_status(statuses=["created", "queued", "running"]) #TODO I believe we need to get downloading out?
running = job_db.get_by_status(statuses=["created", "queued", "running"])
stats["job_db get_by_status"] += 1
per_backend = running.groupby("backend_name").size().to_dict()
_log.info(f"Running per backend: {per_backend}")
Expand Down Expand Up @@ -617,21 +617,32 @@ def on_job_done(self, job: BatchJob, row):
:param job: The job that has finished.
:param row: DataFrame row containing the job's metadata.
"""
_log.info(f"Job {job.job_id} completed. Preparing to handle completion.")

job_metadata = job.describe()
job_dir = self.get_job_dir(job.job_id)
metadata_path = self.get_job_metadata_path(job.job_id)
self.ensure_job_dir_exists(job.job_id)

# Start download in a separate thread
downloader = Thread(target=lambda: (
self._job_download(job, job_dir, row) # Invoke the download logic directly
))
downloader.start()

# Write the job metadata to a file
# Save metadata
_log.info(f"Saving metadata for job {job.job_id} to {metadata_path}")
with metadata_path.open("w", encoding="utf-8") as f:
json.dump(job_metadata, f, ensure_ascii=False)

# Define download logic inline
def download_task():
try:
_log.info(f"Starting download for job {job.job_id} to directory {job_dir}")
job.get_results().download_files(target=job_dir)
_log.info(f"Successfully downloaded job {job.job_id} results to {job_dir}")
except Exception as e:
_log.error(f"Error downloading job {job.job_id}: {e}")

# Start the download in a separate thread
_log.info(f"Starting download thread for job {job.job_id}")
downloader = Thread(target=download_task, daemon=True)
downloader.start()

def _job_download(self, job, job_dir, row):
"""
Download the job's results and update the job status after the download completes.
Expand Down Expand Up @@ -713,15 +724,14 @@ def ensure_job_dir_exists(self, job_id: str) -> Path:
if not job_dir.exists():
job_dir.mkdir(parents=True)


def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] = None):
"""
Tracks status (and stats) of running jobs (in place).
Optionally cancels jobs when running too long.
"""
stats = stats if stats is not None else collections.defaultdict(int)

active = job_db.get_by_status(statuses=["created", "queued", "running", "downloading"]).copy()
active = job_db.get_by_status(statuses=["created", "queued", "running"]).copy()
for i in active.index:
job_id = active.loc[i, "id"]
backend_name = active.loc[i, "backend_name"]
Expand All @@ -738,19 +748,9 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] =
f"Status of job {job_id!r} (on backend {backend_name}) is {new_status!r} (previously {previous_status!r})"
)


#---------------------------------------

if new_status == "finished" and previous_status != "downloading":
new_status = "downloading"
self.on_job_done(the_job, active.loc[i])

if previous_status == "downloading":
if self.get_job_metadata_path(job_id).exists():
new_status = "finished"
stats["job finished"] += 1
else:
new_status = "downloading"
if new_status == "finished":
stats["job finished"] += 1
self.on_job_done(the_job, active.loc[i])

if previous_status != "error" and new_status == "error":
stats["job failed"] += 1
Expand Down

0 comments on commit 789b83c

Please sign in to comment.