-
-
Notifications
You must be signed in to change notification settings - Fork 421
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Extract contact creation logic to an external function
- Loading branch information
Showing
3 changed files
with
213 additions
and
101 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
from dataclasses import dataclass | ||
from typing import Optional | ||
|
||
from sqlalchemy.exc import IntegrityError | ||
|
||
from app.db import Session | ||
from app.email_utils import generate_reply_email | ||
from app.email_validation import is_valid_email | ||
from app.log import LOG | ||
from app.models import Contact, Alias | ||
from app.utils import sanitize_email | ||
|
||
|
||
@dataclass | ||
class ContactCreateResult: | ||
contact: Optional[Contact] | ||
error: Optional[str] | ||
|
||
|
||
def __update_contact_if_needed( | ||
contact: Contact, name: Optional[str], mail_from: Optional[str] | ||
) -> ContactCreateResult: | ||
if name and contact.name != name: | ||
LOG.d(f"Setting {contact} name to {name}") | ||
contact.name = name | ||
Session.commit() | ||
if mail_from and contact.mail_from is None: | ||
LOG.d(f"Setting {contact} mail_from to {mail_from}") | ||
contact.mail_from = mail_from | ||
Session.commit() | ||
return ContactCreateResult(contact, None) | ||
|
||
|
||
def create_contact( | ||
email: str, | ||
name: Optional[str], | ||
alias: Alias, | ||
mail_from: Optional[str] = None, | ||
allow_empty_email: bool = False, | ||
automatic_created: bool = False, | ||
from_partner: bool = False, | ||
) -> ContactCreateResult: | ||
if name is not None: | ||
name = name[: Contact.MAX_NAME_LENGTH] | ||
if name is not None and "\x00" in name: | ||
LOG.w("Cannot use contact name because has \\x00") | ||
name = "" | ||
if not is_valid_email(email): | ||
LOG.w(f"invalid contact email {email}") | ||
if not allow_empty_email: | ||
return ContactCreateResult(None, "Invalid email") | ||
LOG.d("Create a contact with invalid email for %s", alias) | ||
# either reuse a contact with empty email or create a new contact with empty email | ||
email = "" | ||
email = sanitize_email(email, not_lower=True) | ||
contact = Contact.get_by(alias_id=alias.id, website_email=email) | ||
if contact is not None: | ||
return __update_contact_if_needed(contact, name, mail_from) | ||
reply_email = generate_reply_email(email, alias) | ||
try: | ||
flags = Contact.FLAG_PARTNER_CREATED if from_partner else 0 | ||
contact = Contact.create( | ||
user_id=alias.user_id, | ||
alias_id=alias.id, | ||
website_email=email, | ||
name=name, | ||
reply_email=reply_email, | ||
mail_from=mail_from, | ||
automatic_created=automatic_created, | ||
flags=flags, | ||
invalid_email=email == "", | ||
commit=True, | ||
) | ||
LOG.d( | ||
f"Created contact {contact} for alias {alias} with email {email} invalid_email={contact.invalid_email}" | ||
) | ||
except IntegrityError: | ||
Session.rollback() | ||
LOG.info( | ||
f"Contact with email {email} for alias_id {alias.id} already existed, fetching from DB" | ||
) | ||
contact = Contact.get_by(alias_id=alias.id, website_email=email) | ||
return __update_contact_if_needed(contact, name, mail_from) | ||
return ContactCreateResult(contact, None) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
from typing import Optional | ||
|
||
import pytest | ||
from app.contact_utils import create_contact | ||
from app.db import Session | ||
from app.models import ( | ||
Alias, | ||
Contact, | ||
) | ||
from tests.utils import create_new_user, random_email, random_token | ||
|
||
|
||
def create_provider(): | ||
# name auto_created from_partner | ||
yield ["name", "[email protected]", True, True] | ||
yield [None, None, True, True] | ||
yield [None, None, False, True] | ||
yield [None, None, True, False] | ||
yield [None, None, False, False] | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"name, mail_from, automatic_created, from_partner", create_provider() | ||
) | ||
def test_create_contact( | ||
name: Optional[str], | ||
mail_from: Optional[str], | ||
automatic_created: bool, | ||
from_partner: bool, | ||
): | ||
user = create_new_user() | ||
alias = Alias.create_new_random(user) | ||
Session.commit() | ||
email = random_email() | ||
contact_result = create_contact( | ||
email, | ||
name, | ||
alias, | ||
mail_from=mail_from, | ||
automatic_created=automatic_created, | ||
from_partner=from_partner, | ||
) | ||
assert contact_result.error is None | ||
contact = contact_result.contact | ||
assert contact.user_id == user.id | ||
assert contact.alias_id == alias.id | ||
assert contact.website_email == email | ||
assert contact.name == name | ||
assert contact.mail_from == mail_from | ||
assert contact.automatic_created == automatic_created | ||
assert not contact.invalid_email | ||
expected_flags = Contact.FLAG_PARTNER_CREATED if from_partner else 0 | ||
assert contact.flags == expected_flags | ||
|
||
|
||
def test_create_contact_email_email_not_allowed(): | ||
user = create_new_user() | ||
alias = Alias.create_new_random(user) | ||
Session.commit() | ||
contact_result = create_contact("", "", alias) | ||
assert contact_result.contact is None | ||
assert contact_result.error == "Invalid email" | ||
|
||
|
||
def test_create_contact_email_email_allowed(): | ||
user = create_new_user() | ||
alias = Alias.create_new_random(user) | ||
Session.commit() | ||
contact_result = create_contact("", "", alias, allow_empty_email=True) | ||
assert contact_result.error is None | ||
assert contact_result.contact is not None | ||
assert contact_result.contact.website_email == "" | ||
assert contact_result.contact.invalid_email | ||
|
||
|
||
def test_do_not_allow_invalid_email(): | ||
user = create_new_user() | ||
alias = Alias.create_new_random(user) | ||
Session.commit() | ||
contact_result = create_contact("potato", "", alias) | ||
assert contact_result.contact is None | ||
assert contact_result.error == "Invalid email" | ||
contact_result = create_contact("asdf\x00@gmail.com", "", alias) | ||
assert contact_result.contact is None | ||
assert contact_result.error == "Invalid email" | ||
|
||
|
||
def test_update_name_for_existing(): | ||
user = create_new_user() | ||
alias = Alias.create_new_random(user) | ||
Session.commit() | ||
email = random_email() | ||
contact_result = create_contact(email, "", alias) | ||
assert contact_result.error is None | ||
assert contact_result.contact is not None | ||
assert contact_result.contact.name == "" | ||
name = random_token() | ||
contact_result = create_contact(email, name, alias) | ||
assert contact_result.error is None | ||
assert contact_result.contact is not None | ||
assert contact_result.contact.name == name | ||
|
||
|
||
def test_update_mail_from_for_existing(): | ||
user = create_new_user() | ||
alias = Alias.create_new_random(user) | ||
Session.commit() | ||
email = random_email() | ||
contact_result = create_contact(email, "", alias) | ||
assert contact_result.error is None | ||
assert contact_result.contact is not None | ||
assert contact_result.contact.mail_from is None | ||
mail_from = random_email() | ||
contact_result = create_contact(email, "", alias, mail_from=mail_from) | ||
assert contact_result.error is None | ||
assert contact_result.contact is not None | ||
assert contact_result.contact.mail_from == mail_from |