Skip to content

Commit

Permalink
Make returned futures proxies for cleaner blocking code
Browse files Browse the repository at this point in the history
This commit makes every future returned by this client into
a proxy future. A proxy future can be used as a future, but
it will also implicitly proxy attribute and method lookups
through to the underlying result of a future. In many cases,
this allows blocking code to be written as if futures are
not used at all.

This is similar in spirit to commit 15eae36, embracing
implicit over explicit for cleaner code.

Why?
====

APIs in this library are written to return Futures, to enable
aggressively concurrent code. However, there remain many cases
where concurrency isn't possible or desired, so the APIs are
used in a blocking manner.

Currently, the authors of such code are penalized by having to
add many calls to .result(), making the code noisy. This commit
seeks to reduce that noise and make the blocking code more
readable, while still fully supporting non-blocking code styles.

As an example of why both non-blocking and blocking coding styles
should be supported, consider the search_repository API.

Two cases where a task might need to search for repositories are:

- when uploading a productid cert into a repo, we need to find
  related repos for the same product and update repo notes there.
- when adding or removing RPMs in a repo, we need to search for
  corresponding UBI repo(s) in need of repopulation

Both of these cases can occur during a task which needs to
process many other unrelated steps too, which ideally should
be handled concurrently; hence, non-blocking makes sense here.

But another common case is:

- the user has passed a set of repo IDs into a task and we want
  to perform a certain operation on each repo (e.g. publish;
  clear-repo)

In this case, the desired behavior is generally that we want
to fail the task early and not proceed to next steps if any of
the passed repo IDs are incorrect. So, we do explicitly want
to block on repo search completion, by writing code such as:

    repos = client.search_repository(...)
    found_ids = [repo.id for repo in repos]
    check_ids(requested_ids, found_ids)

Hence both blocking and non-blocking usage of search_repository
is valid in different contexts. It'd be best if we can cleanly
support both.

Risks
=====

It's not a certainty that the benefits of this change outweigh
the disadvantages. Problems which might come up due to this include:

- Generally enables devs to be sloppy, to misunderstand what's the
  return type of various methods, to not consider when we should
  or shouldn't block.

- Might be confusing due to the proxy being incomplete.
  For example, if a Future would return an empty list,
  len(f) == 0 is True, but bool(f) is also True since the future
  is non-None; a behavior difference from a plain empty list.

- Since it's now less common that .result() must be called, it might
  increase the chance that necessary calls are forgotten.
  • Loading branch information
rohanpm committed Sep 9, 2019
1 parent 11d6b97 commit f6a8cb8
Show file tree
Hide file tree
Showing 20 changed files with 102 additions and 83 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

- n/a
### Added
- Introduced "proxy futures" for values produced by this library.

## [2.0.0] - 2019-09-09

Expand Down
2 changes: 1 addition & 1 deletion examples/garbage-collect
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def garbage_collect(client, gc_threshold=7):
Criteria.with_field("notes.pub_temp_repo", True),
)

repos = client.search_repository(crit).result()
repos = client.search_repository(crit)
for repo in repos:
created = repo.created
if not created:
Expand Down
2 changes: 1 addition & 1 deletion examples/publish
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ log = logging.getLogger("publish")
def publish(client, repo_id):
crit = Criteria.with_id(repo_id)

repos = client.search_repository(crit).result()
repos = client.search_repository(crit)
publishes = [repo.publish() for repo in repos]

# Blocks at result() calls here.
Expand Down
2 changes: 1 addition & 1 deletion examples/upload-files
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ log = logging.getLogger("upload")


def upload(client, path, repo_id):
repo = client.get_repository(repo_id).result()
repo = client.get_repository(repo_id)

uploads = []
if os.path.isdir(path):
Expand Down
47 changes: 40 additions & 7 deletions pubtools/pulplib/_impl/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import requests
from more_executors import Executors
from more_executors.futures import f_map, f_flat_map, f_return
from more_executors.futures import f_map, f_flat_map, f_return, f_proxy

from ..page import Page
from ..criteria import Criteria
Expand Down Expand Up @@ -41,6 +41,36 @@ class Client(object):
If a future is currently awaiting one or more Pulp tasks, cancelling the future
will attempt to cancel those tasks.
**Proxy futures:**
.. versionadded:: 2.1.0
All :class:`~concurrent.futures.Future` objects produced by this client are
*proxy futures*, meaning that attribute and method lookups on the objects are
proxied to the future's result, blocking as needed.
This allows the client to be used within blocking code without having to
scatter calls to ``.result()`` throughout.
For example, this block of code:
.. code-block:: python
repo = client.get_repository(repo_id).result()
publish_tasks = repo.publish().result()
task_ids = ','.join([t.id for t in publish_tasks])
log.info("Published %s: %s", repo.id, task_ids)
...may be alternatively written without the calls to ``.result()``, due to
the usage of proxy futures:
.. code-block:: python
repo = client.get_repository(repo_id)
publish_tasks = repo.publish()
task_ids = ','.join([t.id for t in publish_tasks])
log.info("Published %s: %s", repo.id, task_ids)
**Retries:**
In general, for all methods which represent an idempotent operation and
Expand Down Expand Up @@ -151,7 +181,7 @@ def unpack_page(page):
return page.data[0]

repo_f = f_map(page_f, unpack_page)
return repo_f
return f_proxy(repo_f)

def search_repository(self, criteria=None):
"""Search for repositories matching the given criteria.
Expand Down Expand Up @@ -180,8 +210,8 @@ def search_repository(self, criteria=None):
# When this request is resolved, we'll have the first page of data.
# We'll need to convert that into a page and also keep going with
# the search if there's more to be done.
return f_map(
response_f, lambda data: self._handle_page(Repository, search, data)
return f_proxy(
f_map(response_f, lambda data: self._handle_page(Repository, search, data))
)

def get_maintenance_report(self):
Expand Down Expand Up @@ -249,7 +279,7 @@ def get_content_type_ids(self):

# The pulp API returns an object per supported type.
# We only support returning the ID at this time.
return f_map(out, lambda types: sorted([t["id"] for t in types]))
return f_proxy(f_map(out, lambda types: sorted([t["id"] for t in types])))

def _do_upload_file(self, upload_id, file_obj, name):
def do_next_upload(checksum, size):
Expand Down Expand Up @@ -378,8 +408,11 @@ def _handle_page(self, object_class, search, raw_data):
search["criteria"] = search["criteria"].copy()
search["criteria"]["skip"] = search["criteria"]["skip"] + limit
response_f = self._do_search("repositories", search)
next_page = f_map(
response_f, lambda data: self._handle_page(object_class, search, data)
next_page = f_proxy(
f_map(
response_f,
lambda data: self._handle_page(object_class, search, data),
)
)

return Page(data=page_data, next=next_page)
Expand Down
14 changes: 7 additions & 7 deletions pubtools/pulplib/_impl/fake/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import six
from six.moves import StringIO

from more_executors.futures import f_return, f_return_error, f_flat_map
from more_executors.futures import f_return, f_return_error, f_flat_map, f_proxy


from pubtools.pulplib import (
Expand Down Expand Up @@ -104,9 +104,9 @@ def search_repository(self, criteria=None):
next_page = None
for batch in reversed(page_data):
page = Page(data=batch, next=next_page)
next_page = f_return(page)
next_page = f_proxy(f_return(page))

return f_return(page)
return f_proxy(f_return(page))

def get_repository(self, repository_id):
if not isinstance(repository_id, six.string_types):
Expand All @@ -118,14 +118,14 @@ def get_repository(self, repository_id):
PulpException("Repository id=%s not found" % repository_id)
)

return f_return(data[0])
return f_proxy(f_return(data[0]))

def get_maintenance_report(self):
if self._maintenance_report:
report = MaintenanceReport._from_data(json.loads(self._maintenance_report))
else:
report = MaintenanceReport()
return f_return(report)
return f_proxy(f_return(report))

def set_maintenance(self, report):
report_json = json.dumps(report._export_dict(), indent=4, sort_keys=True)
Expand All @@ -139,10 +139,10 @@ def set_maintenance(self, report):
publish_ft = f_flat_map(upload_ft, lambda _: repo.publish())
self._maintenance_report = report_json

return publish_ft
return f_proxy(publish_ft)

def get_content_type_ids(self):
return f_return(self._type_ids)
return f_proxy(f_return(self._type_ids))

def _do_upload_file(self, upload_id, file_obj, name):
# pylint: disable=unused-argument
Expand Down
8 changes: 4 additions & 4 deletions pubtools/pulplib/_impl/model/repository/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging

from attr import validators
from more_executors.futures import f_map
from more_executors.futures import f_map, f_proxy

from ..common import PulpObject, DetachedException
from ..attr import pulp_attrib
Expand Down Expand Up @@ -204,7 +204,7 @@ def detach(tasks):
self.__dict__["_client"] = None
return tasks

return f_map(delete_f, detach)
return f_proxy(f_map(delete_f, detach))

def publish(self, options=PublishOptions()):
"""Publish this repository.
Expand Down Expand Up @@ -253,7 +253,7 @@ def publish(self, options=PublishOptions()):
config = self._config_for_distributor(distributor, options)
to_publish.append((distributor, config))

return self._client._publish_repository(self, to_publish)
return f_proxy(self._client._publish_repository(self, to_publish))

def remove_content(self, **kwargs):
"""Remove all content of requested types from this repository.
Expand Down Expand Up @@ -289,7 +289,7 @@ def remove_content(self, **kwargs):
# argument order at least until then.

type_ids = kwargs.get("type_ids")
return self._client._do_unassociate(self.id, type_ids)
return f_proxy(self._client._do_unassociate(self.id, type_ids))

@classmethod
def from_data(cls, data):
Expand Down
4 changes: 2 additions & 2 deletions pubtools/pulplib/_impl/model/repository/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging

from attr import validators
from more_executors.futures import f_flat_map, f_map
from more_executors.futures import f_flat_map, f_map, f_proxy

from .base import Repository, repo_type
from ..frozenlist import FrozenList
Expand Down Expand Up @@ -99,7 +99,7 @@ def upload_file(self, file_obj, relative_url=None):
import_complete_f, lambda _: self._client._delete_upload_request(upload_id)
)

return import_complete_f
return f_proxy(import_complete_f)

def _get_relative_url(self, file_obj, relative_url):
is_file_object = "close" in dir(file_obj)
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
requests
more-executors>=2.2.0
more-executors>=2.3.0
six
PyYAML
jsonschema
Expand Down
6 changes: 2 additions & 4 deletions tests/client/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ def test_can_search(client, requests_mocker):
json=[{"id": "repo1"}, {"id": "repo2"}],
)

repos_f = client.search_repository()

repos = [r for r in repos_f.result()]
repos = client.search_repository()

# It should have returned the repos as objects
assert sorted(repos) == [Repository(id="repo1"), Repository(id="repo2")]
Expand All @@ -75,7 +73,7 @@ def test_search_retries(client, requests_mocker, caplog):

repos_f = client.search_repository()

repos = [r for r in repos_f.result()]
repos = [r for r in repos_f]

# It should have found the repo
assert repos == [Repository(id="repo1")]
Expand Down
5 changes: 2 additions & 3 deletions tests/client/test_search_stops.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ def test_search_stops_paginate(requests_mocker):

# Imagine we have some function which processes two pages of results and
# then stops
def do_something_first_two_pages(repos_f):
page = repos_f.result()
return len(page.data) + len(page.next.result().data)
def do_something_first_two_pages(page):
return len(page.data) + len(page.next.data)

two_pages_len = do_something_first_two_pages(client.search_repository())

Expand Down
4 changes: 2 additions & 2 deletions tests/fake/test_fake_content_type_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ def test_default_content_type_ids():
controller = FakeController()
client = controller.client

type_ids = client.get_content_type_ids().result()
type_ids = client.get_content_type_ids()

# Type IDs should include this common content types.
assert "rpm" in type_ids
Expand All @@ -24,5 +24,5 @@ def test_set_content_type_ids():

controller.set_content_type_ids(["a", "b", "c"])

type_ids = client.get_content_type_ids().result()
type_ids = client.get_content_type_ids()
assert sorted(type_ids) == ["a", "b", "c"]
6 changes: 3 additions & 3 deletions tests/fake/test_fake_delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def test_can_delete():
controller.insert_repository(Repository(id="repo2"))

client = controller.client
repo1 = client.get_repository("repo1").result()
repo1 = client.get_repository("repo1")

# Call to delete should succeed
delete_f = repo1.delete()
Expand Down Expand Up @@ -42,8 +42,8 @@ def test_delete_missing_repo_succeeds():
# We want to try deleting the same repo more than once.
# We can't do this through a single reference since it would be detached
# on delete, but if we get two handles to the same repo, we can.
repo_copy1 = client.get_repository("repo").result()
repo_copy2 = client.get_repository("repo").result()
repo_copy1 = client.get_repository("repo")
repo_copy2 = client.get_repository("repo")

# First delete succeeds, with some tasks
assert repo_copy1.delete().result()
Expand Down
6 changes: 3 additions & 3 deletions tests/fake/test_fake_publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def test_can_publish():
controller.insert_repository(YumRepository(id="repo2"))

client = controller.client
repo1 = client.get_repository("repo1").result()
repo1 = client.get_repository("repo1")

# Call to publish should succeed
publish_f = repo1.publish()
Expand Down Expand Up @@ -53,8 +53,8 @@ def test_publish_absent_raises():
)

client = controller.client
repo_copy1 = client.get_repository("repo1").result()
repo_copy2 = client.get_repository("repo1").result()
repo_copy1 = client.get_repository("repo1")
repo_copy2 = client.get_repository("repo1")

# If I delete the repo through one handle...
assert repo_copy1.delete().result()
Expand Down
22 changes: 6 additions & 16 deletions tests/fake/test_fake_remove_content.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def test_can_remove_empty():
repo = YumRepository(id="repo1")
controller.insert_repository(repo)

remove_tasks = client.get_repository("repo1").result().remove_content().result()
remove_tasks = client.get_repository("repo1").remove_content()

assert len(remove_tasks) == 1
task = remove_tasks[0]
Expand Down Expand Up @@ -45,12 +45,7 @@ def test_can_remove_content():
controller.insert_repository(repo)
controller.insert_units(repo, units)

remove_rpms = (
client.get_repository("repo1")
.result()
.remove_content(type_ids=["rpm"])
.result()
)
remove_rpms = client.get_repository("repo1").remove_content(type_ids=["rpm"])

assert len(remove_rpms) == 1
task = remove_rpms[0]
Expand All @@ -63,12 +58,7 @@ def test_can_remove_content():
assert sorted(task.units) == sorted(rpm_units)

# Now if we ask to remove same content again...
remove_rpms = (
client.get_repository("repo1")
.result()
.remove_content(type_ids=["rpm"])
.result()
)
remove_rpms = client.get_repository("repo1").remove_content(type_ids=["rpm"])

assert len(remove_rpms) == 1
task = remove_rpms[0]
Expand All @@ -79,7 +69,7 @@ def test_can_remove_content():
assert not task.units

# It should still be possible to remove other content
remove_all = client.get_repository("repo1").result().remove_content().result()
remove_all = client.get_repository("repo1").remove_content()

assert len(remove_all) == 1
task = remove_all[0]
Expand All @@ -99,8 +89,8 @@ def test_remove_deleted_repo_fails():
controller.insert_repository(repo)

# Get two references to the same repo
repo_handle1 = client.get_repository("repo1").result()
repo_handle2 = client.get_repository("repo1").result()
repo_handle1 = client.get_repository("repo1")
repo_handle2 = client.get_repository("repo1")

# Use one of them to delete the repo
repo_handle1.delete().result()
Expand Down
Loading

0 comments on commit f6a8cb8

Please sign in to comment.