diff --git a/camel/toolkits/__init__.py b/camel/toolkits/__init__.py index f29ab67d40..98617a21cf 100644 --- a/camel/toolkits/__init__.py +++ b/camel/toolkits/__init__.py @@ -94,6 +94,7 @@ from .notion_mcp_toolkit import NotionMCPToolkit from .vertex_ai_veo_toolkit import VertexAIVeoToolkit from .minimax_mcp_toolkit import MinimaxMCPToolkit +from .imap_mail_toolkit import IMAPMailToolkit __all__ = [ 'BaseToolkit', @@ -178,4 +179,5 @@ 'NotionMCPToolkit', 'VertexAIVeoToolkit', 'MinimaxMCPToolkit', + 'IMAPMailToolkit', ] diff --git a/camel/toolkits/imap_mail_toolkit.py b/camel/toolkits/imap_mail_toolkit.py new file mode 100644 index 0000000000..165858814c --- /dev/null +++ b/camel/toolkits/imap_mail_toolkit.py @@ -0,0 +1,718 @@ +# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= + +import email +import imaplib +import os +import smtplib +import time +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from typing import Any, Dict, List, Optional + +from camel.logger import get_logger +from camel.toolkits import FunctionTool +from camel.toolkits.base import BaseToolkit +from camel.utils import MCPServer, dependencies_required + +logger = get_logger(__name__) + + +@MCPServer() +class IMAPMailToolkit(BaseToolkit): + r"""A toolkit for IMAP email operations. + + This toolkit provides comprehensive email functionality including: + - Fetching emails with filtering options + - Retrieving specific emails by ID + - Sending emails via SMTP + - Replying to emails + - Moving emails to folders + - Deleting emails + + The toolkit implements connection pooling with automatic idle timeout + to prevent resource leaks when used by LLM agents. + + Args: + imap_server (str, optional): IMAP server hostname. If not provided, + will be obtained from environment variables. + imap_port (int, optional): IMAP server port. Defaults to 993. + smtp_server (str, optional): SMTP server hostname. If not provided, + will be obtained from environment variables. + smtp_port (int, optional): SMTP server port. Defaults to 587. + username (str, optional): Email username. If not provided, will be + obtained from environment variables. + password (str, optional): Email password. If not provided, will be + obtained from environment variables. + timeout (Optional[float]): The timeout for the toolkit operations. + connection_idle_timeout (float): Maximum idle time (in seconds) + before auto-closing connections. Defaults to 300 (5 minutes). + """ + + @dependencies_required('imaplib', 'smtplib', 'email') + def __init__( + self, + imap_server: Optional[str] = None, + imap_port: int = 993, + smtp_server: Optional[str] = None, + smtp_port: int = 587, + username: Optional[str] = None, + password: Optional[str] = None, + timeout: Optional[float] = None, + connection_idle_timeout: float = 300.0, + ) -> None: + r"""Initialize the IMAP Mail Toolkit. + + Args: + imap_server: IMAP server hostname + imap_port: IMAP server port (default: 993) + smtp_server: SMTP server hostname + smtp_port: SMTP server port (default: 587) + username: Email username + password: Email password + timeout: Timeout for operations + connection_idle_timeout: Max idle time before auto-close (default: + 300s) + """ + super().__init__(timeout=timeout) + + # Get credentials from environment if not provided + self.imap_server = imap_server or os.environ.get("IMAP_SERVER") + self.imap_port = imap_port + self.smtp_server = smtp_server or os.environ.get("SMTP_SERVER") + self.smtp_port = smtp_port + self.username = username or os.environ.get("EMAIL_USERNAME") + self.password = password or os.environ.get("EMAIL_PASSWORD") + + # Persistent connections + self._imap_connection: Optional[imaplib.IMAP4_SSL] = None + self._smtp_connection: Optional[smtplib.SMTP] = None + + # Connection idle timeout management + self._connection_idle_timeout = connection_idle_timeout + self._imap_last_used: float = 0.0 + self._smtp_last_used: float = 0.0 + + def _get_imap_connection(self) -> imaplib.IMAP4_SSL: + r"""Establish or reuse IMAP connection with idle timeout. + + Returns: + imaplib.IMAP4_SSL: Connected IMAP client + """ + current_time = time.time() + + # Check if existing connection has exceeded idle timeout + if self._imap_connection is not None: + idle_time = current_time - self._imap_last_used + if idle_time > self._connection_idle_timeout: + logger.info( + "IMAP connection idle for %.1f seconds, closing", + idle_time, + ) + try: + self._imap_connection.logout() + except (imaplib.IMAP4.error, OSError) as e: + logger.debug("Error closing idle IMAP connection: %s", e) + self._imap_connection = None + + # Check if existing connection is still alive + if self._imap_connection is not None: + try: + # Test connection with NOOP command + self._imap_connection.noop() + logger.debug("Reusing existing IMAP connection") + self._imap_last_used = current_time + return self._imap_connection + except (imaplib.IMAP4.error, OSError): + # Connection is dead, close it and create new one + logger.debug("IMAP connection is dead, creating new one") + try: + self._imap_connection.logout() + except (imaplib.IMAP4.error, OSError) as e: + logger.debug("Error closing dead IMAP connection: %s", e) + self._imap_connection = None + + # Create new connection + try: + imap = imaplib.IMAP4_SSL(self.imap_server, self.imap_port) + imap.login(self.username, self.password) + self._imap_connection = imap + self._imap_last_used = current_time + logger.info( + "Successfully connected to IMAP server %s", self.imap_server + ) + return self._imap_connection + except Exception as e: + logger.error("Failed to connect to IMAP server: %s", e) + raise + + def _get_smtp_connection(self) -> smtplib.SMTP: + r"""Establish or reuse SMTP connection with idle timeout. + + Returns: + smtplib.SMTP: Connected SMTP client + """ + if not self.smtp_server or not self.username or not self.password: + raise ValueError( + "SMTP server, username, and password must be provided" + ) + + current_time = time.time() + + # Check if existing connection has exceeded idle timeout + if self._smtp_connection is not None: + idle_time = current_time - self._smtp_last_used + if idle_time > self._connection_idle_timeout: + logger.info( + "SMTP connection idle for %.1f seconds, closing", + idle_time, + ) + try: + self._smtp_connection.quit() + except (smtplib.SMTPException, OSError) as e: + logger.debug("Error closing idle SMTP connection: %s", e) + self._smtp_connection = None + + # Check if existing connection is still alive + if self._smtp_connection is not None: + try: + # Test connection with NOOP command + status = self._smtp_connection.noop() + if status[0] == 250: + logger.debug("Reusing existing SMTP connection") + self._smtp_last_used = current_time + return self._smtp_connection + except (smtplib.SMTPException, OSError): + # Connection is dead, close it and create new one + logger.debug("SMTP connection is dead, creating new one") + try: + self._smtp_connection.quit() + except (smtplib.SMTPException, OSError) as e: + logger.debug("Error closing dead SMTP connection: %s", e) + self._smtp_connection = None + + # Create new connection + try: + smtp = smtplib.SMTP(self.smtp_server, self.smtp_port) + smtp.starttls() + smtp.login(self.username, self.password) + self._smtp_connection = smtp + self._smtp_last_used = current_time + logger.info( + "Successfully connected to SMTP server %s", self.smtp_server + ) + return self._smtp_connection + except Exception as e: + logger.error("Failed to connect to SMTP server: %s", e) + raise + + def fetch_emails( + self, + folder: str = "INBOX", + limit: int = 10, + unread_only: bool = False, + sender_filter: Optional[str] = None, + subject_filter: Optional[str] = None, + ) -> List[Dict]: + r"""Fetch emails from a folder with optional filtering. + + Args: + folder (str): Email folder to search in (default: "INBOX") + limit (int): Maximum number of emails to retrieve (default: 10) + unread_only (bool): If True, only fetch unread + emails (default: False) + sender_filter (str, optional): Filter emails by + sender email address + subject_filter (str, optional): Filter emails by subject content + + Returns: + List[Dict]: List of email dictionaries with metadata + """ + try: + imap = self._get_imap_connection() + imap.select(folder) + + # Build search criteria + search_criteria = [] + if unread_only: + search_criteria.append("UNSEEN") + if sender_filter: + search_criteria.append(f'FROM "{sender_filter}"') + if subject_filter: + search_criteria.append(f'SUBJECT "{subject_filter}"') + + # If no specific criteria, get recent emails + if not search_criteria: + search_criteria.append("ALL") + + search_string = " ".join(search_criteria) + status, messages = imap.search(None, search_string) + + if status != "OK": + raise ConnectionError("Failed to search emails") + + email_ids = messages[0].split() + + # Limit results + if len(email_ids) > limit: + email_ids = email_ids[-limit:] # Get most recent emails + + emails: List[Dict[str, Any]] = [] + for email_id in email_ids: + try: + status, msg_data = imap.fetch(email_id, "(RFC822)") + if status == "OK" and msg_data and len(msg_data) > 0: + # msg_data is a list of tuples, get the first one + msg_tuple = msg_data[0] + if ( + isinstance(msg_tuple, tuple) + and len(msg_tuple) >= 2 + ): + email_body = msg_tuple[1] + # Handle different email body formats + if isinstance(email_body, bytes): + email_message = email.message_from_bytes( + email_body + ) + email_size = len(email_body) + elif isinstance(email_body, str): + email_message = email.message_from_string( + email_body + ) + email_size = len(email_body.encode('utf-8')) + else: + logger.warning( + "Email body is incorrect %s: %s", + email_id, + type(email_body), + ) + continue + + email_dict = { + "id": ( + email_id.decode() + if isinstance(email_id, bytes) + else str(email_id) + ), + "subject": email_message.get("Subject", ""), + "from": email_message.get("From", ""), + "to": email_message.get("To", ""), + "date": email_message.get("Date", ""), + "size": email_size, + } + # Get email body content + body_content = self._extract_email_body( + email_message + ) + email_dict["body"] = body_content + + emails.append(email_dict) + + except (ValueError, UnicodeDecodeError) as e: + logger.warning( + "Failed to process email %s: %s", email_id, e + ) + continue + + logger.info( + "Successfully fetched %d emails from %s", len(emails), folder + ) + return emails + + except (ConnectionError, imaplib.IMAP4.error) as e: + logger.error("Error fetching emails: %s", e) + raise + + def get_email_by_id(self, email_id: str, folder: str = "INBOX") -> Dict: + r"""Retrieve a specific email by ID with full metadata. + + Args: + email_id (str): ID of the email to retrieve + folder (str): Folder containing the email (default: "INBOX") + + Returns: + Dict: Email dictionary with complete metadata + """ + try: + imap = self._get_imap_connection() + imap.select(folder) + + status, msg_data = imap.fetch(email_id, "(RFC822)") + if status != "OK": + raise ConnectionError(f"Failed to fetch email {email_id}") + + msg_tuple = msg_data[0] + if not isinstance(msg_tuple, tuple) or len(msg_tuple) < 2: + raise ConnectionError( + f"Invalid message data format for email {email_id}" + ) + + email_body = msg_tuple[1] + if not isinstance(email_body, bytes): + raise ConnectionError( + f"Email body is not bytes for email {email_id}" + ) + + email_message = email.message_from_bytes(email_body) + + email_dict = { + "id": email_id, + "subject": email_message.get("Subject", ""), + "from": email_message.get("From", ""), + "to": email_message.get("To", ""), + "cc": email_message.get("Cc", ""), + "bcc": email_message.get("Bcc", ""), + "date": email_message.get("Date", ""), + "message_id": email_message.get("Message-ID", ""), + "reply_to": email_message.get("Reply-To", ""), + "in_reply_to": email_message.get("In-Reply-To", ""), + "references": email_message.get("References", ""), + "priority": email_message.get("X-Priority", ""), + "size": len(email_body) + if isinstance(email_body, bytes) + else 0, + } + + # Get email body content + email_dict["body"] = self._extract_email_body(email_message) + + logger.info("Successfully retrieved email %s", email_id) + return email_dict + + except (ConnectionError, imaplib.IMAP4.error) as e: + logger.error("Error retrieving email %s: %s", email_id, e) + raise + + def send_email( + self, + to_recipients: List[str], + subject: str, + body: str, + cc_recipients: Optional[List[str]] = None, + bcc_recipients: Optional[List[str]] = None, + html_body: Optional[str] = None, + ) -> str: + r"""Send an email via SMTP. + + Args: + to_recipients (List[str]): List of recipient email addresses + subject (str): Email subject line + body (str): Plain text email body + cc_recipients (List[str], optional): List of CC + recipient email addresses + bcc_recipients (List[str], optional): List of BCC + recipient email addresses + html_body (str, optional): HTML version of email body + + Returns: + str: Success message + """ + if not self.username: + raise ValueError("Username must be provided for sending emails") + + try: + smtp = self._get_smtp_connection() + + msg = MIMEMultipart('alternative') + msg['From'] = self.username + msg['To'] = ", ".join(to_recipients) + msg['Subject'] = subject + + if cc_recipients: + msg['Cc'] = ", ".join(cc_recipients) + if bcc_recipients: + msg['Bcc'] = ", ".join(bcc_recipients) + + # Add plain text body + msg.attach(MIMEText(body, 'plain')) + + # Add HTML body if provided + if html_body: + msg.attach(MIMEText(html_body, 'html')) + + # Send email + recipients = ( + to_recipients + (cc_recipients or []) + (bcc_recipients or []) + ) + smtp.send_message( + msg, from_addr=self.username, to_addrs=recipients + ) + + logger.info( + "Email sent successfully to %s", ", ".join(to_recipients) + ) + return "Email sent successfully. Message ID: Unknown" + + except (ConnectionError, smtplib.SMTPException) as e: + logger.error("Error sending email: %s", e) + raise + + def reply_to_email( + self, + original_email_id: str, + reply_body: str, + folder: str = "INBOX", + html_body: Optional[str] = None, + ) -> str: + r"""Send a reply to an existing email. + + Args: + original_email_id (str): ID of the email to reply to + reply_body (str): Reply message body + folder (str): Folder containing the original + email (default: "INBOX") + html_body (str, optional): HTML version of reply body + + Returns: + str: Success message + """ + try: + # Get original email details + original_email = self.get_email_by_id(original_email_id, folder) + + # Extract sender from original email + original_from = original_email.get("from", "") + + # Create reply subject + original_subject = original_email.get("subject", "") + if not original_subject.startswith("Re: "): + reply_subject = f"Re: {original_subject}" + else: + reply_subject = original_subject + + # Send reply + result = self.send_email( + to_recipients=[original_from], + subject=reply_subject, + body=reply_body, + html_body=html_body, + ) + + logger.info("Successfully replied to email %s", original_email_id) + return f"Reply sent successfully. {result}" + + except ( + ConnectionError, + imaplib.IMAP4.error, + smtplib.SMTPException, + ) as e: + logger.error( + "Error replying to email %s: %s", original_email_id, e + ) + raise + + def move_email_to_folder( + self, email_id: str, target_folder: str, source_folder: str = "INBOX" + ) -> str: + r"""Move an email to a different folder. + + Args: + email_id (str): ID of the email to move + target_folder (str): Destination folder name + source_folder (str): Source folder name (default: "INBOX") + + Returns: + str: Success message + """ + try: + imap = self._get_imap_connection() + + # Select source folder + imap.select(source_folder) + + # Copy email to target folder + imap.copy(email_id, target_folder) + + # Mark email as deleted in source folder + imap.store(email_id, '+FLAGS', '\\Deleted') + imap.expunge() + + logger.info( + "Successfully moved email %s from %s to %s", + email_id, + source_folder, + target_folder, + ) + return ( + f"Email {email_id} moved from {source_folder} to " + f"{target_folder}" + ) + + except (ConnectionError, imaplib.IMAP4.error) as e: + logger.error("Error moving email %s: %s", email_id, e) + raise + + def delete_email( + self, email_id: str, folder: str = "INBOX", permanent: bool = False + ) -> str: + r"""Delete an email. + + Args: + email_id (str): ID of the email to delete + folder (str): Folder containing the email (default: "INBOX") + permanent (bool): If True, permanently + delete the email (default: False) + + Returns: + str: Success message + """ + try: + imap = self._get_imap_connection() + imap.select(folder) + + if permanent: + # Permanently delete + imap.store(email_id, '+FLAGS', '\\Deleted') + imap.expunge() + action = "permanently deleted" + else: + # Move to trash (soft delete) + try: + imap.copy(email_id, "Trash") + imap.store(email_id, '+FLAGS', '\\Deleted') + imap.expunge() + action = "moved to trash" + except imaplib.IMAP4.error: + # If Trash folder doesn't exist, just mark as deleted + imap.store(email_id, '+FLAGS', '\\Deleted') + imap.expunge() + action = "marked as deleted" + + logger.info("Successfully %s email %s", action, email_id) + return f"Email {email_id} {action}" + + except (ConnectionError, imaplib.IMAP4.error) as e: + logger.error("Error deleting email %s: %s", email_id, e) + raise + + def _extract_email_body( + self, email_message: email.message.Message + ) -> Dict[str, str]: + r"""Extract plain text and HTML body from email message. + + Args: + email_message: Email message object + + Returns: + Dict[str, str]: Dictionary with 'plain' and 'html' body content + """ + body_content = {"plain": "", "html": ""} + + if email_message.is_multipart(): + for part in email_message.walk(): + content_type = part.get_content_type() + content_disposition = str(part.get("Content-Disposition")) + + # Skip attachments + if "attachment" not in content_disposition: + payload = part.get_payload(decode=True) + if isinstance(payload, bytes): + if content_type == "text/plain": + body_content["plain"] += payload.decode( + 'utf-8', errors='ignore' + ) + elif content_type == "text/html": + body_content["html"] += payload.decode( + 'utf-8', errors='ignore' + ) + elif isinstance(payload, str): + if content_type == "text/plain": + body_content["plain"] += payload + elif content_type == "text/html": + body_content["html"] += payload + else: + content_type = email_message.get_content_type() + payload = email_message.get_payload(decode=True) + if isinstance(payload, bytes): + if content_type == "text/plain": + body_content["plain"] = payload.decode( + 'utf-8', errors='ignore' + ) + elif content_type == "text/html": + body_content["html"] = payload.decode( + 'utf-8', errors='ignore' + ) + elif isinstance(payload, str): + if content_type == "text/plain": + body_content["plain"] = payload + elif content_type == "text/html": + body_content["html"] = payload + + return body_content + + def close(self) -> None: + r"""Close all open connections. + + This method should be called when the toolkit is no longer needed + to properly clean up network connections. + """ + if self._imap_connection is not None: + try: + self._imap_connection.logout() + logger.info("IMAP connection closed") + except (imaplib.IMAP4.error, OSError) as e: + logger.warning("Error closing IMAP connection: %s", e) + finally: + self._imap_connection = None + + if self._smtp_connection is not None: + try: + self._smtp_connection.quit() + logger.info("SMTP connection closed") + except (smtplib.SMTPException, OSError) as e: + logger.warning("Error closing SMTP connection: %s", e) + finally: + self._smtp_connection = None + + def __del__(self) -> None: + r"""Destructor to ensure connections are closed.""" + try: + self.close() + except Exception: + # Silently ignore errors during cleanup to avoid issues + # during interpreter shutdown + pass + + def __enter__(self) -> 'IMAPMailToolkit': + r"""Context manager entry. + + Returns: + IMAPMailToolkit: Self instance + """ + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + r"""Context manager exit, ensuring connections are closed. + + Args: + exc_type: Exception type if an exception occurred + exc_val: Exception value if an exception occurred + exc_tb: Exception traceback if an exception occurred + """ + self.close() + + def get_tools(self) -> List[FunctionTool]: + r"""Get list of tools provided by this toolkit. + + Returns: + List[FunctionTool]: List of available tools + """ + return [ + FunctionTool(self.fetch_emails), + FunctionTool(self.get_email_by_id), + FunctionTool(self.send_email), + FunctionTool(self.reply_to_email), + FunctionTool(self.move_email_to_folder), + FunctionTool(self.delete_email), + ] diff --git a/examples/toolkits/imap_mail_toolkit.py b/examples/toolkits/imap_mail_toolkit.py new file mode 100644 index 0000000000..4ad911eb5f --- /dev/null +++ b/examples/toolkits/imap_mail_toolkit.py @@ -0,0 +1,153 @@ +# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= + + +from camel.agents.chat_agent import ChatAgent +from camel.messages import BaseMessage +from camel.models import ModelFactory +from camel.toolkits import IMAPMailToolkit +from camel.types import ModelPlatformType, ModelType + + +def main() -> None: + r"""Simple example using IMAP Mail Toolkit with a chat agent.""" + + # Example 1: Basic usage (connections auto-managed with idle timeout) + # The toolkit will automatically close idle connections after 5 minutes + # and clean up on object destruction + mail_toolkit = IMAPMailToolkit( + imap_server="imap.gmail.com", + smtp_server="smtp.gmail.com", + username="your.email@gmail.com", + password="your_app_password", + connection_idle_timeout=300.0, # 5 minutes (default) + ) + tools = mail_toolkit.get_tools() + + model = ModelFactory.create( + model_platform=ModelPlatformType.DEFAULT, + model_type=ModelType.DEFAULT, + ) + + agent = ChatAgent( + model=model, + system_message=BaseMessage.make_assistant_message( + role_name="Email Assistant", + content="You are an email assistant. " + "Help users with their emails.", + ), + tools=tools, + ) + + # Fetch emails + print("Fetching recent emails...") + response = agent.step( + BaseMessage.make_user_message( + role_name="User", content="Get my 2 most recent emails" + ) + ) + print(f"Assistant: {response.msgs[0].content}\n") + + # Send email + print("Sending test email...") + response = agent.step( + BaseMessage.make_user_message( + role_name="User", + content="""Send an email to yourself with + subject 'Test' and body 'Hello from CAMEL'""", + ) + ) + print(f"Assistant: {response.msgs[0].content}") + + # Connections will be auto-closed after idle timeout or when + # mail_toolkit is destroyed + + +def main_with_context_manager() -> None: + r"""Example using context manager for explicit cleanup.""" + + # Example 2: Using context manager (recommended for long-running tasks) + # Connections are guaranteed to close when exiting the context + with IMAPMailToolkit( + imap_server="imap.gmail.com", + smtp_server="smtp.gmail.com", + username="your.email@gmail.com", + password="your_app_password", + ) as mail_toolkit: + tools = mail_toolkit.get_tools() + + model = ModelFactory.create( + model_platform=ModelPlatformType.DEFAULT, + model_type=ModelType.DEFAULT, + ) + + agent = ChatAgent( + model=model, + system_message=BaseMessage.make_assistant_message( + role_name="Email Assistant", + content="You are an email assistant.", + ), + tools=tools, + ) + + # Use the agent + response = agent.step( + BaseMessage.make_user_message( + role_name="User", content="Get my recent emails" + ) + ) + print(f"Assistant: {response.msgs[0].content}") + + # Connections automatically closed here + + +if __name__ == "__main__": + main() + +""" +============================================================== +Fetching recent emails... +Assistant: Here are your two most recent emails (newest first): + +1) From: "Example Brand" + ID: 2620 + Date: Tue, 22 Nov 2024 07:07:16 -0600 + Subject: Get an exclusive experience in Dubai + Size: 87,767 bytes + Snippet: "WELCOME TO THE FAMILY HOUSE — A truly + interactive experience... Join raffle on app to + win an exclusive opportunity for you and 10 friends..." + +2) From: "Service Provider" + ID: 2619 + Date: Mon, 21 Nov 2024 03:34:39 -0800 + Subject: Updates to Terms of Service + Size: 19,175 bytes + Snippet: "On December 21, 2024, we're making some changes to + our Terms of Service... You can review the new terms here..." + +Would you like me to open/read either message in full, reply, +archive/move, or delete one of them? If so, tell me which +email (by number or ID) and the action. + +Sending test email... +Assistant: Do you mean send it to your email (user@example.com)? + If yes, I'll send an email with: + +Subject: Test +Body: Hello from CAMEL + +Any CC/BCC or HTML formatting needed? +=============================================================================== +""" diff --git a/test/toolkits/test_imap_mail_toolkit.py b/test/toolkits/test_imap_mail_toolkit.py new file mode 100644 index 0000000000..266a1275f1 --- /dev/null +++ b/test/toolkits/test_imap_mail_toolkit.py @@ -0,0 +1,533 @@ +# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= + +import imaplib +import os +import smtplib +import unittest +from unittest.mock import MagicMock, patch + +from camel.toolkits import IMAPMailToolkit + + +class TestIMAPMailToolkit(unittest.TestCase): + r"""Test cases for IMAP Mail Toolkit.""" + + def setUp(self): + r"""Set up test fixtures.""" + # Mock environment variables + self.env_patcher = patch.dict( + os.environ, + { + 'IMAP_SERVER': 'imap.test.com', + 'SMTP_SERVER': 'smtp.test.com', + 'EMAIL_USERNAME': 'test@test.com', + 'EMAIL_PASSWORD': 'test_password', + }, + ) + self.env_patcher.start() + + # Create toolkit instance + self.toolkit = IMAPMailToolkit() + + def tearDown(self): + r"""Clean up test fixtures.""" + self.env_patcher.stop() + + @patch('camel.toolkits.imap_mail_toolkit.imaplib.IMAP4_SSL') + def test_get_imap_connection_success(self, mock_imap_ssl): + r"""Test successful IMAP connection.""" + # Setup mock + mock_imap = MagicMock() + mock_imap_ssl.return_value = mock_imap + + # Call method + result = self.toolkit._get_imap_connection() + + # Assertions + self.assertEqual(result, mock_imap) + mock_imap_ssl.assert_called_once_with('imap.test.com', 993) + mock_imap.login.assert_called_once_with( + 'test@test.com', 'test_password' + ) + + @patch('camel.toolkits.imap_mail_toolkit.imaplib.IMAP4_SSL') + def test_get_imap_connection_failure(self, mock_imap_ssl): + r"""Test IMAP connection failure.""" + # Setup mock to raise exception + mock_imap_ssl.side_effect = imaplib.IMAP4.error("Connection failed") + + # Call method and expect exception + # (the actual exception, not ConnectionError) + with self.assertRaises(imaplib.IMAP4.error): + self.toolkit._get_imap_connection() + + @patch('camel.toolkits.imap_mail_toolkit.smtplib.SMTP') + def test_get_smtp_connection_success(self, mock_smtp): + r"""Test successful SMTP connection.""" + # Setup mock + mock_smtp_instance = MagicMock() + mock_smtp.return_value = mock_smtp_instance + + # Call method + result = self.toolkit._get_smtp_connection() + + # Assertions + self.assertEqual(result, mock_smtp_instance) + mock_smtp.assert_called_once_with('smtp.test.com', 587) + mock_smtp_instance.starttls.assert_called_once() + mock_smtp_instance.login.assert_called_once_with( + 'test@test.com', 'test_password' + ) + + @patch('camel.toolkits.imap_mail_toolkit.smtplib.SMTP') + def test_get_smtp_connection_failure(self, mock_smtp): + r"""Test SMTP connection failure.""" + # Setup mock to raise exception + mock_smtp.side_effect = smtplib.SMTPException("Connection failed") + + # Call method and expect exception + with self.assertRaises(smtplib.SMTPException): + self.toolkit._get_smtp_connection() + + @patch.object(IMAPMailToolkit, '_get_imap_connection') + def test_fetch_emails_success(self, mock_get_imap): + r"""Test successful email fetching.""" + # Setup mock + mock_imap = MagicMock() + mock_get_imap.return_value = mock_imap + + # Mock search results + mock_imap.search.return_value = ("OK", [b"1"]) + mock_imap.fetch.return_value = ("OK", [(b"1", b"raw_email_data")]) + + # Mock email message + mock_message = MagicMock() + mock_message.get.side_effect = lambda x, default="": { + "Subject": "Test Subject", + "From": "test@example.com", + "To": "recipient@example.com", + "Date": "Wed, 01 Jan 2024 00:00:00 +0000", + }.get(x, default) + + with patch( + 'camel.toolkits.imap_mail_toolkit.email.message_from_bytes', + return_value=mock_message, + ): + with patch.object( + self.toolkit, + '_extract_email_body', + return_value={"plain": "Test body"}, + ): + # Call method + result = self.toolkit.fetch_emails(limit=1) + + # Assertions + self.assertEqual(len(result), 1) + self.assertEqual(result[0]["subject"], "Test Subject") + self.assertEqual(result[0]["from"], "test@example.com") + mock_imap.select.assert_called_once_with("INBOX") + mock_imap.search.assert_called_once() + mock_imap.fetch.assert_called_once() + + @patch.object(IMAPMailToolkit, '_get_imap_connection') + def test_fetch_emails_with_filters(self, mock_get_imap): + r"""Test email fetching with filters.""" + # Setup mock + mock_imap = MagicMock() + mock_get_imap.return_value = mock_imap + mock_imap.search.return_value = ("OK", [b"1 2 3"]) + mock_imap.fetch.return_value = ("OK", [(b"1", b"raw_email_data")]) + + # Mock email message + mock_message = MagicMock() + mock_message.get.return_value = "Test" + + with patch( + 'camel.toolkits.imap_mail_toolkit.email.message_from_bytes', + return_value=mock_message, + ): + with patch.object( + self.toolkit, + '_extract_email_body', + return_value={"plain": "Test body"}, + ): + # Call method with filters + result = self.toolkit.fetch_emails( + unread_only=True, + sender_filter="test@example.com", + subject_filter="Test Subject", + ) + + # Assertions + self.assertIsInstance(result, list) + # Check that search was called with proper criteria + search_call = mock_imap.search.call_args[0] + search_string = search_call[1] + self.assertIn("UNSEEN", search_string) + self.assertIn("FROM", search_string) + self.assertIn("SUBJECT", search_string) + + @patch.object(IMAPMailToolkit, '_get_imap_connection') + def test_fetch_emails_search_failure(self, mock_get_imap): + r"""Test email fetching with search failure.""" + # Setup mock + mock_imap = MagicMock() + mock_get_imap.return_value = mock_imap + mock_imap.search.return_value = ("NO", [b"Error"]) + + # Call method and expect exception + with self.assertRaises(ConnectionError): + self.toolkit.fetch_emails() + + @patch.object(IMAPMailToolkit, '_get_imap_connection') + def test_get_email_by_id_success(self, mock_get_imap): + r"""Test successful email retrieval by ID.""" + # Setup mock + mock_imap = MagicMock() + mock_get_imap.return_value = mock_imap + mock_imap.fetch.return_value = ("OK", [(b"1", b"raw_email_data")]) + + # Mock email message + mock_message = MagicMock() + mock_message.get.side_effect = lambda x, default="": { + "Subject": "Test Subject", + "From": "test@example.com", + "To": "recipient@example.com", + "Date": "Wed, 01 Jan 2024 00:00:00 +0000", + "Message-ID": "test-id@example.com", + "In-Reply-To": "", + "References": "", + "X-Priority": "3", + }.get(x, default) + + with patch( + 'camel.toolkits.imap_mail_toolkit.email.message_from_bytes', + return_value=mock_message, + ): + with patch.object( + self.toolkit, + '_extract_email_body', + return_value={"plain": "Test body"}, + ): + # Call method + result = self.toolkit.get_email_by_id("123") + + # Assertions + self.assertEqual(result["id"], "123") + self.assertEqual(result["subject"], "Test Subject") + self.assertEqual(result["from"], "test@example.com") + mock_imap.select.assert_called_once_with("INBOX") + mock_imap.fetch.assert_called_once_with("123", "(RFC822)") + + @patch.object(IMAPMailToolkit, '_get_imap_connection') + def test_get_email_by_id_fetch_failure(self, mock_get_imap): + r"""Test email retrieval with fetch failure.""" + # Setup mock + mock_imap = MagicMock() + mock_get_imap.return_value = mock_imap + mock_imap.fetch.return_value = ("NO", [b"Error"]) + + # Call method and expect exception + with self.assertRaises(ConnectionError): + self.toolkit.get_email_by_id("123") + + @patch.object(IMAPMailToolkit, '_get_smtp_connection') + def test_send_email_success(self, mock_get_smtp): + r"""Test successful email sending.""" + # Setup mock + mock_smtp = MagicMock() + mock_get_smtp.return_value = mock_smtp + + # Call method + result = self.toolkit.send_email( + to_recipients=["recipient@example.com"], + subject="Test Subject", + body="Test Body", + html_body="

Test HTML

", + ) + + # Assertions + self.assertIn("Email sent successfully", result) + mock_smtp.send_message.assert_called_once() + mock_smtp.quit.assert_called_once() + + @patch.object(IMAPMailToolkit, '_get_smtp_connection') + def test_send_email_with_cc_bcc(self, mock_get_smtp): + r"""Test email sending with CC and BCC.""" + # Setup mock + mock_smtp = MagicMock() + mock_get_smtp.return_value = mock_smtp + + # Call method + result = self.toolkit.send_email( + to_recipients=["recipient@example.com"], + subject="Test Subject", + body="Test Body", + cc_recipients=["cc@example.com"], + bcc_recipients=["bcc@example.com"], + ) + + # Assertions + self.assertIn("Email sent successfully", result) + # Check that send_message was called with all recipients + call_args = mock_smtp.send_message.call_args + self.assertIn("recipient@example.com", call_args[1]["to_addrs"]) + self.assertIn("cc@example.com", call_args[1]["to_addrs"]) + self.assertIn("bcc@example.com", call_args[1]["to_addrs"]) + + @patch.object(IMAPMailToolkit, '_get_smtp_connection') + def test_send_email_failure(self, mock_get_smtp): + r"""Test email sending failure.""" + # Setup mock to raise exception + mock_get_smtp.side_effect = ConnectionError("SMTP connection failed") + + # Call method and expect exception + with self.assertRaises(ConnectionError): + self.toolkit.send_email( + to_recipients=["recipient@example.com"], + subject="Test Subject", + body="Test Body", + ) + + @patch.object(IMAPMailToolkit, 'get_email_by_id') + @patch.object(IMAPMailToolkit, 'send_email') + def test_reply_to_email_success(self, mock_send_email, mock_get_email): + r"""Test successful email reply.""" + # Setup mocks + mock_get_email.return_value = { + "subject": "Original Subject", + "from": "original@example.com", + "message_id": "original-id@example.com", + } + mock_send_email.return_value = "Reply sent successfully" + + # Call method + result = self.toolkit.reply_to_email( + original_email_id="123", reply_body="Reply content" + ) + + # Assertions + self.assertIn("Reply sent successfully", result) + mock_get_email.assert_called_once_with("123", "INBOX") + mock_send_email.assert_called_once() + + @patch.object(IMAPMailToolkit, 'get_email_by_id') + def test_reply_to_email_no_original(self, mock_get_email): + r"""Test email reply when original email not found.""" + # Setup mock to raise exception + mock_get_email.side_effect = ConnectionError("Email not found") + + # Call method and expect exception + with self.assertRaises(ConnectionError): + self.toolkit.reply_to_email( + original_email_id="123", reply_body="Reply content" + ) + + @patch.object(IMAPMailToolkit, '_get_imap_connection') + def test_move_email_to_folder_success(self, mock_get_imap): + r"""Test successful email moving.""" + # Setup mock + mock_imap = MagicMock() + mock_get_imap.return_value = mock_imap + + # Call method + result = self.toolkit.move_email_to_folder( + email_id="123", target_folder="Archive", source_folder="INBOX" + ) + + # Assertions + self.assertIn("Email 123 moved from INBOX to Archive", result) + mock_imap.select.assert_called_once_with("INBOX") + mock_imap.copy.assert_called_once_with("123", "Archive") + mock_imap.store.assert_called_once_with("123", '+FLAGS', '\\Deleted') + mock_imap.expunge.assert_called_once() + + @patch.object(IMAPMailToolkit, '_get_imap_connection') + def test_move_email_to_folder_failure(self, mock_get_imap): + r"""Test email moving failure.""" + # Setup mock to raise exception + mock_get_imap.side_effect = ConnectionError("IMAP connection failed") + + # Call method and expect exception + with self.assertRaises(ConnectionError): + self.toolkit.move_email_to_folder( + email_id="123", target_folder="Archive" + ) + + @patch.object(IMAPMailToolkit, '_get_imap_connection') + def test_delete_email_success(self, mock_get_imap): + r"""Test successful email deletion.""" + # Setup mock + mock_imap = MagicMock() + mock_get_imap.return_value = mock_imap + + # Call method + result = self.toolkit.delete_email("123", permanent=False) + + # Assertions + self.assertIn("Email 123 moved to trash", result) + mock_imap.select.assert_called_once_with("INBOX") + mock_imap.store.assert_called_once_with("123", '+FLAGS', '\\Deleted') + + @patch.object(IMAPMailToolkit, '_get_imap_connection') + def test_delete_email_permanent(self, mock_get_imap): + r"""Test permanent email deletion.""" + # Setup mock + mock_imap = MagicMock() + mock_get_imap.return_value = mock_imap + + # Call method + result = self.toolkit.delete_email("123", permanent=True) + + # Assertions + self.assertIn("Email 123 permanently deleted", result) + mock_imap.select.assert_called_once_with("INBOX") + mock_imap.store.assert_called_once_with("123", '+FLAGS', '\\Deleted') + mock_imap.expunge.assert_called_once() + + @patch.object(IMAPMailToolkit, '_get_imap_connection') + def test_delete_email_failure(self, mock_get_imap): + r"""Test email deletion failure.""" + # Setup mock to raise exception + mock_get_imap.side_effect = ConnectionError("IMAP connection failed") + + # Call method and expect exception + with self.assertRaises(ConnectionError): + self.toolkit.delete_email("123") + + def test_extract_email_body_plain_text(self): + r"""Test email body extraction for plain text.""" + # Create mock email message + mock_message = MagicMock() + mock_message.is_multipart.return_value = False + mock_message.get_content_type.return_value = "text/plain" + mock_message.get_payload.return_value = b"Plain text content" + + # Call method + result = self.toolkit._extract_email_body(mock_message) + + # Assertions + self.assertEqual(result["plain"], "Plain text content") + self.assertEqual(result["html"], "") + + def test_extract_email_body_html(self): + r"""Test email body extraction for HTML.""" + # Create mock email message + mock_message = MagicMock() + mock_message.is_multipart.return_value = False + mock_message.get_content_type.return_value = "text/html" + mock_message.get_payload.return_value = b"

HTML content

" + + # Call method + result = self.toolkit._extract_email_body(mock_message) + + # Assertions + self.assertEqual(result["plain"], "") + self.assertEqual(result["html"], "

HTML content

") + + def test_extract_email_body_multipart(self): + r"""Test email body extraction for multipart message.""" + # Create mock email message + mock_message = MagicMock() + mock_message.is_multipart.return_value = True + + # Create mock parts + mock_part1 = MagicMock() + mock_part1.get_content_type.return_value = "text/plain" + mock_part1.get.return_value = "inline" # Not an attachment + mock_part1.get_payload.return_value = b"Plain text content" + + mock_part2 = MagicMock() + mock_part2.get_content_type.return_value = "text/html" + mock_part2.get.return_value = "inline" # Not an attachment + mock_part2.get_payload.return_value = b"

HTML content

" + + # Mock the main message to have no content disposition + mock_message.get.return_value = None + mock_message.walk.return_value = [mock_message, mock_part1, mock_part2] + + # Call method + result = self.toolkit._extract_email_body(mock_message) + + # Assertions + self.assertEqual(result["plain"], "Plain text content") + self.assertEqual(result["html"], "

HTML content

") + + def test_get_tools(self): + r"""Test getting available tools.""" + # Call method + tools = self.toolkit.get_tools() + + # Assertions + self.assertIsInstance(tools, list) + self.assertEqual(len(tools), 6) # 6 public methods + + # Check that all expected tools are present + tool_names = [tool.func.__name__ for tool in tools] + expected_tools = [ + 'fetch_emails', + 'get_email_by_id', + 'send_email', + 'reply_to_email', + 'move_email_to_folder', + 'delete_email', + ] + + for expected_tool in expected_tools: + self.assertIn(expected_tool, tool_names) + + def test_toolkit_initialization_with_credentials(self): + r"""Test toolkit initialization with direct credentials.""" + # Create toolkit with direct credentials + toolkit = IMAPMailToolkit( + imap_server="custom.imap.com", + smtp_server="custom.smtp.com", + username="custom@example.com", + password="custom_password", + ) + + # Assertions + self.assertEqual(toolkit.imap_server, "custom.imap.com") + self.assertEqual(toolkit.smtp_server, "custom.smtp.com") + self.assertEqual(toolkit.username, "custom@example.com") + self.assertEqual(toolkit.password, "custom_password") + + def test_toolkit_initialization_missing_credentials(self): + r"""Test toolkit initialization with missing credentials.""" + # Clear environment variables + with patch.dict(os.environ, {}, clear=True): + # Create toolkit without credentials + with self.assertRaises(ValueError): + IMAPMailToolkit() + + def test_toolkit_initialization_partial_credentials(self): + r"""Test toolkit initialization with partial credentials.""" + # Clear environment variables and set only some + with patch.dict( + os.environ, + { + 'IMAP_SERVER': 'imap.test.com', + 'EMAIL_USERNAME': 'test@test.com', + # Missing SMTP_SERVER and EMAIL_PASSWORD + }, + clear=True, + ): + # Create toolkit with partial credentials + with self.assertRaises(ValueError): + IMAPMailToolkit() + + +if __name__ == '__main__': + unittest.main()