from datetime import datetime
from typing import List
import pandas as pd
from prefect import flow, task
from prefect.deployments import run_deployment
from src.entities.alerts import PositionAlertSpecification
from src.flows.position_alert import position_alert_flow
from src.generic_tasks import extract
from src.helpers.alerts import position_alert_specification_must_run_now
from src.shared_tasks.dates import get_utcnow
@task
@task
[docs]
def to_position_alert_specifications(
position_alerts: pd.DataFrame,
) -> List[PositionAlertSpecification]:
return [
PositionAlertSpecification(**position_alert)
for position_alert in position_alerts.to_dict(orient="records")
]
@task
[docs]
def get_alerts_that_must_run_now(
alert_specifications: List[PositionAlertSpecification],
now: datetime,
) -> List[PositionAlertSpecification]:
return [
a
for a in alert_specifications
if position_alert_specification_must_run_now(alert_spec=a, now=now)
]
@flow(name="Monitorfish - Position alerts")
[docs]
def position_alerts_flow():
positions_alerts = extract_position_alerts()
now = get_utcnow()
position_alert_specifications = to_position_alert_specifications(positions_alerts)
position_alert_specifications_to_run = get_alerts_that_must_run_now(
alert_specifications=position_alert_specifications, now=now
)
for position_alert_specification in position_alert_specifications_to_run:
run_deployment(
name=f"{position_alert_flow.name}/{position_alert_flow.name}",
parameters=dict(
position_alert_id=position_alert_specification.id,
name=position_alert_specification.name,
description=position_alert_specification.description,
natinf_code=position_alert_specification.natinf_code,
threat=position_alert_specification.threat,
threat_characterization=position_alert_specification.threat_characterization,
track_analysis_depth=position_alert_specification.track_analysis_depth,
only_fishing_positions=position_alert_specification.only_fishing_positions,
gears=position_alert_specification.gears,
species=position_alert_specification.species,
species_catch_areas=position_alert_specification.species_catch_areas,
administrative_areas=position_alert_specification.administrative_areas,
regulatory_areas=position_alert_specification.regulatory_areas,
min_depth=position_alert_specification.min_depth,
flag_states_iso2=position_alert_specification.flag_states_iso2,
vessel_ids=position_alert_specification.vessel_ids,
district_codes=position_alert_specification.district_codes,
producer_organizations=position_alert_specification.producer_organizations,
),
timeout=0,
)