From def58cf6bd5f178d816b79c27a92e47fd97eec78 Mon Sep 17 00:00:00 2001 From: Steven Landau Date: Tue, 15 May 2018 09:14:31 -0400 Subject: [PATCH] Pubsub support (#123) Add PubSub APIs with tests: * Added `pubsub_ls`, `pubsub_peers`, `pubsub_pub` as simple API wrappers * Added `pubsub_sub` as streaming wrapper returning a special API object for waiting for new messages and supporting closure using the context manager protocol --- ipfsapi/client.py | 174 +++++++++++++++++++++++++++++++++++++++ test/functional/tests.py | 93 ++++++++++++++++++++- test/run-tests.py | 4 +- 3 files changed, 268 insertions(+), 3 deletions(-) diff --git a/ipfsapi/client.py b/ipfsapi/client.py index 0eaed4bd..2566199e 100644 --- a/ipfsapi/client.py +++ b/ipfsapi/client.py @@ -77,6 +77,31 @@ def connect(host=DEFAULT_HOST, port=DEFAULT_PORT, base=DEFAULT_BASE, return client +class SubChannel: + """ + Wrapper for a pubsub subscription object that allows for easy + closing of subscriptions. + """ + + def __init__(self, sub): + self.__sub = sub + + def read_message(self): + return next(self.__sub) + + def __iter__(self): + return self.__sub + + def close(self): + self.__sub.close() + + def __enter__(self): + return self + + def __exit__(self, *a): + self.close() + + class Client(object): """A TCP client for interacting with an IPFS daemon. @@ -2221,3 +2246,152 @@ def get_pyobj(self, multihash, **kwargs): warnings.warn("Using `*_pyobj` on untrusted data is a security risk", DeprecationWarning) return self.cat(multihash, decoder='pickle', **kwargs) + + def pubsub_ls(self, **kwargs): + """Lists subscribed topics by name + + This method returns data that contains a list of + all topics the user is subscribed to. In order + to subscribe to a topic pubsub_sub must be called. + + .. code-block:: python + + # subscribe to a channel + >>> with c.pubsub_sub("hello") as sub: + ... c.pubsub_ls() + { + 'Strings' : ["hello"] + } + + Returns + ------- + dict : Dictionary with the key "Strings" who's value is an array of + topics we are subscribed to + """ + return self._client.request('/pubsub/ls', decoder='json', **kwargs) + + def pubsub_peers(self, topic=None, **kwargs): + """List the peers we are pubsubbing with. + + Lists the id's of other IPFS users who we + are connected to via some topic. Without specifying + a topic, IPFS peers from all subscribed topics + will be returned in the data. If a topic is specified + only the IPFS id's of the peers from the specified + topic will be returned in the data. + + .. code-block:: python + >>> c.pubsub_peers() + {'Strings': + [ + 'QmPbZ3SDgmTNEB1gNSE9DEf4xT8eag3AFn5uo7X39TbZM8', + 'QmQKiXYzoFpiGZ93DaFBFDMDWDJCRjXDARu4wne2PRtSgA', + ... + 'QmepgFW7BHEtU4pZJdxaNiv75mKLLRQnPi1KaaXmQN4V1a' + ] + } + + ## with a topic + + # subscribe to a channel + >>> with c.pubsub_sub('hello') as sub: + ... c.pubsub_peers(topic='hello') + {'String': + [ + 'QmPbZ3SDgmTNEB1gNSE9DEf4xT8eag3AFn5uo7X39TbZM8', + ... + # other peers connected to the same channel + ] + } + + Parameters + ---------- + topic : str + The topic to list connected peers of + (defaults to None which lists peers for all topics) + + Returns + ------- + dict : Dictionary with the ke "Strings" who's value is id of IPFS + peers we're pubsubbing with + """ + args = (topic,) if topic is not None else () + return self._client.request('/pubsub/peers', args, + decoder='json', **kwargs) + + def pubsub_pub(self, topic, payload, **kwargs): + """Publish a message to a given pubsub topic + + Publishing will publish the given payload (string) to + everyone currently subscribed to the given topic. + + All data (including the id of the publisher) is automatically + base64 encoded when published. + + .. code-block:: python + # publishes the message 'message' to the topic 'hello' + >>> c.pubsub_pub('hello', 'message') + [] + + Parameters + ---------- + topic : str + Topic to publish to + payload : Data to be published to the given topic + + Returns + ------- + list : empty list + """ + args = (topic, payload) + return self._client.request('/pubsub/pub', args, + decoder='json', **kwargs) + + def pubsub_sub(self, topic, discover=False, **kwargs): + """Subscribe to mesages on a given topic + + Subscribing to a topic in IPFS means anytime + a message is published to a topic, the subscribers + will be notified of the publication. + + The connection with the pubsub topic is opened and read. + The Subscription returned should be used inside a context + manager to ensure that it is closed properly and not left + hanging. + + .. code-block:: python + >>> sub = c.pubsub_sub('testing') + >>> with c.pubsub_sub('testing') as sub: + # publish a message 'hello' to the topic 'testing' + ... c.pubsub_pub('testing', 'hello') + ... for message in sub: + ... print(message) + ... # Stop reading the subscription after + ... # we receive one publication + ... break + {'from': '', + 'data': 'aGVsbG8=', + 'topicIDs': ['testing']} + + # NOTE: in order to receive published data + # you must already be subscribed to the topic at publication + # time. + + Parameters + ---------- + topic : str + Name of a topic to subscribe to + + discover : bool + Try to discover other peers subscibed to the same topic + (defaults to False) + + Returns + ------- + Generator wrapped in a context + manager that maintains a connection + stream to the given topic. + """ + args = (topic, discover) + return SubChannel(self._client.request('/pubsub/sub', args, + stream=True, decoder='json')) diff --git a/test/functional/tests.py b/test/functional/tests.py index 1f7f0172..ab4c1b61 100644 --- a/test/functional/tests.py +++ b/test/functional/tests.py @@ -7,6 +7,7 @@ import time import unittest import logging +import uuid import pytest @@ -600,7 +601,7 @@ def test_dir_make_fill_list_delete(self): self.api.files_stat(self.test_directory_path) -@skipIfOffline() +skipIfOffline() class TestBlockFunctions(unittest.TestCase): def setUp(self): self.api = ipfsapi.Client() @@ -829,6 +830,96 @@ def test_bitswap_unwant(self): result = self.api.bitswap_unwant(key='QmZTR5bcpQD7cFgTorqxZDYaew1Wqgfbd2ud9QqGPAkK2V') self.assertTrue(result is not None) +@skipIfOffline() +class IpfsApiPubSubTest(unittest.TestCase): + + def setUp(self): + self.api = ipfsapi.Client() + + def createTestChannel(self): + """ + Creates a unique topic for testing purposes + """ + return "{}.testing".format(uuid.uuid4()) + + def test_pubsub_pub_sub(self): + """ + We test both publishing and subscribing at + the same time because we cannot verify that + something has been properly published unless + we subscribe to that channel and receive it. + Likewise, we cannot accurately test a subscription + without publishing something on the topic we are subscribed + to. + """ + # the topic that will be published/subscribed to + topic = self.createTestChannel() + # the message that will be published + message = 'hello' + + expected_data = 'aGVsbG8=' + expected_topicIDs = [topic] + + + # get the subscription stream + with self.api.pubsub_sub(topic) as sub: + + # make sure something was actually returned from the subscription + assert sub is not None + + # publish a message to topic + self.api.pubsub_pub(topic, message) + + # get the message + sub_data = sub.read_message() + + # assert that the returned dict has the following keys + assert 'data' in sub_data + assert 'topicIDs' in sub_data + + assert sub_data['data'] == expected_data + assert sub_data['topicIDs'] == expected_topicIDs + + def test_pubsub_ls(self): + """ + Testing the ls, assumes we are able + to at least subscribe to a topic + """ + topic = self.createTestChannel() + expected_return = { 'Strings': [topic] } + + # subscribe to the topic testing + sub = self.api.pubsub_sub(topic) + + channels = None + try: + # grab the channels we're subscribed to + channels = self.api.pubsub_ls() + finally: + sub.close() + + assert channels == expected_return + + def test_pubsub_peers(self): + """ + Not sure how to test this since it fully depends + on who we're connected to. We may not even have + any peers + """ + peers = self.api.pubsub_peers() + + expected_return = { + 'Strings': [] + } + + # make sure the Strings key is in the map thats returned + assert 'Strings' in peers + + # ensure the value of 'Strings' is a list. + # The list may or may not be empty. + assert isinstance(peers['Strings'], list) + + # Run test for `.shutdown()` only as the last test in CI environments – it would be to annoying # during normal testing @skipIfOffline() diff --git a/test/run-tests.py b/test/run-tests.py index b91228a5..b2e379d3 100755 --- a/test/run-tests.py +++ b/test/run-tests.py @@ -73,7 +73,7 @@ def _contextlib_suppress(*exceptions): ################ # Spawn IPFS daemon in data directory -DAEMON = subprocess.Popen(["ipfs", "daemon"]) +DAEMON = subprocess.Popen(["ipfs", "daemon", "--enable-pubsub-experiment"]) os.environ["PY_IPFSAPI_TEST_DAEMON_PID"] = str(DAEMON.pid) # Collect the exit code of `DAEMON` when `SIGCHLD` is received @@ -121,4 +121,4 @@ def _contextlib_suppress(*exceptions): print("IPFS daemon was still running after test!", file=sys.stderr) -sys.exit(PYTEST_CODE) \ No newline at end of file +sys.exit(PYTEST_CODE)