Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not save bounces to retry later and notify user on reply phase of bounces #1420

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 48 additions & 27 deletions app/mail_sender.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
from __future__ import annotations
import base64
import email
import enum
import json
import os
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
from email.message import Message
from functools import wraps
from smtplib import SMTP, SMTPException
from smtplib import SMTP, SMTPException, SMTPDataError
from typing import Optional, Dict, List, Callable

import newrelic.agent
Expand Down Expand Up @@ -68,6 +69,12 @@ def load_from_bytes(data: bytes) -> SendRequest:
)


class SendResult(enum.Enum):
OK = 1
Failed = 2
Bounced = 3


class MailSender:
def __init__(self):
self._pool: Optional[ThreadPoolExecutor] = None
Expand Down Expand Up @@ -98,7 +105,7 @@ def wrapper(*args, **kwargs):
def enable_background_pool(self, max_workers=10):
self._pool = ThreadPoolExecutor(max_workers=max_workers)

def send(self, send_request: SendRequest, retries: int = 2) -> bool:
def send(self, send_request: SendRequest, retries: int = 2) -> SendResult:
"""replace smtp.sendmail"""
if self._store_emails:
self._emails_sent.append(send_request)
Expand All @@ -109,14 +116,30 @@ def send(self, send_request: SendRequest, retries: int = 2) -> bool:
send_request.msg[headers.FROM],
send_request.msg[headers.TO],
)
return True
return SendResult.OK
if not self._pool:
return self._send_to_smtp(send_request, retries)
return self._send_with_retries_if_needed(send_request, retries)
else:
self._pool.submit(self._send_to_smtp, (send_request, retries))
return True
self._pool.submit(
self._send_with_retries_if_needed, (send_request, retries)
)
return SendResult.OK

def _send_with_retries_if_needed(
self, send_request: SendRequest, retries: int
) -> SendResult:
result = self._send_to_smtp_server(send_request)
if result != SendResult.Failed:
return result
if retries > 0:
time.sleep(0.3 * retries)
return self._send_with_retries_if_needed(send_request, retries - 1)
else:
if not send_request.ignore_smtp_errors:
self._save_request_to_unsent_dir(send_request)
return SendResult.Failed

def _send_to_smtp(self, send_request: SendRequest, retries: int) -> bool:
def _send_to_smtp_server(self, send_request: SendRequest) -> SendResult:
if config.POSTFIX_SUBMISSION_TLS and config.POSTFIX_PORT == 25:
smtp_port = 587
else:
Expand Down Expand Up @@ -156,24 +179,25 @@ def _send_to_smtp(self, send_request: SendRequest, retries: int) -> bool:
newrelic.agent.record_custom_metric(
"Custom/smtp_sending_time", time.time() - start
)
return True
return SendResult.OK
except SMTPDataError as e:
if not send_request.ignore_smtp_errors:
LOG.e(
f"Could not send message to smtp server {config.POSTFIX_SERVER}:{smtp_port}"
)
if e.smtp_code >= 500:
return SendResult.Bounced
return SendResult.Failed
except (
SMTPException,
ConnectionRefusedError,
TimeoutError,
) as e:
if retries > 0:
time.sleep(0.3 * retries)
return self._send_to_smtp(send_request, retries - 1)
else:
if send_request.ignore_smtp_errors:
LOG.e(f"Ignore smtp error {e}")
return False
):
if not send_request.ignore_smtp_errors:
LOG.e(
f"Could not send message to smtp server {config.POSTFIX_SERVER}:{smtp_port}"
)
self._save_request_to_unsent_dir(send_request)
return False
return SendResult.Failed

def _save_request_to_unsent_dir(self, send_request: SendRequest):
file_name = f"DeliveryFail-{int(time.time())}-{uuid.uuid4()}.{SendRequest.SAVE_EXTENSION}"
Expand Down Expand Up @@ -221,15 +245,12 @@ def load_unsent_mails_from_fs_and_resend():
continue
try:
send_request.ignore_smtp_errors = True
if mail_sender.send(send_request, 2):
response = mail_sender.send(send_request, 2)
newrelic.agent.record_custom_event(
"DeliverUnsentEmail", {"delivered": f"{response.name}"}
)
if response != SendResult.Failed:
os.unlink(full_file_path)
newrelic.agent.record_custom_event(
"DeliverUnsentEmail", {"delivered": "true"}
)
else:
newrelic.agent.record_custom_event(
"DeliverUnsentEmail", {"delivered": "false"}
)
except Exception as e:
# Unlink original file to avoid re-doing the same
os.unlink(full_file_path)
Expand All @@ -254,7 +275,7 @@ def sl_sendmail(
is_forward: bool = False,
retries=2,
ignore_smtp_error=False,
):
) -> SendResult:
send_request = SendRequest(
envelope_from,
envelope_to,
Expand Down
6 changes: 4 additions & 2 deletions email_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@
from app.handler.unsubscribe_generator import UnsubscribeGenerator
from app.handler.unsubscribe_handler import UnsubscribeHandler
from app.log import LOG, set_message_id
from app.mail_sender import sl_sendmail
from app.mail_sender import sl_sendmail, SendResult
from app.message_utils import message_to_bytes
from app.models import (
Alias,
Expand Down Expand Up @@ -1249,14 +1249,16 @@ def handle_reply(envelope, msg: Message, rcpt_to: str) -> (bool, str):
add_dkim_signature(msg, alias_domain)

try:
sl_sendmail(
send_result = sl_sendmail(
generate_verp_email(VerpType.bounce_reply, email_log.id, alias_domain),
contact.website_email,
msg,
envelope.mail_options,
envelope.rcpt_options,
is_forward=False,
)
if send_result == SendResult.Bounced:
return False, status.E506

# if alias belongs to several mailboxes, notify other mailboxes about this email
other_mailboxes = [mb for mb in alias.mailboxes if mb.email != mailbox.email]
Expand Down
38 changes: 23 additions & 15 deletions tests/test_mail_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
mail_sender,
SendRequest,
load_unsent_mails_from_fs_and_resend,
SendResult,
)
from app import config

Expand Down Expand Up @@ -96,16 +97,19 @@ def compare_send_requests(expected: SendRequest, request: SendRequest):


@pytest.mark.parametrize(
"server_fn",
"server_fn, should_save",
[
close_on_connect_dummy_server,
closed_dummy_server,
smtp_response_server("421 Retry"),
smtp_response_server("500 error"),
(close_on_connect_dummy_server, True),
(closed_dummy_server, True),
(smtp_response_server("421 Retry"), True),
(smtp_response_server("500 error"), False),
],
)
def test_mail_sender_save_unsent_to_disk(server_fn):
def test_mail_sender_save_unsent_to_disk_if_failed(
server_fn: Callable, should_save: bool
):
original_postfix_server = config.POSTFIX_SERVER
original_postfix_port = config.POSTFIX_PORT
config.POSTFIX_SERVER = "localhost"
config.NOT_SEND_EMAIL = False
config.POSTFIX_SUBMISSION_TLS = False
Expand All @@ -114,16 +118,20 @@ def test_mail_sender_save_unsent_to_disk(server_fn):
with tempfile.TemporaryDirectory() as temp_dir:
config.SAVE_UNSENT_DIR = temp_dir
send_request = create_dummy_send_request()
assert not mail_sender.send(send_request, 0)
assert mail_sender.send(send_request, 0) != SendResult.OK
found_files = os.listdir(temp_dir)
assert len(found_files) == 1
loaded_send_request = SendRequest.load_from_file(
os.path.join(temp_dir, found_files[0])
)
compare_send_requests(loaded_send_request, send_request)
if should_save:
assert len(found_files) == 1
loaded_send_request = SendRequest.load_from_file(
os.path.join(temp_dir, found_files[0])
)
compare_send_requests(loaded_send_request, send_request)
else:
assert len(found_files) == 0
finally:
config.POSTFIX_SERVER = original_postfix_server
config.NOT_SEND_EMAIL = True
config.POSTFIX_PORT = original_postfix_port


@mail_sender.store_emails_test_decorator
Expand All @@ -135,7 +143,7 @@ def test_send_unsent_email_from_fs():
try:
config.SAVE_UNSENT_DIR = temp_dir
send_request = create_dummy_send_request()
assert not mail_sender.send(send_request, 1)
assert mail_sender.send(send_request, 1) == SendResult.Failed
finally:
config.POSTFIX_SERVER = original_postfix_server
config.NOT_SEND_EMAIL = True
Expand All @@ -162,7 +170,7 @@ def test_failed_resend_does_not_delete_file():
config.SAVE_UNSENT_DIR = temp_dir
send_request = create_dummy_send_request()
# Send and store email in disk
assert not mail_sender.send(send_request, 1)
assert mail_sender.send(send_request, 1) == SendResult.Failed
saved_files = os.listdir(config.SAVE_UNSENT_DIR)
assert len(saved_files) == 1
mail_sender.purge_stored_emails()
Expand All @@ -186,6 +194,6 @@ def test_ok_mail_does_not_generate_unsent_file():
config.SAVE_UNSENT_DIR = temp_dir
send_request = create_dummy_send_request()
# Send and store email in disk
assert mail_sender.send(send_request, 1)
assert mail_sender.send(send_request, 1) == SendResult.OK
saved_files = os.listdir(config.SAVE_UNSENT_DIR)
assert len(saved_files) == 0