Skip to content

Commit

Permalink
Merge pull request #36 from ddelange/dill-extra
Browse files Browse the repository at this point in the history
Allow using dill for pickling
  • Loading branch information
dano authored May 21, 2021
2 parents e7fdf6f + 9f65c11 commit 4a82111
Show file tree
Hide file tree
Showing 18 changed files with 74 additions and 52 deletions.
9 changes: 7 additions & 2 deletions .github/workflows/run_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
tests:
runs-on: ubuntu-latest
strategy:
matrix:
matrix:
python: ['3.5', '3.6', '3.7', '3.8', '3.9']
name: aioprocessing ${{ matrix.python }} tests
steps:
Expand All @@ -19,4 +19,9 @@ jobs:
- name: Install Flake8
run: pip install flake8
- run: flake8 .
- run: python runtests.py
- run: python runtests.py -v --failfast
timeout-minutes: 1
# tests should also pass when using multiprocess (dill)
- run: pip install multiprocess
- run: python runtests.py -v --failfast
timeout-minutes: 1
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ aioprocessing

`aioprocessing` provides asynchronous, [`asyncio`](https://docs.python.org/3/library/asyncio.html) compatible, coroutine
versions of many blocking instance methods on objects in the [`multiprocessing`](https://docs.python.org/3/library/multiprocessing.html)
library. Here's an example demonstrating the `aioprocessing` versions of
library. To use [`dill`](https://pypi.org/project/dill) for universal pickling, install using `pip install aioprocessing[dill]`. Here's an example demonstrating the `aioprocessing` versions of
`Event`, `Queue`, and `Lock`:

```python
Expand Down Expand Up @@ -66,6 +66,14 @@ seamlessly used inside of `asyncio` coroutines, without ever blocking
the event loop.


What's new
----------

`v2.0.0`

- Add support for universal pickling using [`dill`](https://github.com/uqfoundation/dill), installable with `pip install aioprocessing[dill]`. The library will now attempt to import [`multiprocess`](https://github.com/uqfoundation/multiprocess), falling back to stdlib `multiprocessing`. Force stdlib behaviour by setting a non-empty environment variable `AIOPROCESSING_DILL_DISABLED=1`. This can be used to avoid [errors](https://github.com/dano/aioprocessing/pull/36#discussion_r631178933) when attempting to combine `aioprocessing[dill]` with stdlib `multiprocessing` based objects like `concurrent.futures.ProcessPoolExecutor`.


How does it work?
-----------------

Expand Down Expand Up @@ -132,4 +140,4 @@ Keep in mind that, while the API exposes coroutines for interacting with
to a `ThreadPoolExecutor`, this means the caveats that apply with using
`ThreadPoolExecutor` with `asyncio` apply: namely, you won't be able to
cancel any of the coroutines, because the work being done in the worker
thread can't be interrupted.
thread can't be interrupted.
7 changes: 3 additions & 4 deletions aioprocessing/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import multiprocessing

from . import mp as multiprocessing # noqa
from .connection import * # noqa
from .managers import * # noqa

Expand Down Expand Up @@ -27,8 +26,8 @@
# is zero for an official release, positive for a development branch,
# or negative for a release candidate or beta (after the base version
# number has been incremented)
version = "1.1.1"
version_info = (1, 1, 1, 0)
version = "2.0.0"
version_info = (2, 0, 0, 0)

if hasattr(multiprocessing, "get_context"):

Expand Down
24 changes: 12 additions & 12 deletions aioprocessing/connection.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
from concurrent.futures import ThreadPoolExecutor
from multiprocessing.connection import (
Listener,
Client,
deliver_challenge,
answer_challenge,
wait,
)

from .mp import connection as _connection
from .executor import CoroBuilder
from .util import run_in_executor

Expand Down Expand Up @@ -42,12 +36,12 @@ def __exit__(self, *args, **kwargs):

def AioClient(*args, **kwargs):
""" Returns an AioConnection instance. """
conn = Client(*args, **kwargs)
conn = _connection.Client(*args, **kwargs)
return AioConnection(conn)


class AioListener(metaclass=CoroBuilder):
delegate = Listener
delegate = _connection.Listener
coroutines = ["accept"]

def accept(self):
Expand All @@ -64,14 +58,20 @@ def __exit__(self, *args, **kwargs):

def coro_deliver_challenge(*args, **kwargs):
executor = ThreadPoolExecutor(max_workers=1)
return run_in_executor(executor, deliver_challenge, *args, **kwargs)
return run_in_executor(
executor, _connection.deliver_challenge, *args, **kwargs
)


def coro_answer_challenge(*args, **kwargs):
executor = ThreadPoolExecutor(max_workers=1)
return run_in_executor(executor, answer_challenge, *args, **kwargs)
return run_in_executor(
executor, _connection.answer_challenge, *args, **kwargs
)


def coro_wait(*args, **kwargs):
executor = ThreadPoolExecutor(max_workers=1)
return run_in_executor(executor, wait, *args, **kwargs)
return run_in_executor(
executor, _connection.wait, *args, **kwargs
)
2 changes: 1 addition & 1 deletion aioprocessing/executor.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from multiprocessing import cpu_count
from functools import wraps
from concurrent.futures import ThreadPoolExecutor

from . import util
from .mp import cpu_count


def init_executor(func):
Expand Down
6 changes: 3 additions & 3 deletions aioprocessing/locks.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
from .executor import CoroBuilder
from multiprocessing import (
from .mp import (
Event,
Lock,
RLock,
BoundedSemaphore,
Condition,
Semaphore,
Barrier,
util as _util,
)
from multiprocessing.util import register_after_fork

__all__ = [
"AioLock",
Expand Down Expand Up @@ -57,7 +57,7 @@ def __init__(self, *args, **kwargs):
def _after_fork(obj):
obj._threaded_acquire = False

register_after_fork(self, _after_fork)
_util.register_after_fork(self, _after_fork)

def coro_acquire(self, *args, **kwargs):
""" Non-blocking acquire.
Expand Down
21 changes: 7 additions & 14 deletions aioprocessing/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,13 @@
RLock,
Semaphore,
)
from multiprocessing.managers import (
SyncManager,
MakeProxyType,
BarrierProxy,
EventProxy,
ConditionProxy,
AcquirerProxy,
)

from aioprocessing.locks import _ContextManager
from .executor import _ExecutorMixin
from .mp import managers as _managers


AioBaseQueueProxy = MakeProxyType(
AioBaseQueueProxy = _managers.MakeProxyType(
"AioQueueProxy",
(
"task_done",
Expand Down Expand Up @@ -99,7 +92,7 @@ class AioQueueProxy(AioBaseQueueProxy, metaclass=ProxyCoroBuilder):
coroutines = ["get", "put"]


class AioAcquirerProxy(AcquirerProxy, metaclass=ProxyCoroBuilder):
class AioAcquirerProxy(_managers.AcquirerProxy, metaclass=ProxyCoroBuilder):
pool_workers = 1
coroutines = ["acquire", "release"]

Expand Down Expand Up @@ -166,19 +159,19 @@ def __iter__(self):
return _ContextManager(self)


class AioBarrierProxy(BarrierProxy, metaclass=ProxyCoroBuilder):
class AioBarrierProxy(_managers.BarrierProxy, metaclass=ProxyCoroBuilder):
coroutines = ["wait"]


class AioEventProxy(EventProxy, metaclass=ProxyCoroBuilder):
class AioEventProxy(_managers.EventProxy, metaclass=ProxyCoroBuilder):
coroutines = ["wait"]


class AioConditionProxy(ConditionProxy, metaclass=ProxyCoroBuilder):
class AioConditionProxy(_managers.ConditionProxy, metaclass=ProxyCoroBuilder):
coroutines = ["wait", "wait_for"]


class AioSyncManager(SyncManager):
class AioSyncManager(_managers.SyncManager):
""" A mp.Manager that provides asyncio-friendly objects. """

pass
Expand Down
10 changes: 10 additions & 0 deletions aioprocessing/mp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# flake8: noqa
import os
try:
if os.environ.get("AIOPROCESSING_DILL_DISABLED"):
raise ImportError
from multiprocess import *
from multiprocess import connection, managers, util
except ImportError:
from multiprocessing import *
from multiprocessing import connection, managers, util
2 changes: 1 addition & 1 deletion aioprocessing/pool.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from multiprocessing import Pool
from asyncio import Future
import asyncio

from .executor import CoroBuilder
from .mp import Pool

__all__ = ["AioPool"]

Expand Down
3 changes: 1 addition & 2 deletions aioprocessing/process.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from multiprocessing import Process

from .executor import CoroBuilder
from .mp import Process

__all__ = ["AioProcess"]

Expand Down
3 changes: 1 addition & 2 deletions aioprocessing/queues.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from multiprocessing import Queue, SimpleQueue, JoinableQueue

from .executor import CoroBuilder
from .mp import Queue, SimpleQueue, JoinableQueue


class AioBaseQueue(metaclass=CoroBuilder):
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
),
zip_safe=False,
license="BSD",
extras_require={"dill": ["multiprocess"]},
keywords="asyncio multiprocessing coroutine",
url="https://github.com/dano/aioprocessing",
long_description=readme,
Expand Down
3 changes: 2 additions & 1 deletion tests/_base_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import unittest
import multiprocessing

import aioprocessing.mp as multiprocessing


class BaseTest(unittest.TestCase):
Expand Down
4 changes: 2 additions & 2 deletions tests/connection_tests.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import unittest
import multiprocessing
from multiprocessing import Process
from array import array

import aioprocessing
import aioprocessing.mp as multiprocessing
from aioprocessing.connection import AioConnection, AioListener, AioClient
from aioprocessing.mp import Process

from ._base_test import BaseTest

Expand Down
7 changes: 4 additions & 3 deletions tests/lock_tests.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import multiprocessing
import sys
import time
import asyncio
import unittest
import traceback

import aioprocessing
from multiprocessing import Process, Event, Queue, get_all_start_methods
import aioprocessing.mp as multiprocessing
from aioprocessing.mp import Process, Event, Queue, get_all_start_methods

try:
from multiprocessing import get_context
from aioprocessing.mp import get_context
except ImportError:

def get_context(param):
Expand Down
1 change: 1 addition & 0 deletions tests/pickle_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pickle
import unittest

from aioprocessing.executor import _ExecutorMixin


Expand Down
2 changes: 1 addition & 1 deletion tests/process_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import unittest
import multiprocessing
import aioprocessing.mp as multiprocessing

import aioprocessing
from ._base_test import BaseTest, _GenMixin
Expand Down
9 changes: 7 additions & 2 deletions tests/queue_test.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import unittest
import aioprocessing
from multiprocessing import Process, Event
from concurrent.futures import ProcessPoolExecutor

import aioprocessing
from aioprocessing.mp import Process, Event, util
from ._base_test import BaseTest, _GenMixin


Expand Down Expand Up @@ -97,6 +97,11 @@ async def queue_put():


class ManagerQueueTest(BaseTest):
@unittest.skipIf(
"multiprocess.util" in str(util),
"concurrent.futures is not yet supported by uqfoundation "
"(https://github.com/uqfoundation/pathos/issues/90)"
)
def test_executor(self):
m = aioprocessing.AioManager()
q = m.AioQueue()
Expand Down

0 comments on commit 4a82111

Please sign in to comment.