Skip to content

Commit

Permalink
Merge pull request #305 from goodboy/name_query
Browse files Browse the repository at this point in the history
Add `tractor.query_actor()` an addr looker-upper
  • Loading branch information
goodboy authored Apr 13, 2022
2 parents 6298368 + 8901272 commit 71f19f2
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 29 deletions.
7 changes: 7 additions & 0 deletions 305.misc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Add ``tractor.query_actor()`` an addr looker-upper which doesn't deliver
a ``Portal`` instance and instead just a socket address ``tuple``.

Sometimes it's handy to just have a simple way to figure out if
a "service" actor is up, so add this discovery helper for that. We'll
prolly just leave it undocumented for now until we figure out
a longer-term/better discovery system.
18 changes: 12 additions & 6 deletions tractor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@
stream,
context,
)
from ._discovery import get_arbiter, find_actor, wait_for_actor
from ._discovery import (
get_arbiter,
find_actor,
wait_for_actor,
query_actor,
)
from ._supervise import open_nursery
from ._state import current_actor, is_root_process
from ._exceptions import (
Expand All @@ -46,11 +51,15 @@
__all__ = [
'Channel',
'Context',
'ContextCancelled',
'ModuleNotExposed',
'MsgStream',
'MultiError',
'Portal',
'ReceiveMsgStream',
'RemoteActorError',
'ContextCancelled',
'breakpoint',
'context',
'current_actor',
'find_actor',
'get_arbiter',
Expand All @@ -59,14 +68,11 @@
'open_actor_cluster',
'open_nursery',
'open_root_actor',
'Portal',
'post_mortem',
'query_actor',
'run',
'run_daemon',
'stream',
'context',
'ReceiveMsgStream',
'MsgStream',
'to_asyncio',
'wait_for_actor',
]
78 changes: 55 additions & 23 deletions tractor/_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
Actor discovery API.
"""
import typing
from typing import Tuple, Optional, Union
from async_generator import asynccontextmanager
from typing import Tuple, Optional, Union, AsyncGenerator
from contextlib import asynccontextmanager as acm

from ._ipc import _connect_chan, Channel
from ._portal import (
Expand All @@ -31,13 +30,13 @@
from ._state import current_actor, _runtime_vars


@asynccontextmanager
@acm
async def get_arbiter(

host: str,
port: int,

) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]:
) -> AsyncGenerator[Union[Portal, LocalPortal], None]:
'''Return a portal instance connected to a local or remote
arbiter.
'''
Expand All @@ -58,10 +57,10 @@ async def get_arbiter(
yield arb_portal


@asynccontextmanager
@acm
async def get_root(
**kwargs,
) -> typing.AsyncGenerator[Portal, None]:
) -> AsyncGenerator[Portal, None]:

host, port = _runtime_vars['_root_mailbox']
assert host is not None
Expand All @@ -71,49 +70,82 @@ async def get_root(
yield portal


@asynccontextmanager
async def find_actor(
@acm
async def query_actor(
name: str,
arbiter_sockaddr: Tuple[str, int] = None
) -> typing.AsyncGenerator[Optional[Portal], None]:
"""Ask the arbiter to find actor(s) by name.
arbiter_sockaddr: Optional[tuple[str, int]] = None,

Returns a connected portal to the last registered matching actor
known to the arbiter.
"""
) -> AsyncGenerator[tuple[str, int], None]:
'''
Simple address lookup for a given actor name.
Returns the (socket) address or ``None``.
'''
actor = current_actor()
async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal:
async with get_arbiter(
*arbiter_sockaddr or actor._arb_addr
) as arb_portal:

sockaddr = await arb_portal.run_from_ns('self', 'find_actor', name=name)
sockaddr = await arb_portal.run_from_ns(
'self',
'find_actor',
name=name,
)

# TODO: return portals to all available actors - for now just
# the last one that registered
if name == 'arbiter' and actor.is_arbiter:
raise RuntimeError("The current actor is the arbiter")

elif sockaddr:
yield sockaddr if sockaddr else None


@acm
async def find_actor(
name: str,
arbiter_sockaddr: Tuple[str, int] = None

) -> AsyncGenerator[Optional[Portal], None]:
'''
Ask the arbiter to find actor(s) by name.
Returns a connected portal to the last registered matching actor
known to the arbiter.
'''
async with query_actor(
name=name,
arbiter_sockaddr=arbiter_sockaddr,
) as sockaddr:

if sockaddr:
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
yield None


@asynccontextmanager
@acm
async def wait_for_actor(
name: str,
arbiter_sockaddr: Tuple[str, int] = None
) -> typing.AsyncGenerator[Portal, None]:
) -> AsyncGenerator[Portal, None]:
"""Wait on an actor to register with the arbiter.
A portal to the first registered actor is returned.
"""
actor = current_actor()

async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal:

sockaddrs = await arb_portal.run_from_ns('self', 'wait_for_actor', name=name)
async with get_arbiter(
*arbiter_sockaddr or actor._arb_addr,
) as arb_portal:
sockaddrs = await arb_portal.run_from_ns(
'self',
'wait_for_actor',
name=name,
)
sockaddr = sockaddrs[-1]

async with _connect_chan(*sockaddr) as chan:
Expand Down

0 comments on commit 71f19f2

Please sign in to comment.