Skip to content

Commit

Permalink
pm notify
Browse files Browse the repository at this point in the history
  • Loading branch information
trim21 committed Sep 26, 2024
1 parent e14f796 commit ac38bff
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 36 deletions.
20 changes: 8 additions & 12 deletions lib/debezium.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
import msgspec


class ChiiMember(msgspec.Struct):
"""table of chii_members as json"""

uid: int
new_notify: int


class ChiiNotify(msgspec.Struct):
"""table of chii_notify as json"""

Expand Down Expand Up @@ -35,11 +28,14 @@ class NotifyValue(msgspec.Struct):
op: str # 'r', 'c', 'd' ...


class MemberValuePayload(msgspec.Struct):
before: ChiiMember | None
after: ChiiMember | None
op: str # 'r', 'c', 'd' ...
class ChiiMember(msgspec.Struct):
"""table of chii_members as json"""

uid: int
newpm: int


class MemberValue(msgspec.Struct):
payload: MemberValuePayload
before: ChiiMember | None
after: ChiiMember | None
op: str # 'r', 'c', 'd' ...
60 changes: 36 additions & 24 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,12 @@ class TelegramApplication:
__user_ids: dict[int, set[int]]
__lock: aiorwlock.RWLock()
__queue: asyncio.Queue[Item]
__mq: asyncio.Queue[Msg]
__background_tasks: set

# queue for kafka message
__notify_queue: asyncio.Queue[Msg]
__pm_queue: asyncio.Queue[Msg]

def __init__(
self, redis_client: redis.Redis, pg_client: pg.PG, mysql_client: MySql
):
Expand Down Expand Up @@ -100,7 +103,8 @@ def __init__(
self.mysql = mysql_client
self.__lock = aiorwlock.RWLock()
self.__queue = asyncio.Queue(maxsize=config.QUEUE_SIZE)
self.__mq = asyncio.Queue(maxsize=config.QUEUE_SIZE)
self.__notify_queue = asyncio.Queue(maxsize=config.QUEUE_SIZE)
self.__pm_queue = asyncio.Queue(maxsize=config.QUEUE_SIZE)
self.__background_tasks = set()

async def init(self):
Expand Down Expand Up @@ -190,28 +194,36 @@ async def read_from_db(self):

@sslog.logger.catch
def __watch_kafka_messages(self, loop: asyncio.AbstractEventLoop) -> None:
q = self.__mq
logger.info("start watching kafka message")
consumer = KafkaConsumer(
"debezium.chii",
"debezium.chii.bangumi.chii_members",
"debezium.chii.bangumi.chii_notify",
)
for msg in consumer:
match msg.topic:
# case "debezium.chii.bangumi.chii_members":
# await self.handle_member(msg)
case "debezium.chii.bangumi.chii_members":
asyncio.run_coroutine_threadsafe(
self.__pm_queue.put(msg), loop
).result()
case "debezium.chii.bangumi.chii_notify":
asyncio.run_coroutine_threadsafe(q.put(msg), loop).result()
asyncio.run_coroutine_threadsafe(
self.__notify_queue.put(msg), loop
).result()

async def __handle_kafka_messages(self) -> None:
async def __handle_new_notify(self) -> None:
while True:
msg = await self.__mq.get()
print("new message from queue")
msg = await self.__notify_queue.get()
await self.handle_notify_change(msg)

async def __handle_new_pm(self) -> None:
while True:
msg = await self.__pm_queue.get()
await self.handle_member(msg)

def watch_kafka_message(self):
loop = asyncio.get_running_loop()
self.__background_tasks.add(loop.create_task(self.__handle_kafka_messages()))
self.__background_tasks.add(loop.create_task(self.__handle_new_notify()))
self.__background_tasks.add(loop.create_task(self.__handle_new_pm()))
self.__background_tasks.add(
Thread(target=self.__watch_kafka_messages, args=(loop,)).start()
)
Expand Down Expand Up @@ -263,32 +275,32 @@ async def handle_notify_change(self, msg: Msg):
for c in char:
await self.__queue.put(Item(c, msg, parse_mode=ParseMode.HTML))

member_decoder = msgspec.json.Decoder(debezium.MemberValue)

async def handle_member(self, msg: Msg):
with logger.catch(message="unexpected exception when parsing kafka messages"):
if not msg.value:
return

value: debezium.MemberValue = msgspec.json.decode(
msg.value, type=debezium.MemberValue
)
if value.payload.op != "u":
value: debezium.MemberValue = self.member_decoder.decode(msg.value)
if value.op != "u":
return

if value.payload.after.new_notify <= value.payload.before.new_notify:
after = value.after
before = value.before

if after is None:
return
if before is None:
return

if not value.payload.after.new_notify:
if after.newpm > before.newpm:
return

user_id = value.payload.after.uid
user_id = after.uid
if char := await self.is_watched_user_id(user_id):
for c in char:
await self.__queue.put(
Item(
c,
f"你有 {value.payload.after.new_notify} 条新通知",
)
)
await self.__queue.put(Item(c, "你有新私信"))

async def start_queue_consumer(self):
while True:
Expand Down

0 comments on commit ac38bff

Please sign in to comment.