@@ -106,6 +106,7 @@ def get_processed_since(
106
106
def get_new_updates (
107
107
redis_client : Redis ,
108
108
stream_name : str = "product_updates_off" ,
109
+ min_id : Union [str , datetime .datetime ] = "$" ,
109
110
batch_size : int = 100 ,
110
111
) -> Iterator [RedisUpdate ]:
111
112
"""Reads new updates from a Redis Stream, starting from the moment this
@@ -115,34 +116,41 @@ def get_new_updates(
115
116
116
117
:param redis_client: the Redis client
117
118
:param stream_name: the name of the Redis stream to read from
119
+ :param min_id: the minimum ID to start from, defaults to "$".
118
120
:param batch_size: the size of the batch to fetch, defaults to 100
119
121
:yield: a RedisUpdate instance for each update
120
122
"""
121
123
yield from get_new_updates_multistream (
122
124
redis_client ,
123
125
[stream_name ],
126
+ min_id = min_id ,
124
127
batch_size = batch_size ,
125
128
)
126
129
127
130
128
131
def get_new_updates_multistream (
129
132
redis_client : Redis ,
130
133
stream_names : list [str ],
134
+ min_id : Union [str , datetime .datetime ] = "$" ,
131
135
batch_size : int = 100 ,
132
136
) -> Iterator [RedisUpdate ]:
133
137
"""Reads new updates from Redis Stream, starting from the moment this
134
138
function is called.
135
139
136
140
The function will block until new updates are available.
137
141
138
- :param redis_client: the Redis client
139
- :param stream_names: the names of the Redis streams to read from
142
+ :param redis_client: the Redis client.
143
+ :param stream_names: the names of the Redis streams to read from.
144
+ :param min_id: the minimum ID to start from, defaults to "$".
140
145
:param batch_size: the size of the batch to fetch, defaults to 100.
141
- :yield: a RedisUpdate instance for each update
146
+ :yield: a RedisUpdate instance for each update.
142
147
"""
148
+ if isinstance (min_id , datetime .datetime ):
149
+ min_id = f"{ int (min_id .timestamp () * 1000 )} -0"
150
+
143
151
# We start from the last ID
144
152
min_ids : dict [Union [bytes , str , memoryview ], Union [int , bytes , str , memoryview ]] = {
145
- stream_name : "$" for stream_name in stream_names
153
+ stream_name : min_id for stream_name in stream_names
146
154
}
147
155
while True :
148
156
logger .debug (
@@ -155,8 +163,8 @@ def get_new_updates_multistream(
155
163
156
164
for stream_name , batch in response :
157
165
# We update the min_id to the last ID of the batch
158
- min_id = batch [- 1 ][0 ]
159
- min_ids [stream_name ] = min_id
166
+ new_min_id = batch [- 1 ][0 ]
167
+ min_ids [stream_name ] = new_min_id
160
168
for timestamp_id , item in batch :
161
169
# Get the timestamp from the ID
162
170
timestamp = int (timestamp_id .split ("-" )[0 ])
0 commit comments