import email
import io
import mimetypes
import smtplib
from email.message import EmailMessage
from logging import Logger
from mimetypes import guess_type
from pathlib import Path
from smtplib import (
SMTPDataError,
SMTPHeloError,
SMTPNotSupportedError,
SMTPRecipientsRefused,
SMTPSenderRefused,
)
from time import sleep
from typing import List, Tuple, Union
import pypdf
from config import (
MONITORFISH_EMAIL_ADDRESS,
MONITORFISH_EMAIL_SERVER_PORT,
MONITORFISH_EMAIL_SERVER_URL,
MONITORFISH_FAX_DOMAIN,
MONITORFISH_FAX_SERVER_PORT,
MONITORFISH_FAX_SERVER_URL,
MONITORFISH_SMS_DOMAIN,
MONITORFISH_SMS_SERVER_PORT,
MONITORFISH_SMS_SERVER_URL,
)
from src.entities.communication_means import CommunicationMeans
mimetypes.add_type(
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
".docx",
strict=True,
)
mimetypes.add_type(
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
".xlsx",
strict=True,
)
[docs]
def create_html_email(
to: Union[str, List[str]],
subject: str,
html: str,
from_: str = MONITORFISH_EMAIL_ADDRESS,
cc: Union[str, List[str]] = None,
bcc: Union[str, List[str]] = None,
images: List[Path] = None,
attachments: List[Tuple[str, bytes]] = None,
reply_to: str = None,
) -> EmailMessage:
"""
Creates a `email.EmailMessage` with the defined parameters.
Args:
to (Union[str, List[str]]): email address or list of email addresses of
recipient(s)
subject (str): Subject of the email.
html (str): html representation of the email's content.
from_ (str, optional): `From` field. Defaults to env var
`MONITORFISH_EMAIL_ADDRESS`.
cc (Union[str, List[str]], optional): `Cc` field with optional email address
(or list of email addresses) of copied recipient(s). Defaults to None.
bcc (Union[str, List[str]], optional): `Bcc` field with optional email address
(or list of email addresses) of hidden copied recipient(s). Defaults to None.
from_ (str, optional): `From` field. Defaults to MONITORFISH_EMAIL_ADDRESS env
var.
images (List[Path], optional): List of `Path` to images on the server's file
system to attach to the email. These images can be displayed in the html body
of the email by referencing them in the `src` attribute of an `<img>` tag as
`cid:<image_name>`, where `<image_name>` is the image file's name.
For example: `/a/b/c/my_image_123.png` can be included in the html message
as :
`<img src="cid:my_image_123.png">` in the html message.
Defaults to None.
attachments (List[Tuple[str, bytes]], optional): `list` of attachments to add
to the email. Elements of the list must be pairs of (filename, content).
Defaults to None.
reply_to (str, optional): if given, added as `Reply-To` header. Defaults to
None.
Returns:
EmailMessage
"""
if isinstance(to, list):
to = ", ".join(to)
msg = EmailMessage()
msg["Subject"] = subject
msg["From"] = from_
msg["To"] = to
msg["Date"] = email.utils.formatdate(localtime=True)
if cc:
if isinstance(cc, list):
cc = ", ".join(cc)
msg["Cc"] = cc
if bcc:
if isinstance(bcc, list):
bcc = ", ".join(bcc)
msg["Bcc"] = bcc
if reply_to:
msg["Reply-To"] = reply_to
msg.set_content(html, subtype="html")
if images:
msg.make_related()
for image in images:
(mimetype, _) = guess_type(image)
(maintype, subtype) = mimetype.split("/")
with open(image, "rb") as f:
img_data = f.read()
img = EmailMessage(policy=msg.policy)
img.set_content(
img_data,
maintype=maintype,
subtype=subtype,
filename=image.name,
cid=f"<{image.name}>",
)
img.replace_header(
"Content-Disposition", f'inline; filename="{image.name}"'
)
msg.attach(img)
if attachments:
for filename, filebytes in attachments:
(mimetype, _) = guess_type(filename)
try:
(maintype, subtype) = mimetype.split("/")
except Exception:
maintype = "application"
subtype = "octet-stream"
msg.add_attachment(
filebytes,
maintype=maintype,
subtype=subtype,
filename=filename,
)
return msg
[docs]
def create_sms_email(
to: Union[str, List[str]],
text: str,
from_: str = MONITORFISH_EMAIL_ADDRESS,
) -> EmailMessage:
"""
Creates a `email.EmailMessage` with the defined parameters, configured to be sent
as an SMS.
Args:
to (Union[str, List[str]]): phone number or list of phone numbers of
recipient(s)
text (str): text of the SMS message
from_ (str, optional): `From` field. Defaults to MONITORFISH_EMAIL_ADDRESS env
var.
Returns:
EmailMessage
"""
assert isinstance(to, (str, list))
assert MONITORFISH_SMS_DOMAIN
if isinstance(to, str):
to = f"{to}@{MONITORFISH_SMS_DOMAIN}"
if isinstance(to, list):
to = [f"{phone_number}@{MONITORFISH_SMS_DOMAIN}" for phone_number in to]
to = ", ".join(to)
msg = EmailMessage()
msg["From"] = from_
msg["To"] = to
msg.set_content(text)
return msg
[docs]
def create_fax_email(
to: Union[str, List[str]],
pdf: bytes,
from_: str = MONITORFISH_EMAIL_ADDRESS,
) -> EmailMessage:
"""
Creates a `email.EmailMessage` with the defined parameters.
Args:
to (Union[str, List[str]]): email address or list of email addresses of
recipient(s)
pdf (bytes): `bytes` pdf object
from_ (str, optional): `From` field. Defaults to MONITORFISH_EMAIL_ADDRESS env
var.
Returns:
EmailMessage
"""
assert isinstance(to, (str, list))
assert MONITORFISH_FAX_DOMAIN
if isinstance(to, str):
to = f"{to}@{MONITORFISH_FAX_DOMAIN}"
if isinstance(to, list):
to = [f"{phone_number}@{MONITORFISH_FAX_DOMAIN}" for phone_number in to]
to = ", ".join(to)
msg = EmailMessage()
msg["Subject"] = "FAX"
msg["From"] = from_
msg["To"] = to
msg.add_attachment(
pdf,
maintype="application",
subtype="octet-stream",
filename="FAX.pdf",
)
return msg
[docs]
def send_email(msg: EmailMessage) -> dict:
"""
Sends input email using the contents of `From` header as sender and `To`, `Cc`
and `Bcc` headers as recipients.
This method will return normally if the mail is accepted for at least
one recipient. It returns a dictionary, with one entry for each
recipient that was refused. Each entry contains a tuple of the SMTP
error code and the accompanying error message sent by the server, like :
{ "three@three.org" : ( 550 ,"User unknown" ) }
Args:
msg (EmailMessage): `email.message.EmailMessage` to send.
Returns:
dict: {email_address : (error_code, error_message)} for all recipients that
were refused.
Raises:
SMTPHeloError: The server didn't reply properly to the helo greeting.
SMTPRecipientsRefused: The server rejected ALL recipients (no mail was sent).
SMTPSenderRefused: The server didn't accept the from_addr.
SMTPDataError: The server replied with an unexpected error code (other than a
refusal of a recipient).
SMTPNotSupportedError: The mail_options parameter includes 'SMTPUTF8' but the
SMTPUTF8 extension is not supported by the server.
ValueError: if there is more than one set of 'Resent-' headers
"""
assert MONITORFISH_EMAIL_SERVER_URL is not None
assert MONITORFISH_EMAIL_SERVER_PORT is not None
with smtplib.SMTP(
host=MONITORFISH_EMAIL_SERVER_URL, port=MONITORFISH_EMAIL_SERVER_PORT
) as server:
send_errors = server.send_message(msg)
return send_errors
[docs]
def send_sms(msg: EmailMessage) -> dict:
"""
Same as `send_email`, using sms server.
"""
assert MONITORFISH_SMS_SERVER_URL is not None
assert MONITORFISH_SMS_SERVER_PORT is not None
with smtplib.SMTP(
host=MONITORFISH_SMS_SERVER_URL, port=MONITORFISH_SMS_SERVER_PORT
) as server:
send_errors = server.send_message(msg)
return send_errors
[docs]
def send_fax(msg: EmailMessage) -> dict:
"""
Same as `send_email`, using fax server.
"""
assert MONITORFISH_FAX_SERVER_URL is not None
assert MONITORFISH_FAX_SERVER_PORT is not None
with smtplib.SMTP(
host=MONITORFISH_FAX_SERVER_URL, port=MONITORFISH_FAX_SERVER_PORT
) as server:
send_errors = server.send_message(msg)
return send_errors
[docs]
def send_email_or_sms_or_fax_message(
msg: EmailMessage,
communication_means: CommunicationMeans,
is_integration: bool,
logger: Logger,
) -> dict:
send_functions = {
CommunicationMeans.EMAIL: send_email,
CommunicationMeans.SMS: send_sms,
CommunicationMeans.FAX: send_fax,
}
send = send_functions[communication_means]
addr_fields = [f for f in (msg["To"], msg["Bcc"], msg["Cc"]) if f is not None]
addressees = [a[1] for a in email.utils.getaddresses(addr_fields)]
try:
try:
if is_integration:
logger.info(f"(Mock) Sending {communication_means.value.lower()}.")
send_errors = {}
else:
logger.info(f"Sending {communication_means.value.lower()}.")
send_errors = send(msg)
except (SMTPHeloError, SMTPDataError):
# Retry
logger.warning("Message not sent, retrying...")
sleep(10)
send_errors = send(msg)
except SMTPHeloError:
send_errors = {
addr: (
None,
"The server didn't reply properly to the helo greeting.",
)
for addr in addressees
}
logger.error(str(send_errors))
except SMTPRecipientsRefused:
# All recipients were refused
send_errors = {
addr: (
None,
"The server rejected ALL recipients (no mail was sent)",
)
for addr in addressees
}
logger.error(str(send_errors))
except SMTPSenderRefused:
send_errors = {
addr: (None, "The server didn't accept the from_addr.")
for addr in addressees
}
logger.error(str(send_errors))
except SMTPDataError:
send_errors = {
addr: (
None,
(
"The server replied with an unexpected error code "
"(other than a refusal of a recipient)."
),
)
for addr in addressees
}
logger.error(str(send_errors))
except SMTPNotSupportedError:
send_errors = {
addr: (
None,
(
"The mail_options parameter includes 'SMTPUTF8' but the SMTPUTF8 "
"extension is not supported by the server."
),
)
for addr in addressees
}
logger.error(str(send_errors))
except ValueError:
send_errors = {
addr: (
None,
"there is more than one set of 'Resent-' headers.",
)
for addr in addressees
}
logger.error(str(send_errors))
except Exception as e:
send_errors = {addr: (None, f"Other error: {e}") for addr in addressees}
logger.error(str(send_errors))
match communication_means:
case CommunicationMeans.SMS:
suffix = f"@{MONITORFISH_SMS_DOMAIN}"
case CommunicationMeans.FAX:
suffix = f"@{MONITORFISH_FAX_DOMAIN}"
case CommunicationMeans.EMAIL:
suffix = ""
send_errors = {k.removesuffix(suffix): v for k, v in send_errors.items()}
if send_errors:
logger.error(send_errors)
return send_errors
[docs]
def resize_pdf_to_A4(pdf: bytes) -> bytes:
A4_w = pypdf.PaperSize.A4.width
A4_h = pypdf.PaperSize.A4.height
pdf = pypdf.PdfReader(io.BytesIO(pdf))
writer = pypdf.PdfWriter()
for page in pdf.pages:
page.scale_to(width=A4_w, height=A4_h)
page.cropbox = pypdf.generic.RectangleObject((0, 0, A4_w, A4_h))
writer.add_page(page)
for page in writer.pages:
page.compress_content_streams()
buf = io.BytesIO()
writer.write(buf)
buf.seek(0)
return buf.read()