from datetime import datetime
from typing import Callable, Optional, Tuple
import numpy as np
import pandas as pd
import requests
from prefect import flow, get_run_logger, task, unmapped
from config import (
BACKEND_API_KEY,
BEACON_MALFUNCTIONS_ENDPOINT,
BEACONS_MAX_HOURS_WITHOUT_EMISSION_AT_PORT,
BEACONS_MAX_HOURS_WITHOUT_EMISSION_AT_SEA,
)
from src.entities.beacon_malfunctions import (
BeaconMalfunctionNotificationType,
BeaconMalfunctionStage,
BeaconMalfunctionVesselStatus,
BeaconStatus,
EndOfMalfunctionReason,
)
from src.generic_tasks import extract, load
from src.processing import join_on_multiple_keys
from src.shared_tasks.control_flow import filter_results
from src.shared_tasks.dates import get_utcnow, make_timedelta
from src.shared_tasks.healthcheck import (
assert_last_positions_flow_health,
get_monitorfish_healthcheck,
)
@task
@task
@task
@task
@task
[docs]
def get_last_emissions_of_vessels_that_should_emit(
vessels_that_should_emit: pd.DataFrame, last_positions: pd.DataFrame
) -> pd.DataFrame:
"""
Join `vessels_that_should_emit` and `last_positions` using `cfr`, `ircs` and
`external_immatriculation` as join keys, using the `join_on_multiple_keys` logic.
`last_positions` of a given vessel that were emitted before the vessel's beacon
`logging_datetime_utc` are not taken into account : the result therefore only
includes each vessel's last emission **with its current beacon**. Ths is done to
avoid generating beacon malfunctions on a given vessel from emission data of its
previous beacon.
Args:
vessels_that_should_emit (pd.DataFrame): DataFrame of vessels that should emit
last_positions (pd.DataFrame): DataFrame of last positions
Returns:
pd.DataFrame: last emissions of the input vessels with their current beacon
"""
last_emissions_of_vessels_that_should_emit = join_on_multiple_keys(
vessels_that_should_emit,
last_positions,
or_join_keys=["cfr", "ircs", "external_immatriculation"],
how="left",
coalesce_common_columns=False,
)
last_emissions_of_vessels_that_should_emit = (
last_emissions_of_vessels_that_should_emit.sort_values(
"last_position_datetime_utc", ascending=False
)
.groupby("beacon_number")
.head(1)
.reset_index(drop=True)
)
last_emissions_of_vessels_that_should_emit[
"last_position_datetime_utc"
] = last_emissions_of_vessels_that_should_emit.last_position_datetime_utc.where(
last_emissions_of_vessels_that_should_emit.last_position_datetime_utc
> last_emissions_of_vessels_that_should_emit.logging_datetime_utc
)
return last_emissions_of_vessels_that_should_emit
@task
[docs]
def get_new_malfunctions(
last_emissions: pd.DataFrame,
known_malfunctions: pd.DataFrame,
satellite_operators_statuses: pd.DataFrame,
malfunction_datetime_utc_threshold_at_sea: datetime,
malfunction_datetime_utc_threshold_at_port: datetime,
) -> pd.DataFrame:
operators_up = set(
satellite_operators_statuses.loc[
satellite_operators_statuses.operator_is_up == True, "satellite_operator_id"
].values
)
# Remove emissions of beacons already in malfunction
last_emissions = last_emissions.loc[
~last_emissions.beacon_number.isin(known_malfunctions.beacon_number)
].copy(deep=True)
# Remove emissions of beacons whose satellite operator is down
last_emissions = last_emissions.loc[
last_emissions.satellite_operator_id.isin(operators_up)
].copy(deep=True)
new_malfunctions = (
last_emissions.loc[
(last_emissions.is_manual == True)
| (
(last_emissions.is_at_port == True)
& (
last_emissions.last_position_datetime_utc
< malfunction_datetime_utc_threshold_at_port
)
)
| (
(last_emissions.is_at_port == False)
& (
last_emissions.last_position_datetime_utc
< malfunction_datetime_utc_threshold_at_sea
)
)
]
.rename(columns={"last_position_datetime_utc": "malfunction_start_date_utc"})
.reset_index(drop=True)
)
new_malfunctions = new_malfunctions.drop(
columns=["is_manual", "satellite_operator_id"]
)
return new_malfunctions
@task
[docs]
def get_ended_malfunction_ids(
last_emissions_of_vessels_that_should_emit: pd.DataFrame,
known_malfunctions: pd.DataFrame,
malfunction_datetime_utc_threshold_at_sea: datetime,
) -> Tuple[list, list, list, list]:
ids_not_required_to_emit = set(
known_malfunctions.loc[
~known_malfunctions.beacon_number.isin(
last_emissions_of_vessels_that_should_emit.beacon_number
),
"id",
]
)
known_malfunctions_last_emissions = pd.merge(
known_malfunctions,
last_emissions_of_vessels_that_should_emit,
on="beacon_number",
how="inner",
)
malfunctions_with_restarted_emissions = known_malfunctions_last_emissions.loc[
(known_malfunctions_last_emissions.is_manual == False)
& (
known_malfunctions_last_emissions.last_position_datetime_utc
>= malfunction_datetime_utc_threshold_at_sea
)
].reset_index(drop=True)
ids_unsupervised_restarted_emitting = set(
malfunctions_with_restarted_emissions.loc[
malfunctions_with_restarted_emissions.beacon_status
!= BeaconStatus.ACTIVATED.value,
"id",
]
)
ids_at_port_restarted_emitting = set(
malfunctions_with_restarted_emissions.loc[
(
malfunctions_with_restarted_emissions.beacon_status
== BeaconStatus.ACTIVATED.value
)
& (
malfunctions_with_restarted_emissions.vessel_status
== BeaconMalfunctionVesselStatus.AT_PORT.value
),
"id",
]
)
ids_not_at_port_restarted_emitting = set(
malfunctions_with_restarted_emissions.loc[
(
malfunctions_with_restarted_emissions.beacon_status
== BeaconStatus.ACTIVATED.value
)
& (
malfunctions_with_restarted_emissions.vessel_status
!= BeaconMalfunctionVesselStatus.AT_PORT.value
),
"id",
]
)
return (
list(ids_not_at_port_restarted_emitting),
list(ids_at_port_restarted_emitting),
list(ids_not_required_to_emit),
list(ids_unsupervised_restarted_emitting),
)
@task
[docs]
def prepare_new_beacon_malfunctions(new_malfunctions: pd.DataFrame) -> pd.DataFrame:
new_malfunctions["vessel_status"] = np.choose(
new_malfunctions.is_at_port.astype(int),
[
BeaconMalfunctionVesselStatus.AT_SEA.value,
BeaconMalfunctionVesselStatus.AT_PORT.value,
],
)
new_malfunctions["stage"] = BeaconMalfunctionStage.INITIAL_ENCOUNTER.value
new_malfunctions["malfunction_end_date_utc"] = pd.NaT
new_malfunctions["vessel_status_last_modification_date_utc"] = datetime.utcnow()
notification_to_send = {
(BeaconMalfunctionVesselStatus.AT_SEA.value, BeaconStatus.ACTIVATED.value): (
BeaconMalfunctionNotificationType.MALFUNCTION_AT_SEA_INITIAL_NOTIFICATION.value
),
(BeaconMalfunctionVesselStatus.AT_PORT.value, BeaconStatus.ACTIVATED.value): (
BeaconMalfunctionNotificationType.MALFUNCTION_AT_PORT_INITIAL_NOTIFICATION.value
),
(BeaconMalfunctionVesselStatus.AT_SEA.value, BeaconStatus.UNSUPERVISED.value): (
BeaconMalfunctionNotificationType.MALFUNCTION_AT_SEA_INITIAL_NOTIFICATION_UNSUPERVISED_BEACON.value
),
(
BeaconMalfunctionVesselStatus.AT_PORT.value,
BeaconStatus.UNSUPERVISED.value,
): (
BeaconMalfunctionNotificationType.MALFUNCTION_AT_PORT_INITIAL_NOTIFICATION_UNSUPERVISED_BEACON.value
),
}
new_malfunctions["notification_requested"] = (
new_malfunctions[["vessel_status", "beacon_status"]]
.apply(lambda row: tuple(row), axis=1)
.map(notification_to_send)
)
new_malfunctions = new_malfunctions.rename(
columns={
"cfr": "internal_reference_number",
"external_immatriculation": "external_reference_number",
"beacon_status": "beacon_status_at_malfunction_creation",
}
)
ordered_columns = [
"internal_reference_number",
"external_reference_number",
"ircs",
"vessel_name",
"flag_state",
"vessel_identifier",
"vessel_status",
"stage",
"malfunction_start_date_utc",
"malfunction_end_date_utc",
"vessel_status_last_modification_date_utc",
"vessel_id",
"notification_requested",
"latitude",
"longitude",
"beacon_number",
"beacon_status_at_malfunction_creation",
]
return new_malfunctions.loc[:, ordered_columns]
@task
[docs]
def load_new_beacon_malfunctions(new_beacon_malfunctions: pd.DataFrame):
load(
new_beacon_malfunctions,
table_name="beacon_malfunctions",
schema="public",
db_name="monitorfish_remote",
logger=get_run_logger(),
how="append",
)
@task
[docs]
def update_beacon_malfunction(
beacon_malfunction_id: int,
*,
new_stage: Optional[BeaconMalfunctionStage] = None,
new_vessel_status: Optional[BeaconMalfunctionVesselStatus] = None,
end_of_malfunction_reason: Optional[EndOfMalfunctionReason] = None,
):
"""Update a `beacon_malfunction` stage or vessel status.
- Exactly one of `new_state` or `new_vessel_status` must be provided
- `end_of_malfunction_reason` must be provided if `new_stage` is provided and is
equal to `ARCHIVED`
- `end_of_malfunction_reason` cannot be be provided when `new_stage` is provided
and is different from `ARCHIVED`
- `end_of_malfunction_reason` cannot be be provided when `new_vessel_status` is
provided
Args:
beacon_malfunction_id (int): id of the beacon_malfunction to update
new_stage (beaconMalfunctionStage, optional): stage to move the beacon
malfunction to. Defaults to None.
new_vessel_status (beaconMalfunctionVesselStatus, optional): vessel_status to
move the beacon malfunction to. Defaults to None.
end_of_malfunction_reason (endOfMalfunctionReason, optional): reason that led
to the archiving of the malfunction. Defaults to None.
Raises:
ValueError: in the following cases :
- both `new_stage` and `new_vessel_status` are provided
- both `new_stage` and `new_vessel_status` are null
- `new_stage` is `ARCHIVED` and no `end_of_malfunction_reason` is
provided
- an `end_of_malfunction_reason` is provided along with a `new_vessel_status`
- an `end_of_malfunction_reason` is provided along with a `new_stage` other
than `ARCHIVED`
"""
try:
assert (
new_stage is None
and isinstance(new_vessel_status, BeaconMalfunctionVesselStatus)
) or (
new_vessel_status is None and isinstance(new_stage, BeaconMalfunctionStage)
)
except AssertionError:
raise ValueError(
"Exactly one of new_stage or new_vessel_status must be provided"
)
url = BEACON_MALFUNCTIONS_ENDPOINT + str(beacon_malfunction_id)
json = {}
if new_stage:
json["stage"] = new_stage.value
if new_stage is BeaconMalfunctionStage.ARCHIVED:
try:
assert isinstance(end_of_malfunction_reason, EndOfMalfunctionReason)
except AssertionError:
raise ValueError(
(
"Cannot end a malfunction without "
"giving an end_of_malfunction_reason"
)
)
json["endOfBeaconMalfunctionReason"] = end_of_malfunction_reason.value
else:
try:
assert end_of_malfunction_reason is None
except AssertionError:
raise ValueError(
(
"Cannot give a `EndOfBeaconMalfunctionReason` for a new_stage "
"other than `ARCHIVED`."
)
)
if new_vessel_status:
try:
assert end_of_malfunction_reason is None
except AssertionError:
raise ValueError(
(
"Unexpected argument end_of_malfunction_reason "
"when updating vessel_status"
)
)
json["vesselStatus"] = new_vessel_status.value
if json:
headers = {
"Accept": "application/json, text/plain",
"Content-Type": "application/json;charset=UTF-8",
"X-API-KEY": BACKEND_API_KEY,
}
r = requests.put(url=url, json=json, headers=headers)
r.raise_for_status()
return beacon_malfunction_id
@task
[docs]
def request_notification(
beacon_malfunction_id: int,
requested_notification: BeaconMalfunctionNotificationType,
):
try:
assert isinstance(requested_notification, BeaconMalfunctionNotificationType)
except AssertionError:
raise ValueError(
(
"Expected BeaconMalfunctionNotificationType, "
f"got {requested_notification} instead."
)
)
url = (
BEACON_MALFUNCTIONS_ENDPOINT
+ f"{str(beacon_malfunction_id)}/{requested_notification.value}"
)
headers = {"X-API-KEY": BACKEND_API_KEY}
r = requests.put(url=url, headers=headers)
r.raise_for_status()
@flow(name="Monitorfish - Beacons malfunctions")
[docs]
def update_beacon_malfunctions_flow(
max_hours_without_emission_at_sea: int = BEACONS_MAX_HOURS_WITHOUT_EMISSION_AT_SEA,
max_hours_without_emission_at_port: int = BEACONS_MAX_HOURS_WITHOUT_EMISSION_AT_PORT,
extract_satellite_operators_statuses_fn: Callable = extract_satellite_operators_statuses,
):
# Healthcheck
healthcheck = get_monitorfish_healthcheck()
now = get_utcnow()
assert_last_positions_flow_health(healthcheck=healthcheck, utcnow=now)
# Extract
last_positions = extract_last_positions.submit()
vessels_that_should_emit = extract_vessels_that_should_emit.submit()
known_malfunctions = extract_known_malfunctions.submit()
satellite_operators_statuses = extract_satellite_operators_statuses_fn.submit()
# Transform
non_emission_at_sea_max_duration = make_timedelta(
hours=max_hours_without_emission_at_sea
)
non_emission_at_port_max_duration = make_timedelta(
hours=max_hours_without_emission_at_port
)
malfunction_datetime_utc_threshold_at_sea = now - non_emission_at_sea_max_duration
malfunction_datetime_utc_threshold_at_port = now - non_emission_at_port_max_duration
last_emissions_of_vessels_that_should_emit = (
get_last_emissions_of_vessels_that_should_emit(
vessels_that_should_emit, last_positions
)
)
new_malfunctions = get_new_malfunctions(
last_emissions_of_vessels_that_should_emit,
known_malfunctions,
satellite_operators_statuses,
malfunction_datetime_utc_threshold_at_sea,
malfunction_datetime_utc_threshold_at_port,
)
new_malfunctions = prepare_new_beacon_malfunctions(new_malfunctions)
# Load
load_new_beacon_malfunctions(new_malfunctions)
(
ids_not_at_port_restarted_emitting,
ids_at_port_restarted_emitting,
ids_not_required_to_emit,
ids_unsupervised_restarted_emitting,
) = get_ended_malfunction_ids(
last_emissions_of_vessels_that_should_emit,
known_malfunctions,
malfunction_datetime_utc_threshold_at_sea,
)
# Updated August 2024 :
# Malfunctions not "at port" (or supposed not to be, according to the latest
# vessel_status of the malfunction) - that is, in a status of AT_SEA,
# ACTIVITY_DETECTED... anything but AT_PORT - are moved to ARCHIVED and
# automatically notified.
# Previously : were moved to END_OF_MALFUNCTION and the notification was
# requested by the user after a manual check
ids_not_at_port_restarted_emitting_updated = update_beacon_malfunction.map(
ids_not_at_port_restarted_emitting,
new_stage=unmapped(BeaconMalfunctionStage.ARCHIVED),
end_of_malfunction_reason=unmapped(EndOfMalfunctionReason.RESUMED_TRANSMISSION),
)
ids_not_at_port_restarted_emitting_updated = filter_results(
ids_not_at_port_restarted_emitting_updated
)
request_notification.map(
ids_not_at_port_restarted_emitting_updated,
unmapped(BeaconMalfunctionNotificationType.END_OF_MALFUNCTION),
)
# Malfunctions "at port" (or supposed to be, according to the latest
# vessel_status of the malfunction) and malfunctions of unsupervised beacons
# are moved to ARCHIVED and automatically notified.
ids_at_port_restarted_emitting_updated = update_beacon_malfunction.map(
ids_at_port_restarted_emitting,
new_stage=unmapped(BeaconMalfunctionStage.ARCHIVED),
end_of_malfunction_reason=unmapped(EndOfMalfunctionReason.RESUMED_TRANSMISSION),
)
ids_at_port_restarted_emitting_updated = filter_results(
ids_at_port_restarted_emitting_updated
)
request_notification.map(
ids_at_port_restarted_emitting_updated,
unmapped(BeaconMalfunctionNotificationType.END_OF_MALFUNCTION),
)
# Malfunctions of unsupervised beacons are archived and automatically notified.
ids_unsupervised_restarted_emitting_updated = update_beacon_malfunction.map(
ids_unsupervised_restarted_emitting,
new_stage=unmapped(BeaconMalfunctionStage.ARCHIVED),
end_of_malfunction_reason=unmapped(EndOfMalfunctionReason.RESUMED_TRANSMISSION),
)
ids_unsupervised_restarted_emitting_updated = filter_results(
ids_unsupervised_restarted_emitting_updated
)
request_notification.map(
ids_unsupervised_restarted_emitting_updated,
unmapped(BeaconMalfunctionNotificationType.END_OF_MALFUNCTION),
)
# Malfunctions for which the beacon has been deactivated or completely
# unequipped are just archived.
update_beacon_malfunction.map(
ids_not_required_to_emit,
new_stage=unmapped(BeaconMalfunctionStage.ARCHIVED),
end_of_malfunction_reason=unmapped(
EndOfMalfunctionReason.BEACON_DEACTIVATED_OR_UNEQUIPPED
),
)
return (
new_malfunctions,
ids_not_at_port_restarted_emitting,
ids_at_port_restarted_emitting,
ids_not_required_to_emit,
ids_unsupervised_restarted_emitting,
)