Source code for pipeline.src.flows.regulations_checkup

import datetime
import logging
import os
import re
from email.message import EmailMessage
from typing import Callable, Iterable, List, Tuple

import jinja2
import pandas as pd
import pytz
import requests
from prefect import flow, get_run_logger, task

from config import (
    BACKOFFICE_REGULATION_URL,
    CNSP_FRANCE_EMAIL_ADDRESS,
    EMAIL_STYLESHEETS_LOCATION,
    EMAIL_TEMPLATES_LOCATION,
    PROXIES,
)
from src.generic_tasks import extract
from src.helpers.emails import create_html_email, send_email
from src.processing import get_matched_groups, try_get_factory
from src.shared_tasks.dates import get_utcnow

####################################### Helpers #######################################






################################### Tasks and flows ###################################


@task
[docs] def extract_monitorfish_regulations() -> pd.DataFrame: """ Extracts regulation references from the monitorfish `regulations` table. The ouptut DataFrame contains one line per regulatory reference, which means there can be multiple lines for one regulated zone, if the zone has several regulatory references. Output columns are `law_type`, `topic`, `zone`, `url` and `reference`. Regulatory zones without any regulatory reference are present in the output as a line with `None` as `url` and `reference` values. Returns: pd.DataFrame: DataFrame of regulatory references """ logger = get_run_logger() monitorfish_regulations = extract( "monitorfish_remote", query_filepath="monitorfish/regulations_references.sql", ) monitorfish_regulations = monitorfish_regulations.explode( "regulatory_references", ignore_index=True ) monitorfish_regulations["url"] = monitorfish_regulations.regulatory_references.map( try_get_factory("url", error_value=None) ) monitorfish_regulations[ "reference" ] = monitorfish_regulations.regulatory_references.map( try_get_factory("reference", error_value=None) ) def parse_datetime_string(s): if s == "infinite": res = datetime.datetime(9999, 12, 31) else: try: res = datetime.datetime.fromisoformat(s).replace(tzinfo=None) except (ValueError, TypeError) as error: logger.error(error) logger.error(f"Could not parse datetime string {s}, returning `None`") res = None return res monitorfish_regulations[ "end_date" ] = monitorfish_regulations.regulatory_references.map( try_get_factory("endDate", error_value=None) ).map( parse_datetime_string, na_action="ignore" ) monitorfish_regulations = monitorfish_regulations.drop( columns=["regulatory_references"] ) return monitorfish_regulations
@task
[docs] def extract_legipeche_regulations() -> pd.DataFrame: """ Extracts legipeche regulations from the monitorfish `legipeche` table (which is scraped from legipeche by the `Scrape Legipeche` flow). The ouput has one line per document - there can be multiple documents for the same Legipeche page. Output columns are `extraction_datetime_utc`, `extraction_occurence`, `page_title`, `page_url`, `document_title`, and `document_url`. Returns: pd.DataFrame: DataFrame of Legipeche regulations. """ return extract( "monitorfish_remote", query_filepath="monitorfish/legipeche.sql", )
@task
[docs] def add_article_id(regulations: pd.DataFrame, url_column: str) -> pd.DataFrame: """ Adds an `article_id` column to the `regulations` DataFrame, extracting the article_id from the `url_column` according the the Legipeche URL schema. Rows for which the URL does not match the Legipeche URL schema will have an article_id of `None`. Args: regulations (pd.DataFrame): DataFrame of regulations url_column (str): Name of the column containing URLs of regulation pages Returns: pd.DataFrame: copy of input `regulations` with an added `article_id` column """ legipeche_regex = re.compile( ( r"^https?://legipeche\.metier\." r"e2\.rie\.gouv\.fr/" r"(?:[a-zA-Z0-9-]*)" r"-a(?P<article_id>\d+)" r"\.html" r".*$" ) ) regulations = pd.concat( [ regulations, regulations[url_column].apply(get_matched_groups, regex=legipeche_regex), ], axis=1, ) return regulations
@task
[docs] def get_extraction_datetimes(legipeche_regulations: pd.DataFrame) -> Tuple[str, str]: """ Returns the extraction datetimes of `previous` and `latest` legipeche extraction occurences from the `legipeche_regulations` DataFrame. The input must have `extraction_occurence` and `extraction_datetime_utc` columns. Args: legipeche_regulations (pd.DataFrame): DataFrame of legipeche extractions. Returns: Tuple[str, str]: extraction datetimes of `previous` and `latest` legipeche extractions """ previous_extraction_datetime_utc = legipeche_regulations.loc[ legipeche_regulations.extraction_occurence == "previous", "extraction_datetime_utc", ].iloc[0] latest_extraction_datetime_utc = legipeche_regulations.loc[ legipeche_regulations.extraction_occurence == "latest", "extraction_datetime_utc", ].iloc[0] def naive_datetime_utc_to_paris_datetime_string( naive_dt_utc: datetime.datetime, ) -> str: """ Takes a naive `datetime`, supposed to represent a UTC datetime object, converts it to Europe/Paris aware `datetime` and returns it as a formatted string like "%d/%m/%Y %H:%M". Args: naive_dt_utc (datetime.datetime): naive `datetime` Returns: str: `str` formatted Europe/paris represenation of the input datetime """ dt_utc = pytz.UTC.localize(naive_dt_utc) res = dt_utc.astimezone(pytz.timezone("Europe/Paris")).strftime( "%d/%m/%Y %H:%M" ) return res return ( naive_datetime_utc_to_paris_datetime_string(previous_extraction_datetime_utc), naive_datetime_utc_to_paris_datetime_string(latest_extraction_datetime_utc), )
@task
[docs] def get_modified_regulations( legipeche_regulations: pd.DataFrame, monitorfish_regulations: pd.DataFrame ) -> pd.DataFrame: """ Filters the input `legipeche_regulations` and returns legipeche regulations (documents) that : - have been either added to or removed from an existing Legipeche page between the `previous` and `latest` Legipeche scraping occurences - belong to a Legipeche page referenced by at least one `monitorfish_regulation` Args: legipeche_regulations (pd.DataFrame): monitorfish_regulations (pd.DataFrame): Returns: pd.DataFrame: filtered DataFrame of Legipeche regulations """ legipeche_latest_article_ids = set( legipeche_regulations.loc[ legipeche_regulations.extraction_occurence == "latest", "article_id" ] ) legipeche_previous_article_ids = set( legipeche_regulations.loc[ legipeche_regulations.extraction_occurence == "previous", "article_id" ] ) legipeche_stable_article_ids = legipeche_latest_article_ids.intersection( legipeche_previous_article_ids ) legipeche_latest_document_urls = set( legipeche_regulations.loc[ legipeche_regulations.extraction_occurence == "latest", "document_url" ] ) legipeche_previous_document_urls = set( legipeche_regulations.loc[ legipeche_regulations.extraction_occurence == "previous", "document_url" ] ) legipeche_modified_documents = legipeche_latest_document_urls.symmetric_difference( legipeche_previous_document_urls ) monitorfish_regulations_article_ids = set( monitorfish_regulations.article_id.dropna() ) modified_legipeche_regulations = legipeche_regulations[ (legipeche_regulations.document_url.isin(legipeche_modified_documents)) & (legipeche_regulations.article_id.isin(legipeche_stable_article_ids)) & (legipeche_regulations.article_id.isin(monitorfish_regulations_article_ids)) ].reset_index(drop=True) return modified_legipeche_regulations
@task
[docs] def transform_modified_regulations( modified_regulations: pd.DataFrame, monitorfish_regulations: pd.DataFrame ) -> pd.DataFrame: """ Formats `modified_regulations` into a DataFrame suitable for printing in an email. Args: modified_regulations (pd.DataFrame): DataFrame with columns : - `extraction_occurence`, having values 'previous' and 'latest - `page_url` - `document_title` - `document_url` monitorfish_regulations (pd.DataFrame): DataFrame with columns : - `url` (url of the regulatory reference in Monitorfish) - `reference` (name of the regulatory reference in Monitorfish) - `law_type` - `topic` - `zone` Returns: pd.DataFrame: formatted DataFrame of regulation modifications """ logger = get_run_logger() modified_regulations = pd.merge( monitorfish_regulations, modified_regulations, on="article_id" ) modified_regulations[ "Modification" ] = modified_regulations.extraction_occurence.map( lambda s: "Ajout de document" if s == "latest" else "Suppression de document" ) modified_regulations["Référence réglementaire"] = make_html_hyperlinks( modified_regulations.url, modified_regulations.reference, logger=logger ) modified_regulations["Document"] = make_html_hyperlinks( modified_regulations.document_url, modified_regulations.document_title, logger=logger, ) modified_regulations = ( modified_regulations[ [ "law_type", "topic", "zone", "Référence réglementaire", "Modification", "Document", ] ] .sort_values( [ "law_type", "topic", "zone", "Référence réglementaire", "Modification", "Document", ] ) .rename( columns={ "law_type": "Type de réglementation", "topic": "Thématique", "zone": "Zone", } ) ).reset_index(drop=True) return modified_regulations
@task
[docs] def get_missing_references(monitorfish_regulations: pd.DataFrame) -> pd.DataFrame: """ Returns `monitorfish_regulations` with null values as `reference`. Args: monitorfish_regulations (pd.DataFrame): monitorfish_regulations. Must have columns : - `reference` - `law_type` - `topic` - `zone` Returns: pd.DataFrame: Filtered and formatted version of input. """ return ( monitorfish_regulations.loc[ monitorfish_regulations.reference.isna(), ["law_type", "topic", "zone"], ] .copy(deep=True) .sort_values(["law_type", "topic", "zone"]) .rename( columns={ "law_type": "Type de réglementation", "topic": "Thématique", "zone": "Zone", } ) .reset_index(drop=True) )
@task @task @task @task
[docs] def get_outdated_references( monitorfish_regulations: pd.DataFrame, now: datetime.datetime ) -> pd.DataFrame: """ Returns `monitorfish_regulations` that have an `end_date` which is before `now`. Args: monitorfish_regulations (pd.DataFrame): DataFrame of Monitorfish regulations. Must have at least a `end_date` column. now (datetime.datetime): now Returns: pd.DataFrame: Subset of `monitorfish_regulations` """ return monitorfish_regulations[monitorfish_regulations.end_date < now].reset_index( drop=True )
@task
[docs] def format_outdated_references(outdated_references: pd.DataFrame) -> pd.DataFrame: """ Format input for printing. """ logger = get_run_logger() outdated_references = ( outdated_references.sort_values(["law_type", "topic", "zone"]) .assign(ref=lambda x: make_html_hyperlinks(x.url, x.reference, logger=logger)) .rename( columns={ "law_type": "Type de réglementation", "topic": "Thématique", "zone": "Zone", "end_date": "Date de fin de validité", "ref": "Référence réglementaire", } )[ [ "Type de réglementation", "Thématique", "Zone", "Référence réglementaire", "Date de fin de validité", ] ] .reset_index(drop=True) .copy(deep=True) ) return outdated_references
@task
[docs] def get_main_template() -> jinja2.environment.Template: with open(EMAIL_TEMPLATES_LOCATION / "regulations_checkup/main.jinja", "r") as f: return jinja2.Template(f.read())
@task
[docs] def get_body_template() -> jinja2.environment.Template: with open(EMAIL_TEMPLATES_LOCATION / "regulations_checkup/body.jinja", "r") as f: return jinja2.Template(f.read())
@task
[docs] def get_style() -> str: with open(EMAIL_STYLESHEETS_LOCATION / "splendid.css") as f: style = f.read() return style
@task
[docs] def render_body( body_template: jinja2.environment.Template, previous_extraction_datetime_utc: datetime.datetime, latest_extraction_datetime_utc: datetime.datetime, missing_references: pd.DataFrame, modified_regulations: pd.DataFrame, dead_links: pd.DataFrame, outdated_references: pd.DataFrame, backoffice_regulation_url: str, utcnow: datetime.datetime, ) -> str: """ Renders email body as html string. """ email_content = { "previous_extraction_datetime_utc": previous_extraction_datetime_utc, "latest_extraction_datetime_utc": latest_extraction_datetime_utc, "verification_date": utcnow.date().strftime("%d/%m/%Y"), "backoffice_regulation_url": backoffice_regulation_url, } if len(missing_references) > 0: email_content["missing_references"] = missing_references.to_html( index=False, justify="center", escape=False ) email_content["n_missing_references"] = len(missing_references) if len(dead_links) > 0: email_content["dead_links"] = dead_links.to_html( index=False, justify="center", escape=False ) email_content["n_dead_links"] = len(dead_links) if len(modified_regulations) > 0: email_content["modified_regulations"] = modified_regulations.to_html( index=False, justify="center", escape=False ) email_content["n_modified_regulations"] = len(modified_regulations) if len(outdated_references) > 0: email_content["outdated_references"] = outdated_references.to_html( index=False, justify="center", escape=False ) email_content["n_outdated_references"] = len(outdated_references) return body_template.render(email_content)
@task
[docs] def render_main( main_template: jinja2.environment.Template, style: str, body: str ) -> str: return main_template.render(style=style, body=body)
@task
[docs] def get_recipients() -> List[str]: try: assert CNSP_FRANCE_EMAIL_ADDRESS is not None except AssertionError: logging.error("CNSP_FRANCE_EMAIL_ADDRESS environment variable is not set.") raise return [CNSP_FRANCE_EMAIL_ADDRESS]
@task
[docs] def create_message(html: str, recipients: List[str]) -> EmailMessage: msg = create_html_email( to=recipients, subject="[Monitorfish] Suivi des modifications Legipêche dans Monitorfish", html=html, ) return msg
@task
[docs] def send_message(msg: EmailMessage): send_email(msg)
@flow(name="Monitorfish - Regulations checkup")
[docs] def regulations_checkup_flow( proxies: dict = PROXIES, backoffice_regulation_url: str = BACKOFFICE_REGULATION_URL, get_utcnow_fn=get_utcnow, get_dead_links_fn: Callable = get_dead_links, send_message_fn: Callable = send_message, ): # Extract data monitorfish_regulations = extract_monitorfish_regulations.submit() legipeche_regulations = extract_legipeche_regulations.submit() utcnow = get_utcnow_fn() # Extract output templates main_template = get_main_template() body_template = get_body_template() style = get_style() # Transform data monitorfish_regulations = add_article_id(monitorfish_regulations, url_column="url") legipeche_regulations = add_article_id(legipeche_regulations, url_column="page_url") missing_references = get_missing_references(monitorfish_regulations) modified_regulations = get_modified_regulations( legipeche_regulations, monitorfish_regulations ) modified_regulations = transform_modified_regulations( modified_regulations, monitorfish_regulations ) ( previous_extraction_datetime_utc, latest_extraction_datetime_utc, ) = get_extraction_datetimes(legipeche_regulations) unknown_links = get_unknown_links( monitorfish_regulations=monitorfish_regulations, legipeche_regulations=legipeche_regulations, ) dead_links = get_dead_links_fn(monitorfish_regulations, unknown_links, proxies) dead_links = format_dead_links(dead_links) outdated_references = get_outdated_references(monitorfish_regulations, utcnow) outdated_references = format_outdated_references(outdated_references) # Render email body = render_body( body_template=body_template, previous_extraction_datetime_utc=previous_extraction_datetime_utc, latest_extraction_datetime_utc=latest_extraction_datetime_utc, missing_references=missing_references, modified_regulations=modified_regulations, dead_links=dead_links, outdated_references=outdated_references, backoffice_regulation_url=backoffice_regulation_url, utcnow=utcnow, ) html = render_main( main_template=main_template, style=style, body=body, ) # Send recipients = get_recipients() msg = create_message(html, recipients) send_message_fn(msg) return msg