Source code for pipeline.src.flows.suspicions_of_under_declaration_alerts

from prefect import flow, task

from src.entities.alerts import AlertType
from src.generic_tasks import extract
from src.shared_tasks.alerts import (
    extract_active_reportings,
    extract_silenced_alerts,
    filter_alerts,
    load_alerts,
    make_alerts,
)


@task
[docs] def extract_suspicions_of_under_declaration(): return extract( db_name="monitorfish_remote", query_filepath="monitorfish/suspicions_of_under_declaration.sql", )
@flow(name="Monitorfish - Suspicions of under-declaration")
[docs] def suspicions_of_under_declaration_alerts_flow(): vessels_with_suspicions_of_under_declaration = ( extract_suspicions_of_under_declaration.submit() ) silenced_alerts = extract_silenced_alerts.submit( AlertType.SUSPICION_OF_UNDER_DECLARATION_ALERT.value, # 8 days, to cover the date range analyzed in # `extract_suspicions_of_under_declaration` number_of_hours=192, ) active_reportings = extract_active_reportings.submit( AlertType.SUSPICION_OF_UNDER_DECLARATION_ALERT.value ) alerts = make_alerts( vessels_with_suspicions_of_under_declaration, AlertType.SUSPICION_OF_UNDER_DECLARATION_ALERT.value, "Suspicion de sous-déclaration", natinf_code=27689, threat="Obligations déclaratives", threat_characterization="FAR (JPE)", ) filtered_alerts = filter_alerts(alerts, silenced_alerts, active_reportings) # Load load_alerts( filtered_alerts, alert_config_name=AlertType.SUSPICION_OF_UNDER_DECLARATION_ALERT.value, )