1
1
import dataclasses
2
2
import enum
3
3
import logging
4
- from datetime import datetime , timedelta
5
4
import threading
5
+ from collections import defaultdict
6
+ from datetime import datetime , timedelta
6
7
# Imports from typing are deprecated as of Python 3.9 but required for
7
8
# compatibility with earlier versions
8
- from typing import Dict , Iterable , Iterator , List , Optional , Set , Union , Collection
9
- from collections import defaultdict
9
+ from typing import ( Collection , Dict , Iterable , Iterator , List , Optional , Set ,
10
+ Union )
10
11
11
12
import confluent_kafka # type: ignore
12
13
import confluent_kafka .admin # type: ignore
15
16
from .errors import ErrorCallback , log_client_errors
16
17
from .oidc import set_oauth_cb
17
18
19
+
18
20
class LogicalOffset (enum .IntEnum ):
19
21
BEGINNING = confluent_kafka .OFFSET_BEGINNING
20
22
EARLIEST = confluent_kafka .OFFSET_BEGINNING
@@ -26,6 +28,7 @@ class LogicalOffset(enum.IntEnum):
26
28
27
29
INVALID = confluent_kafka .OFFSET_INVALID
28
30
31
+
29
32
class Consumer :
30
33
conf : 'ConsumerConfig'
31
34
_consumer : confluent_kafka .Consumer
@@ -99,14 +102,15 @@ def mark_done(self, msg: confluent_kafka.Message, asynchronous: bool = True):
99
102
self ._consumer .commit (msg , asynchronous = False )
100
103
101
104
def _offsets_for_position (self , partitions : Collection [confluent_kafka .TopicPartition ],
102
- position : Union [datetime , LogicalOffset ]) -> List [confluent_kafka .TopicPartition ]:
105
+ position : Union [datetime , LogicalOffset ]) \
106
+ -> List [confluent_kafka .TopicPartition ]:
103
107
if isinstance (position , datetime ):
104
108
offset = int (position .timestamp () * 1000 )
105
109
elif isinstance (position , LogicalOffset ):
106
110
offset = position
107
111
else :
108
112
raise TypeError ("Only datetime objects and logical offsets supported" )
109
-
113
+
110
114
_partitions = [
111
115
confluent_kafka .TopicPartition (topic = tp .topic , partition = tp .partition , offset = offset )
112
116
for tp in partitions
@@ -248,6 +252,7 @@ def close(self):
248
252
""" Close the consumer, ending its subscriptions. """
249
253
self ._consumer .close ()
250
254
255
+
251
256
# Used to be called ConsumerStartPosition, though this was confusing because
252
257
# it only affects "auto.offset.reset" not the start position for a call to
253
258
# consume.
@@ -258,10 +263,12 @@ class ConsumerDefaultPosition(enum.Enum):
258
263
def __str__ (self ):
259
264
return self .name .lower ()
260
265
266
+
261
267
# Alias to the old name
262
268
# TODO: Remove alias on the next breaking release
263
269
ConsumerStartPosition = ConsumerDefaultPosition
264
270
271
+
265
272
@dataclasses .dataclass
266
273
class ConsumerConfig :
267
274
broker_urls : List [str ]
0 commit comments