Skip to content

Commit

Permalink
Bidirectional communication for ObjectType (#12)
Browse files Browse the repository at this point in the history
Co-authored-by: Matthew Runyon <[email protected]>
  • Loading branch information
niloc132 and mattrunyon authored Aug 9, 2023
1 parent a40d0d8 commit 512c9cf
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 86 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,4 @@ cython_debug/
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
.idea/
35 changes: 31 additions & 4 deletions src/deephaven/plugin/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import abc
from typing import Union, Type

"""
The deephaven.plugin module provides an API and registration mechanism to add new behavior to the Deephaven
server. Plugins should be registered by adding a Registration instance as an entrypoint to the Python package.
"""
__version__ = "0.4.0.dev0"

DEEPHAVEN_PLUGIN_ENTRY_KEY = "deephaven.plugin"
Expand All @@ -11,15 +15,38 @@ class Plugin(abc.ABC):
pass


class Callback(abc.ABC):
"""
An instance of Callback will be passed to Registration.register_into, to allow any number of plugins to be
registered.
"""

@abc.abstractmethod
def register(self, plugin: Union[Plugin, Type[Plugin]]) -> None:
"""
Registers a given plugin type for use in the Deephaven server. Should be called from from a Registration's
register_into method, so that it is available when the server expects it.
:param plugin: the plugin or plugin type to register on the server
:return:
"""
pass


class Registration(abc.ABC):
class Callback(abc.ABC):
@abc.abstractmethod
def register(self, plugin: Union[Plugin, Type[Plugin]]) -> None:
pass
"""
Registration types should be set as the registration_cls for deephaven.plugin entrypoints for their package to
ensure that they are all run on server startup.
"""

@classmethod
@abc.abstractmethod
def register_into(cls, callback: Callback) -> None:
"""
Implement this method and reference this Registration type from the package's entrypoint to ensure that any
provided plugins are available at server startup. Invoke callback.register() once for each provided plugin.
:param callback: invoke this once per plugin to register them for use in the server.
:return:
"""
pass

@classmethod
Expand Down
81 changes: 0 additions & 81 deletions src/deephaven/plugin/object.py

This file was deleted.

98 changes: 98 additions & 0 deletions src/deephaven/plugin/object_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import abc
from typing import Optional, Union, Type, List, Any

from . import Plugin, Registration, register_all_into

"""
Tools to create ObjectType plugins in Python to run on a Deephaven server.
"""


class Reference:
"""A reference."""

def __init__(self, index: int, type: Optional[str]):
self._index = index
self._type = type

@property
def index(self) -> int:
"""The index, which is defined by the order in which references are created.
May be used in the output stream to refer to the reference from the client."""
return self._index


class Exporter(abc.ABC):
"""The interface for creating new references during FetchOnlyObjectBase.to_bytes."""

@abc.abstractmethod
def reference(self, obj: object) -> Reference:
"""Creates a reference for an object, ensuring that it is exported for use on the client. Each time this is
called, a new reference will be returned, with the index of the export in the data to be sent to the client.
"""
pass


class MessageStream(abc.ABC):
"""A stream of messages, either sent from server to client or client to server. ObjectType implementations
provide an instance of this interface for each incoming stream to invoke as messages arrive, and will
likewise be given an instance of this interface to be able to send messages to the client.
"""

def __init__(self):
pass

@abc.abstractmethod
def on_close(self) -> None:
"""Closes the stream on both ends. No further messages can be sent or received."""
pass

@abc.abstractmethod
def on_data(self, payload: bytes, references: List[Any]) -> None:
"""Transmits data to the remote end of the stream. This can consist of a binary payload and references to
objects on the server.
"""
pass


class ObjectType(Plugin):
"""An object type plugin. Useful for serializing custom objects between the server / client."""

@property
@abc.abstractmethod
def name(self) -> str:
"""The name of the object type."""
pass

@abc.abstractmethod
def is_type(self, obj: Any) -> bool:
"""Returns True if, and only if, the object is compatible with this object type."""
pass


class BidirectionalObjectType(ObjectType):
"""Base class for an object type that can continue to send responses to the client, or receive requests
from the server even after it is fetched.
"""

@abc.abstractmethod
def create_client_connection(self, obj: object, connection: MessageStream) -> MessageStream:
"""Signals creation of a client stream to the specified object. The returned MessageStream implementation will
be called with each received message from the client, and can call the provided connection parameter to send
messages as needed to the client.
Before returning, this method must call connection.on_message with some initial payload, so that the client has
an initial view of the object.
"""
pass


class FetchOnlyObjectType(ObjectType):
"""Base class for an object type which will only be fetched once, rather than support streaming requests or
responses.
"""

@abc.abstractmethod
def to_bytes(self, exporter: Exporter, obj: Any) -> bytes:
"""Serializes obj into bytes. Must only be called with a compatible object."""
pass

0 comments on commit 512c9cf

Please sign in to comment.