Source code for pipeline.src.flows.foreign_fmcs

import pandas as pd
from prefect import flow, get_run_logger, task

from config import CNSP_SIP_DEPARTMENT_EMAIL
from src.generic_tasks import extract, load


@task
[docs] def extract_foreign_fmcs_contacts() -> pd.DataFrame: """ Extract foreign fmcs contact data from Poseidon. Returns: pd.DataFrame: foreign fmcs contact data. """ return extract("fmc", "fmc/foreign_fmcs_contacts.sql")
@task
[docs] def transform_foreign_fmcs_contacts( foreign_fmcs_contacts: pd.DataFrame, ) -> pd.DataFrame: """ Remove `CNSP_SIP_DEPARTMENT_EMAIL` where present and aggregate countries' email addresses into arrays. Args: foreign_fmcs_contacts (pd.DataFrame): Extracted fmc data from Poseidon. Returns: pd.DataFrame: Transformed fmc data. """ foreign_fmcs = ( foreign_fmcs_contacts.groupby("country_code_iso3")[ ["country_code_iso3", "country_name"] ] .head(1) .reset_index(drop=True) ) foreign_fmcs_contacts = foreign_fmcs_contacts[ foreign_fmcs_contacts.email_address != CNSP_SIP_DEPARTMENT_EMAIL ].dropna(subset=["email_address"]) foreign_fmcs_contacts = ( foreign_fmcs_contacts.groupby("country_code_iso3")["email_address"] .unique() .rename("email_addresses") .reset_index() ) foreign_fmcs = pd.merge( foreign_fmcs, foreign_fmcs_contacts, on="country_code_iso3", how="left" ) return foreign_fmcs
@task
[docs] def load_foreign_fmcs(foreign_fmcs): load( foreign_fmcs, table_name="foreign_fmcs", schema="public", db_name="monitorfish_remote", logger=get_run_logger(), how="replace", replace_with_truncate=True, pg_array_columns=["email_addresses"], )
@flow(name="Monitorfish - Foreign FMCs")
[docs] def foreign_fmcs_flow(extract_foreign_fmcs_contacts_task=extract_foreign_fmcs_contacts): foreign_fmcs_contacts = extract_foreign_fmcs_contacts_task() foreign_fmcs = transform_foreign_fmcs_contacts(foreign_fmcs_contacts) load_foreign_fmcs(foreign_fmcs)