Skip to content

Commit

Permalink
Pubsub support (#123)
Browse files Browse the repository at this point in the history
Add PubSub APIs with tests:

 * Added `pubsub_ls`, `pubsub_peers`, `pubsub_pub` as simple API wrappers
 * Added `pubsub_sub` as streaming wrapper returning a special API object for waiting for new messages and supporting closure using the context manager protocol
  • Loading branch information
Steven Landau authored and Alexander255 committed May 15, 2018
1 parent 2d3fb8c commit def58cf
Show file tree
Hide file tree
Showing 3 changed files with 268 additions and 3 deletions.
174 changes: 174 additions & 0 deletions ipfsapi/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,31 @@ def connect(host=DEFAULT_HOST, port=DEFAULT_PORT, base=DEFAULT_BASE,
return client


class SubChannel:
"""
Wrapper for a pubsub subscription object that allows for easy
closing of subscriptions.
"""

def __init__(self, sub):
self.__sub = sub

def read_message(self):
return next(self.__sub)

def __iter__(self):
return self.__sub

def close(self):
self.__sub.close()

def __enter__(self):
return self

def __exit__(self, *a):
self.close()


class Client(object):
"""A TCP client for interacting with an IPFS daemon.
Expand Down Expand Up @@ -2221,3 +2246,152 @@ def get_pyobj(self, multihash, **kwargs):
warnings.warn("Using `*_pyobj` on untrusted data is a security risk",
DeprecationWarning)
return self.cat(multihash, decoder='pickle', **kwargs)

def pubsub_ls(self, **kwargs):
"""Lists subscribed topics by name
This method returns data that contains a list of
all topics the user is subscribed to. In order
to subscribe to a topic pubsub_sub must be called.
.. code-block:: python
# subscribe to a channel
>>> with c.pubsub_sub("hello") as sub:
... c.pubsub_ls()
{
'Strings' : ["hello"]
}
Returns
-------
dict : Dictionary with the key "Strings" who's value is an array of
topics we are subscribed to
"""
return self._client.request('/pubsub/ls', decoder='json', **kwargs)

def pubsub_peers(self, topic=None, **kwargs):
"""List the peers we are pubsubbing with.
Lists the id's of other IPFS users who we
are connected to via some topic. Without specifying
a topic, IPFS peers from all subscribed topics
will be returned in the data. If a topic is specified
only the IPFS id's of the peers from the specified
topic will be returned in the data.
.. code-block:: python
>>> c.pubsub_peers()
{'Strings':
[
'QmPbZ3SDgmTNEB1gNSE9DEf4xT8eag3AFn5uo7X39TbZM8',
'QmQKiXYzoFpiGZ93DaFBFDMDWDJCRjXDARu4wne2PRtSgA',
...
'QmepgFW7BHEtU4pZJdxaNiv75mKLLRQnPi1KaaXmQN4V1a'
]
}
## with a topic
# subscribe to a channel
>>> with c.pubsub_sub('hello') as sub:
... c.pubsub_peers(topic='hello')
{'String':
[
'QmPbZ3SDgmTNEB1gNSE9DEf4xT8eag3AFn5uo7X39TbZM8',
...
# other peers connected to the same channel
]
}
Parameters
----------
topic : str
The topic to list connected peers of
(defaults to None which lists peers for all topics)
Returns
-------
dict : Dictionary with the ke "Strings" who's value is id of IPFS
peers we're pubsubbing with
"""
args = (topic,) if topic is not None else ()
return self._client.request('/pubsub/peers', args,
decoder='json', **kwargs)

def pubsub_pub(self, topic, payload, **kwargs):
"""Publish a message to a given pubsub topic
Publishing will publish the given payload (string) to
everyone currently subscribed to the given topic.
All data (including the id of the publisher) is automatically
base64 encoded when published.
.. code-block:: python
# publishes the message 'message' to the topic 'hello'
>>> c.pubsub_pub('hello', 'message')
[]
Parameters
----------
topic : str
Topic to publish to
payload : Data to be published to the given topic
Returns
-------
list : empty list
"""
args = (topic, payload)
return self._client.request('/pubsub/pub', args,
decoder='json', **kwargs)

def pubsub_sub(self, topic, discover=False, **kwargs):
"""Subscribe to mesages on a given topic
Subscribing to a topic in IPFS means anytime
a message is published to a topic, the subscribers
will be notified of the publication.
The connection with the pubsub topic is opened and read.
The Subscription returned should be used inside a context
manager to ensure that it is closed properly and not left
hanging.
.. code-block:: python
>>> sub = c.pubsub_sub('testing')
>>> with c.pubsub_sub('testing') as sub:
# publish a message 'hello' to the topic 'testing'
... c.pubsub_pub('testing', 'hello')
... for message in sub:
... print(message)
... # Stop reading the subscription after
... # we receive one publication
... break
{'from': '<base64encoded IPFS id>',
'data': 'aGVsbG8=',
'topicIDs': ['testing']}
# NOTE: in order to receive published data
# you must already be subscribed to the topic at publication
# time.
Parameters
----------
topic : str
Name of a topic to subscribe to
discover : bool
Try to discover other peers subscibed to the same topic
(defaults to False)
Returns
-------
Generator wrapped in a context
manager that maintains a connection
stream to the given topic.
"""
args = (topic, discover)
return SubChannel(self._client.request('/pubsub/sub', args,
stream=True, decoder='json'))
93 changes: 92 additions & 1 deletion test/functional/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import time
import unittest
import logging
import uuid

import pytest

Expand Down Expand Up @@ -600,7 +601,7 @@ def test_dir_make_fill_list_delete(self):
self.api.files_stat(self.test_directory_path)


@skipIfOffline()
skipIfOffline()
class TestBlockFunctions(unittest.TestCase):
def setUp(self):
self.api = ipfsapi.Client()
Expand Down Expand Up @@ -829,6 +830,96 @@ def test_bitswap_unwant(self):
result = self.api.bitswap_unwant(key='QmZTR5bcpQD7cFgTorqxZDYaew1Wqgfbd2ud9QqGPAkK2V')
self.assertTrue(result is not None)

@skipIfOffline()
class IpfsApiPubSubTest(unittest.TestCase):

def setUp(self):
self.api = ipfsapi.Client()

def createTestChannel(self):
"""
Creates a unique topic for testing purposes
"""
return "{}.testing".format(uuid.uuid4())

def test_pubsub_pub_sub(self):
"""
We test both publishing and subscribing at
the same time because we cannot verify that
something has been properly published unless
we subscribe to that channel and receive it.
Likewise, we cannot accurately test a subscription
without publishing something on the topic we are subscribed
to.
"""
# the topic that will be published/subscribed to
topic = self.createTestChannel()
# the message that will be published
message = 'hello'

expected_data = 'aGVsbG8='
expected_topicIDs = [topic]


# get the subscription stream
with self.api.pubsub_sub(topic) as sub:

# make sure something was actually returned from the subscription
assert sub is not None

# publish a message to topic
self.api.pubsub_pub(topic, message)

# get the message
sub_data = sub.read_message()

# assert that the returned dict has the following keys
assert 'data' in sub_data
assert 'topicIDs' in sub_data

assert sub_data['data'] == expected_data
assert sub_data['topicIDs'] == expected_topicIDs

def test_pubsub_ls(self):
"""
Testing the ls, assumes we are able
to at least subscribe to a topic
"""
topic = self.createTestChannel()
expected_return = { 'Strings': [topic] }

# subscribe to the topic testing
sub = self.api.pubsub_sub(topic)

channels = None
try:
# grab the channels we're subscribed to
channels = self.api.pubsub_ls()
finally:
sub.close()

assert channels == expected_return

def test_pubsub_peers(self):
"""
Not sure how to test this since it fully depends
on who we're connected to. We may not even have
any peers
"""
peers = self.api.pubsub_peers()

expected_return = {
'Strings': []
}

# make sure the Strings key is in the map thats returned
assert 'Strings' in peers

# ensure the value of 'Strings' is a list.
# The list may or may not be empty.
assert isinstance(peers['Strings'], list)


# Run test for `.shutdown()` only as the last test in CI environments – it would be to annoying
# during normal testing
@skipIfOffline()
Expand Down
4 changes: 2 additions & 2 deletions test/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def _contextlib_suppress(*exceptions):
################

# Spawn IPFS daemon in data directory
DAEMON = subprocess.Popen(["ipfs", "daemon"])
DAEMON = subprocess.Popen(["ipfs", "daemon", "--enable-pubsub-experiment"])
os.environ["PY_IPFSAPI_TEST_DAEMON_PID"] = str(DAEMON.pid)

# Collect the exit code of `DAEMON` when `SIGCHLD` is received
Expand Down Expand Up @@ -121,4 +121,4 @@ def _contextlib_suppress(*exceptions):

print("IPFS daemon was still running after test!", file=sys.stderr)

sys.exit(PYTEST_CODE)
sys.exit(PYTEST_CODE)

0 comments on commit def58cf

Please sign in to comment.