Skip to content

Commit

Permalink
Merge pull request #23 from twosigma/pyupgrade-py39
Browse files Browse the repository at this point in the history
pyupgrade to python3.9
  • Loading branch information
daniel-shields committed Feb 9, 2024
2 parents 37a6c3e + 315c1b2 commit c1a8991
Show file tree
Hide file tree
Showing 20 changed files with 50 additions and 54 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ jobs:
- ubuntu-latest
- windows-latest
python:
- "3.8"
- "3.9"
- "3.10"
- "3.11"
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v1
Expand Down
8 changes: 4 additions & 4 deletions src/uberjob/_execution/run_function_on_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import os
import threading
from contextlib import contextmanager
from typing import Dict, List, NamedTuple, Set
from typing import NamedTuple

from uberjob._errors import NodeError
from uberjob._execution.scheduler import create_queue
Expand Down Expand Up @@ -80,9 +80,9 @@ def worker_pool(queue, process_item, worker_count):


class PreparedNodes(NamedTuple):
source_nodes: List[Node]
single_parent_nodes: Set[Node]
remaining_pred_count_mapping: Dict[Node, int]
source_nodes: list[Node]
single_parent_nodes: set[Node]
remaining_pred_count_mapping: dict[Node, int]


def prepare_nodes(graph) -> PreparedNodes:
Expand Down
6 changes: 3 additions & 3 deletions src/uberjob/_execution/run_physical.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.
#
"""Functionality for executing a physical plan"""
from typing import Any, Callable, Dict, NamedTuple, Optional
from typing import Any, Callable, NamedTuple, Optional

from uberjob._errors import NodeError, create_chained_call_error
from uberjob._execution.run_function_on_graph import run_function_on_graph
Expand Down Expand Up @@ -45,7 +45,7 @@ def run(self, fn, retry):


def _create_bound_call(
graph: Graph, call: Call, result_lookup: Dict[Node, Any]
graph: Graph, call: Call, result_lookup: dict[Node, Any]
) -> BoundCall:
args, kwargs = get_argument_nodes(graph, call)
args = [result_lookup[predecessor] for predecessor in args]
Expand All @@ -71,7 +71,7 @@ def _create_bound_call_lookup_and_output_slot(


class PrepRunPhysical(NamedTuple):
bound_call_lookup: Dict[Node, BoundCall]
bound_call_lookup: dict[Node, BoundCall]
output_slot: Slot
process: Callable[[Node], None]
plan: Plan
Expand Down
4 changes: 1 addition & 3 deletions src/uberjob/_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Tuple

from uberjob._util import fully_qualified_name
from uberjob.graph import Call


def get_full_call_scope(call: Call) -> Tuple:
def get_full_call_scope(call: Call) -> tuple:
return (*call.scope, fully_qualified_name(call.fn))


Expand Down
5 changes: 3 additions & 2 deletions src/uberjob/_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
# limitations under the License.
#
import operator
from collections.abc import Generator
from contextlib import contextmanager
from threading import RLock
from typing import Callable, Generator, Tuple
from typing import Callable

from uberjob import _builtins
from uberjob._util import validation
Expand Down Expand Up @@ -140,7 +141,7 @@ def gather(self, value) -> Node:
"""
return self._gather(get_stack_frame(), value)

def unpack(self, iterable, length: int) -> Tuple[Node, ...]:
def unpack(self, iterable, length: int) -> tuple[Node, ...]:
"""
Unpack a symbolic iterable into a tuple of symbolic values.
Expand Down
4 changes: 2 additions & 2 deletions src/uberjob/_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,15 @@ def keys(self) -> typing.KeysView[Node]:
"""
return self.mapping.keys()

def values(self) -> typing.List[ValueStore]:
def values(self) -> list[ValueStore]:
"""
Get all registered :class:`~uberjob.ValueStore` instances.
:return: A list of :class:`~uberjob.ValueStore`.
"""
return [v.value_store for v in self.mapping.values()]

def items(self) -> typing.List[typing.Tuple[Node, ValueStore]]:
def items(self) -> list[tuple[Node, ValueStore]]:
"""
Get all registered (node, value_store) pairs.
Expand Down
2 changes: 1 addition & 1 deletion src/uberjob/_rendering.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class Scope:


def render(
plan: typing.Union[Plan, Graph, typing.Tuple[Plan, typing.Optional[Node]]],
plan: typing.Union[Plan, Graph, tuple[Plan, typing.Optional[Node]]],
*,
registry: Registry = None,
predicate: typing.Callable[[Node, dict], bool] = None,
Expand Down
5 changes: 3 additions & 2 deletions src/uberjob/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
#
import collections
import datetime as dt
from typing import Callable, Iterable, Optional, Tuple, Union
from collections.abc import Iterable
from typing import Callable, Optional, Union

from uberjob._errors import CallError
from uberjob._execution.run_function_on_graph import NodeError
Expand Down Expand Up @@ -84,7 +85,7 @@ def run(
fresh_time: Optional[dt.datetime] = None,
progress: Union[None, bool, Progress, Iterable[Progress]] = True,
scheduler: Optional[str] = None,
transform_physical: Optional[Callable[[Plan, Node], Tuple[Plan, Node]]] = None,
transform_physical: Optional[Callable[[Plan, Node], tuple[Plan, Node]]] = None,
stale_check_max_workers: Optional[int] = None,
):
"""
Expand Down
10 changes: 5 additions & 5 deletions src/uberjob/_transformations/caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#
import collections
import datetime as dt
from typing import Optional, Set, Tuple
from typing import Optional

from uberjob._errors import NodeError, create_chained_call_error
from uberjob._execution.run_function_on_graph import run_function_on_graph
Expand Down Expand Up @@ -47,7 +47,7 @@ def _to_naive_utc_time(value: Optional[dt.datetime]) -> Optional[dt.datetime]:
)


def _get_stale_scope(call: Call, registry: Registry) -> Tuple:
def _get_stale_scope(call: Call, registry: Registry) -> tuple:
scope = get_full_call_scope(call)
value_store = registry.get(call)
if value_store is None:
Expand All @@ -63,7 +63,7 @@ def _get_stale_nodes(
max_workers: Optional[int] = None,
fresh_time: Optional[dt.datetime] = None,
progress_observer: ProgressObserver,
) -> Set[Node]:
) -> set[Node]:
plan = prune_source_literals(
plan, inplace=False, predicate=lambda node: node not in registry
)
Expand Down Expand Up @@ -132,7 +132,7 @@ def process_with_callbacks(node):

def _add_value_store(
plan: Plan, node: Node, registry_value: RegistryValue, *, is_stale: bool
) -> Tuple[Optional[Node], Node]:
) -> tuple[Optional[Node], Node]:
def nested_call(*args):
call = plan._call(registry_value.stack_frame, *args)
if type(node) is Call:
Expand Down Expand Up @@ -191,7 +191,7 @@ def plan_with_value_stores(
fresh_time: Optional[dt.datetime] = None,
inplace: bool,
progress_observer,
) -> Tuple[Plan, Optional[Node]]:
) -> tuple[Plan, Optional[Node]]:
_update_stale_totals(plan, registry, progress_observer)
plan = get_mutable_plan(plan, inplace=inplace)
stale_nodes = _get_stale_nodes(
Expand Down
3 changes: 2 additions & 1 deletion src/uberjob/_transformations/pruning.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
# limitations under the License.
#
import itertools
from typing import Callable, Iterable, Optional
from collections.abc import Iterable
from typing import Callable, Optional

from uberjob._plan import Plan
from uberjob._transformations import get_mutable_plan
Expand Down
6 changes: 3 additions & 3 deletions src/uberjob/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#
"""Provides the underlying graph, node, and edge classes used by the :class:`~uberjob.Plan`."""
import warnings
from typing import Callable, Dict, List, Tuple
from typing import Callable

import networkx as nx

Expand Down Expand Up @@ -147,7 +147,7 @@ def __eq__(self, other):
)


def get_argument_nodes(graph: Graph, call: Call) -> Tuple[List[Node], Dict[str, Node]]:
def get_argument_nodes(graph: Graph, call: Call) -> tuple[list[Node], dict[str, Node]]:
"""
Return the symbolic args and kwargs of the given :class:`~uberjob.graph.Call`.
Expand All @@ -173,7 +173,7 @@ def get_argument_nodes(graph: Graph, call: Call) -> Tuple[List[Node], Dict[str,
return args, dict(keyword_arg_pairs)


def get_scope(graph: Graph, node: Node) -> Tuple:
def get_scope(graph: Graph, node: Node) -> tuple:
"""
Return the scope of the given :class:`~uberjob.graph.Node`.
Expand Down
10 changes: 5 additions & 5 deletions src/uberjob/progress/_composite_progress_observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from collections.abc import Iterable
from contextlib import ExitStack
from typing import Iterable, Tuple

from uberjob.progress._progress_observer import ProgressObserver

Expand All @@ -39,21 +39,21 @@ def __enter__(self):
def __exit__(self, exc_type, exc_val, exc_tb):
self._stack.__exit__(exc_type, exc_val, exc_tb)

def increment_total(self, *, section: str, scope: Tuple, amount: int):
def increment_total(self, *, section: str, scope: tuple, amount: int):
for progress_observer in self._progress_observers:
progress_observer.increment_total(
section=section, scope=scope, amount=amount
)

def increment_running(self, *, section: str, scope: Tuple):
def increment_running(self, *, section: str, scope: tuple):
for progress_observer in self._progress_observers:
progress_observer.increment_running(section=section, scope=scope)

def increment_completed(self, *, section: str, scope: Tuple):
def increment_completed(self, *, section: str, scope: tuple):
for progress_observer in self._progress_observers:
progress_observer.increment_completed(section=section, scope=scope)

def increment_failed(self, *, section: str, scope: Tuple, exception: Exception):
def increment_failed(self, *, section: str, scope: tuple, exception: Exception):
for progress_observer in self._progress_observers:
progress_observer.increment_failed(
section=section, scope=scope, exception=exception
Expand Down
6 changes: 2 additions & 4 deletions src/uberjob/progress/_console_progress_observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def _ralign(strings):

def _print_section(print_, section, scope_mapping):
scope_items = sorted_scope_items(scope_mapping)
print_("{}:".format(section))
print_(f"{section}:")
progress_strs = _ralign(
scope_state.to_progress_string() for scope, scope_state in scope_items
)
Expand All @@ -54,9 +54,7 @@ def _print_section(print_, section, scope_mapping):
for progress_str, elapsed_str, (scope, _scope_state) in zip(
progress_strs, elapsed_strs, scope_items
):
print_(
" {} | {} | {}".format(progress_str, elapsed_str, get_scope_string(scope))
)
print_(f" {progress_str} | {elapsed_str} | {get_scope_string(scope)}")


def _print_new_exceptions(print_, new_exception_index, exception_tuples):
Expand Down
6 changes: 2 additions & 4 deletions src/uberjob/progress/_ipython_progress_observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def _render(self, state, new_exception_index, exception_tuples, elapsed):
title_widget = self._get(
"section", section, "title", default=widgets.HTML
)
title_widget.value = "<b>{}</b>".format(html.escape(title))
title_widget.value = f"<b>{html.escape(title)}</b>"
children.append(title_widget)
for scope, scope_state in sorted_scope_items(scope_mapping):
progress_widget = self._get(
Expand Down Expand Up @@ -137,9 +137,7 @@ def _get_exception_accordion(self, exception_tuples):
)
)
exception_text_widgets.append(exception_text_widget)
exception_titles.append(
"Exception {}; {}".format(i + 1, get_scope_string(scope))
)
exception_titles.append(f"Exception {i + 1}; {get_scope_string(scope)}")
exception_accordion.children = exception_text_widgets
for i, exception_title in enumerate(exception_titles):
exception_accordion.set_title(i, exception_title)
Expand Down
9 changes: 4 additions & 5 deletions src/uberjob/progress/_progress_observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# limitations under the License.
#
from abc import ABC, abstractmethod
from typing import Tuple


class ProgressObserver(ABC):
Expand All @@ -29,7 +28,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
"""Stop observing progress."""

@abstractmethod
def increment_total(self, *, section: str, scope: Tuple, amount: int):
def increment_total(self, *, section: str, scope: tuple, amount: int):
"""
Increment the number of entries in this section and scope by the specified amount.
Expand All @@ -39,7 +38,7 @@ def increment_total(self, *, section: str, scope: Tuple, amount: int):
"""

@abstractmethod
def increment_running(self, *, section: str, scope: Tuple):
def increment_running(self, *, section: str, scope: tuple):
"""
Increment the number of running entries in this section and scope. This method must be thread-safe.
Expand All @@ -48,7 +47,7 @@ def increment_running(self, *, section: str, scope: Tuple):
"""

@abstractmethod
def increment_completed(self, *, section: str, scope: Tuple):
def increment_completed(self, *, section: str, scope: tuple):
"""
Increment the number of completed entries in this section and scope. This method must be thread-safe.
Expand All @@ -57,7 +56,7 @@ def increment_completed(self, *, section: str, scope: Tuple):
"""

@abstractmethod
def increment_failed(self, *, section: str, scope: Tuple, exception: Exception):
def increment_failed(self, *, section: str, scope: tuple, exception: Exception):
"""
Increment the number of failed entries in this section and scope. This method must be thread-safe.
Expand Down
9 changes: 4 additions & 5 deletions src/uberjob/progress/_simple_progress_observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import threading
import time
from abc import ABC, abstractmethod
from typing import Tuple

from uberjob.progress._progress_observer import ProgressObserver

Expand Down Expand Up @@ -202,22 +201,22 @@ def _run_update_thread(self):
if output_value is not None:
self._output(output_value)

def increment_total(self, *, section: str, scope: Tuple, amount: int):
def increment_total(self, *, section: str, scope: tuple, amount: int):
with self._lock:
self._stale = True
self._state.increment_total(section, scope, amount)

def increment_running(self, *, section: str, scope: Tuple):
def increment_running(self, *, section: str, scope: tuple):
with self._lock:
self._stale = True
self._state.increment_running(section, scope)

def increment_completed(self, *, section: str, scope: Tuple):
def increment_completed(self, *, section: str, scope: tuple):
with self._lock:
self._stale = True
self._state.increment_completed(section, scope)

def increment_failed(self, *, section: str, scope: Tuple, exception: Exception):
def increment_failed(self, *, section: str, scope: tuple, exception: Exception):
with self._lock:
self._stale = True
self._state.increment_failed(section, scope)
Expand Down
3 changes: 2 additions & 1 deletion src/uberjob/stores/_file_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
import os
import pathlib
from abc import ABC, abstractmethod
from collections.abc import Generator
from contextlib import contextmanager
from typing import IO, AnyStr, Generator, Optional, Union
from typing import IO, AnyStr, Optional, Union

from uberjob._util import repr_helper
from uberjob._value_store import ValueStore
Expand Down
2 changes: 1 addition & 1 deletion src/uberjob/stores/_path_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def get_modified_time(self) -> typing.Optional[dt.datetime]:
def _get_modified_time(self, required):
modified_time = get_modified_time(self.path)
if modified_time is None and required:
raise IOError(
raise OSError(
f"Failed to get modified time of required source path {self.path!r}."
)
return modified_time
Expand Down
Loading

0 comments on commit c1a8991

Please sign in to comment.