import datetime
import logging
import os
import re
from email.message import EmailMessage
from typing import Callable, Iterable, List, Tuple
import jinja2
import pandas as pd
import pytz
import requests
from prefect import flow, get_run_logger, task
from config import (
BACKOFFICE_REGULATION_URL,
CNSP_FRANCE_EMAIL_ADDRESS,
EMAIL_STYLESHEETS_LOCATION,
EMAIL_TEMPLATES_LOCATION,
PROXIES,
)
from src.generic_tasks import extract
from src.helpers.emails import create_html_email, send_email
from src.processing import get_matched_groups, try_get_factory
from src.shared_tasks.dates import get_utcnow
####################################### Helpers #######################################
[docs]
def make_html_hyperlinks(
urls: Iterable, link_texts: Iterable, logger: logging.Logger = None
) -> List[str]:
"""
Returns a list of html strings of links like <a href=url>link_text</a> for the
input `urls` and `link_texts`.
Args:
urls (Iterable): Iterable of urls
link_texts (Iterable): Iterable of link texts
Returns:
List[str]: `list` of html links
"""
if not len(urls) == len(link_texts):
if not logger:
logger = logging.Logger("logger")
logger.warning(
(
"urls and text_links do not match in length. The output list will "
"be truncated to the shortest of the two sequences"
)
)
null_url = "#"
hyperlinks = [
f'<a href="{url or null_url}">{link_text}</a>'
for url, link_text in zip(urls, link_texts)
]
return hyperlinks
################################### Tasks and flows ###################################
@task
@task
@task
[docs]
def add_article_id(regulations: pd.DataFrame, url_column: str) -> pd.DataFrame:
"""
Adds an `article_id` column to the `regulations` DataFrame, extracting the
article_id from the `url_column` according the the Legipeche URL schema.
Rows for which the URL does not match the Legipeche URL schema will have an
article_id of `None`.
Args:
regulations (pd.DataFrame): DataFrame of regulations
url_column (str): Name of the column containing URLs of regulation pages
Returns:
pd.DataFrame: copy of input `regulations` with an added `article_id` column
"""
legipeche_regex = re.compile(
(
r"^https?://legipeche\.metier\."
r"e2\.rie\.gouv\.fr/"
r"(?:[a-zA-Z0-9-]*)"
r"-a(?P<article_id>\d+)"
r"\.html"
r".*$"
)
)
regulations = pd.concat(
[
regulations,
regulations[url_column].apply(get_matched_groups, regex=legipeche_regex),
],
axis=1,
)
return regulations
@task
@task
[docs]
def get_modified_regulations(
legipeche_regulations: pd.DataFrame, monitorfish_regulations: pd.DataFrame
) -> pd.DataFrame:
"""
Filters the input `legipeche_regulations` and returns legipeche regulations
(documents) that :
- have been either added to or removed from an existing Legipeche page between
the `previous` and `latest` Legipeche scraping occurences
- belong to a Legipeche page referenced by at least one `monitorfish_regulation`
Args:
legipeche_regulations (pd.DataFrame):
monitorfish_regulations (pd.DataFrame):
Returns:
pd.DataFrame: filtered DataFrame of Legipeche regulations
"""
legipeche_latest_article_ids = set(
legipeche_regulations.loc[
legipeche_regulations.extraction_occurence == "latest", "article_id"
]
)
legipeche_previous_article_ids = set(
legipeche_regulations.loc[
legipeche_regulations.extraction_occurence == "previous", "article_id"
]
)
legipeche_stable_article_ids = legipeche_latest_article_ids.intersection(
legipeche_previous_article_ids
)
legipeche_latest_document_urls = set(
legipeche_regulations.loc[
legipeche_regulations.extraction_occurence == "latest", "document_url"
]
)
legipeche_previous_document_urls = set(
legipeche_regulations.loc[
legipeche_regulations.extraction_occurence == "previous", "document_url"
]
)
legipeche_modified_documents = legipeche_latest_document_urls.symmetric_difference(
legipeche_previous_document_urls
)
monitorfish_regulations_article_ids = set(
monitorfish_regulations.article_id.dropna()
)
modified_legipeche_regulations = legipeche_regulations[
(legipeche_regulations.document_url.isin(legipeche_modified_documents))
& (legipeche_regulations.article_id.isin(legipeche_stable_article_ids))
& (legipeche_regulations.article_id.isin(monitorfish_regulations_article_ids))
].reset_index(drop=True)
return modified_legipeche_regulations
@task
@task
[docs]
def get_missing_references(monitorfish_regulations: pd.DataFrame) -> pd.DataFrame:
"""
Returns `monitorfish_regulations` with null values as `reference`.
Args:
monitorfish_regulations (pd.DataFrame): monitorfish_regulations. Must have
columns :
- `reference`
- `law_type`
- `topic`
- `zone`
Returns:
pd.DataFrame: Filtered and formatted version of input.
"""
return (
monitorfish_regulations.loc[
monitorfish_regulations.reference.isna(),
["law_type", "topic", "zone"],
]
.copy(deep=True)
.sort_values(["law_type", "topic", "zone"])
.rename(
columns={
"law_type": "Type de réglementation",
"topic": "Thématique",
"zone": "Zone",
}
)
.reset_index(drop=True)
)
@task
[docs]
def get_unknown_links(
monitorfish_regulations: pd.DataFrame,
legipeche_regulations: pd.DataFrame,
) -> set:
"""
Returns the urls of `monitorfish_regulations` whose `article_id`
is either not present in `legipeche_regulations` (i.e. referencing Legipeche
articles that might not exist) or null (which corresponds to urls that do not match
the legipeche url pattern and which usually point to external websites).
Args:
monitorfish_regulations (pd.DataFrame):
legipeche_regulations (pd.DataFrame):
Returns:
set: subset of `monitorfish_regulations.url`
"""
logger = get_run_logger()
legipeche_article_ids = set(
legipeche_regulations.loc[
legipeche_regulations.extraction_occurence == "latest", "article_id"
].dropna()
)
unknown_links = set(
monitorfish_regulations.loc[
(monitorfish_regulations.url.notnull())
& (~monitorfish_regulations.article_id.isin(legipeche_article_ids)),
"url",
]
)
logger.info(
(
f"Out of {monitorfish_regulations.url.dropna().nunique()} distincts urls "
f"in monitorfish_regulation, {len(unknown_links)} were not found in the"
" legipeche table."
)
)
return unknown_links
@task
[docs]
def get_dead_links(
monitorfish_regulations: pd.DataFrame, unknown_links: set, proxies: dict
) -> pd.DataFrame:
"""
Perfoms get requests to check whether `unknown_links` are dead links, then returns
`monitorfish_regulations` that reference a dead link as regulatory reference.
Args:
monitorfish_regulations (pd.DataFrame):
unknown_links (set): `set` of urls not knonwn (i.e. urls not found when
scraping Legipeche)
proxies (dict): proxies to use when requests time out without proxies
Returns:
pd.DataFrame: filtered `monitorfish_regulations` with only those that reference
a dead link
"""
logger = get_run_logger()
for proxy_env in ["http_proxy", "https_proxy", "HTTP_PROXY", "HTTPS_PROXY"]:
os.environ.pop(proxy_env, None)
logger.info(
(
f"Performing requests on {len(unknown_links)} unknown urls to check "
"whether they are dead links..."
)
)
dead_links_urls = []
for unknown_link in unknown_links:
try:
logger.info(f"Testing {unknown_link}")
r = requests.get(unknown_link, timeout=10)
r.raise_for_status()
except (requests.Timeout, requests.ConnectionError) as e:
try:
logger.info(
(
f"{unknown_link} fails with error {repr(e)}. "
"Retrying with proxies..."
)
)
r = requests.get(unknown_link, timeout=10, proxies=proxies)
r.raise_for_status()
except Exception as e:
logger.info(
(
f"{unknown_link} with proxies fails with error {repr(e)}. "
"Adding to dead links"
)
)
dead_links_urls.append(unknown_link)
except Exception as e:
logger.info(
(f"{unknown_link} fails with error {repr(e)}. " "Adding to dead links")
)
dead_links_urls.append(unknown_link)
# null references are missing_references, not dead_links
dead_links = monitorfish_regulations[
(monitorfish_regulations.url.isin(dead_links_urls))
& (monitorfish_regulations.reference.notnull())
].reset_index(drop=True)
return dead_links
@task
@task
[docs]
def get_outdated_references(
monitorfish_regulations: pd.DataFrame, now: datetime.datetime
) -> pd.DataFrame:
"""
Returns `monitorfish_regulations` that have an `end_date` which is before `now`.
Args:
monitorfish_regulations (pd.DataFrame): DataFrame of Monitorfish regulations.
Must have at least a `end_date` column.
now (datetime.datetime): now
Returns:
pd.DataFrame: Subset of `monitorfish_regulations`
"""
return monitorfish_regulations[monitorfish_regulations.end_date < now].reset_index(
drop=True
)
@task
@task
[docs]
def get_main_template() -> jinja2.environment.Template:
with open(EMAIL_TEMPLATES_LOCATION / "regulations_checkup/main.jinja", "r") as f:
return jinja2.Template(f.read())
@task
[docs]
def get_body_template() -> jinja2.environment.Template:
with open(EMAIL_TEMPLATES_LOCATION / "regulations_checkup/body.jinja", "r") as f:
return jinja2.Template(f.read())
@task
[docs]
def get_style() -> str:
with open(EMAIL_STYLESHEETS_LOCATION / "splendid.css") as f:
style = f.read()
return style
@task
[docs]
def render_body(
body_template: jinja2.environment.Template,
previous_extraction_datetime_utc: datetime.datetime,
latest_extraction_datetime_utc: datetime.datetime,
missing_references: pd.DataFrame,
modified_regulations: pd.DataFrame,
dead_links: pd.DataFrame,
outdated_references: pd.DataFrame,
backoffice_regulation_url: str,
utcnow: datetime.datetime,
) -> str:
"""
Renders email body as html string.
"""
email_content = {
"previous_extraction_datetime_utc": previous_extraction_datetime_utc,
"latest_extraction_datetime_utc": latest_extraction_datetime_utc,
"verification_date": utcnow.date().strftime("%d/%m/%Y"),
"backoffice_regulation_url": backoffice_regulation_url,
}
if len(missing_references) > 0:
email_content["missing_references"] = missing_references.to_html(
index=False, justify="center", escape=False
)
email_content["n_missing_references"] = len(missing_references)
if len(dead_links) > 0:
email_content["dead_links"] = dead_links.to_html(
index=False, justify="center", escape=False
)
email_content["n_dead_links"] = len(dead_links)
if len(modified_regulations) > 0:
email_content["modified_regulations"] = modified_regulations.to_html(
index=False, justify="center", escape=False
)
email_content["n_modified_regulations"] = len(modified_regulations)
if len(outdated_references) > 0:
email_content["outdated_references"] = outdated_references.to_html(
index=False, justify="center", escape=False
)
email_content["n_outdated_references"] = len(outdated_references)
return body_template.render(email_content)
@task
[docs]
def render_main(
main_template: jinja2.environment.Template, style: str, body: str
) -> str:
return main_template.render(style=style, body=body)
@task
[docs]
def get_recipients() -> List[str]:
try:
assert CNSP_FRANCE_EMAIL_ADDRESS is not None
except AssertionError:
logging.error("CNSP_FRANCE_EMAIL_ADDRESS environment variable is not set.")
raise
return [CNSP_FRANCE_EMAIL_ADDRESS]
@task
[docs]
def create_message(html: str, recipients: List[str]) -> EmailMessage:
msg = create_html_email(
to=recipients,
subject="[Monitorfish] Suivi des modifications Legipêche dans Monitorfish",
html=html,
)
return msg
@task
[docs]
def send_message(msg: EmailMessage):
send_email(msg)
@flow(name="Monitorfish - Regulations checkup")
[docs]
def regulations_checkup_flow(
proxies: dict = PROXIES,
backoffice_regulation_url: str = BACKOFFICE_REGULATION_URL,
get_utcnow_fn=get_utcnow,
get_dead_links_fn: Callable = get_dead_links,
send_message_fn: Callable = send_message,
):
# Extract data
monitorfish_regulations = extract_monitorfish_regulations.submit()
legipeche_regulations = extract_legipeche_regulations.submit()
utcnow = get_utcnow_fn()
# Extract output templates
main_template = get_main_template()
body_template = get_body_template()
style = get_style()
# Transform data
monitorfish_regulations = add_article_id(monitorfish_regulations, url_column="url")
legipeche_regulations = add_article_id(legipeche_regulations, url_column="page_url")
missing_references = get_missing_references(monitorfish_regulations)
modified_regulations = get_modified_regulations(
legipeche_regulations, monitorfish_regulations
)
modified_regulations = transform_modified_regulations(
modified_regulations, monitorfish_regulations
)
(
previous_extraction_datetime_utc,
latest_extraction_datetime_utc,
) = get_extraction_datetimes(legipeche_regulations)
unknown_links = get_unknown_links(
monitorfish_regulations=monitorfish_regulations,
legipeche_regulations=legipeche_regulations,
)
dead_links = get_dead_links_fn(monitorfish_regulations, unknown_links, proxies)
dead_links = format_dead_links(dead_links)
outdated_references = get_outdated_references(monitorfish_regulations, utcnow)
outdated_references = format_outdated_references(outdated_references)
# Render email
body = render_body(
body_template=body_template,
previous_extraction_datetime_utc=previous_extraction_datetime_utc,
latest_extraction_datetime_utc=latest_extraction_datetime_utc,
missing_references=missing_references,
modified_regulations=modified_regulations,
dead_links=dead_links,
outdated_references=outdated_references,
backoffice_regulation_url=backoffice_regulation_url,
utcnow=utcnow,
)
html = render_main(
main_template=main_template,
style=style,
body=body,
)
# Send
recipients = get_recipients()
msg = create_message(html, recipients)
send_message_fn(msg)
return msg