Skip to content

Commit da1271d

Browse files
authored
Updated redis version. (#125)
Signed-off-by: Pavel Kirilin <s3riussan@gmail.com>
1 parent 374c789 commit da1271d

10 files changed

Lines changed: 1148 additions & 987 deletions

.github/workflows/test.yml

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,6 @@ on:
44
pull_request:
55
paths-ignore:
66
- '*.md'
7-
push:
8-
paths-ignore:
9-
- '*.md'
107

118
permissions:
129
actions: read
@@ -26,10 +23,8 @@ jobs:
2623
- id: setup-uv
2724
uses: astral-sh/setup-uv@v7
2825
with:
29-
enable-cache: true
30-
cache-suffix: "3.12"
3126
version: "latest"
32-
python-version: "3.12"
27+
python-version: "3.13"
3328
- name: Install deps
3429
run: uv sync --all-extras
3530
- name: Run lint check

.pre-commit-config.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ repos:
3636

3737
- id: mypy
3838
name: Validate types with MyPy
39-
entry: uv run mypy
39+
entry: uv run mypy --show-traceback .
4040
language: system
41+
pass_filenames: false
4142
types: [ python ]

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ classifiers = [
2626
"Programming Language :: Python :: 3.13",
2727
]
2828
dependencies = [
29-
"redis>=7.0.0,<8", # TODO: fix issues in tests with 7.1.0
29+
"redis>=8.0.0,<9",
3030
"taskiq>=0.12.0",
3131
]
3232

@@ -42,7 +42,7 @@ dev = [
4242
]
4343
lint = [
4444
"black>=25.11.0",
45-
"mypy>=1.19.0",
45+
"mypy>=2.0.0",
4646
"ruff>=0.14.7",
4747
]
4848
test = [

taskiq_redis/list_schedule_source.py

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ async def _get_previous_time_schedules(self) -> list[bytes]:
123123
).replace(second=0, microsecond=0) - datetime.timedelta(
124124
minutes=1,
125125
)
126-
schedules = []
126+
schedules: list[bytes] = []
127127
async with Redis(connection_pool=self._connection_pool) as redis:
128128
time_keys: list[str] = []
129129
# We need to get all the time keys and check if the time is less than
@@ -133,28 +133,28 @@ async def _get_previous_time_schedules(self) -> list[bytes]:
133133
if key_time and key_time <= minute_before:
134134
time_keys.append(key.decode())
135135
for key in time_keys:
136-
schedules.extend(await redis.lrange(key, 0, -1)) # type: ignore[misc]
136+
schedules.extend(await redis.lrange(key, 0, -1)) # type: ignore[arg-type]
137137

138138
return schedules
139139

140140
async def delete_schedule(self, schedule_id: str) -> None:
141141
"""Delete a schedule from the source."""
142142
async with Redis(connection_pool=self._connection_pool) as redis:
143-
schedule = await redis.getdel(self._get_data_key(schedule_id))
144-
if schedule is not None:
143+
raw_schedule = await redis.getdel(self._get_data_key(schedule_id))
144+
if raw_schedule is not None:
145145
logger.debug("Deleting schedule %s", schedule_id)
146146
schedule = model_validate(
147147
ScheduledTask,
148-
self._serializer.loadb(schedule),
148+
self._serializer.loadb(raw_schedule), # type: ignore[arg-type]
149149
)
150150
# We need to remove the schedule from the cron or time list.
151151
if schedule.cron is not None:
152-
await redis.lrem(self._get_cron_key(), 0, schedule_id) # type: ignore[misc]
152+
await redis.lrem(self._get_cron_key(), 0, schedule_id)
153153
elif schedule.time is not None:
154154
time_key = self._get_time_key(schedule.time)
155-
await redis.lrem(time_key, 0, schedule_id) # type: ignore[misc]
155+
await redis.lrem(time_key, 0, schedule_id)
156156
elif schedule.interval:
157-
await redis.lrem(self._get_interval_key(), 0, schedule_id) # type: ignore[misc]
157+
await redis.lrem(self._get_interval_key(), 0, schedule_id)
158158

159159
async def add_schedule(self, schedule: "ScheduledTask") -> None:
160160
"""Add a schedule to the source."""
@@ -168,14 +168,14 @@ async def add_schedule(self, schedule: "ScheduledTask") -> None:
168168
# This is an optimization, so we can get all the schedules
169169
# for the current time much faster.
170170
if schedule.cron is not None:
171-
await redis.rpush(self._get_cron_key(), schedule.schedule_id) # type: ignore[misc]
171+
await redis.rpush(self._get_cron_key(), schedule.schedule_id)
172172
elif schedule.time is not None:
173-
await redis.rpush( # type: ignore[misc]
173+
await redis.rpush(
174174
self._get_time_key(schedule.time),
175175
schedule.schedule_id,
176176
)
177177
elif schedule.interval:
178-
await redis.rpush( # type: ignore[misc]
178+
await redis.rpush(
179179
self._get_interval_key(),
180180
schedule.schedule_id,
181181
)
@@ -204,16 +204,16 @@ async def get_schedules(self) -> list["ScheduledTask"]:
204204
timed = await self._get_previous_time_schedules()
205205
self._is_first_run = False
206206
async with Redis(connection_pool=self._connection_pool) as redis:
207-
buffer = []
208-
crons = await redis.lrange(self._get_cron_key(), 0, -1) # type: ignore[misc]
207+
buffer: list[bytes] = []
208+
crons = await redis.lrange(self._get_cron_key(), 0, -1)
209209
logger.debug("Got %d cron schedules", len(crons))
210210
if crons:
211-
buffer.extend(crons)
212-
intervals = await redis.lrange(self._get_interval_key(), 0, -1) # type: ignore[misc]
211+
buffer.extend(crons) # type: ignore[arg-type]
212+
intervals = await redis.lrange(self._get_interval_key(), 0, -1)
213213
logger.debug("Got %d interval schedules", len(intervals))
214214
if intervals:
215-
buffer.extend(intervals)
216-
timed.extend(await redis.lrange(self._get_time_key(current_time), 0, -1)) # type: ignore[misc]
215+
buffer.extend(intervals) # type: ignore[arg-type]
216+
timed.extend(await redis.lrange(self._get_time_key(current_time), 0, -1)) # type: ignore[arg-type]
217217
logger.debug("Got %d timed schedules", len(timed))
218218
if timed:
219219
buffer.extend(timed)
@@ -229,7 +229,7 @@ async def get_schedules(self) -> list["ScheduledTask"]:
229229
buffer = buffer[self._buffer_size :]
230230

231231
return [
232-
model_validate(ScheduledTask, self._serializer.loadb(schedule))
232+
model_validate(ScheduledTask, self._serializer.loadb(schedule)) # type: ignore[arg-type]
233233
for schedule in schedules
234234
if schedule
235235
]

taskiq_redis/redis_backend.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ async def get_result(
156156

157157
taskiq_result = model_validate(
158158
TaskiqResult[_ReturnType],
159-
self.serializer.loadb(result_value),
159+
self.serializer.loadb(result_value), # type: ignore[arg-type]
160160
)
161161

162162
if not with_logs:
@@ -208,7 +208,7 @@ async def get_progress(
208208

209209
return model_validate(
210210
TaskProgress[_ReturnType],
211-
self.serializer.loadb(result_value),
211+
self.serializer.loadb(result_value), # type: ignore[arg-type]
212212
)
213213

214214

@@ -333,7 +333,7 @@ async def get_result(
333333

334334
taskiq_result: TaskiqResult[_ReturnType] = model_validate(
335335
TaskiqResult[_ReturnType],
336-
self.serializer.loadb(result_value),
336+
self.serializer.loadb(result_value), # type: ignore[arg-type]
337337
)
338338

339339
if not with_logs:
@@ -384,7 +384,7 @@ async def get_progress(
384384

385385
return model_validate(
386386
TaskProgress[_ReturnType],
387-
self.serializer.loadb(result_value),
387+
self.serializer.loadb(result_value), # type: ignore[arg-type]
388388
)
389389

390390

@@ -519,7 +519,7 @@ async def get_result(
519519

520520
taskiq_result = model_validate(
521521
TaskiqResult[_ReturnType],
522-
self.serializer.loadb(result_value),
522+
self.serializer.loadb(result_value), # type: ignore[arg-type]
523523
)
524524

525525
if not with_logs:
@@ -571,7 +571,7 @@ async def get_progress(
571571

572572
return model_validate(
573573
TaskProgress[_ReturnType],
574-
self.serializer.loadb(result_value),
574+
self.serializer.loadb(result_value), # type: ignore[arg-type]
575575
)
576576

577577
async def shutdown(self) -> None:

taskiq_redis/redis_broker.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ async def kick(self, message: BrokerMessage) -> None:
114114
"""
115115
queue_name = message.labels.get("queue_name") or self.queue_name
116116
async with Redis(connection_pool=self.connection_pool) as redis_conn:
117-
await redis_conn.lpush(queue_name, message.message) # type: ignore
117+
await redis_conn.lpush(queue_name, message.message)
118118

119119
async def listen(self) -> AsyncGenerator[bytes, None]:
120120
"""
@@ -129,9 +129,10 @@ async def listen(self) -> AsyncGenerator[bytes, None]:
129129
while True:
130130
try:
131131
async with Redis(connection_pool=self.connection_pool) as redis_conn:
132-
yield (await redis_conn.brpop(self.queue_name))[ # type: ignore
133-
redis_brpop_data_position
134-
]
132+
brpop_result = await redis_conn.brpop(self.queue_name)
133+
if brpop_result is None:
134+
continue
135+
yield brpop_result[redis_brpop_data_position] # type: ignore[misc]
135136
except ConnectionError as exc:
136137
logger.warning("Redis connection error: %s", exc)
137138
continue
@@ -283,12 +284,14 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
283284
noack=False,
284285
count=self.count,
285286
)
286-
for stream, msg_list in fetched:
287-
for msg_id, msg in msg_list:
287+
if not fetched:
288+
continue
289+
for stream, msg_list in fetched: # type: ignore[str-unpack]
290+
for msg_id, msg in msg_list: # type: ignore[str-unpack,union-attr]
288291
logger.debug("Received message: %s", msg)
289292
yield AckableMessage(
290-
data=msg[b"data"],
291-
ack=self._ack_generator(id=msg_id, queue_name=stream),
293+
data=msg[b"data"], # type: ignore[arg-type,index]
294+
ack=self._ack_generator(id=msg_id, queue_name=stream), # type: ignore[arg-type]
292295
)
293296
logger.debug("Starting fetching unacknowledged messages")
294297
for stream in [self.queue_name, *self.additional_streams.keys()]:

taskiq_redis/redis_cluster_broker.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ async def kick(self, message: BrokerMessage) -> None:
5757
:param message: message to append.
5858
"""
5959
queue_name = message.labels.get("queue_name") or self.queue_name
60-
await self.redis.lpush(queue_name, message.message) # type: ignore
60+
await self.redis.lpush(queue_name, message.message)
6161

6262
async def listen(self) -> AsyncGenerator[bytes, None]:
6363
"""
@@ -70,8 +70,10 @@ async def listen(self) -> AsyncGenerator[bytes, None]:
7070
"""
7171
redis_brpop_data_position = 1
7272
while True:
73-
value = await self.redis.brpop([self.queue_name]) # type: ignore
74-
yield value[redis_brpop_data_position]
73+
value = await self.redis.brpop([self.queue_name])
74+
if value is None:
75+
continue
76+
yield value[redis_brpop_data_position] # type: ignore[misc]
7577

7678

7779
class RedisStreamClusterBroker(BaseRedisClusterBroker):
@@ -195,10 +197,12 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
195197
block=self.block,
196198
noack=False,
197199
)
198-
for stream, msg_list in fetched:
199-
for msg_id, msg in msg_list:
200+
if not fetched:
201+
continue
202+
for stream, msg_list in fetched: # type: ignore[str-unpack]
203+
for msg_id, msg in msg_list: # type: ignore[str-unpack,union-attr]
200204
logger.debug("Received message: %s", msg)
201205
yield AckableMessage(
202-
data=msg[b"data"],
203-
ack=self._ack_generator(id=msg_id, queue_name=stream),
206+
data=msg[b"data"], # type: ignore[arg-type,index]
207+
ack=self._ack_generator(id=msg_id, queue_name=stream), # type: ignore[arg-type]
204208
)

taskiq_redis/redis_sentinel_broker.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ async def kick(self, message: BrokerMessage) -> None:
105105
"""
106106
queue_name = message.labels.get("queue_name") or self.queue_name
107107
async with self._acquire_master_conn() as redis_conn:
108-
await redis_conn.lpush(queue_name, message.message) # type: ignore
108+
await redis_conn.lpush(queue_name, message.message)
109109

110110
async def listen(self) -> AsyncGenerator[bytes, None]:
111111
"""
@@ -119,9 +119,10 @@ async def listen(self) -> AsyncGenerator[bytes, None]:
119119
redis_brpop_data_position = 1
120120
async with self._acquire_master_conn() as redis_conn:
121121
while True:
122-
yield (await redis_conn.brpop(self.queue_name))[ # type: ignore
123-
redis_brpop_data_position
124-
]
122+
brpop_result = await redis_conn.brpop(self.queue_name)
123+
if brpop_result is None:
124+
continue
125+
yield brpop_result[redis_brpop_data_position] # type: ignore[misc]
125126

126127

127128
class RedisStreamSentinelBroker(BaseSentinelBroker):
@@ -252,10 +253,12 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
252253
block=self.block,
253254
noack=False,
254255
)
255-
for stream, msg_list in fetched:
256-
for msg_id, msg in msg_list:
256+
if not fetched:
257+
continue
258+
for stream, msg_list in fetched: # type: ignore[str-unpack]
259+
for msg_id, msg in msg_list: # type: ignore[str-unpack,union-attr]
257260
logger.debug("Received message: %s", msg)
258261
yield AckableMessage(
259-
data=msg[b"data"],
260-
ack=self._ack_generator(id=msg_id, queue_name=stream),
262+
data=msg[b"data"], # type: ignore[arg-type,index]
263+
ack=self._ack_generator(id=msg_id, queue_name=stream), # type: ignore[arg-type]
261264
)

taskiq_redis/schedule_source.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ async def get_schedules(self) -> list[ScheduledTask]:
103103
if buffer:
104104
schedules.extend(await redis.mget(buffer))
105105
return [
106-
model_validate(ScheduledTask, self.serializer.loadb(schedule))
106+
model_validate(ScheduledTask, self.serializer.loadb(schedule)) # type: ignore[arg-type]
107107
for schedule in schedules
108108
if schedule
109109
]
@@ -179,7 +179,7 @@ async def get_schedules(self) -> list[ScheduledTask]:
179179
raw_schedule = await self.redis.get(key)
180180
parsed_schedule = model_validate(
181181
ScheduledTask,
182-
self.serializer.loadb(raw_schedule),
182+
self.serializer.loadb(raw_schedule), # type: ignore[arg-type]
183183
)
184184
schedules.append(parsed_schedule)
185185
return schedules
@@ -277,7 +277,7 @@ async def get_schedules(self) -> list[ScheduledTask]:
277277
if buffer:
278278
schedules.extend(await redis.mget(buffer))
279279
return [
280-
model_validate(ScheduledTask, self.serializer.loadb(schedule))
280+
model_validate(ScheduledTask, self.serializer.loadb(schedule)) # type: ignore[arg-type]
281281
for schedule in schedules
282282
if schedule
283283
]

0 commit comments

Comments
 (0)