Skip to content

Commit

Permalink
Merge pull request #69 from Open-EO/58-threadpool
Browse files Browse the repository at this point in the history
ThreadPoolExecutor replacing job queue
  • Loading branch information
GriffinBabe authored Mar 20, 2024
2 parents 018fd0c + 59f61c3 commit 5a044cf
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 199 deletions.
219 changes: 57 additions & 162 deletions examples/extraction_pipelines/S2_extraction_example.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
"source": [
"from pathlib import Path\n",
"import geopandas as gpd\n",
"from openeo_gfmap.manager.job_splitters import split_job_hex, split_job_s2grid, _append_h3_index\n",
"from openeo_gfmap.manager.job_splitters import split_job_s2grid, _append_h3_index\n",
"\n",
"base_df_path = \"https://artifactory.vgt.vito.be/artifactory/auxdata-public/gfmap/DEMO_CROPTYPE.gpkg\"\n",
"base_df = gpd.read_file(base_df_path)\n",
Expand Down Expand Up @@ -648,160 +648,6 @@
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>epsg</th>\n",
" <th>tile</th>\n",
" <th>bounds</th>\n",
" <th>geometry</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>32601</td>\n",
" <td>01NAA</td>\n",
" <td>(99960.0, -9780.0, 209760.0, 100020.0)</td>\n",
" <td>MULTIPOLYGON (((-179.60773 0.90397, -179.60741...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>32601</td>\n",
" <td>01NAB</td>\n",
" <td>(99960.0, 90240.0, 209760.0, 200040.0)</td>\n",
" <td>MULTIPOLYGON (((179.40554 1.80624, 180.00000 1...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>32601</td>\n",
" <td>01NBA</td>\n",
" <td>(199980.0, -9780.0, 309780.0, 100020.0)</td>\n",
" <td>POLYGON ((-179.69554 0.90390, -178.70942 0.904...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>32601</td>\n",
" <td>01NFA</td>\n",
" <td>(600000.0, -9780.0, 709800.0, 100020.0)</td>\n",
" <td>POLYGON ((-176.10125 0.90480, -175.11469 0.904...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>32601</td>\n",
" <td>01NGA</td>\n",
" <td>(699960.0, -9780.0, 809760.0, 100020.0)</td>\n",
" <td>POLYGON ((-175.20308 0.90446, -174.21702 0.903...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>...</th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>31092</th>\n",
" <td>32760</td>\n",
" <td>60MYD</td>\n",
" <td>(699960.0, 9790240.0, 809760.0, 9900040.0)</td>\n",
" <td>POLYGON ((178.79692 -0.90392, 179.78298 -0.903...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>31093</th>\n",
" <td>32760</td>\n",
" <td>60MYE</td>\n",
" <td>(699960.0, 9890200.0, 809760.0, 10000000.0)</td>\n",
" <td>POLYGON ((178.79669 0.00000, 179.78263 0.00000...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>31094</th>\n",
" <td>32760</td>\n",
" <td>60MYS</td>\n",
" <td>(699960.0, 9090220.0, 809760.0, 9200020.0)</td>\n",
" <td>POLYGON ((178.81102 -7.23368, 179.80480 -7.228...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>31095</th>\n",
" <td>32760</td>\n",
" <td>60MYT</td>\n",
" <td>(699960.0, 9190240.0, 809760.0, 9300040.0)</td>\n",
" <td>POLYGON ((178.80764 -6.32935, 179.79957 -6.324...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>31096</th>\n",
" <td>32760</td>\n",
" <td>60MYV</td>\n",
" <td>(699960.0, 9390220.0, 809760.0, 9500020.0)</td>\n",
" <td>POLYGON ((178.80227 -4.52114, 179.79126 -4.518...</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"<p>31097 rows × 4 columns</p>\n",
"</div>"
],
"text/plain": [
" epsg tile bounds \\\n",
"0 32601 01NAA (99960.0, -9780.0, 209760.0, 100020.0) \n",
"1 32601 01NAB (99960.0, 90240.0, 209760.0, 200040.0) \n",
"2 32601 01NBA (199980.0, -9780.0, 309780.0, 100020.0) \n",
"3 32601 01NFA (600000.0, -9780.0, 709800.0, 100020.0) \n",
"4 32601 01NGA (699960.0, -9780.0, 809760.0, 100020.0) \n",
"... ... ... ... \n",
"31092 32760 60MYD (699960.0, 9790240.0, 809760.0, 9900040.0) \n",
"31093 32760 60MYE (699960.0, 9890200.0, 809760.0, 10000000.0) \n",
"31094 32760 60MYS (699960.0, 9090220.0, 809760.0, 9200020.0) \n",
"31095 32760 60MYT (699960.0, 9190240.0, 809760.0, 9300040.0) \n",
"31096 32760 60MYV (699960.0, 9390220.0, 809760.0, 9500020.0) \n",
"\n",
" geometry \n",
"0 MULTIPOLYGON (((-179.60773 0.90397, -179.60741... \n",
"1 MULTIPOLYGON (((179.40554 1.80624, 180.00000 1... \n",
"2 POLYGON ((-179.69554 0.90390, -178.70942 0.904... \n",
"3 POLYGON ((-176.10125 0.90480, -175.11469 0.904... \n",
"4 POLYGON ((-175.20308 0.90446, -174.21702 0.903... \n",
"... ... \n",
"31092 POLYGON ((178.79692 -0.90392, 179.78298 -0.903... \n",
"31093 POLYGON ((178.79669 0.00000, 179.78263 0.00000... \n",
"31094 POLYGON ((178.81102 -7.23368, 179.80480 -7.228... \n",
"31095 POLYGON ((178.80764 -6.32935, 179.79957 -6.324... \n",
"31096 POLYGON ((178.80227 -4.52114, 179.79126 -4.518... \n",
"\n",
"[31097 rows x 4 columns]"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"s2_grid"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"from pathlib import Path\n",
Expand Down Expand Up @@ -842,7 +688,7 @@
},
{
"cell_type": "code",
"execution_count": 17,
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
Expand Down Expand Up @@ -971,16 +817,25 @@
},
{
"cell_type": "code",
"execution_count": 11,
"execution_count": 39,
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"The autoreload extension is already loaded. To reload it, use:\n",
" %reload_ext autoreload\n"
]
}
],
"source": [
"%load_ext autoreload"
]
},
{
"cell_type": "code",
"execution_count": 15,
"execution_count": 44,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -989,7 +844,7 @@
"from openeo_gfmap.backend import cdse_staging_connection\n",
"\n",
"\n",
"base_output_dir = Path('/data/users/Public/couchard/world_cereal/extractions_bys2tile_2/')\n",
"base_output_dir = Path('/data/users/Public/couchard/world_cereal/extractions_bys2tile_4/')\n",
"tracking_job_csv = base_output_dir / 'job_tracker.csv'\n",
"\n",
"manager = GFMAPJobManager(\n",
Expand All @@ -1012,9 +867,49 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 45,
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-20 11:32:22,783|openeo_gfmap.manager|INFO: Starting ThreadPoolExecutor with 2 workers.\n",
"2024-03-20 11:32:22,784|openeo_gfmap.manager|INFO: Creating and running jobs.\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-20 11:32:22,790|openeo_gfmap.manager|DEBUG: Normalizing dataframe. Columns: Index(['backend_name', 'out_prefix', 'out_extension', 'start_date', 'end_date',\n",
" 's2_tile', 'h3index', 'geometry', 'nb_polygons', 'status', 'id',\n",
" 'start_time', 'cpu', 'memory', 'duration', 'description', 'costs'],\n",
" dtype='object')\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Authenticated using refresh token.\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-20 11:32:24,137|openeo_gfmap.manager|DEBUG: Status of job j-24032058198e49aebbde5f6041ffa3d7 is finished (on backend cdse-staging).\n",
"2024-03-20 11:32:24,138|openeo_gfmap.manager|INFO: Job j-24032058198e49aebbde5f6041ffa3d7 finished successfully, queueing on_job_done...\n",
"2024-03-20 11:32:24,662|openeo_gfmap.manager|DEBUG: Generating output path for asset openEO_0.nc from job j-24032058198e49aebbde5f6041ffa3d7...\n",
"2024-03-20 11:32:28,082|openeo_gfmap.manager|DEBUG: Downloaded openEO_0.nc from job j-24032058198e49aebbde5f6041ffa3d7 -> /data/users/Public/couchard/world_cereal/extractions_bys2tile_4/2021_EUR_DEMO_POLY_110/83194dfffffffff/2021_BE_Flanders_full_2195082011/S2-L2A-10m_2021_BE_Flanders_full_2195082011_32631_2020-08-30_2022-03-03.nc\n",
"2024-03-20 11:32:29,939|openeo_gfmap.manager|INFO: Parsed item j-24032058198e49aebbde5f6041ffa3d7_openEO_0.nc from job j-24032058198e49aebbde5f6041ffa3d7\n",
"2024-03-20 11:32:29,941|openeo_gfmap.manager|DEBUG: Calling post job action for job j-24032058198e49aebbde5f6041ffa3d7...\n",
"2024-03-20 11:32:30,806|openeo_gfmap.manager|INFO: Added 1 items to the STAC collection.\n",
"2024-03-20 11:32:30,807|openeo_gfmap.manager|INFO: Job j-24032058198e49aebbde5f6041ffa3d7 and post job action finished successfully.\n"
]
}
],
"source": [
"# Run the jobs and create the STAC catalogue\n",
"manager.run_jobs(job_df, create_datacube_s2, tracking_job_csv)\n",
Expand Down
58 changes: 21 additions & 37 deletions src/openeo_gfmap/manager/job_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import json
import queue
import threading
from concurrent.futures import ThreadPoolExecutor
from enum import Enum
from pathlib import Path
from typing import Callable, Optional, Union
Expand Down Expand Up @@ -39,10 +38,9 @@ def __init__(
self._output_dir = output_dir

# Setup the threads to work on the on_job_done and on_job_error methods
self._finished_job_queue = queue.Queue()
self._n_threads = n_threads

self._threads = []
self._executor = None # Will be set in run_jobs, is a threadpool executor
self._futures = []

self._output_path_gen = output_path_generator
self._post_job_action = post_job_action
Expand All @@ -60,28 +58,6 @@ def __init__(
extent=None,
)

def _post_job_worker(self):
"""Checks which jobs are finished or failed and calls the `on_job_done` or `on_job_error`
methods."""
while True:
try:
status, job, row = self._finished_job_queue.get(timeout=1)
_log.debug(
f"Worker thread {threading.current_thread().name}: polled finished job with status {status}."
)
if status == PostJobStatus.ERROR:
self.on_job_error(job, row)
elif status == PostJobStatus.FINISHED:
self.on_job_done(job, row)
else:
raise ValueError(f"Unknown status: {status}")
self._finished_job_queue.task_done()
except queue.Empty:
continue
except KeyboardInterrupt:
_log.debug(f"Worker thread {threading.current_thread().name} interrupted.")
return

def _update_statuses(self, df: pd.DataFrame):
"""Updates the statues of the jobs in the dataframe from the backend. If a job is finished
or failed, it will be queued to the `on_job_done` or `on_job_error` methods.
Expand All @@ -105,17 +81,27 @@ def _update_statuses(self, df: pd.DataFrame):
job_metadata["status"] == "finished"
):
_log.info(f"Job {job.job_id} finished successfully, queueing on_job_done...")
self._finished_job_queue.put((PostJobStatus.FINISHED, job, row))
self._futures.append(self._executor.submit(self.on_job_done, job, row))
df.loc[idx, "costs"] = job_metadata["costs"]

# Case in which it failed
if (df.loc[idx, "status"] != "error") and (job_metadata["status"] == "error"):
_log.info(f"Job {job.job_id} finished with error, queueing on_job_error...")
self._finished_job_queue.put((PostJobStatus.ERROR, job, row))
self._futures.append(self._executor.submit(self.on_job_error, self, job, row))
df.loc[idx, "costs"] = job_metadata["costs"]

df.loc[idx, "status"] = job_status

futures_to_clear = []
for future in self._futures:
if future.done():
exception = future.exception(timeout=1.0)
if exception:
raise exception
futures_to_clear.append(future)
for future in futures_to_clear:
self._futures.remove(future)

def on_job_error(self, job: BatchJob, row: pd.Series):
"""Method called when a job finishes with an error.
Expand Down Expand Up @@ -255,15 +241,13 @@ def run_jobs(self, df: pd.DataFrame, start_job: Callable, output_file: Union[str
output_file: Union[str, Path]
The file to track the results of the jobs.
"""
_log.info(f"Starting job manager using {self._n_threads} worker threads.")
# Starts the thread pool to work on the on_job_done and on_job_error methods
for _ in range(self._n_threads):
thread = threading.Thread(target=self._post_job_worker)
thread.start()
self._threads.append(thread)

_log.info("Workers started, creating and running jobs.")
super().run_jobs(df, start_job, output_file)
_log.info(f"Starting ThreadPoolExecutor with {self._n_threads} workers.")
with ThreadPoolExecutor(max_workers=self._n_threads) as executor:
_log.info("Creating and running jobs.")
self._executor = executor
super().run_jobs(df, start_job, output_file)
self._executor = None

def create_stac(self, output_path: Optional[Union[str, Path]] = None):
"""Method to be called after run_jobs to create a STAC catalog
Expand Down

0 comments on commit 5a044cf

Please sign in to comment.