Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move more contact creation logic to a single function #2234

Merged
merged 3 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/api/views/alias.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ def create_contact_route(alias_id):
contact_address = data.get("contact")

try:
contact = create_contact(g.user, alias, contact_address)
contact = create_contact(alias, contact_address)
except ErrContactErrorUpgradeNeeded as err:
return jsonify(error=err.error_for_user()), 403
except (ErrAddressInvalid, CannotCreateContactForReverseAlias) as err:
Expand Down
38 changes: 31 additions & 7 deletions app/contact_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from sqlalchemy.exc import IntegrityError

from app.db import Session
from app.email_utils import generate_reply_email
from app.email_utils import generate_reply_email, parse_full_address
from app.email_validation import is_valid_email
from app.log import LOG
from app.models import Contact, Alias
Expand All @@ -14,11 +14,13 @@

class ContactCreateError(Enum):
InvalidEmail = "Invalid email"
NotAllowed = "Your plan does not allow to create contacts"


@dataclass
class ContactCreateResult:
contact: Optional[Contact]
created: bool
error: Optional[ContactCreateError]


Expand All @@ -33,34 +35,56 @@ def __update_contact_if_needed(
LOG.d(f"Setting {contact} mail_from to {mail_from}")
contact.mail_from = mail_from
Session.commit()
return ContactCreateResult(contact, None)
return ContactCreateResult(contact, created=False, error=None)


def create_contact(
email: str,
name: Optional[str],
alias: Alias,
name: Optional[str] = None,
mail_from: Optional[str] = None,
allow_empty_email: bool = False,
automatic_created: bool = False,
from_partner: bool = False,
) -> ContactCreateResult:
if name is not None:
# If user cannot create contacts, they still need to be created when receiving an email for an alias
if not automatic_created and not alias.user.can_create_contacts():
return ContactCreateResult(
None, created=False, error=ContactCreateError.NotAllowed
)
# Parse emails with form 'name <email>'
try:
email_name, email = parse_full_address(email)
except ValueError:
email = ""
email_name = ""
# If no name is explicitly given try to get it from the parsed email
if name is None:
name = email_name[: Contact.MAX_NAME_LENGTH]
else:
name = name[: Contact.MAX_NAME_LENGTH]
# If still no name is there, make sure the name is None instead of empty string
if not name:
name = None
if name is not None and "\x00" in name:
LOG.w("Cannot use contact name because has \\x00")
name = ""
# Sanitize email and if it's not valid only allow to create a contact if it's explicitly allowed. Otherwise fail
email = sanitize_email(email, not_lower=True)
if not is_valid_email(email):
LOG.w(f"invalid contact email {email}")
if not allow_empty_email:
return ContactCreateResult(None, ContactCreateError.InvalidEmail)
return ContactCreateResult(
None, created=False, error=ContactCreateError.InvalidEmail
)
LOG.d("Create a contact with invalid email for %s", alias)
# either reuse a contact with empty email or create a new contact with empty email
email = ""
email = sanitize_email(email, not_lower=True)
# If contact exists, update name and mail_from if needed
contact = Contact.get_by(alias_id=alias.id, website_email=email)
if contact is not None:
return __update_contact_if_needed(contact, name, mail_from)
# Create the contact
reply_email = generate_reply_email(email, alias)
try:
flags = Contact.FLAG_PARTNER_CREATED if from_partner else 0
Expand All @@ -86,4 +110,4 @@ def create_contact(
)
contact = Contact.get_by(alias_id=alias.id, website_email=email)
return __update_contact_if_needed(contact, name, mail_from)
return ContactCreateResult(contact, None)
return ContactCreateResult(contact, created=True, error=None)
45 changes: 14 additions & 31 deletions app/dashboard/views/alias_contact_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,10 @@
from wtforms import StringField, validators, ValidationError

# Need to import directly from config to allow modification from the tests
from app import config, parallel_limiter
from app import config, parallel_limiter, contact_utils
from app.contact_utils import ContactCreateError
from app.dashboard.base import dashboard_bp
from app.db import Session
from app.email_utils import (
generate_reply_email,
parse_full_address,
)
from app.email_validation import is_valid_email
from app.errors import (
CannotCreateContactForReverseAlias,
Expand All @@ -24,8 +21,8 @@
ErrContactAlreadyExists,
)
from app.log import LOG
from app.models import Alias, Contact, EmailLog, User
from app.utils import sanitize_email, CSRFValidationForm
from app.models import Alias, Contact, EmailLog
from app.utils import CSRFValidationForm


def email_validator():
Expand All @@ -51,7 +48,7 @@ def _check(form, field):
return _check


def create_contact(user: User, alias: Alias, contact_address: str) -> Contact:
def create_contact(alias: Alias, contact_address: str) -> Contact:
"""
Create a contact for a user. Can be restricted for new free users by enabling DISABLE_CREATE_CONTACTS_FOR_FREE_USERS.
Can throw exceptions:
Expand All @@ -61,37 +58,23 @@ def create_contact(user: User, alias: Alias, contact_address: str) -> Contact:
"""
if not contact_address:
raise ErrAddressInvalid("Empty address")
try:
contact_name, contact_email = parse_full_address(contact_address)
except ValueError:
output = contact_utils.create_contact(email=contact_address, alias=alias)
if output.error == ContactCreateError.InvalidEmail:
raise ErrAddressInvalid(contact_address)

contact_email = sanitize_email(contact_email)
if not is_valid_email(contact_email):
raise ErrAddressInvalid(contact_email)

contact = Contact.get_by(alias_id=alias.id, website_email=contact_email)
if contact:
raise ErrContactAlreadyExists(contact)

if not user.can_create_contacts():
elif output.error == ContactCreateError.NotAllowed:
raise ErrContactErrorUpgradeNeeded()
elif output.error is not None:
raise ErrAddressInvalid("Invalid address")
elif not output.created:
raise ErrContactAlreadyExists(output.contact)

contact = Contact.create(
user_id=alias.user_id,
alias_id=alias.id,
website_email=contact_email,
name=contact_name,
reply_email=generate_reply_email(contact_email, alias),
)

contact = output.contact
LOG.d(
"create reverse-alias for %s %s, reverse alias:%s",
contact_address,
alias,
contact.reply_email,
)
Session.commit()

return contact

Expand Down Expand Up @@ -261,7 +244,7 @@ def alias_contact_manager(alias_id):
if new_contact_form.validate():
contact_address = new_contact_form.email.data.strip()
try:
contact = create_contact(current_user, alias, contact_address)
contact = create_contact(alias, contact_address)
except (
ErrContactErrorUpgradeNeeded,
ErrAddressInvalid,
Expand Down
6 changes: 3 additions & 3 deletions app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ class Fido(Base, ModelMixin):
class User(Base, ModelMixin, UserMixin, PasswordOracle):
__tablename__ = "users"

FLAG_FREE_DISABLE_CREATE_ALIAS = 1 << 0
FLAG_DISABLE_CREATE_CONTACTS = 1 << 0
FLAG_CREATED_FROM_PARTNER = 1 << 1
FLAG_FREE_OLD_ALIAS_LIMIT = 1 << 2
FLAG_CREATED_ALIAS_FROM_PARTNER = 1 << 3
Expand Down Expand Up @@ -543,7 +543,7 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
# bitwise flags. Allow for future expansion
flags = sa.Column(
sa.BigInteger,
default=FLAG_FREE_DISABLE_CREATE_ALIAS,
default=FLAG_DISABLE_CREATE_CONTACTS,
server_default="0",
nullable=False,
)
Expand Down Expand Up @@ -1168,7 +1168,7 @@ def get_random_alias_suffix(self, custom_domain: Optional["CustomDomain"] = None
def can_create_contacts(self) -> bool:
if self.is_premium():
return True
if self.flags & User.FLAG_FREE_DISABLE_CREATE_ALIAS == 0:
if self.flags & User.FLAG_DISABLE_CREATE_CONTACTS == 0:
return True
return not config.DISABLE_CREATE_CONTACTS_FOR_FREE_USERS

Expand Down
4 changes: 2 additions & 2 deletions email_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ def get_or_create_contact(from_header: str, mail_from: str, alias: Alias) -> Con
contact_email = mail_from
contact_result = contact_utils.create_contact(
email=contact_email,
name=contact_name,
alias=alias,
name=contact_name,
mail_from=mail_from,
allow_empty_email=True,
automatic_created=True,
Expand Down Expand Up @@ -229,7 +229,7 @@ def get_or_create_reply_to_contact(
)
return None

return contact_utils.create_contact(contact_address, contact_name, alias).contact
return contact_utils.create_contact(contact_address, alias, contact_name).contact


def replace_header_when_forward(msg: Message, alias: Alias, header: str):
Expand Down
2 changes: 1 addition & 1 deletion tests/api/test_alias.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ def test_create_contact_route_free_users(flask_client):
assert r.status_code == 201

# End trial and disallow for new free users. Config should allow it
user.flags = User.FLAG_FREE_DISABLE_CREATE_ALIAS
user.flags = User.FLAG_DISABLE_CREATE_CONTACTS
Session.commit()
r = flask_client.post(
url_for("api.create_contact_route", alias_id=alias.id),
Expand Down
12 changes: 7 additions & 5 deletions tests/dashboard/test_alias_contact_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
Alias,
Contact,
)
from tests.utils import login
from tests.utils import login, random_email


def test_add_contact_success(flask_client):
Expand All @@ -13,26 +13,28 @@ def test_add_contact_success(flask_client):

assert Contact.filter_by(user_id=user.id).count() == 0

email = random_email()
# <<< Create a new contact >>>
flask_client.post(
url_for("dashboard.alias_contact_manager", alias_id=alias.id),
data={
"form-name": "create",
"email": "[email protected]",
"email": email,
},
follow_redirects=True,
)
# a new contact is added
assert Contact.filter_by(user_id=user.id).count() == 1
contact = Contact.filter_by(user_id=user.id).first()
assert contact.website_email == "[email protected]"
assert contact.website_email == email

# <<< Create a new contact using a full email format >>>
email = random_email()
flask_client.post(
url_for("dashboard.alias_contact_manager", alias_id=alias.id),
data={
"form-name": "create",
"email": "First Last <[email protected]>",
"email": f"First Last <{email}>",
},
follow_redirects=True,
)
Expand All @@ -41,7 +43,7 @@ def test_add_contact_success(flask_client):
contact = (
Contact.filter_by(user_id=user.id).filter(Contact.id != contact.id).first()
)
assert contact.website_email == "[email protected]"
assert contact.website_email == email
assert contact.name == "First Last"

# <<< Create a new contact with invalid email address >>>
Expand Down
Loading
Loading