Skip to content

Commit e307fe0

Browse files
committed
feat(core): capture metric
1 parent 62bba87 commit e307fe0

File tree

3 files changed

+292
-137
lines changed

3 files changed

+292
-137
lines changed
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
from datetime import datetime, timezone
2+
import hashlib
3+
4+
from sqlalchemy import select, update, func, text
5+
6+
from ..schema.utils import asUUID
7+
from ..infra.db import DB_CLIENT
8+
from ..schema.orm import Metric
9+
10+
11+
async def capture_increment(project_id: asUUID, tag: str, increment: int = 1) -> None:
12+
"""
13+
Fetch the latest metric row for the given project and tag.
14+
- If no row exists for today, create a new one.
15+
- Then increment the `increment` field on that row atomically on the DB server side.
16+
17+
Uses PostgreSQL advisory locks to prevent race conditions when multiple
18+
concurrent calls try to create the same row.
19+
"""
20+
async with DB_CLIENT.get_session_context() as session:
21+
# Always compare dates in a consistent timezone (UTC)
22+
today_utc = datetime.now(timezone.utc).date()
23+
24+
# Generate a unique lock key from project_id, tag, and date
25+
# PostgreSQL advisory locks use bigint, so we hash the combination
26+
lock_key_str = f"{project_id}:{tag}:{today_utc}"
27+
lock_key = int(hashlib.md5(lock_key_str.encode()).hexdigest()[:15], 16)
28+
29+
# Acquire an advisory lock for this specific (project_id, tag, date) combination
30+
# This ensures only one transaction can check/create at a time
31+
await session.execute(
32+
text("SELECT pg_advisory_xact_lock(:lock_key)"), {"lock_key": lock_key}
33+
)
34+
35+
# Now check if any row exists for today (we hold the lock, so safe)
36+
check_stmt = (
37+
select(Metric)
38+
.where(
39+
Metric.project_id == project_id,
40+
Metric.tag == tag,
41+
func.date(Metric.created_at) == today_utc,
42+
)
43+
.order_by(Metric.created_at.desc())
44+
.limit(1)
45+
)
46+
result = await session.scalars(check_stmt)
47+
metric = result.first()
48+
49+
# If there is no metric yet for today, create a new row
50+
if metric is None:
51+
metric = Metric(project_id=project_id, tag=tag, increment=0)
52+
session.add(metric)
53+
# Flush so that metric.id is available for the UPDATE statement
54+
await session.flush()
55+
56+
# Atomically increment the counter on the database server side to avoid data races
57+
await session.execute(
58+
update(Metric)
59+
.where(Metric.id == metric.id)
60+
.values(increment=Metric.increment + increment)
61+
)

0 commit comments

Comments
 (0)