Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose the capability to multiple topics #211

Merged
merged 1 commit into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions doc/user/stream.rst
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,32 @@ Kafka topics, and takes the form:

.. code:: bash

kafka://[username@]broker/topic[,topic2[,...]]
kafka://[username@]broker/[topic[,topic2[,...]]]

The broker takes the form :code:`hostname[:port]` and gives the URL to connect to a
Kafka broker. Optionally, a :code:`username` can be provided, which is used to select
among available credentials to use when communicating with the broker.
Finally, one can publish to a topic or subscribe to one or more topics to consume messages
from.
Finally, one can specify a number of topics to which to publish or subscribe.

Publishing to Multiple Topics
-------------------------------

A single stream object can be used to publish to multiple topics, and doing so uses resources
more efficiently by spawning fewer worker threads, opening fewer sockets, etc., than opening a
separate stream for each of several topics, but requires attention to one extra detail: When a
stream is opened for multiple topics, the topic must be specified when calling :code:`write()`,
in order to make unambiguous to which topic that particular message should be published:

.. code:: python

from hop import stream

with stream.open("kafka://hostname:port/topic1,topic2", "w") as s:
s.write({"my": "message"}, topic="topic2")

In fact, when opening a stream for writing, it is not necessary for the target URL to contain
a topic at all; if it does not, the topic to which to publish must always be specified when
calling :code:`write()`.

Committing Messages Manually
------------------------------
Expand Down
29 changes: 17 additions & 12 deletions hop/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ def open(self, url, mode="r", group_id=None, ignoretest=True, **kwargs):
in write mode or a :class:`Consumer` instance in read mode.

Raises:
ValueError: If the mode is not set to read/write, if more than
one topic is specified in write mode, or if more than one broker is specified
ValueError: If the mode is not set to read/write, if no topic
is specified in read mode, or if more than one broker is specified

"""
username, broker_addresses, topics = kafka.parse_kafka_url(url)
Expand All @@ -98,21 +98,20 @@ def open(self, url, mode="r", group_id=None, ignoretest=True, **kwargs):
logger.debug("connecting to addresses=%s username=%s topics=%s",
broker_addresses, group_id, topics)

if topics is None:
raise ValueError("no topic(s) specified in kafka URL")

if self.auth is not None:
credential = select_matching_auth(self.auth, broker_addresses[0], username)
else:
credential = None

if mode == "w":
if len(topics) != 1:
raise ValueError("must specify exactly one topic in write mode")
if topics is None or len(topics) != 1:
topics = [None]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the idea here that if they set multiple topics, you set the topic to None in the Producer so they are forced to manually enter topic in subsequent write calls so there is no ambiguity?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly; the choice we made at the adc level was to store only one topic (https://github.com/astronomy-commons/adc-streaming/blob/b1a5a81c52d2752a9fd63b9717c121285646d09b/adc/producer.py#L89), as either there is a specific topic to which the user wants to always publish or they will have to specify it with ever write, and there didn't seem to be much point in artificially restricting the caller in that latter case to one of the predeclared topics, so we just forget what topics they specified except that there was not exactly one.

if group_id is not None:
warnings.warn("group ID has no effect when opening a stream in write mode")
return Producer(broker_addresses, topics[0], auth=credential, **kwargs)
elif mode == "r":
if topics is None or len(topics) == 0:
raise ValueError("no topic(s) specified in kafka URL")
if group_id is None:
username = credential.username if credential is not None else None
group_id = _generate_group_id(username, 10)
Expand Down Expand Up @@ -220,7 +219,7 @@ def from_format(data, format, deserialize=True):
return models.JSONBlob(content=old)
# if we can't tell what the data is, pass it on unchanged
except (UnicodeDecodeError, json.JSONDecodeError):
logger.warning("Unknown message format; returning a Blob")
logger.info("Unknown message format; returning a Blob")
return models.Blob(content=message.value())

def load(self, input_):
Expand Down Expand Up @@ -470,7 +469,7 @@ def __init__(self, broker_addresses, topic, auth, **kwargs):
logger.info(f"publishing to topic: {topic}")

def write(self, message, headers=None,
delivery_callback=errors.raise_delivery_errors, test=False):
delivery_callback=errors.raise_delivery_errors, test=False, topic=None):
"""Write messages to a stream.


Expand All @@ -484,12 +483,15 @@ def write(self, message, headers=None,
is either delivered or permenantly fails to be delivered.
test: Message should be marked as a test message by adding a header
with key '_test'.
topic: The topic to which the message should be sent. This need not be specified if
the stream was opened with a URL containing exactly one topic name.
"""
message, headers = self._pack(message, headers, test=test)
self._producer.write(message, headers=headers, delivery_callback=delivery_callback)
self._producer.write(message, headers=headers, delivery_callback=delivery_callback,
topic=topic)

def write_raw(self, packed_message, headers=None,
delivery_callback=errors.raise_delivery_errors):
delivery_callback=errors.raise_delivery_errors, topic=None):
"""Write a pre-encoded message to the stream.

This is an advanced interface; for most purposes it is preferrable to use
Expand All @@ -502,9 +504,12 @@ def write_raw(self, packed_message, headers=None,
mapping strings to strings, or as a list of 2-tuples of strings.
delivery_callback: A callback which will be called when each message
is either delivered or permenantly fails to be delivered.
topic: The topic to which the message should be sent. This need not be specified if
the stream was opened with a URL containing exactly one topic name.
"""

self._producer.write(packed_message, headers=headers, delivery_callback=delivery_callback)
self._producer.write(packed_message, headers=headers, delivery_callback=delivery_callback,
topic=topic)

@staticmethod
def pack(message, headers=None, test=False, auth=None):
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

# requirements
install_requires = [
"adc-streaming >= 2.2.0",
"adc-streaming >= 2.4.0",
"dataclasses ; python_version < '3.7'",
"fastavro >= 1.4.0",
"pluggy >= 0.11",
Expand Down
9 changes: 7 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,13 @@ def __init__(self, broker, topic):
self.broker = broker
self.topic = topic

def write(self, msg, headers=[], delivery_callback=None):
self.broker.write(self.topic, msg, headers)
def write(self, msg, headers=[], delivery_callback=None, topic=None):
if topic is None:
if self.topic is not None:
topic = self.topic
else:
raise Exception("No topic specified for write")
self.broker.write(topic, msg, headers)

def close(self):
pass
Expand Down
53 changes: 42 additions & 11 deletions tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,7 @@ def test_stream_stop(circular_msg):

def test_stream_write(circular_msg, circular_text, mock_broker, mock_producer):
topic = "gcn"
mock_adc_producer = mock_producer(mock_broker, topic)
expected_msg = make_message_standard(circular_msg)

fixed_uuid = uuid4()

auth = Auth("user", "password")
Expand All @@ -298,7 +296,8 @@ def test_stream_write(circular_msg, circular_text, mock_broker, mock_producer):
none_test_headers = [("_id", fixed_uuid.bytes), ("_sender", auth.username.encode("utf-8")),
('_test', b"true"), ("_format", b"circular")]

with patch("hop.io.producer.Producer", autospec=True, return_value=mock_adc_producer), \
mb = mock_broker
with patch("hop.io.producer.Producer", side_effect=lambda c: mock_producer(mb, c.topic)), \
patch("hop.io.uuid4", MagicMock(return_value=fixed_uuid)):

broker_url = f"kafka://localhost:port/{topic}"
Expand All @@ -307,10 +306,6 @@ def test_stream_write(circular_msg, circular_text, mock_broker, mock_producer):

stream = io.Stream(start_at=start_at, until_eos=until_eos, auth=auth)

# verify only 1 topic is allowed in write mode
with pytest.raises(ValueError):
stream.open("kafka://localhost:9092/topic1,topic2", "w")

# verify warning is raised when groupid is set in write mode
with pytest.warns(UserWarning):
stream.open("kafka://localhost:9092/topic1", "w", group_id="group")
Expand All @@ -337,14 +332,40 @@ def test_stream_write(circular_msg, circular_text, mock_broker, mock_producer):
s.close()
assert mock_broker.has_message(topic, expected_msg.value(), canonical_headers)

mock_broker.reset()
# more than one topics should now be allowed in write mode
with stream.open("kafka://localhost:9092/topic1,topic2", "w") as s:
with pytest.raises(Exception):
# however, a topic must be specified when calling write with multiple topics
# specified on construction
s.write(circular_msg, headers)

# selecting a topic explicitly when calling write should work
s.write(circular_msg, headers, topic="topic1")
assert mock_broker.has_message("topic1", expected_msg.value(), canonical_headers)
s.write(circular_msg, headers, topic="topic2")
assert mock_broker.has_message("topic2", expected_msg.value(), canonical_headers)

mock_broker.reset()
# no topic can also be specified in write mode
with stream.open("kafka://localhost:9092/", "w") as s:
with pytest.raises(Exception):
# however, a topic must be specified when calling write
s.write(circular_msg, headers)

s.write(circular_msg, headers, topic="topic1")
assert mock_broker.has_message("topic1", expected_msg.value(), canonical_headers)
s.write(circular_msg, headers, topic="topic2")
assert mock_broker.has_message("topic2", expected_msg.value(), canonical_headers)


def test_stream_write_raw(circular_msg, circular_text, mock_broker, mock_producer):
topic = "gcn"
mock_adc_producer = mock_producer(mock_broker, topic)
encoded_msg = io.Producer.pack(circular_msg)
headers = {"some header": "some value"}
canonical_headers = list(headers.items())
with patch("hop.io.producer.Producer", autospec=True, return_value=mock_adc_producer):
mb = mock_broker
with patch("hop.io.producer.Producer", side_effect=lambda c: mock_producer(mb, c.topic)):
stream = io.Stream(auth=False)

broker_url = f"kafka://localhost:9092/{topic}"
Expand All @@ -361,6 +382,14 @@ def test_stream_write_raw(circular_msg, circular_text, mock_broker, mock_produce
s.close()
assert mock_broker.has_message(topic, encoded_msg, canonical_headers)

with stream.open("kafka://localhost:9092/topic1,topic2", "w") as s:
with pytest.raises(Exception):
s.write_raw(encoded_msg, canonical_headers)
s.write_raw(encoded_msg, canonical_headers, topic="topic1")
assert mock_broker.has_message("topic1", encoded_msg, canonical_headers)
s.write_raw(encoded_msg, canonical_headers, topic="topic2")
assert mock_broker.has_message("topic2", encoded_msg, canonical_headers)


def test_stream_auth(auth_config, tmpdir):
# turning off authentication should give None for the auth property
Expand All @@ -385,7 +414,7 @@ def test_stream_auth(auth_config, tmpdir):
assert s4.auth == "blarg"


def test_stream_open(auth_config, tmpdir):
def test_stream_open(auth_config, mock_broker, mock_producer, tmpdir):
stream = io.Stream(auth=False)

# verify only read/writes are allowed
Expand All @@ -398,7 +427,7 @@ def test_stream_open(auth_config, tmpdir):
stream.open("bad://example.com/topic", "r")
assert "invalid kafka URL: must start with 'kafka://'" in err.value.args

# verify that URLs with no topic are rejected
# verify that URLs with no topic are rejected when reading
with pytest.raises(ValueError) as err:
stream.open("kafka://example.com/", "r")
assert "no topic(s) specified in kafka URL" in err.value.args
Expand All @@ -409,7 +438,9 @@ def test_stream_open(auth_config, tmpdir):
assert "Multiple broker addresses are not supported" in err.value.args

# verify that complete URLs are accepted
mb = mock_broker
with temp_config(tmpdir, auth_config) as config_dir, temp_environ(XDG_CONFIG_HOME=config_dir), \
patch("hop.io.producer.Producer", side_effect=lambda c: mock_producer(mb, c.topic)), \
patch("adc.consumer.Consumer.subscribe", MagicMock()) as subscribe:
stream = io.Stream()
# opening a valid URL for reading should succeed
Expand Down
Loading