from datetime import datetime, timedelta
from typing import List, Tuple
import duckdb
import pandas as pd
from geoalchemy2.functions import ST_Intersects
from prefect import flow, task
from sqlalchemy import Table, and_, not_, or_, select
from sqlalchemy.sql import Select
from src.entities.alerts import (
AdminAreasSpecification,
AdminAreasSpecWithTable,
AdministrativeAreaType,
AreasTableMetadata,
GearSpecification,
RegulatoryAreaSpecification,
SpeciesSpecification,
)
from src.generic_tasks import extract, read_query_task
from src.processing import join_on_multiple_keys
from src.shared_tasks.alerts import (
extract_silenced_alerts,
filter_alerts,
load_alerts,
make_alerts,
)
from src.shared_tasks.dates import get_utcnow
from src.shared_tasks.infrastructure import get_table
from src.shared_tasks.positions import add_depth, add_vessel_identifier
from src.shared_tasks.risk_factors import extract_current_risk_factors
from src.shared_tasks.vessels import add_vessel_id, add_vessels_columns
@task
@task
[docs]
def to_admin_areas_spec_with_table(
spec: AdminAreasSpecification, table_metadata: AreasTableMetadata, table: Table
) -> AdminAreasSpecWithTable:
return AdminAreasSpecWithTable(
area_type=spec.areaType,
areas=spec.areas,
metadata=table_metadata,
table=table,
)
[docs]
def to_regulatory_area_filter(
spec: RegulatoryAreaSpecification, regulations_table: Table
):
filters = []
if spec.lawType:
filters.append(regulations_table.c.law_type == spec.lawType)
if spec.topic:
filters.append(regulations_table.c.topic == spec.topic)
if spec.zone:
filters.append(regulations_table.c.zone == spec.zone)
if len(filters) == 0:
raise ValueError("Cannot set regulatory area filter without any criterion")
return and_(*filters)
@task
[docs]
def make_vessels_query(
*,
vessels_table: Table,
prod_org_memberships_table: Table | None,
vessel_ids: list | None,
district_codes: list | None,
producer_organizations: list | None,
) -> Select:
from_table = vessels_table
filter_conditions = []
if vessel_ids:
filter_conditions.append(vessels_table.c.id.in_(vessel_ids))
if district_codes:
filter_conditions.append(vessels_table.c.district_code.in_(district_codes))
if producer_organizations:
assert isinstance(prod_org_memberships_table, Table)
from_table = from_table.join(
prod_org_memberships_table,
from_table.c.cfr == prod_org_memberships_table.c.internal_reference_number,
)
filter_conditions.append(
prod_org_memberships_table.c.organization_name.in_(producer_organizations)
)
return (
select(
vessels_table.c.id,
vessels_table.c.cfr,
vessels_table.c.external_immatriculation,
vessels_table.c.ircs,
)
.select_from(from_table)
.where(and_(*filter_conditions))
)
@task
[docs]
def get_sets_of_identifiers(vessels: pd.DataFrame) -> Tuple[set, set, set]:
cfrs = set(vessels.cfr.dropna())
external_immatriculations = set(
vessels.loc[
(vessels.cfr.isna())
& (vessels.external_immatriculation.notnull())
& (vessels.external_immatriculation != "-"),
"external_immatriculation",
]
)
ircss = set(
vessels.loc[
(vessels.cfr.isna())
& (
(vessels.external_immatriculation.isna())
| (vessels.external_immatriculation == "-")
)
& (vessels.ircs.notnull()),
"ircs",
]
)
return (
cfrs,
external_immatriculations,
ircss,
)
@task
[docs]
def merge_sets_of_identifiers(
cfrs_with_species_min_weight: set | None,
cfrs_with_gears: set | None,
vessels_cfrs: set | None,
vessels_external_immats: set | None,
vessels_ircss: set | None,
) -> Tuple[set | None, set | None, set | None]:
external_immats = vessels_external_immats
ircss = vessels_ircss
# When a species or a gear condition is given, fitlering on an identifier other
# than CFR is excluded.
if cfrs_with_species_min_weight is not None or cfrs_with_gears is not None:
external_immats = None
ircss = None
cfr_sets = []
if vessels_cfrs is not None:
cfr_sets.append(vessels_cfrs)
if cfrs_with_species_min_weight is not None:
cfr_sets.append(cfrs_with_species_min_weight)
if cfrs_with_gears is not None:
cfr_sets.append(cfrs_with_gears)
if len(cfr_sets) == 0:
cfrs = None
else:
cfrs = set.intersection(*cfr_sets)
return cfrs, external_immats, ircss
@task
[docs]
def make_positions_in_alert_query(
*,
positions_table: Table,
facades_table: Table,
track_analysis_depth: float,
now: datetime,
regulations_table: Table | None = None,
only_fishing_positions: bool = True,
flag_states_iso2: List[str] | None = None,
regulatory_areas: List[RegulatoryAreaSpecification] | None = None,
admin_areas_specs_with_tables: List[AdminAreasSpecWithTable] | None = None,
cfrs: set | None = None,
external_immats: set | None = None,
ircss: set | None = None,
) -> Select:
start_date = now - timedelta(hours=track_analysis_depth)
from_tables = positions_table.join(
facades_table,
ST_Intersects(positions_table.c.geometry, facades_table.c.geometry),
isouter=True,
)
filter_conditions = [
positions_table.c.date_time > start_date,
positions_table.c.date_time < now,
or_(
positions_table.c.internal_reference_number.isnot(None),
positions_table.c.external_reference_number.isnot(None),
positions_table.c.ircs.isnot(None),
),
]
if regulatory_areas:
from_tables = from_tables.join(
regulations_table,
ST_Intersects(
positions_table.c.geometry,
regulations_table.c.geometry,
),
)
regulatory_area_specs = [
to_regulatory_area_filter(spec=a, regulations_table=regulations_table)
for a in regulatory_areas
]
filter_conditions.append(or_(*regulatory_area_specs))
if admin_areas_specs_with_tables:
for admin_areas_spec_with_table in admin_areas_specs_with_tables:
admin_areas_table = admin_areas_spec_with_table.table
geom_column = admin_areas_spec_with_table.metadata.geometry_column
filter_column = admin_areas_spec_with_table.metadata.filter_column
from_tables = from_tables.join(
admin_areas_table,
ST_Intersects(
positions_table.c.geometry, admin_areas_table.c.get(geom_column)
),
)
filter_conditions.append(
admin_areas_table.c.get(filter_column).in_(
admin_areas_spec_with_table.areas
)
)
if cfrs is not None or external_immats is not None or ircss is not None:
vessel_filters = []
if cfrs is not None:
vessel_filters.append(
positions_table.c.internal_reference_number.in_(sorted(cfrs))
)
if external_immats is not None:
vessel_filters.append(
and_(
positions_table.c.internal_reference_number == None, # noqa: E711
positions_table.c.external_reference_number.in_(
sorted(external_immats)
),
)
)
if ircss is not None:
vessel_filters.append(
and_(
positions_table.c.internal_reference_number == None, # noqa: E711
positions_table.c.ircs.in_(sorted(ircss)),
)
)
filter_conditions.append(or_(*vessel_filters))
q = (
select(
positions_table.c.id,
positions_table.c.internal_reference_number.label("cfr"),
positions_table.c.external_reference_number.label(
"external_immatriculation"
),
positions_table.c.ircs,
positions_table.c.vessel_name,
positions_table.c.flag_state,
positions_table.c.date_time,
positions_table.c.latitude,
positions_table.c.longitude,
facades_table.c.facade,
)
.select_from(from_tables)
.where(and_(*filter_conditions))
)
if only_fishing_positions:
q = q.where(positions_table.c.is_fishing)
else:
q = q.where(not_(positions_table.c.is_at_port))
if flag_states_iso2:
q = q.where(positions_table.c.flag_state.in_(flag_states_iso2))
return q
@task
@task
@task
[docs]
def get_vessels_with_species_min_weight(
vessels_species: pd.DataFrame, species_spec: List[SpeciesSpecification]
) -> set:
species_min_weight_dict = (
pd.DataFrame(species_spec).groupby("species")["minWeight"].min().to_dict()
)
df = vessels_species.assign(
min_weight=vessels_species.species.map(species_min_weight_dict).fillna(0)
)
return set(df.query("weight >= min_weight").cfr)
@task
[docs]
def get_vessels_with_gears(
vessels_gears: pd.DataFrame, gears: List[GearSpecification]
) -> set:
vessels_gears = vessels_gears.copy(deep=True)
gears_min_mesh = pd.DataFrame(gears).set_index("gear")["minMesh"].to_dict()
gears_max_mesh = pd.DataFrame(gears).set_index("gear")["maxMesh"].to_dict()
vessels_gears["min_mesh"] = vessels_gears.gear.map(gears_min_mesh)
vessels_gears["max_mesh"] = vessels_gears.gear.map(gears_max_mesh)
vessels_gears = vessels_gears[vessels_gears.gear.isin(gears_max_mesh)].reset_index(
drop=True
)
vessels = duckdb.query(
"""
SELECT DISTINCT cfr
FROM vessels_gears
WHERE
(mesh >= min_mesh OR min_mesh IS NULL) AND
(mesh <= max_mesh OR max_mesh IS NULL)
"""
).to_df()
return set(vessels.cfr)
@task
[docs]
def filter_on_depth(positions_in_alert: pd.DataFrame, min_depth: float) -> pd.DataFrame:
# Positions depth is assumed to be negative below sea level.
# Deeper = more negative.
return positions_in_alert[positions_in_alert.depth <= -min_depth]
@task
[docs]
def merge_risk_factor(
positions_in_alert: pd.DataFrame, current_risk_factors: pd.DataFrame
) -> pd.DataFrame:
return join_on_multiple_keys(
positions_in_alert,
current_risk_factors,
how="left",
or_join_keys=["cfr", "external_immatriculation", "ircs"],
)
@task
[docs]
def get_vessels_in_alert(positions_in_alert: pd.DataFrame) -> pd.DataFrame:
"""
Returns a `DataFrame` of unique vessels in alert from the input `DataFrame` of
positions in alert.
For each vessel, the date of the most recent position is used as
`creation_datetime` for the alert.
"""
vessels_in_alerts = (
positions_in_alert.sort_values("date_time", ascending=False)
.groupby(["cfr", "ircs", "external_immatriculation"], dropna=False)
.head(1)
.rename(
columns={
"date_time": "triggering_behaviour_datetime_utc",
}
)
.reset_index(drop=True)
)
return vessels_in_alerts
@flow(name="Monitorfish - Position alert")
[docs]
def position_alert_flow(
position_alert_id: int,
name: str,
description: str,
natinf_code: int,
threat_characterization: str,
threat: str,
track_analysis_depth: float = 12.0,
only_fishing_positions: bool = True,
gears: List[GearSpecification] | None = None,
species: List[SpeciesSpecification] | None = None,
species_catch_areas: List[str] | None = None,
administrative_areas: List[AdminAreasSpecification] | None = None,
regulatory_areas: List[RegulatoryAreaSpecification] | None = None,
min_depth: float | None = None,
flag_states_iso2: List[str] | None = None,
vessel_ids: List[int] | None = None,
district_codes: List[str] | None = None,
producer_organizations: List[str] | None = None,
):
now = get_utcnow()
if administrative_areas:
administrative_areas_table_metadatas = [
to_admin_areas_table_metadata(a) for a in administrative_areas
]
table_names = [t.table_name for t in administrative_areas_table_metadatas]
administrative_areas_tables = get_table.map(table_names)
admin_areas_specs_with_tables = to_admin_areas_spec_with_table.map(
spec=administrative_areas,
table_metadata=administrative_areas_table_metadatas,
table=administrative_areas_tables,
)
else:
admin_areas_specs_with_tables = None
positions_table = get_table.submit("positions")
vessels_table = get_table.submit("vessels")
districts_table = get_table.submit("districts")
facades_table = get_table.submit("facade_areas_subdivided")
if regulatory_areas:
regulations_table = get_table.submit("regulations")
else:
regulations_table = None
if species:
vessels_species = extract_vessels_with_species_onboard.submit(
species, species_catch_areas
)
cfrs_with_species_min_weight = get_vessels_with_species_min_weight.submit(
vessels_species=vessels_species, species_spec=species
)
else:
cfrs_with_species_min_weight = None
if gears:
vessels_current_gears = extract_vessels_current_gears.submit()
cfrs_with_gears = get_vessels_with_gears.submit(vessels_current_gears, gears)
else:
cfrs_with_gears = None
if vessel_ids or district_codes or producer_organizations:
if producer_organizations:
prod_org_memberships_table = get_table("producer_organization_memberships")
else:
prod_org_memberships_table = None
vessels_query = make_vessels_query(
vessels_table=vessels_table,
prod_org_memberships_table=prod_org_memberships_table,
vessel_ids=vessel_ids,
district_codes=district_codes,
producer_organizations=producer_organizations,
)
vessels = read_query_task("monitorfish_remote", vessels_query)
(
vessels_cfrs,
vessels_external_immats,
vessels_ircss,
) = get_sets_of_identifiers(vessels)
else:
vessels_cfrs = None
vessels_external_immats = None
vessels_ircss = None
cfrs, external_immats, ircss = merge_sets_of_identifiers(
cfrs_with_species_min_weight,
cfrs_with_gears,
vessels_cfrs,
vessels_external_immats,
vessels_ircss,
)
positions_query = make_positions_in_alert_query(
positions_table=positions_table,
facades_table=facades_table,
track_analysis_depth=track_analysis_depth,
now=now,
regulations_table=regulations_table,
only_fishing_positions=only_fishing_positions,
flag_states_iso2=flag_states_iso2,
regulatory_areas=regulatory_areas,
admin_areas_specs_with_tables=admin_areas_specs_with_tables,
cfrs=cfrs,
external_immats=external_immats,
ircss=ircss,
)
current_risk_factors = extract_current_risk_factors.submit()
positions_in_alert = read_query_task.submit("monitorfish_remote", positions_query)
if min_depth:
positions_in_alert = add_depth(positions_in_alert)
positions_in_alert = filter_on_depth(positions_in_alert, min_depth)
positions_in_alert = add_vessel_identifier(positions_in_alert)
positions_in_alert = merge_risk_factor(positions_in_alert, current_risk_factors)
vessels_in_alert = get_vessels_in_alert(positions_in_alert)
vessels_in_alert = add_vessel_id(vessels_in_alert, vessels_table)
vessels_in_alert = add_vessels_columns(
vessels_in_alert,
vessels_table,
districts_table=districts_table,
districts_columns_to_add=["dml"],
)
alerts = make_alerts(
vessels_in_alert,
alert_type="POSITION_ALERT",
alert_id=position_alert_id,
name=name,
description=description,
natinf_code=natinf_code,
threat_characterization=threat_characterization,
threat=threat,
)
silenced_alerts = extract_silenced_alerts.submit(
alert_type="POSITION_ALERT",
number_of_hours=track_analysis_depth,
alert_id=position_alert_id,
)
alert_without_silenced = filter_alerts(alerts, silenced_alerts)
load_alerts(alert_without_silenced, f"POSITION_ALERT/{position_alert_id}")