from datetime import datetime
from typing import List
import pandas as pd
import requests
from prefect import get_run_logger, task
from config import (
PENDING_ALERT_VALIDATION_ENDPOINT_TEMPLATE,
REPORTING_ARCHIVING_ENDPOINT_TEMPLATE,
)
from src.db_config import create_engine
from src.entities.alerts import AlertType
from src.generic_tasks import extract, load
from src.processing import (
df_to_dict_series,
join_on_multiple_keys,
left_isin_right_by_decreasing_priority,
)
from src.utils import delete_rows, get_table
@task
@task
@task
@task
@task
[docs]
def archive_reporting(id: int):
logger = get_run_logger()
url = REPORTING_ARCHIVING_ENDPOINT_TEMPLATE.format(reporting_id=id)
logger.info(f"Archiving reporting {id}.")
r = requests.put(url)
r.raise_for_status()
@task
[docs]
def validate_pending_alert(id: int):
logger = get_run_logger()
url = PENDING_ALERT_VALIDATION_ENDPOINT_TEMPLATE.format(pending_alert_id=id)
logger.info(f"Validating pending alert {id}.")
r = requests.put(url)
r.raise_for_status()
@task
[docs]
def make_alerts(
vessels_in_alert: pd.DataFrame,
alert_type: str,
name: str,
*,
natinf_code: int,
threat: str,
threat_characterization: str,
alert_id: int | None = None,
description: str | None = None,
) -> pd.DataFrame:
"""
Generates alerts from the input `vessels_in_alert`, which must contain the
following columns :
- `cfr`
- `external_immatriculation`
- `ircs`
- `vessel_id`
- `vessel_identifier`
- `vessel_name`
- `facade`
- `dml`
- `flag_state`
- `risk_factor`
- `triggering_behaviour_datetime_utc`
- and optionally, `depth`, `latitude` and `longitude`
If `latitude` and `longitude` are not columns of the input, they are added and
filled with null values in the result.
If `depth` is a column of the input, it is added to the `value` attributes.
Args:
vessels_in_alert (pd.DataFrame): `DateFrame` of vessels for which to
create an alert.
alert_type (str): `type` to specify in the built alerts.
name (str): name of the alert.
natinf_code): natinf code associated with the alert.
threat (str): threat associated with the alert.
threat_characterization (str): threat_characterization associated with
the alert.
alert_id (str | None): `alert_id` to specify in the built alerts,
defaults to None.
description (str | None): description of the alert, defaults to None.
Returns:
pd.DataFrame: `DataFrame` of alerts.
"""
assert name is not None
alerts = vessels_in_alert.copy(deep=True)
alerts = alerts.rename(
columns={
"cfr": "internal_reference_number",
"external_immatriculation": "external_reference_number",
}
)
alerts["creation_date"] = datetime.utcnow()
if "latitude" not in alerts:
alerts["latitude"] = None
if "longitude" not in alerts:
alerts["longitude"] = None
alert_type = AlertType(alert_type).value
alerts["type"] = alert_type
alert_type_suffix = f"/{alert_id}" if alert_id is not None else ""
alerts["alert_config_name"] = alert_type + alert_type_suffix
alerts["name"] = name
value_cols = ["seaFront", "type", "riskFactor", "dml", "name"]
if "depth" in alerts.columns:
value_cols += ["depth"]
if description is not None:
alerts["description"] = description
value_cols.append("description")
alerts["natinfCode"] = natinf_code
value_cols.append("natinfCode")
alerts["threat"] = threat
value_cols.append("threat")
alerts["threatCharacterization"] = threat_characterization
value_cols.append("threatCharacterization")
if alert_id is not None:
alerts["alertId"] = alert_id
value_cols.append("alertId")
alerts["value"] = df_to_dict_series(
alerts.rename(
columns={
"facade": "seaFront",
"risk_factor": "riskFactor",
}
)[value_cols]
)
return alerts[
[
"vessel_name",
"internal_reference_number",
"external_reference_number",
"ircs",
"flag_state",
"vessel_id",
"vessel_identifier",
"triggering_behaviour_datetime_utc",
"creation_date",
"latitude",
"longitude",
"value",
"alert_config_name",
]
]
@task
[docs]
def filter_alerts(
alerts: pd.DataFrame,
vessels_with_silenced_alerts: pd.DataFrame,
vessels_with_active_reportings: pd.DataFrame = None,
) -> pd.DataFrame:
"""
Filters `alerts` to keep only alerts of vessels that are not in
`vessels_with_silenced_alerts`. If `vessels_with_active_reportings` is provided,
alerts of vessels that are in this DataFrame are also removed.
All input DataFrames must have columns :
- internal_reference_number
- external_reference_number
- ircs
In addition, the `alerts` DataFrame must have columns :
- vessel_id
- vessel_name
- vessel_identifier
- flag_state
- facade
- triggering_behaviour_datetime_utc
- creation_date
- latitude
- longitude
- value
- alert_config_name
and the `silenced_alerts` DataFrame must have a `silenced_before_date`
column.
Args:
alerts (pd.DataFrame): positions alerts.
vessels_with_silenced_alerts (pd.DataFrame): vessels with silenced alerts.
Returns:
pd.DataFrame: same as input with some rows removed.
"""
vessel_id_cols = ["internal_reference_number", "external_reference_number", "ircs"]
alerts = join_on_multiple_keys(
alerts, vessels_with_silenced_alerts, or_join_keys=vessel_id_cols, how="left"
)
alerts = alerts.loc[
(
(alerts.silenced_before_date.isna())
| (alerts.triggering_behaviour_datetime_utc > alerts.silenced_before_date)
),
[
"vessel_name",
"internal_reference_number",
"external_reference_number",
"ircs",
"flag_state",
"vessel_id",
"vessel_identifier",
"creation_date",
"latitude",
"longitude",
"value",
"alert_config_name",
],
]
if isinstance(vessels_with_active_reportings, pd.DataFrame):
alerts = alerts.loc[
~left_isin_right_by_decreasing_priority(
alerts[vessel_id_cols], vessels_with_active_reportings[vessel_id_cols]
)
]
alerts = alerts.sort_values("internal_reference_number").reset_index(drop=True)
return alerts
@task
[docs]
def load_alerts(alerts: pd.DataFrame, alert_config_name: str):
"""
Updates the `pending_alerts` that have the specified `alert_config_name` by:
- deleting alerts in the `pending_alerts`table of the specified `alert_config_name`
- inserting alerts of the `alerts` dataframe into the `pending_alerts` table
Args:
alerts (pd.DataFrame): Alerts to load into the `pending_alerts` table
alert_config_name (str): Name that uniquely identifies the set of parameters
used for the flow run
"""
try:
assert alert_config_name and isinstance(alert_config_name, str)
except AssertionError:
raise ValueError(
(
"alert_config_name must be a non null `str`, "
f"got {alert_config_name} instead."
)
)
schema = "public"
table_name = "pending_alerts"
logger = get_run_logger()
e = create_engine("monitorfish_remote")
with e.begin() as connection:
table = get_table(
table_name=table_name, schema=schema, conn=connection, logger=logger
)
# This cannot be done by using the `upsert` mode of the `load` fonction because
# when the input DataFrame is empty, rows in the `pending_alerts` table that
# correspond to the designated `alert_config_name` must be deleted, which does
# not happen if there is not at least one row in the DataFrame that contains
# the information of which `alert_config_name` needs to be deleted.
delete_rows(
table=table,
id_column="alert_config_name",
ids_to_delete=[alert_config_name],
connection=connection,
logger=logger,
)
load(
alerts,
table_name=table_name,
schema=schema,
logger=logger,
how="append",
jsonb_columns=["value"],
nullable_integer_columns=["vessel_id"],
connection=connection,
)