Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-topic publishing #69

Merged
merged 5 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions adc/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
try:
from importlib.metadata import version, PackageNotFoundError
from importlib.metadata import PackageNotFoundError, version
except ImportError:
# NOTE: remove after dropping support for Python < 3.8
from importlib_metadata import version, PackageNotFoundError
from importlib_metadata import PackageNotFoundError, version

try:
__version__ = version("adc-streaming")
Expand Down
17 changes: 12 additions & 5 deletions adc/consumer.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import dataclasses
import enum
import logging
from datetime import datetime, timedelta
import threading
from collections import defaultdict
from datetime import datetime, timedelta
# Imports from typing are deprecated as of Python 3.9 but required for
# compatibility with earlier versions
from typing import Dict, Iterable, Iterator, List, Optional, Set, Union, Collection
from collections import defaultdict
from typing import (Collection, Dict, Iterable, Iterator, List, Optional, Set,
Union)

import confluent_kafka # type: ignore
import confluent_kafka.admin # type: ignore
Expand All @@ -15,6 +16,7 @@
from .errors import ErrorCallback, log_client_errors
from .oidc import set_oauth_cb


class LogicalOffset(enum.IntEnum):
BEGINNING = confluent_kafka.OFFSET_BEGINNING
EARLIEST = confluent_kafka.OFFSET_BEGINNING
Expand All @@ -26,6 +28,7 @@ class LogicalOffset(enum.IntEnum):

INVALID = confluent_kafka.OFFSET_INVALID


class Consumer:
conf: 'ConsumerConfig'
_consumer: confluent_kafka.Consumer
Expand Down Expand Up @@ -99,14 +102,15 @@ def mark_done(self, msg: confluent_kafka.Message, asynchronous: bool = True):
self._consumer.commit(msg, asynchronous=False)

def _offsets_for_position(self, partitions: Collection[confluent_kafka.TopicPartition],
position: Union[datetime, LogicalOffset]) -> List[confluent_kafka.TopicPartition]:
position: Union[datetime, LogicalOffset]) \
-> List[confluent_kafka.TopicPartition]:
if isinstance(position, datetime):
offset = int(position.timestamp() * 1000)
elif isinstance(position, LogicalOffset):
offset = position
else:
raise TypeError("Only datetime objects and logical offsets supported")

_partitions = [
confluent_kafka.TopicPartition(topic=tp.topic, partition=tp.partition, offset=offset)
for tp in partitions
Expand Down Expand Up @@ -248,6 +252,7 @@ def close(self):
""" Close the consumer, ending its subscriptions. """
self._consumer.close()


# Used to be called ConsumerStartPosition, though this was confusing because
# it only affects "auto.offset.reset" not the start position for a call to
# consume.
Expand All @@ -258,10 +263,12 @@ class ConsumerDefaultPosition(enum.Enum):
def __str__(self):
return self.name.lower()


# Alias to the old name
# TODO: Remove alias on the next breaking release
ConsumerStartPosition = ConsumerDefaultPosition


@dataclasses.dataclass
class ConsumerConfig:
broker_urls: List[str]
Expand Down
62 changes: 55 additions & 7 deletions adc/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,69 @@ def log_delivery_errors(
def raise_delivery_errors(kafka_error: confluent_kafka.KafkaError,
msg: confluent_kafka.Message) -> None:
if kafka_error is not None:
raise KafkaException.from_kafka_error(kafka_error)
raise KafkaException.from_kafka_error(kafka_error, msg)
elif msg.error() is not None:
raise KafkaException.from_kafka_error(msg.error())
raise KafkaException.from_kafka_error(msg.error(), msg)


def _get_topic_related_errors():
"""Build a set of all Kafka error codes which seem to relate to a specific topic.

This uses a list extracted from all documented error codes up to confluent_kafka v2.4,
but some of these errors did not exist or were not exposed in earlier versions.
To maintain backward compatibility, this function checks whether each error exists before
attempting to otherwise refer to it.
"""
err_names = [
"_UNKNOWN_TOPIC",
"_NO_OFFSET",
"_LOG_TRUNCATION",
"OFFSET_OUT_OF_RANGE",
"UNKNOWN_TOPIC_OR_PART",
"NOT_LEADER_FOR_PARTITION",
"TOPIC_EXCEPTION",
"NOT_ENOUGH_REPLICAS",
"NOT_ENOUGH_REPLICAS_AFTER_APPEND",
"INVALID_COMMIT_OFFSET_SIZE",
"TOPIC_AUTHORIZATION_FAILED",
"TOPIC_ALREADY_EXISTS",
"INVALID_PARTITIONS",
"INVALID_REPLICATION_FACTOR",
"INVALID_REPLICA_ASSIGNMENT",
"REASSIGNMENT_IN_PROGRESS",
"TOPIC_DELETION_DISABLED",
"OFFSET_NOT_AVAILABLE",
"PREFERRED_LEADER_NOT_AVAILABLE",
"NO_REASSIGNMENT_IN_PROGRESS",
"GROUP_SUBSCRIBED_TO_TOPIC",
"UNSTABLE_OFFSET_COMMIT",
"UNKNOWN_TOPIC_ID",
]
errors = set()
for name in err_names:
if hasattr(confluent_kafka.KafkaError, name):
errors.add(getattr(confluent_kafka.KafkaError, name))
else:
logger.debug(f"{name} does not exist in confluent_kafka version "
f"{confluent_kafka.__version__} ({confluent_kafka.libversion()})")
return errors


class KafkaException(Exception):
@classmethod
def from_kafka_error(cls, error):
return cls(error)
def from_kafka_error(cls, error, msg=None):
return cls(error, msg)

topic_related_errors = _get_topic_related_errors()

def __init__(self, error):
def __init__(self, error, msg=None):
self.error = error
self.name = error.name()
self.reason = error.str()
self.retriable = error.retriable()
self.fatal = error.fatal()
msg = f"Error communicating with Kafka: code={self.name} {self.reason}"
super(KafkaException, self).__init__(msg)
self.message = msg
ex_msg = f"Error communicating with Kafka: code={self.name} {self.reason}"
if msg and error.code() in KafkaException.topic_related_errors:
ex_msg += f" on topic {msg.topic()}"
super(KafkaException, self).__init__(ex_msg)
2 changes: 1 addition & 1 deletion adc/oidc.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def set_oauth_cb(config):

from authlib.integrations.requests_client import OAuth2Session
session = OAuth2Session(client_id, client_secret, scope=scope)

def oauth_cb(*_, **__):
token = session.fetch_token(
token_endpoint, grant_type='client_credentials')
Expand Down
23 changes: 16 additions & 7 deletions adc/producer.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import abc
from ast import comprehension
import dataclasses
import logging
from datetime import timedelta
from typing import Dict, List, Optional, Union

try: # this will work only in python >= 3.8
from typing import Literal
except ImportError:
Expand Down Expand Up @@ -34,15 +34,23 @@ def __init__(self, conf: 'ProducerConfig') -> None:
def write(self,
msg: Union[bytes, 'Serializable'],
headers: Optional[Union[dict, list]] = None,
delivery_callback: Optional[DeliveryCallback] = log_delivery_errors) -> None:
delivery_callback: Optional[DeliveryCallback] = log_delivery_errors,
topic: Optional[str] = None) -> None:
if isinstance(msg, Serializable):
msg = msg.serialize()
self.logger.debug("writing message to %s", self.conf.topic)
if topic is None:
if self.conf.topic is not None:
topic = self.conf.topic
else:
raise Exception("No topic specified for write: "
"Either configure a topic when consturcting the Producer, "
cnweaver marked this conversation as resolved.
Show resolved Hide resolved
"or specify the topic argument to write()")
self.logger.debug("writing message to %s", topic)
if delivery_callback is not None:
self._producer.produce(self.conf.topic, msg, headers=headers,
self._producer.produce(topic, msg, headers=headers,
on_delivery=delivery_callback)
else:
self._producer.produce(self.conf.topic, msg, headers=headers)
self._producer.produce(topic, msg, headers=headers)

def flush(self, timeout: timedelta = timedelta(seconds=10)) -> int:
"""Attempt to flush enqueued messages. Return the number of messages still
Expand Down Expand Up @@ -78,7 +86,7 @@ def __exit__(self, type, value, traceback) -> bool:
@dataclasses.dataclass
class ProducerConfig:
broker_urls: List[str]
topic: str
topic: Optional[str]
auth: Optional[SASLAuth] = None
error_callback: Optional[ErrorCallback] = log_client_errors

Expand All @@ -104,7 +112,8 @@ class ProducerConfig:
# between attempts to reconnect to Kafka.
reconnect_max_time: timedelta = timedelta(seconds=10)

compression_type: Optional[Union[Literal['gzip'], Literal['snappy'], Literal['lz4'], Literal['zstd']]] = None
compression_type: Optional[Union[Literal['gzip'], Literal['snappy'],
Literal['lz4'], Literal['zstd']]] = None

# maximum message size, before compression
message_max_bytes: Optional[int] = None
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ exclude = setup.py,
[tool:pytest]
log_cli = True
log_cli_level = INFO
testpaths = tests
51 changes: 49 additions & 2 deletions tests/test_kafka_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,43 @@ def test_contextmanager_support(self):
self.assertEqual(messages[1].value(), b"message 2")
self.assertEqual(messages[2].value(), b"message 3")

def test_multi_topic_handling(self):
"""Use a single producer object to write messages to multiple topics,
and check that a consumer can receive them all.

"""
topics = ["test_multi_1", "test_multi_2"]

# Push some messages in
producer = adc.producer.Producer(adc.producer.ProducerConfig(
broker_urls=[self.kafka.address],
topic=None,
auth=self.kafka.auth,
))
for i in range(0,8):
producer.write(str(i), topic=topics[i%2])
producer.flush()
logger.info("messages sent")

# check that we receive the messages from the right topics
consumer = adc.consumer.Consumer(adc.consumer.ConsumerConfig(
broker_urls=[self.kafka.address],
group_id="test_consumer",
auth=self.kafka.auth,
))
consumer.subscribe(topics)
stream = consumer.stream()
total_messages = 0;
for msg in stream:
if msg.error() is not None:
raise Exception(msg.error())
idx = int(msg.value())
self.assertEqual(msg.topic(), topics[idx%2])
total_messages += 1
if total_messages == 8:
break
self.assertEqual(total_messages, 8)


class KafkaDockerConnection:
"""Holds connection information for communicating with a Kafka broker running
Expand Down Expand Up @@ -437,6 +474,8 @@ def query_kafka_broker_address(self):
if not addrs:
return None
ip = addrs[0]['HostIp']
if len(ip) == 0:
ip = "localhost"
port = addrs[0]['HostPort']
return f"{ip}:{port}"

Expand Down Expand Up @@ -502,8 +541,16 @@ def get_or_create_container(self):
detach=True,
auto_remove=True,
network=self.net.name,
# Setting None below the OS pick an ephemeral port.
ports={"9092/tcp": None},
# Kafka insists on redirecting consumers to one of its advertised listeners,
# which it will get wrong if it is running in a private container network.
# To fix this, we need to tell it what to advertise, which means we must
# know what port will be visible from the host system, and we cannot use an
# ephemeral port, which would be known to us only after the container is
# started. Since we have to pick something, pick 9092, which means that
# these tests cannot run if there is already an instance of Kafka running on
# the same host.
ports={"9092/tcp": 9092},
command=["/root/runServer","--advertisedListener","SASL_SSL://localhost:9092"],
)

def get_or_create_docker_network(self):
Expand Down
Loading