Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event Hubs Kafka endpoint doesn't match native Kafka behavior - OFFSET_BEGINNING #212

Open
3 tasks
joshuaphelpsms opened this issue Oct 10, 2022 · 1 comment

Comments

@joshuaphelpsms
Copy link

Description

Using the confluent_kafka Python client and calling consumer.store_offsets() with offset=OFFSET_BEGINNING succeeds, but it doesn't result in actually updating the stored offset to the beginning of the partition. Our workaround was to use offset=0

How to reproduce

The following reproducible files are below: consumer.py, producer.py, and reset_offset_beginning.py

consumer.py

from confluent_kafka import Consumer, KafkaException


if __name__ == '__main__':
    CONNECTION_STRING = '<CONNECTION_STRING>'
    NAMESPACE = '<NAMESPACE_FQDN>'
    GROUP = '<CONSUMER_GROUP>'
    TOPIC = '<TOPIC>'

    conf = {
        'bootstrap.servers': '%s:9093' % NAMESPACE,
        'group.id': GROUP,
        "enable.auto.offset.store": False,
        "enable.auto.commit": True,
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '$ConnectionString',
        'sasl.password': CONNECTION_STRING,
        'security.protocol': 'SASL_SSL',
    }

    c = Consumer(conf)

    def print_assignment(consumer, partitions):
        print('Assignment:', partitions)

    c.subscribe([TOPIC], on_assign=print_assignment)

    try:
        while True:
            msg = c.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                raise KafkaException(msg.error())
            else:
                print(f"Consumed message: msg: topic: {msg.topic()}, partition: {msg.partition()}, offset: {msg.offset()}")
                c.store_offsets(msg)

    except KeyboardInterrupt:
        print('%% Aborted by user\n')

    finally:
        c.close()

producer.py

from confluent_kafka import Producer


if __name__ == '__main__':
    CONNECTION_STRING = '<CONNECTION_STRING>'
    NAMESPACE = '<NAMESPACE_FQDN>'
    TOPIC = '<TOPIC>'

    conf = {
        'bootstrap.servers': '%s:9093' % NAMESPACE,
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '$ConnectionString',
        'sasl.password': CONNECTION_STRING,
        'security.protocol': 'SASL_SSL',
    }

    p = Producer(**conf)

    def delivery_callback(err, msg):
        if err:
            print('%% Message failed delivery: %s\n' % err)
        else:
            print('%% Message delivered to %s, partition=[%d],  offset=%d\n' %
                             (msg.topic(), msg.partition(), msg.offset()))

    # Write 1-3 to topic
    for i in range(0, 3):
        try:
            p.produce(TOPIC, str(i), partition=0, callback=delivery_callback)
        except BufferError:
            print('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %
                             len(p))
        p.poll(0)

    print('%% Waiting for %d deliveries\n' % len(p))
    p.flush()

reset_offset_beginning.py

from confluent_kafka import Consumer, TopicPartition, OFFSET_BEGINNING


if __name__ == "__main__":
    CONNECTION_STRING = '<CONNECTION_STRING>'
    NAMESPACE = '<NAMESPACE_FQDN>'
    GROUP = '<CONSUMER_GROUP>'
    TOPIC = '<TOPIC>'

    conf = {
        'bootstrap.servers': '%s:9093' % NAMESPACE,
        'group.id': GROUP,
        "enable.auto.offset.store": False,
        "enable.auto.commit": True,
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '$ConnectionString',
        'sasl.password': CONNECTION_STRING,
        'security.protocol': 'SASL_SSL',
    }

    c = Consumer(conf)
    # update to offset = 0 to see desired behavior
    offset = OFFSET_BEGINNING
    print(f"Resetting offset to {offset}")
    tp = TopicPartition(topic=TOPIC, partition=0, offset=offset)
    c.assign([tp])        
    c.store_offsets(offsets=[tp])
    c.close()

Steps

  1. Create an Event Hub with a single partition
  2. Install confluent_kafka pip install confluent-kafka
  3. Start the consumer
$ python consumer.py
Assignment: [TopicPartition{topic=topic1,partition=0,offset=-1001,error=None}
  1. In another terminal, produce some messages to the topic
$ python producer.py
Waiting for 3 deliveries
Message delivered to topic1, partition=0, offset=0
Message delivered to topic1, partition=0, offset=1
Message delivered to topic1, partition=0, offset=2
  1. Observe consumed messages in consumer terminal
Consumed message:  topic: topic1, partition: 0, offset: 0
Consumed message:  topic: topic1, partition: 0, offset: 1
Consumed message:  topic: topic1, partition: 0, offset: 2
  1. Stop the consumer, then reset the stored offset to OFFSET_BEGINNING
$ python reset_offset_beginning.py
Resetting offset to -2
  1. Restart the consumer and observe that the messages are not replayed
$ python consumer.py
Assignment: [TopicPartition{topic=topic1,partition=0,offset=-1001,error=None}]
  1. Stop the consumer. Update offset to offset = 0 on line 23 of reset_offset_beginning.py.
$ python reset_offset_beginning.py
Resetting offset to 0
  1. Restarting the consumer will replay the messages from offset 0 as expected.
$ python consumer.py
Assignment: [TopicPartition{topic=topic1,partition=0,offset=-1001,error=None}]
Consumed message: msg: topic: topic1, partition: 0, offset: 0
Consumed message: msg: topic: topic1, partition: 0, offset: 1
Consumed message: msg: topic: topic1, partition: 0, offset: 2

Has it worked previously?

No

Checklist

IMPORTANT: We will close issues where the checklist has not been completed or where adequate information has not been provided.

Please provide the relevant information for the following items:

  • SDK (include version info): Name: confluent-kafka, Version: 1.9.2
  • Kafka client configuration: included above
  • Consumer or producer failure consumer.store_offsets() succeeds but does not match native Kafka behavior
@assures
Copy link
Collaborator

assures commented Mar 23, 2023

Hi @joshuaphelpsms, do you notice any difference in behavior with the native Kafka broker? I just did some testing with Kafka v3.3.1 and observed the same behavior. The client doesn't seem to send an offset commit request to the broker/service to really commit the OFFSET_BEGINNING offset.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants