Source code for pipeline.src.deployments

from dataclasses import dataclass, field
from typing import List

from prefect import Flow
from prefect.client.schemas.objects import (
    ConcurrencyLimitConfig,
    ConcurrencyLimitStrategy,
)
from prefect.runner.storage import LocalStorage
from prefect.schedules import Schedule

from config import (
    ERS_FILES_LOCATION,
    HOST_ENV_FILE_LOCATION,
    IS_INTEGRATION,
    LOGBOOK_FILES_GID,
    MAX_FISHING_SPEED_THRESHOLD,
    MIN_FISHING_SPEED_THRESHOLD,
    MINIMUM_CONSECUTIVE_POSITIONS,
    MINIMUM_MINUTES_OF_EMISSION_AT_SEA,
    PNO_TEST_MODE,
    PREFECT_API_URL,
    TEST_MODE,
    WEEKLY_CONTROL_REPORT_EMAIL_TEST_MODE,
)
from src.flows.activity_visualizations import activity_visualizations_flow
from src.flows.admin_areas import admin_areas_flow
from src.flows.anchorages import anchorages_flow
from src.flows.beacons import beacons_flow
from src.flows.control_anteriority import control_anteriority_flow
from src.flows.control_units import control_units_flow
from src.flows.controls import controls_flow
from src.flows.controls_open_data import controls_open_data_flow
from src.flows.current_segments import current_segments_flow
from src.flows.distribute_pnos import distribute_pnos_flow
from src.flows.districts import districts_flow
from src.flows.email_actions_to_units import email_actions_to_units_flow
from src.flows.enrich_logbook import enrich_logbook_flow
from src.flows.enrich_positions import enrich_positions_flow
from src.flows.facade_areas import facade_areas_flow
from src.flows.fao_areas import fao_areas_flow
from src.flows.fishing_gear_codes import fishing_gear_codes_flow
from src.flows.foreign_fmcs import foreign_fmcs_flow
from src.flows.infractions import infractions_flow
from src.flows.init_infraction_threat_characterization import (
    init_infraction_threat_characterization_flow,
)
from src.flows.init_pno_types import init_pno_types_flow
from src.flows.init_species_groups import init_species_groups_flow
from src.flows.last_positions import last_positions_flow
from src.flows.missing_dep_alerts import missing_dep_alerts_flow
from src.flows.missing_far_alerts import missing_far_alerts_flow
from src.flows.missing_trip_numbers import missing_trip_numbers_flow
from src.flows.missions import missions_flow
from src.flows.notify_beacon_malfunctions import notify_beacon_malfunctions_flow
from src.flows.ports import ports_flow
from src.flows.position_alert import position_alert_flow
from src.flows.position_alerts import position_alerts_flow
from src.flows.recompute_controls_segments import recompute_controls_segments_flow
from src.flows.refresh_materialized_view import refresh_materialized_view_flow
from src.flows.regulations import regulations_flow
from src.flows.regulations_checkup import regulations_checkup_flow
from src.flows.regulations_open_data import regulations_open_data_flow
from src.flows.risk_elements import risk_elements_flow
from src.flows.risk_factors import risk_factors_flow
from src.flows.sales_and_logbook import sales_and_logbook_flow
from src.flows.scrape_legipeche import scrape_legipeche_flow
from src.flows.species import species_flow
from src.flows.suspicions_of_under_declaration_alerts import (
    suspicions_of_under_declaration_alerts_flow,
)
from src.flows.trips_snapshot import trips_snapshot_flow
from src.flows.update_beacon_malfunctions import update_beacon_malfunctions_flow
from src.flows.validate_pending_alerts import validate_pending_alerts_flow
from src.flows.vessel_profiles import vessel_profiles_flow
from src.flows.vessels import vessels_flow

################################# List flows to deploy ################################


[docs] def default_concurrency_limit() -> ConcurrencyLimitConfig: return ConcurrencyLimitConfig( limit=1, collision_strategy=ConcurrencyLimitStrategy.CANCEL_NEW )
@dataclass
[docs] class FlowAndSchedules:
[docs] flow: Flow
[docs] schedules: List[Schedule] = None
[docs] concurrency_limit: ConcurrencyLimitConfig = field( default_factory=default_concurrency_limit )
[docs] flows_to_deploy = [ FlowAndSchedules( flow=activity_visualizations_flow, schedules=[Schedule(cron="0 20 1 * *")] ), FlowAndSchedules( flow=admin_areas_flow, schedules=[Schedule(cron="43 4 * * 1,2,3,4,5")] ), FlowAndSchedules(flow=anchorages_flow), FlowAndSchedules( flow=beacons_flow, schedules=[Schedule(cron="4,14,24,34,44,54 * * * *")] ), FlowAndSchedules( flow=control_anteriority_flow, schedules=[Schedule(cron="5 * * * *")] ), FlowAndSchedules(flow=control_units_flow, schedules=[Schedule(cron="12 8 * * *")]), FlowAndSchedules(flow=controls_flow), FlowAndSchedules( flow=controls_open_data_flow, schedules=[Schedule(cron="15 3 * * 5")] ), FlowAndSchedules( flow=current_segments_flow, schedules=[Schedule(cron="2,22,42 * * * *")] ), FlowAndSchedules( flow=distribute_pnos_flow, schedules=[ Schedule( cron="* * * * *", parameters={ "test_mode": PNO_TEST_MODE, "is_integration": IS_INTEGRATION, "start_hours_ago": 120, "end_hours_ago": 0, }, ) ], ), FlowAndSchedules(flow=districts_flow), FlowAndSchedules( flow=email_actions_to_units_flow, schedules=[ Schedule( cron="0 5 * * 1", parameters={ "start_days_ago": 7, "end_days_ago": 1, "test_mode": WEEKLY_CONTROL_REPORT_EMAIL_TEST_MODE, "is_integration": IS_INTEGRATION, }, ) ], ), FlowAndSchedules( flow=enrich_logbook_flow, schedules=[ Schedule( cron="1,6,11,16,21,26,31,36,41,46,51,56 * * * *", parameters={ "start_hours_ago": 6, "end_hours_ago": 0, "minutes_per_chunk": 480, "recompute_all": False, }, ) ], ), FlowAndSchedules( flow=enrich_positions_flow, schedules=[ Schedule( cron="1-59 * * * *", parameters={ "start_hours_ago": 7, "end_hours_ago": 0, "minutes_per_chunk": 420, "chunk_overlap_minutes": 0, "minimum_consecutive_positions": MINIMUM_CONSECUTIVE_POSITIONS, "minimum_minutes_of_emission_at_sea": MINIMUM_MINUTES_OF_EMISSION_AT_SEA, "min_fishing_speed_threshold": MIN_FISHING_SPEED_THRESHOLD, "max_fishing_speed_threshold": MAX_FISHING_SPEED_THRESHOLD, "recompute_all": False, }, ), Schedule( cron="0 * * * *", parameters={ "start_hours_ago": 24, "end_hours_ago": 0, "minutes_per_chunk": 1440, "chunk_overlap_minutes": 0, "minimum_consecutive_positions": MINIMUM_CONSECUTIVE_POSITIONS, "minimum_minutes_of_emission_at_sea": MINIMUM_MINUTES_OF_EMISSION_AT_SEA, "min_fishing_speed_threshold": MIN_FISHING_SPEED_THRESHOLD, "max_fishing_speed_threshold": MAX_FISHING_SPEED_THRESHOLD, "recompute_all": True, }, ), ], ), FlowAndSchedules(flow=species_flow), FlowAndSchedules(flow=facade_areas_flow), FlowAndSchedules(flow=fao_areas_flow), FlowAndSchedules(flow=fishing_gear_codes_flow), FlowAndSchedules(flow=foreign_fmcs_flow, schedules=[Schedule(cron="37 10 * * *")]), FlowAndSchedules(flow=infractions_flow, schedules=[Schedule(cron="1 8 * * *")]), FlowAndSchedules(flow=init_infraction_threat_characterization_flow), FlowAndSchedules(flow=init_pno_types_flow), FlowAndSchedules(flow=init_species_groups_flow), FlowAndSchedules( flow=last_positions_flow, schedules=[ Schedule( cron=( "0,2,4,6,8,10,12,14,16,18,20,22,24,26,28,30," "32,34,36,38,40,42,44,46,48,50,52,54,56,58 " "* * * *" ), parameters={"minutes": 1440, "action": "update"}, ), ], ), FlowAndSchedules( flow=sales_and_logbook_flow, schedules=[Schedule(cron="0,5,10,15,20,25,30,35,40,45,50,55 * * * *")], ), FlowAndSchedules( flow=missing_dep_alerts_flow, schedules=[Schedule(cron="5,25,45 * * * *")] ), FlowAndSchedules( flow=missing_far_alerts_flow, schedules=[ Schedule( cron="45 6 * * *", parameters={ "alert_type": "MISSING_FAR_ALERT", "name": "FAR manquant en 24h", "states_iso2_to_monitor_everywhere": ["FR"], "states_iso2_to_monitor_in_french_eez": ["BE", "VE"], "max_share_of_vessels_with_missing_fars": 0.5, "minimum_length": 12.0, "only_raise_if_route_shows_fishing": True, "days_without_far": 1, }, ), Schedule( cron="55 6 * * *", parameters={ "alert_type": "MISSING_FAR_48_HOURS_ALERT", "name": "FAR manquant en 48h", "states_iso2_to_monitor_everywhere": ["FR"], "states_iso2_to_monitor_in_french_eez": ["BE", "VE"], "max_share_of_vessels_with_missing_fars": 0.5, "minimum_length": 12.0, "only_raise_if_route_shows_fishing": True, "days_without_far": 2, }, ), ], ), FlowAndSchedules( flow=missing_trip_numbers_flow, schedules=[Schedule(cron="4,14,24,34,44,54 * * * *")], ), FlowAndSchedules( flow=missions_flow, schedules=[ Schedule( cron="6 4 * * *", parameters={"number_of_months": 200}, ) ], ), FlowAndSchedules( flow=notify_beacon_malfunctions_flow, schedules=[ Schedule( cron=( "1,3,5,7,9,11,13,15,17,19,21,23,25,27,29,31," "33,35,37,39,41,43,45,47,49,51,53,55,57,59 " "* * * *" ), parameters={ "test_mode": TEST_MODE, "is_integration": IS_INTEGRATION, }, ), ], ), FlowAndSchedules(flow=ports_flow), FlowAndSchedules( flow=position_alert_flow, concurrency_limit=ConcurrencyLimitConfig( limit=3, collision_strategy=ConcurrencyLimitStrategy.ENQUEUE, ), ), FlowAndSchedules( flow=position_alerts_flow, schedules=[ Schedule( cron="1,11,21,31,41,51 * * * *", ) ], ), FlowAndSchedules(flow=recompute_controls_segments_flow), FlowAndSchedules( flow=refresh_materialized_view_flow, schedules=[ Schedule( cron="20 4 * * *", parameters={ "view_name": "analytics_controls_full_data", }, ), ], ), FlowAndSchedules( flow=regulations_checkup_flow, schedules=[Schedule(cron="5 6 * * 1,2,3,4,5")], ), FlowAndSchedules( flow=regulations_open_data_flow, schedules=[Schedule(cron="18 1 * * 5")], ), FlowAndSchedules( flow=regulations_flow, schedules=[Schedule(cron="2,32 * * * *")], ), FlowAndSchedules( flow=risk_elements_flow, schedules=[Schedule(cron="30 3 * * *")], ), FlowAndSchedules( flow=risk_factors_flow, schedules=[Schedule(cron="3,23,43 * * * *")], ), FlowAndSchedules( flow=scrape_legipeche_flow, schedules=[Schedule(cron="15 5 * * 1,2,3,4,5")], ), FlowAndSchedules( flow=suspicions_of_under_declaration_alerts_flow, schedules=[Schedule(cron="57 6 * * *")], ), FlowAndSchedules( flow=trips_snapshot_flow, schedules=[Schedule(cron="37 4 * * *")], ), FlowAndSchedules( flow=update_beacon_malfunctions_flow, schedules=[Schedule(cron="6,16,26,36,46,56 * * * *")], ), FlowAndSchedules( flow=validate_pending_alerts_flow, schedules=[ Schedule( cron="50 6 * * *", parameters={"alert_type": "MISSING_FAR_ALERT"}, ) ], ), FlowAndSchedules( flow=vessel_profiles_flow, schedules=[Schedule(cron="51 * * * *")], ), FlowAndSchedules( flow=vessels_flow, schedules=[Schedule(cron="2 2,5,8,11,14,20,23 * * *")], ), ]
[docs] deployments = []
for flow_to_deploy in flows_to_deploy: # Ensure flow name unicity among all projects orchestrated by Prefect 3 assert flow_to_deploy.flow.name[:14] == "Monitorfish - "
[docs] deployment = flow_to_deploy.flow.to_deployment( name=flow_to_deploy.flow.name, schedules=flow_to_deploy.schedules, concurrency_limit=flow_to_deploy.concurrency_limit, tags=["monitorfish"], )
deployment.job_variables = { "env": {"PREFECT_API_URL": PREFECT_API_URL}, "volumes": [ f"{HOST_ENV_FILE_LOCATION}:/home/monitorfish-pipeline/pipeline/.env" ], "auto_remove": True, "image_pull_policy": "IfNotPresent", } deployment.work_pool_name = "monitorfish" deployment.storage = LocalStorage("/home/monitorfish-pipeline/pipeline") if deployment.name == "Monitorfish - Sales and Logbook": deployment.job_variables["container_create_kwargs"] = { "group_add": [LOGBOOK_FILES_GID] } deployment.job_variables["volumes"].append( f"{ERS_FILES_LOCATION}:{ERS_FILES_LOCATION}" ) deployments.append(deployment)