Skip to content

Commit

Permalink
Fix InvocationClient.get_invocation_biocompute_object() method on…
Browse files Browse the repository at this point in the history
… upcoming Galaxy 24.1

xref: galaxyproject/galaxy#16645

Also:
- Set `enable_celery_tasks` for tests (needed for BioCompute objects export
  in Galaxy 24.1)
- Add ``wait`` parameter to ``HistoryClient.delete_dataset()`` and
  ``HistoryDatasetAssociation.delete()`` methods (needed for testing purged
  datasets when Celery tasks are enabled).
- Refactor methods that wait for something to make use of a new generic
  ``bioblend.wait_on()`` function. They now all raise ``TimeoutException``.
  • Loading branch information
nsoranzo committed May 1, 2024
1 parent 015f229 commit 7d46e88
Show file tree
Hide file tree
Showing 15 changed files with 182 additions and 126 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
* Dropped support for Python 3.7. Added support for Python 3.12. Added support
for Galaxy releases 23.2 and 24.0.

* Added ``wait`` parameter to ``HistoryClient.delete_dataset()`` and
``HistoryDatasetAssociation.delete()`` methods.

* Dropped broken ``deleted`` parameter of ``DatasetClient.show_dataset()``.

* Parameters after ``password`` in the ``__init__()`` method of the
Expand All @@ -12,8 +15,14 @@
* Classes defined in ``bioblend.galaxy.objects.wrappers`` are no more
re-exported by ``bioblend.galaxy.objects``.

* ``DatasetTimeoutException`` and ``DatasetCollectionTimeoutException`` are now
aliases for ``TimeoutException`` instead of subclasses.

* Added support for the new "cancelling" invocation state.

* Fixed ``InvocationClient.get_invocation_biocompute_object()`` method on
upcoming Galaxy 24.1 .

### BioBlend v1.2.0 - 2023-06-30

* Dropped support for Galaxy releases 17.09-19.01. Added support for Galaxy
Expand Down
38 changes: 38 additions & 0 deletions bioblend/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
import logging
import logging.config
import os
import time
from typing import (
Callable,
Optional,
TypeVar,
Union,
)

Expand Down Expand Up @@ -116,3 +119,38 @@ def __str__(self) -> str:

class TimeoutException(Exception):
pass


class NotReady(Exception):
pass


T = TypeVar("T")


def wait_on(func: Callable[[], T], maxwait: float = 60, interval: float = 3) -> T:
"""
Wait until a function returns without raising a NotReady exception
:param func: function to wait on. It should accept no parameters.
:param maxwait: Total time (in seconds) to wait for the function to return
without raising a NotReady exception. After this time, a
``TimeoutException`` will be raised.
:param interval: Time (in seconds) to wait between 2 consecutive checks.
"""
assert maxwait >= 0
assert interval > 0

time_left = maxwait
while True:
try:
return func()
except NotReady as e:
if time_left > 0:
log.info("%s. Will wait %s more s", e, time_left)
time.sleep(min(time_left, interval))
time_left -= interval
else:
raise TimeoutException(f"{e} after {maxwait} s")
11 changes: 1 addition & 10 deletions bioblend/_tests/TestGalaxyDatasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ def setUp(self):
def tearDown(self):
self.gi.histories.delete_history(self.history_id, purge=True)

@test_util.skip_unless_galaxy("release_19.05")
def test_show_nonexistent_dataset(self):
with pytest.raises(ConnectionError):
self.gi.datasets.show_dataset("nonexistent_id")
Expand Down Expand Up @@ -65,25 +64,21 @@ def test_download_dataset(self):
f.flush()
assert f.read() == expected_contents

@test_util.skip_unless_galaxy("release_19.05")
def test_get_datasets(self):
datasets = self.gi.datasets.get_datasets()
dataset_ids = [dataset["id"] for dataset in datasets]
assert self.dataset_id in dataset_ids

@test_util.skip_unless_galaxy("release_19.05")
def test_get_datasets_history(self):
datasets = self.gi.datasets.get_datasets(history_id=self.history_id)
assert len(datasets) == 1

@test_util.skip_unless_galaxy("release_19.05")
def test_get_datasets_limit_offset(self):
datasets = self.gi.datasets.get_datasets(limit=1)
assert len(datasets) == 1
datasets = self.gi.datasets.get_datasets(history_id=self.history_id, offset=1)
assert datasets == []

@test_util.skip_unless_galaxy("release_19.05")
def test_get_datasets_name(self):
datasets = self.gi.datasets.get_datasets(history_id=self.history_id, name="Pasted Entry")
assert len(datasets) == 1
Expand Down Expand Up @@ -143,7 +138,6 @@ def test_get_datasets_visible(self):
datasets = self.gi.datasets.get_datasets(history_id=self.history_id, visible=False)
assert len(datasets) == 0

@test_util.skip_unless_galaxy("release_19.05")
def test_get_datasets_ordering(self):
self.dataset_id2 = self._test_dataset(self.history_id, contents=self.dataset_contents)
self.gi.datasets.wait_for_dataset(self.dataset_id2)
Expand All @@ -156,7 +150,6 @@ def test_get_datasets_ordering(self):
datasets = self.gi.datasets.get_datasets(history_id=self.history_id, order="hid-asc")
assert datasets[0]["id"] == self.dataset_id

@test_util.skip_unless_galaxy("release_19.05")
def test_get_datasets_deleted(self):
deleted_datasets = self.gi.datasets.get_datasets(history_id=self.history_id, deleted=True)
assert deleted_datasets == []
Expand All @@ -165,11 +158,10 @@ def test_get_datasets_deleted(self):
assert len(deleted_datasets) == 1
purged_datasets = self.gi.datasets.get_datasets(history_id=self.history_id, purged=True)
assert purged_datasets == []
self.gi.histories.delete_dataset(self.history_id, self.dataset_id, purge=True)
self.gi.histories.delete_dataset(self.history_id, self.dataset_id, purge=True, wait=True)
purged_datasets = self.gi.datasets.get_datasets(history_id=self.history_id, purged=True)
assert len(purged_datasets) == 1

@test_util.skip_unless_galaxy("release_19.05")
def test_get_datasets_tool_id_and_tag(self):
cat1_datasets = self.gi.datasets.get_datasets(history_id=self.history_id, tool_id="cat1")
assert cat1_datasets == []
Expand All @@ -189,7 +181,6 @@ def test_wait_for_dataset(self):

self.gi.histories.delete_history(history_id, purge=True)

@test_util.skip_unless_galaxy("release_19.05")
def test_dataset_permissions(self):
admin_user_id = self.gi.users.get_current_user()["id"]
username = test_util.random_string()
Expand Down
2 changes: 1 addition & 1 deletion bioblend/_tests/TestGalaxyHistories.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ def test_delete_dataset(self):
def test_purge_dataset(self):
history_id = self.history["id"]
dataset1_id = self._test_dataset(history_id)
self.gi.histories.delete_dataset(history_id, dataset1_id, purge=True)
self.gi.histories.delete_dataset(history_id, dataset1_id, purge=True, wait=True)
dataset = self.gi.histories.show_dataset(history_id, dataset1_id)
assert dataset["deleted"]
assert dataset["purged"]
Expand Down
1 change: 0 additions & 1 deletion bioblend/_tests/TestGalaxyJobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ def test_rerun_and_remap(self):
assert last_dataset["id"] == history_contents[2]["id"]
self._wait_and_verify_dataset(last_dataset["id"], b"line 1\tline 1\n")

@test_util.skip_unless_galaxy("release_19.05")
@test_util.skip_unless_tool("random_lines1")
def test_get_common_problems(self):
job_id = self._run_tool()["jobs"][0]["id"]
Expand Down
2 changes: 1 addition & 1 deletion bioblend/_tests/TestGalaxyObjects.py
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,7 @@ def test_dataset_delete(self):
assert not self.ds.purged

def test_dataset_purge(self):
self.ds.delete(purge=True)
self.ds.delete(purge=True, wait=True)
assert self.ds.deleted
assert self.ds.purged

Expand Down
1 change: 0 additions & 1 deletion bioblend/_tests/TestGalaxyTools.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ def test_run_cat1(self):
# TODO: Wait for results and verify it has 3 lines - 1 2 3, 4 5 6,
# and 7 8 9.

@test_util.skip_unless_galaxy("release_19.05")
@test_util.skip_unless_tool("CONVERTER_fasta_to_bowtie_color_index")
def test_tool_dependency_install(self):
installed_dependencies = self.gi.tools.install_dependencies("CONVERTER_fasta_to_bowtie_color_index")
Expand Down
1 change: 1 addition & 0 deletions bioblend/_tests/template_galaxy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ galaxy:
master_api_key: $BIOBLEND_GALAXY_MASTER_API_KEY
enable_quotas: true
cleanup_job: onsuccess
enable_celery_tasks: true
36 changes: 12 additions & 24 deletions bioblend/galaxy/dataset_collections/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
import time
from typing import (
Any,
Dict,
Expand All @@ -11,7 +10,9 @@

from bioblend import (
CHUNK_SIZE,
NotReady,
TimeoutException,
wait_on,
)
from bioblend.galaxy.client import Client
from bioblend.galaxy.datasets import TERMINAL_STATES
Expand Down Expand Up @@ -176,9 +177,8 @@ def wait_for_dataset_collection(
:type maxwait: float
:param maxwait: Total time (in seconds) to wait for the dataset
states in the dataset collection to become terminal. If not
all datasets are in a terminal state within this time, a
``DatasetCollectionTimeoutException`` will be raised.
states in the dataset collection to become terminal. After this time,
a ``TimeoutException`` will be raised.
:type interval: float
:param interval: Time (in seconds) to wait between two consecutive checks.
Expand All @@ -200,12 +200,9 @@ def wait_for_dataset_collection(
:rtype: dict
:return: Details of the given dataset collection.
"""
assert maxwait >= 0
assert interval > 0
assert 0 <= proportion_complete <= 1

time_left = maxwait
while True:
def check_and_get_dataset_collection() -> Dict[str, Any]:
dataset_collection = self.show_dataset_collection(dataset_collection_id)
states = [elem["object"]["state"] for elem in dataset_collection["elements"]]
terminal_states = [state for state in states if state in TERMINAL_STATES]
Expand All @@ -217,24 +214,15 @@ def wait_for_dataset_collection(
proportion = len(terminal_states) / len(states)
if proportion >= proportion_complete:
return dataset_collection
if time_left > 0:
log.info(
"The dataset collection %s has %s out of %s datasets in a terminal state. Will wait %s more s",
dataset_collection_id,
len(terminal_states),
len(states),
time_left,
)
time.sleep(min(time_left, interval))
time_left -= interval
else:
raise DatasetCollectionTimeoutException(
f"Less than {proportion_complete * 100}% of datasets in the dataset collection is in a terminal state after {maxwait} s"
)
raise NotReady(
f"The dataset collection {dataset_collection_id} has only {proportion * 100}% of datasets in a terminal state"
)

return wait_on(check_and_get_dataset_collection, maxwait=maxwait, interval=interval)


class DatasetCollectionTimeoutException(TimeoutException):
pass
# Unused, for backward compatibility
DatasetCollectionTimeoutException = TimeoutException


__all__ = (
Expand Down
38 changes: 16 additions & 22 deletions bioblend/galaxy/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import logging
import os
import shlex
import time
import warnings
from typing import (
Any,
Expand All @@ -21,8 +20,12 @@

from requests import Response

import bioblend
from bioblend import TimeoutException
from bioblend import (
CHUNK_SIZE,
NotReady,
TimeoutException,
wait_on,
)
from bioblend.galaxy.client import Client

if TYPE_CHECKING:
Expand Down Expand Up @@ -144,8 +147,8 @@ def download_dataset(
:type maxwait: float
:param maxwait: Total time (in seconds) to wait for the dataset state to
become terminal. If the dataset state is not terminal within this
time, a ``DatasetTimeoutException`` will be thrown.
become terminal. After this time, a ``TimeoutException`` will be
raised.
:rtype: bytes or str
:return: If a ``file_path`` argument is not provided, returns the file
Expand Down Expand Up @@ -180,7 +183,7 @@ def download_dataset(
file_local_path = file_path

with open(file_local_path, "wb") as fp:
for chunk in r.iter_content(chunk_size=bioblend.CHUNK_SIZE):
for chunk in r.iter_content(chunk_size=CHUNK_SIZE):
if chunk:
fp.write(chunk)

Expand Down Expand Up @@ -411,8 +414,7 @@ def wait_for_dataset(
:type maxwait: float
:param maxwait: Total time (in seconds) to wait for the dataset state to
become terminal. If the dataset state is not terminal within this
time, a ``DatasetTimeoutException`` will be raised.
become terminal. After this time, a ``TimeoutException`` will be raised.
:type interval: float
:param interval: Time (in seconds) to wait between 2 consecutive checks.
Expand All @@ -423,25 +425,17 @@ def wait_for_dataset(
:rtype: dict
:return: Details of the given dataset.
"""
assert maxwait >= 0
assert interval > 0

time_left = maxwait
while True:
def check_and_get_dataset() -> Dict[str, Any]:
dataset = self.show_dataset(dataset_id)
state = dataset["state"]
if state in TERMINAL_STATES:
if check and state != "ok":
raise Exception(f"Dataset {dataset_id} is in terminal state {state}")
return dataset
if time_left > 0:
log.info("Dataset %s is in non-terminal state %s. Will wait %s more s", dataset_id, state, time_left)
time.sleep(min(time_left, interval))
time_left -= interval
else:
raise DatasetTimeoutException(
f"Dataset {dataset_id} is still in non-terminal state {state} after {maxwait} s"
)
raise NotReady(f"Dataset {dataset_id} is in non-terminal state {state}")

return wait_on(check_and_get_dataset, maxwait=maxwait, interval=interval)


class DatasetStateException(Exception):
Expand All @@ -452,5 +446,5 @@ class DatasetStateWarning(UserWarning):
pass


class DatasetTimeoutException(TimeoutException):
pass
# Unused, just for backward compatibility
DatasetTimeoutException = TimeoutException
Loading

0 comments on commit 7d46e88

Please sign in to comment.