from datetime import datetime, timedelta
from typing import Tuple
import pandas as pd
from geoalchemy2.functions import ST_Intersects
from prefect import flow, get_run_logger, task
from sqlalchemy import Table, and_, not_, or_, select
from sqlalchemy.sql import Select
from src.exceptions import MonitorfishHealthError
from src.generic_tasks import extract, read_query_task
from src.processing import join_on_multiple_keys
from src.shared_tasks.alerts import (
extract_silenced_alerts,
filter_alerts,
load_alerts,
make_alerts,
)
from src.shared_tasks.infrastructure import get_table
from src.shared_tasks.positions import add_vessel_identifier
from src.shared_tasks.risk_factors import extract_current_risk_factors
from src.shared_tasks.vessels import add_vessel_id, add_vessels_columns
@task
[docs]
def get_dates(
days_without_far: int,
) -> Tuple[datetime, datetime, datetime, datetime, float]:
"""
Returns the dates used in the flow as a 5-tuple :
- `days_without_far` days ago at 00:00 (beginning of the day) in UTC (1)
- Yesterday at 8pm in UTC
- Today at 00:00 (beginning of the day) in UTC
- Current datetime in UTC (2)
- The number of hours that separate 1 and 2
Returns:
Tuple[datetime, datetime, datetime]
"""
utcnow = datetime.utcnow()
today_at_zero_hours = utcnow.replace(hour=0, minute=0, second=0, microsecond=0)
period_start_at_zero_hours = today_at_zero_hours - timedelta(days=days_without_far)
yesterday_at_eight_pm = today_at_zero_hours - timedelta(hours=4)
period_start_hours_from_now = (
utcnow - period_start_at_zero_hours
).total_seconds() / 3600
return (
period_start_at_zero_hours,
yesterday_at_eight_pm,
today_at_zero_hours,
utcnow,
period_start_hours_from_now,
)
@task
[docs]
def make_positions_at_sea_query(
positions_table: Table,
facade_areas_table: Table,
from_date: datetime,
to_date: datetime,
states_to_monitor_iso2: list = None,
vessels_table: Table = None,
minimum_length: float = None,
eez_areas_table: Table = None,
eez_to_monitor_iso3: list = None,
only_fishing_positions: bool = False,
exclude_vessels_with_logbook_exemptions: bool = False,
) -> Select:
"""
Generates the `sqlalchemy.Select` statement to run in order to get the positions of
vessels that were at sea (i.e. those that emitted at least one VMS position outside
of a port) between the designated dates and matching the designated flag states.
Args:
positions_table (Table): `sqlalchemy.Table` representing `positions`
facade_areas_table (Table): `sqlalchemy.Table` representing `facade_areas`
from_date (datetime): Start of the time interval to query, in UTC
to_date (datetime): End of the time interval to query, in UTC
states_to_monitor_iso2 (list, optional): If provided, only vessels of the given
flag_states will be queried. Defaults to None.
vessels_table (Table, optional): `sqlalchemy.Table` representing `vessels`. Must
be provided if `minimum_length` is not `None`. Defaults to None.
minimum_length (float, optional): If provided, only vessels longer than the
given value will be queried (only applies to french vessels). Defaults to
None.
eez_areas_table (Table, optional): `sqlalchemy.Table` representing `eez_areas`.
Must be provided if `eez_to_monitor_iso3` is not `None`. Defaults to None.
eez_to_monitor_iso3 (list, optional): If provided, only VMS emission in the
designated EEZ areas will be considered. Defaults to None.
only_fishing_positions (bool, optional): if `True`, only positions which were
detected as being in fishing operation will be considered.
Defaults to `False`.
exclude_vessels_with_logbook_exemptions (bool, optional): if `True`, vessels
with an 'Exempté' logbook_equipment_status will be excluded.
Defaults to `False`.
Raises:
ValueError: If `minimum_length` is not `None` and the `vessels_table` is not
provided.
ValueError: If `eez_to_monitor_iso3` is not `None` and the `eez_areas_table`
is not provided.
Returns:
Select: `Select` statement representing a SQL query
"""
from_table = positions_table.join(
facade_areas_table,
ST_Intersects(positions_table.c.geometry, facade_areas_table.c.geometry),
isouter=True,
)
if minimum_length or exclude_vessels_with_logbook_exemptions:
try:
assert isinstance(vessels_table, Table)
except AssertionError:
raise ValueError(
(
"The vessels_table must be provided when "
"filtering on a minimum_length"
)
)
from_table = from_table.join(
vessels_table,
positions_table.c.internal_reference_number == vessels_table.c.cfr,
)
if eez_to_monitor_iso3:
try:
assert isinstance(eez_areas_table, Table)
except AssertionError:
raise ValueError(
("The eez_table must be provided when " "filtering on a eez_to_monitor")
)
from_table = from_table.join(
eez_areas_table,
ST_Intersects(positions_table.c.geometry, eez_areas_table.c.geom),
)
q = (
select(
positions_table.c.internal_reference_number.label("cfr"),
positions_table.c.external_reference_number.label(
"external_immatriculation"
),
positions_table.c.ircs,
positions_table.c.vessel_name,
positions_table.c.flag_state,
positions_table.c.date_time,
positions_table.c.latitude,
positions_table.c.longitude,
facade_areas_table.c.facade,
)
.select_from(from_table)
.where(
and_(
positions_table.c.date_time >= from_date,
positions_table.c.date_time < to_date,
positions_table.c.internal_reference_number.isnot(None),
not_(positions_table.c.is_at_port),
)
)
)
if exclude_vessels_with_logbook_exemptions:
q = q.where(
or_(
vessels_table.c.logbook_equipment_status != "Exempté",
vessels_table.c.logbook_equipment_status.is_(None),
)
)
if only_fishing_positions:
q = q.where(positions_table.c.is_fishing)
if states_to_monitor_iso2:
q = q.where(positions_table.c.flag_state.in_(states_to_monitor_iso2))
if minimum_length:
q = q.where(
or_(
vessels_table.c.length >= minimum_length,
positions_table.c.flag_state != "FR",
)
)
if eez_to_monitor_iso3:
q = q.where(eez_areas_table.c["ISO_SOV1"].in_(eez_to_monitor_iso3))
return q
@task
@task
[docs]
def concat(
positions_at_sea_yesterday_everywhere: pd.DataFrame,
positions_at_sea_yesterday_in_french_eez: pd.DataFrame,
) -> pd.DataFrame:
"""
Concatenates the two input `DataFrame`.
Args:
positions_at_sea_yesterday_everywhere (pd.DataFrame)
positions_at_sea_yesterday_in_french_eez (pd.DataFrame)
Returns:
pd.DataFrame
"""
return pd.concat(
[
positions_at_sea_yesterday_everywhere,
positions_at_sea_yesterday_in_french_eez,
],
ignore_index=True,
)
@task
[docs]
def get_vessels_at_sea(positions_at_sea: pd.DataFrame, min_days: int) -> pd.DataFrame:
"""
Returns a DataFrame with the vessels present in the input `positions_at_sea`
DataFrame which were at sea on at least `min_days` days. Must have columns :
- `cfr`
- `external_immatriculation`
- `ircs`
- `vessel_name`
- `facade`
- `flag_state`
- `date_time`
- `latitude`
- `longitude`
Args:
positions_at_sea (pd.DataFrame): DataFrame of positions of vessels at sea
min_days (int): minimum number of days at sea. Vessels at sea less than
`min_days` days are excluded from the result.
Returns:
pd.DataFrame: vessels of the input that were at sea on at least `n_days`
different days.
"""
positions_at_sea = positions_at_sea.copy(deep=True)
positions_at_sea["date"] = positions_at_sea.date_time.map(lambda d: d.date())
positions_at_sea["days_at_sea"] = positions_at_sea.groupby(
["cfr", "ircs", "external_immatriculation"]
)["date"].transform("nunique")
positions_at_sea = positions_at_sea.loc[
positions_at_sea.days_at_sea >= min_days
].reset_index(drop=True)
vessels_at_sea = (
positions_at_sea.sort_values("date_time", ascending=False)
.groupby(["cfr", "ircs", "external_immatriculation"])
.head(1)[
[
"cfr",
"external_immatriculation",
"ircs",
"vessel_name",
"flag_state",
"facade",
"date_time",
"latitude",
"longitude",
]
]
.rename(
columns={
"date_time": "triggering_behaviour_datetime_utc",
}
)
.reset_index(drop=True)
)
return vessels_at_sea
@task
[docs]
def get_vessels_with_missing_fars(
vessels_at_sea: pd.DataFrame,
vessels_that_emitted_fars: set,
max_share_of_vessels_with_missing_fars: float = 0.5,
) -> pd.DataFrame:
"""
Filters `vessels_at_sea` to keep only rows whose `cfr` is NOT in
`vessels_that_emitted_fars`.
Args:
vessels_at_sea (pd.DataFrame): `DataFrame` of vessels at sea
vessels_that_emitted_fars (set): `set` cfrs of vessels that emitted
`FAR` reports
max_share_of_vessels_with_missing_fars (float, optional): If the share of
`vessels_at_sea` that are not in `vessels_that_emitted_fars` is greater than
this value, it is assumed that there is a breakdown in the date pipeline and
a `MonitorfishHealthError` is raised. Defaults to 0.5.
Raises:
MonitorfishHealthError: raised if the share of vessels with missing fars is
greater than `max_share_of_vessels_with_missing_fars`
Returns:
pd.DataFrame: Filtered version of `vessels_at_sea` with only those that are
not in `vessels_that_emitted_fars`
"""
logger = get_run_logger()
vessels_with_missing_fars = vessels_at_sea.loc[
~vessels_at_sea.cfr.isin(vessels_that_emitted_fars)
].reset_index(drop=True)
share_of_vessels_with_missing_fars = len(vessels_with_missing_fars) / max(
len(vessels_at_sea), 1
)
logger.info(
(
f"Out of {len(vessels_at_sea)} vessels at sea, "
f"{len(vessels_with_missing_fars)} sent no FAR "
f"({share_of_vessels_with_missing_fars:.0%})."
)
)
try:
assert (
share_of_vessels_with_missing_fars <= max_share_of_vessels_with_missing_fars
)
except AssertionError:
raise MonitorfishHealthError(
(
f"More than {max_share_of_vessels_with_missing_fars:.0%} of the "
"`vessels_at_sea` are absent from `vessels_that_emitted_fars`. It is "
"likely that there is a logbook data breakdown."
)
)
return vessels_with_missing_fars
@task
[docs]
def merge_risk_factor(
vessels_with_missing_fars: pd.DataFrame, current_risk_factors: pd.DataFrame
) -> pd.DataFrame:
"""
Merges on the input DataFrame on ["cfr", "external_immatriculation", "ircs"].
Args:
vessels_with_missing_fars (pd.DataFrame)
current_risk_factors (pd.DataFrame)
Returns:
pd.DataFrame
"""
return join_on_multiple_keys(
vessels_with_missing_fars,
current_risk_factors,
how="left",
or_join_keys=["cfr", "external_immatriculation", "ircs"],
)
@flow(name="Monitorfish - Missing FAR alerts")
[docs]
def missing_far_alerts_flow(
alert_type: str,
name: str,
states_iso2_to_monitor_everywhere: list,
states_iso2_to_monitor_in_french_eez: list,
max_share_of_vessels_with_missing_fars: float,
minimum_length: float,
only_raise_if_route_shows_fishing: bool,
days_without_far: int,
):
# Infras
districts_table = get_table("districts")
positions_table = get_table("positions")
facade_areas_table = get_table("facade_areas_subdivided")
eez_areas_table = get_table("eez_areas")
vessels_table = get_table("vessels")
# Extract
(
period_start_at_zero_hours,
yesterday_at_eight_pm,
today_at_zero_hours,
utcnow,
period_start_hours_from_now,
) = get_dates(days_without_far)
positions_at_sea_yesterday_everywhere_query = make_positions_at_sea_query(
positions_table=positions_table,
facade_areas_table=facade_areas_table,
from_date=period_start_at_zero_hours,
to_date=yesterday_at_eight_pm,
states_to_monitor_iso2=states_iso2_to_monitor_everywhere,
vessels_table=vessels_table,
minimum_length=minimum_length,
only_fishing_positions=only_raise_if_route_shows_fishing,
exclude_vessels_with_logbook_exemptions=True,
)
positions_at_sea_yesterday_in_french_eez_query = make_positions_at_sea_query(
positions_table=positions_table,
facade_areas_table=facade_areas_table,
from_date=period_start_at_zero_hours,
to_date=yesterday_at_eight_pm,
states_to_monitor_iso2=states_iso2_to_monitor_in_french_eez,
vessels_table=vessels_table,
minimum_length=minimum_length,
eez_areas_table=eez_areas_table,
eez_to_monitor_iso3=["FRA"],
only_fishing_positions=only_raise_if_route_shows_fishing,
)
positions_at_sea_yesterday_in_french_eez = read_query_task.submit(
"monitorfish_remote", positions_at_sea_yesterday_in_french_eez_query
)
positions_at_sea_yesterday_everywhere = read_query_task.submit(
"monitorfish_remote", positions_at_sea_yesterday_everywhere_query
)
vessels_that_emitted_fars = extract_vessels_that_emitted_fars.submit(
declaration_min_datetime_utc=period_start_at_zero_hours,
declaration_max_datetime_utc=utcnow,
fishing_operation_min_datetime_utc=period_start_at_zero_hours,
fishing_operation_max_datetime_utc=today_at_zero_hours,
)
current_risk_factors = extract_current_risk_factors.submit()
# Transform
positions_at_sea = concat(
positions_at_sea_yesterday_everywhere,
positions_at_sea_yesterday_in_french_eez,
)
vessels_at_sea = get_vessels_at_sea(positions_at_sea, min_days=days_without_far)
vessels_with_missing_fars = get_vessels_with_missing_fars(
vessels_at_sea,
vessels_that_emitted_fars,
max_share_of_vessels_with_missing_fars,
)
vessels_with_missing_fars = add_vessel_identifier(vessels_with_missing_fars)
vessels_with_missing_fars = merge_risk_factor(
vessels_with_missing_fars, current_risk_factors
)
vessels_with_missing_fars = add_vessel_id(vessels_with_missing_fars, vessels_table)
vessels_with_missing_fars = add_vessels_columns(
vessels_with_missing_fars,
vessels_table,
districts_table=districts_table,
districts_columns_to_add=["dml"],
)
alerts = make_alerts(
vessels_with_missing_fars,
alert_type,
name,
natinf_code=27689,
threat="Obligations déclaratives",
threat_characterization="FAR (JPE)",
)
silenced_alerts = extract_silenced_alerts.submit(
alert_type, number_of_hours=period_start_hours_from_now
)
alert_without_silenced = filter_alerts(alerts, silenced_alerts)
# Load
load_alerts(alert_without_silenced, alert_config_name=alert_type)