Skip to content

Commit

Permalink
fix queue
Browse files Browse the repository at this point in the history
  • Loading branch information
trim21 committed Sep 26, 2024
1 parent 8df8900 commit 72d4baa
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 43 deletions.
3 changes: 2 additions & 1 deletion kafka.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Iterator, NamedTuple
from uuid import uuid4

from confluent_kafka import Consumer
from sslog import logger
Expand All @@ -17,7 +18,7 @@ class KafkaConsumer:
def __init__(self, *topics: str):
self.c = Consumer(
{
"group.id": "tg-notify-bot",
"group.id": "tg-notify-bot" + str(uuid4()),
"bootstrap.servers": f"{config.KAFKA_BROKER.host}:{config.KAFKA_BROKER.port}",
"auto.offset.reset": "earliest",
}
Expand Down
6 changes: 1 addition & 5 deletions lib/debezium.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,11 @@ class ChiiNotifyField(msgspec.Struct):
ntf_hash: int


class NotifyValuePayload(msgspec.Struct):
class NotifyValue(msgspec.Struct):
after: ChiiNotify | None
op: str # 'r', 'c', 'd' ...


class NotifyValue(msgspec.Struct):
payload: NotifyValuePayload


class MemberValuePayload(msgspec.Struct):
before: ChiiMember | None
after: ChiiMember | None
Expand Down
42 changes: 21 additions & 21 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
import sys
import time
import traceback
from concurrent.futures.thread import ThreadPoolExecutor
from threading import Thread
from typing import NamedTuple

import aiorwlock
import httpx
import janus
import msgspec.json
import redis.asyncio as redis
import sslog
import starlette
import starlette.applications
import telegram as tg
Expand Down Expand Up @@ -70,7 +70,7 @@ class TelegramApplication:
__user_ids: dict[int, set[int]]
__lock: aiorwlock.RWLock()
__queue: asyncio.Queue[Item]
__mq: janus.Queue[Msg]
__mq: asyncio.Queue[Msg]
__background_tasks: set

def __init__(
Expand All @@ -79,7 +79,7 @@ def __init__(
application = tg.ext.Application.builder()

if sys.platform == "win32":
proxy_url = "http://127.0.0.1:7890"
proxy_url = "http://192.168.1.3:7890"
application = application.proxy_url(proxy_url).get_updates_proxy_url(
proxy_url
)
Expand All @@ -100,7 +100,7 @@ def __init__(
self.mysql = mysql_client
self.__lock = aiorwlock.RWLock()
self.__queue = asyncio.Queue(maxsize=config.QUEUE_SIZE)
self.__mq = janus.Queue(maxsize=config.QUEUE_SIZE)
self.__mq = asyncio.Queue(maxsize=config.QUEUE_SIZE)
self.__background_tasks = set()

async def init(self):
Expand Down Expand Up @@ -188,44 +188,46 @@ async def read_from_db(self):
async with self.__lock.writer:
self.__user_ids = rr

def __watch_kafka_messages(self) -> None:
@sslog.logger.catch
def __watch_kafka_messages(self, loop: asyncio.AbstractEventLoop) -> None:
q = self.__mq
logger.info("start watching kafka message")
consumer = KafkaConsumer(
"debezium.chii",
"debezium.chii.bangumi.chii_notify",
)
sq = self.__mq.sync_q
for msg in consumer:
match msg.topic:
# case "debezium.chii.bangumi.chii_members":
# await self.handle_member(msg)
case "debezium.chii.bangumi.chii_notify":
sq.put(msg)
asyncio.run_coroutine_threadsafe(q.put(msg), loop).result()

async def __handle_kafka_messages(self) -> None:
q = self.__mq.async_q
while True:
msg = await q.get()
msg = await self.__mq.get()
print("new message from queue")
await self.handle_notify_change(msg)

async def watch_kafka_message(self):
def watch_kafka_message(self):
loop = asyncio.get_running_loop()
self.__background_tasks.add(loop.create_task(self.__handle_kafka_messages()))
self.__background_tasks.add(
loop.run_in_executor(ThreadPoolExecutor(), self.__watch_kafka_messages)
Thread(target=self.__watch_kafka_messages, args=(loop,)).start()
)

notify_decoder = msgspec.json.Decoder(debezium.NotifyValue)

async def handle_notify_change(self, msg: Msg):
with logger.catch(message="unexpected exception when parsing kafka messages"):
if not msg.value:
return

value: debezium.NotifyValue = msgspec.json.decode(
msg.value, type=debezium.NotifyValue
)
if value.payload.op != "c":
value: debezium.NotifyValue = self.notify_decoder.decode(msg.value)
if value.op != "c":
return

notify = value.payload.after
notify = value.after

if notify.timestamp < time.time() - 60 * 2:
# skip notification older than 2 min
Expand Down Expand Up @@ -297,15 +299,13 @@ async def start_queue_consumer(self):
def start_tasks(self):
loop = asyncio.get_event_loop()
task = loop.create_task(self.start_queue_consumer())
self.watch_kafka_message()

def _exit(*_args, **_kwargs):
sys.exit(1)

task.add_done_callback(_exit)
self.__background_tasks.add(task)
task = loop.create_task(self.watch_kafka_message())
task.add_done_callback(_exit)
self.__background_tasks.add(task)


class OAuthHTTPServer:
Expand Down Expand Up @@ -426,7 +426,7 @@ async def start() -> None:


def main() -> None:
asyncio.run(start())
asyncio.get_event_loop().run_until_complete(start())


if __name__ == "__main__":
Expand Down
16 changes: 1 addition & 15 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ aiorwlock = "1.4.0"
asyncmy = "0.2.9"
asyncpg = "0.29.0"
confluent-kafka = "2.5.3"
janus = "^1.0.0"
sslog = "^0.0.0a48"

[tool.poetry.group.dev.dependencies]
Expand Down

0 comments on commit 72d4baa

Please sign in to comment.