Skip to content

Commit

Permalink
Add sync events (#2113)
Browse files Browse the repository at this point in the history
* feat: add protocol buffers for events

* chore: add EventDispatcher

* chore: add WebhookEvent class

* chore: emit events

* feat: initial version of event listener

* chore: emit user plan change with new timestamp

* feat: emit metrics + add alias status to create event

* chore: add newrelic decorator to functions

* fix: event emitter fixes

* fix: take null end_time into account

* fix: avoid double-commits

* chore: move UserDeleted event to User.delete method

* db: add index to sync_event created_at and taken_time columns

* chore: add index to model
  • Loading branch information
cquintana92 authored May 23, 2024
1 parent 60ab8c1 commit 3e0b7bb
Show file tree
Hide file tree
Showing 25 changed files with 690 additions and 7 deletions.
18 changes: 18 additions & 0 deletions app/alias_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
render,
)
from app.errors import AliasInTrashError
from app.events.event_dispatcher import EventDispatcher
from app.events.generated.event_pb2 import AliasDeleted, AliasStatusChange, EventContent
from app.log import LOG
from app.models import (
Alias,
Expand Down Expand Up @@ -334,6 +336,10 @@ def delete_alias(alias: Alias, user: User):
Alias.filter(Alias.id == alias.id).delete()
Session.commit()

EventDispatcher.send_event(
user, EventContent(alias_deleted=AliasDeleted(alias_id=alias.id))
)


def aliases_for_mailbox(mailbox: Mailbox) -> [Alias]:
"""
Expand Down Expand Up @@ -459,3 +465,15 @@ def transfer_alias(alias, new_user, new_mailboxes: [Mailbox]):
alias.pinned = False

Session.commit()


def change_alias_status(alias: Alias, enabled: bool, commit: bool = False):
alias.enabled = enabled

event = AliasStatusChange(
alias_id=alias.id, alias_email=alias.email, enabled=enabled
)
EventDispatcher.send_event(alias.user, EventContent(alias_status_change=event))

if commit:
Session.commit()
2 changes: 1 addition & 1 deletion app/api/views/alias.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def toggle_alias(alias_id):
if not alias or alias.user_id != user.id:
return jsonify(error="Forbidden"), 403

alias.enabled = not alias.enabled
alias_utils.change_alias_status(alias, enabled=not alias.enabled)
Session.commit()

return jsonify(enabled=alias.enabled), 200
Expand Down
2 changes: 2 additions & 0 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,3 +581,5 @@ def getRateLimitFromConfig(
UPCLOUD_DB_ID = os.environ.get("UPCLOUD_DB_ID", None)

STORE_TRANSACTIONAL_EMAILS = "STORE_TRANSACTIONAL_EMAILS" in os.environ

EVENT_WEBHOOK = os.environ.get("EVENT_WEBHOOK", None)
2 changes: 1 addition & 1 deletion app/dashboard/views/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def index():
alias_utils.delete_alias(alias, current_user)
flash(f"Alias {email} has been deleted", "success")
elif request.form.get("form-name") == "disable-alias":
alias.enabled = False
alias_utils.change_alias_status(alias, enabled=False)
Session.commit()
flash(f"Alias {alias.email} has been disabled", "success")

Expand Down
3 changes: 2 additions & 1 deletion app/dashboard/views/unsubscribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from flask import redirect, url_for, flash, request, render_template
from flask_login import login_required, current_user

from app import alias_utils
from app.dashboard.base import dashboard_bp
from app.handler.unsubscribe_encoder import UnsubscribeAction
from app.handler.unsubscribe_handler import UnsubscribeHandler
Expand All @@ -31,7 +32,7 @@ def unsubscribe(alias_id):

# automatic unsubscribe, according to https://tools.ietf.org/html/rfc8058
if request.method == "POST":
alias.enabled = False
alias_utils.change_alias_status(alias, False)
flash(f"Alias {alias.email} has been blocked", "success")
Session.commit()

Expand Down
Empty file added app/events/__init__.py
Empty file.
63 changes: 63 additions & 0 deletions app/events/event_dispatcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from abc import ABC, abstractmethod
from app import config
from app.db import Session
from app.errors import ProtonPartnerNotSetUp
from app.events.generated import event_pb2
from app.models import User, PartnerUser, SyncEvent
from app.proton.utils import get_proton_partner
from typing import Optional

NOTIFICATION_CHANNEL = "simplelogin_sync_events"


class Dispatcher(ABC):
@abstractmethod
def send(self, event: bytes):
pass


class PostgresDispatcher(Dispatcher):
def send(self, event: bytes):
instance = SyncEvent.create(content=event, flush=True)
Session.execute(f"NOTIFY {NOTIFICATION_CHANNEL}, '{instance.id}';")

@staticmethod
def get():
return PostgresDispatcher()


class EventDispatcher:
@staticmethod
def send_event(
user: User,
content: event_pb2.EventContent,
dispatcher: Dispatcher = PostgresDispatcher.get(),
skip_if_webhook_missing: bool = True,
):
if not config.EVENT_WEBHOOK and skip_if_webhook_missing:
return

partner_user = EventDispatcher.__partner_user(user.id)
if not partner_user:
return

event = event_pb2.Event(
user_id=user.id,
external_user_id=partner_user.external_user_id,
partner_id=partner_user.partner_id,
content=content,
)

serialized = event.SerializeToString()
dispatcher.send(serialized)

@staticmethod
def __partner_user(user_id: int) -> Optional[PartnerUser]:
# Check if the current user has a partner_id
try:
proton_partner_id = get_proton_partner().id
except ProtonPartnerNotSetUp:
return None

# It has. Retrieve the information for the PartnerUser
return PartnerUser.get_by(user_id=user_id, partner_id=proton_partner_id)
38 changes: 38 additions & 0 deletions app/events/generated/event_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

71 changes: 71 additions & 0 deletions app/events/generated/event_pb2.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Mapping as _Mapping, Optional as _Optional, Union as _Union

DESCRIPTOR: _descriptor.FileDescriptor

class UserPlanChange(_message.Message):
__slots__ = ("plan_end_time",)
PLAN_END_TIME_FIELD_NUMBER: _ClassVar[int]
plan_end_time: int
def __init__(self, plan_end_time: _Optional[int] = ...) -> None: ...

class UserDeleted(_message.Message):
__slots__ = ()
def __init__(self) -> None: ...

class AliasCreated(_message.Message):
__slots__ = ("alias_id", "alias_email", "alias_note", "enabled")
ALIAS_ID_FIELD_NUMBER: _ClassVar[int]
ALIAS_EMAIL_FIELD_NUMBER: _ClassVar[int]
ALIAS_NOTE_FIELD_NUMBER: _ClassVar[int]
ENABLED_FIELD_NUMBER: _ClassVar[int]
alias_id: int
alias_email: str
alias_note: str
enabled: bool
def __init__(self, alias_id: _Optional[int] = ..., alias_email: _Optional[str] = ..., alias_note: _Optional[str] = ..., enabled: bool = ...) -> None: ...

class AliasStatusChange(_message.Message):
__slots__ = ("alias_id", "alias_email", "enabled")
ALIAS_ID_FIELD_NUMBER: _ClassVar[int]
ALIAS_EMAIL_FIELD_NUMBER: _ClassVar[int]
ENABLED_FIELD_NUMBER: _ClassVar[int]
alias_id: int
alias_email: str
enabled: bool
def __init__(self, alias_id: _Optional[int] = ..., alias_email: _Optional[str] = ..., enabled: bool = ...) -> None: ...

class AliasDeleted(_message.Message):
__slots__ = ("alias_id", "alias_email")
ALIAS_ID_FIELD_NUMBER: _ClassVar[int]
ALIAS_EMAIL_FIELD_NUMBER: _ClassVar[int]
alias_id: int
alias_email: str
def __init__(self, alias_id: _Optional[int] = ..., alias_email: _Optional[str] = ...) -> None: ...

class EventContent(_message.Message):
__slots__ = ("user_plan_change", "user_deleted", "alias_created", "alias_status_change", "alias_deleted")
USER_PLAN_CHANGE_FIELD_NUMBER: _ClassVar[int]
USER_DELETED_FIELD_NUMBER: _ClassVar[int]
ALIAS_CREATED_FIELD_NUMBER: _ClassVar[int]
ALIAS_STATUS_CHANGE_FIELD_NUMBER: _ClassVar[int]
ALIAS_DELETED_FIELD_NUMBER: _ClassVar[int]
user_plan_change: UserPlanChange
user_deleted: UserDeleted
alias_created: AliasCreated
alias_status_change: AliasStatusChange
alias_deleted: AliasDeleted
def __init__(self, user_plan_change: _Optional[_Union[UserPlanChange, _Mapping]] = ..., user_deleted: _Optional[_Union[UserDeleted, _Mapping]] = ..., alias_created: _Optional[_Union[AliasCreated, _Mapping]] = ..., alias_status_change: _Optional[_Union[AliasStatusChange, _Mapping]] = ..., alias_deleted: _Optional[_Union[AliasDeleted, _Mapping]] = ...) -> None: ...

class Event(_message.Message):
__slots__ = ("user_id", "external_user_id", "partner_id", "content")
USER_ID_FIELD_NUMBER: _ClassVar[int]
EXTERNAL_USER_ID_FIELD_NUMBER: _ClassVar[int]
PARTNER_ID_FIELD_NUMBER: _ClassVar[int]
CONTENT_FIELD_NUMBER: _ClassVar[int]
user_id: int
external_user_id: str
partner_id: int
content: EventContent
def __init__(self, user_id: _Optional[int] = ..., external_user_id: _Optional[str] = ..., partner_id: _Optional[int] = ..., content: _Optional[_Union[EventContent, _Mapping]] = ...) -> None: ...
3 changes: 2 additions & 1 deletion app/handler/unsubscribe_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from aiosmtpd.smtp import Envelope

from app import config
from app import alias_utils
from app.db import Session
from app.email import headers, status
from app.email_utils import (
Expand Down Expand Up @@ -101,7 +102,7 @@ def _disable_alias(
mailbox.email, alias
):
return status.E509
alias.enabled = False
alias_utils.change_alias_status(alias, enabled=False)
Session.commit()
enable_alias_url = config.URL + f"/dashboard/?highlight_alias_id={alias.id}"
for mailbox in alias.mailboxes:
Expand Down
73 changes: 73 additions & 0 deletions app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,21 @@ def create(cls, email, name="", password=None, from_partner=False, **kwargs):

return user

@classmethod
def delete(cls, obj_id, commit=False):
# Internal import to avoid global import cycles
from app.events.event_dispatcher import EventDispatcher
from app.events.generated.event_pb2 import UserDeleted, EventContent

user: User = cls.get(obj_id)
EventDispatcher.send_event(user, EventContent(user_deleted=UserDeleted()))

res = super(User, cls).delete(obj_id)
if commit:
Session.commit()

return res

def get_active_subscription(
self, include_partner_subscription: bool = True
) -> Optional[
Expand Down Expand Up @@ -1619,6 +1634,18 @@ def create(cls, **kw):
Session.add(new_alias)
DailyMetric.get_or_create_today_metric().nb_alias += 1

# Internal import to avoid global import cycles
from app.events.event_dispatcher import EventDispatcher
from app.events.generated.event_pb2 import AliasCreated, EventContent

event = AliasCreated(
alias_id=new_alias.id,
alias_email=new_alias.email,
alias_note=new_alias.note,
enabled=True,
)
EventDispatcher.send_event(user, EventContent(alias_created=event))

if commit:
Session.commit()

Expand Down Expand Up @@ -3648,3 +3675,49 @@ def create(cls, **kwargs):
code = secrets.token_urlsafe(32)

return super().create(code=code, **kwargs)


class SyncEvent(Base, ModelMixin):
"""This model holds the events that need to be sent to the webhook"""

__tablename__ = "sync_event"
content = sa.Column(sa.LargeBinary, unique=False, nullable=False)
taken_time = sa.Column(
ArrowType, default=None, nullable=True, server_default=None, index=True
)

__table_args__ = (
sa.Index("ix_sync_event_created_at", "created_at"),
sa.Index("ix_sync_event_taken_time", "taken_time"),
)

def mark_as_taken(self) -> bool:
sql = """
UPDATE sync_event
SET taken_time = :taken_time
WHERE id = :sync_event_id
AND taken_time IS NULL
"""
args = {"taken_time": arrow.now().datetime, "sync_event_id": self.id}
res = Session.execute(sql, args)
return res.rowcount > 0

@classmethod
def get_dead_letter(cls, older_than: Arrow) -> [SyncEvent]:
return (
SyncEvent.filter(
(
(
SyncEvent.taken_time.isnot(None)
& (SyncEvent.taken_time < older_than)
)
| (
SyncEvent.taken_time.is_(None)
& (SyncEvent.created_at < older_than)
)
)
)
.order_by(SyncEvent.id)
.limit(100)
.all()
)
5 changes: 5 additions & 0 deletions app/subscription_webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from requests import RequestException

from app import config
from app.events.event_dispatcher import EventDispatcher
from app.events.generated.event_pb2 import EventContent, UserPlanChange
from app.log import LOG
from app.models import User

Expand Down Expand Up @@ -31,3 +33,6 @@ def execute_subscription_webhook(user: User):
)
except RequestException as e:
LOG.error(f"Subscription request exception: {e}")

event = UserPlanChange(plan_end_time=sl_subscription_end)
EventDispatcher.send_event(user, EventContent(user_plan_change=event))
4 changes: 2 additions & 2 deletions email_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
from sqlalchemy.exc import IntegrityError

from app import pgp_utils, s3, config
from app.alias_utils import try_auto_create
from app.alias_utils import try_auto_create, change_alias_status
from app.config import (
EMAIL_DOMAIN,
URL,
Expand Down Expand Up @@ -1585,7 +1585,7 @@ def handle_bounce_forward_phase(msg: Message, email_log: EmailLog):
LOG.w(
f"Disable alias {alias} because {reason}. {alias.mailboxes} {alias.user}. Last contact {contact}"
)
alias.enabled = False
change_alias_status(alias, enabled=False)

Notification.create(
user_id=user.id,
Expand Down
Loading

0 comments on commit 3e0b7bb

Please sign in to comment.