Source code for pipeline.src.flows.update_beacon_malfunctions

from datetime import datetime
from typing import Callable, Optional, Tuple

import numpy as np
import pandas as pd
import requests
from prefect import flow, get_run_logger, task, unmapped

from config import (
    BACKEND_API_KEY,
    BEACON_MALFUNCTIONS_ENDPOINT,
    BEACONS_MAX_HOURS_WITHOUT_EMISSION_AT_PORT,
    BEACONS_MAX_HOURS_WITHOUT_EMISSION_AT_SEA,
)
from src.entities.beacon_malfunctions import (
    BeaconMalfunctionNotificationType,
    BeaconMalfunctionStage,
    BeaconMalfunctionVesselStatus,
    BeaconStatus,
    EndOfMalfunctionReason,
)
from src.generic_tasks import extract, load
from src.processing import join_on_multiple_keys
from src.shared_tasks.control_flow import filter_results
from src.shared_tasks.dates import get_utcnow, make_timedelta
from src.shared_tasks.healthcheck import (
    assert_last_positions_flow_health,
    get_monitorfish_healthcheck,
)


@task
[docs] def extract_last_positions() -> pd.DataFrame: """ Extract the last emission date of each vessel in the `last_positions` table. """ return extract( "monitorfish_remote", "monitorfish/last_positions_for_beacon_malfunctions.sql", )
@task
[docs] def extract_known_malfunctions() -> pd.DataFrame: """ Extract ongoing malfunctions in the `beacon_malfunctions` table. """ return extract("monitorfish_remote", "monitorfish/known_beacon_malfunctions.sql")
@task
[docs] def extract_vessels_that_should_emit() -> pd.DataFrame: """ Extract vessels from the `vessels` table that have a beacon associated to them with a status of `ACTIVATED` or `UNSUPERVISED` and with a flag_state that must be monitored. """ return extract("monitorfish_remote", "monitorfish/vessels_that_should_emit.sql")
@task
[docs] def extract_satellite_operators_statuses() -> pd.DataFrame: """ Extract satellite operators statuses from the `satellite_operators_statuses` view. This is intended to be used to filter which beacon malfunction to create and / or : when a satellite operator is down, we do not want to generate malfunctions for all the beacons of this operator, we want to wait until data flows are up again. """ return extract("monitorfish_remote", "monitorfish/satellite_operators_statuses.sql")
@task
[docs] def get_last_emissions_of_vessels_that_should_emit( vessels_that_should_emit: pd.DataFrame, last_positions: pd.DataFrame ) -> pd.DataFrame: """ Join `vessels_that_should_emit` and `last_positions` using `cfr`, `ircs` and `external_immatriculation` as join keys, using the `join_on_multiple_keys` logic. `last_positions` of a given vessel that were emitted before the vessel's beacon `logging_datetime_utc` are not taken into account : the result therefore only includes each vessel's last emission **with its current beacon**. Ths is done to avoid generating beacon malfunctions on a given vessel from emission data of its previous beacon. Args: vessels_that_should_emit (pd.DataFrame): DataFrame of vessels that should emit last_positions (pd.DataFrame): DataFrame of last positions Returns: pd.DataFrame: last emissions of the input vessels with their current beacon """ last_emissions_of_vessels_that_should_emit = join_on_multiple_keys( vessels_that_should_emit, last_positions, or_join_keys=["cfr", "ircs", "external_immatriculation"], how="left", coalesce_common_columns=False, ) last_emissions_of_vessels_that_should_emit = ( last_emissions_of_vessels_that_should_emit.sort_values( "last_position_datetime_utc", ascending=False ) .groupby("beacon_number") .head(1) .reset_index(drop=True) ) last_emissions_of_vessels_that_should_emit[ "last_position_datetime_utc" ] = last_emissions_of_vessels_that_should_emit.last_position_datetime_utc.where( last_emissions_of_vessels_that_should_emit.last_position_datetime_utc > last_emissions_of_vessels_that_should_emit.logging_datetime_utc ) return last_emissions_of_vessels_that_should_emit
@task
[docs] def get_new_malfunctions( last_emissions: pd.DataFrame, known_malfunctions: pd.DataFrame, satellite_operators_statuses: pd.DataFrame, malfunction_datetime_utc_threshold_at_sea: datetime, malfunction_datetime_utc_threshold_at_port: datetime, ) -> pd.DataFrame: operators_up = set( satellite_operators_statuses.loc[ satellite_operators_statuses.operator_is_up == True, "satellite_operator_id" ].values ) # Remove emissions of beacons already in malfunction last_emissions = last_emissions.loc[ ~last_emissions.beacon_number.isin(known_malfunctions.beacon_number) ].copy(deep=True) # Remove emissions of beacons whose satellite operator is down last_emissions = last_emissions.loc[ last_emissions.satellite_operator_id.isin(operators_up) ].copy(deep=True) new_malfunctions = ( last_emissions.loc[ (last_emissions.is_manual == True) | ( (last_emissions.is_at_port == True) & ( last_emissions.last_position_datetime_utc < malfunction_datetime_utc_threshold_at_port ) ) | ( (last_emissions.is_at_port == False) & ( last_emissions.last_position_datetime_utc < malfunction_datetime_utc_threshold_at_sea ) ) ] .rename(columns={"last_position_datetime_utc": "malfunction_start_date_utc"}) .reset_index(drop=True) ) new_malfunctions = new_malfunctions.drop( columns=["is_manual", "satellite_operator_id"] ) return new_malfunctions
@task
[docs] def get_ended_malfunction_ids( last_emissions_of_vessels_that_should_emit: pd.DataFrame, known_malfunctions: pd.DataFrame, malfunction_datetime_utc_threshold_at_sea: datetime, ) -> Tuple[list, list, list, list]: ids_not_required_to_emit = set( known_malfunctions.loc[ ~known_malfunctions.beacon_number.isin( last_emissions_of_vessels_that_should_emit.beacon_number ), "id", ] ) known_malfunctions_last_emissions = pd.merge( known_malfunctions, last_emissions_of_vessels_that_should_emit, on="beacon_number", how="inner", ) malfunctions_with_restarted_emissions = known_malfunctions_last_emissions.loc[ (known_malfunctions_last_emissions.is_manual == False) & ( known_malfunctions_last_emissions.last_position_datetime_utc >= malfunction_datetime_utc_threshold_at_sea ) ].reset_index(drop=True) ids_unsupervised_restarted_emitting = set( malfunctions_with_restarted_emissions.loc[ malfunctions_with_restarted_emissions.beacon_status != BeaconStatus.ACTIVATED.value, "id", ] ) ids_at_port_restarted_emitting = set( malfunctions_with_restarted_emissions.loc[ ( malfunctions_with_restarted_emissions.beacon_status == BeaconStatus.ACTIVATED.value ) & ( malfunctions_with_restarted_emissions.vessel_status == BeaconMalfunctionVesselStatus.AT_PORT.value ), "id", ] ) ids_not_at_port_restarted_emitting = set( malfunctions_with_restarted_emissions.loc[ ( malfunctions_with_restarted_emissions.beacon_status == BeaconStatus.ACTIVATED.value ) & ( malfunctions_with_restarted_emissions.vessel_status != BeaconMalfunctionVesselStatus.AT_PORT.value ), "id", ] ) return ( list(ids_not_at_port_restarted_emitting), list(ids_at_port_restarted_emitting), list(ids_not_required_to_emit), list(ids_unsupervised_restarted_emitting), )
@task
[docs] def prepare_new_beacon_malfunctions(new_malfunctions: pd.DataFrame) -> pd.DataFrame: new_malfunctions["vessel_status"] = np.choose( new_malfunctions.is_at_port.astype(int), [ BeaconMalfunctionVesselStatus.AT_SEA.value, BeaconMalfunctionVesselStatus.AT_PORT.value, ], ) new_malfunctions["stage"] = BeaconMalfunctionStage.INITIAL_ENCOUNTER.value new_malfunctions["malfunction_end_date_utc"] = pd.NaT new_malfunctions["vessel_status_last_modification_date_utc"] = datetime.utcnow() notification_to_send = { (BeaconMalfunctionVesselStatus.AT_SEA.value, BeaconStatus.ACTIVATED.value): ( BeaconMalfunctionNotificationType.MALFUNCTION_AT_SEA_INITIAL_NOTIFICATION.value ), (BeaconMalfunctionVesselStatus.AT_PORT.value, BeaconStatus.ACTIVATED.value): ( BeaconMalfunctionNotificationType.MALFUNCTION_AT_PORT_INITIAL_NOTIFICATION.value ), (BeaconMalfunctionVesselStatus.AT_SEA.value, BeaconStatus.UNSUPERVISED.value): ( BeaconMalfunctionNotificationType.MALFUNCTION_AT_SEA_INITIAL_NOTIFICATION_UNSUPERVISED_BEACON.value ), ( BeaconMalfunctionVesselStatus.AT_PORT.value, BeaconStatus.UNSUPERVISED.value, ): ( BeaconMalfunctionNotificationType.MALFUNCTION_AT_PORT_INITIAL_NOTIFICATION_UNSUPERVISED_BEACON.value ), } new_malfunctions["notification_requested"] = ( new_malfunctions[["vessel_status", "beacon_status"]] .apply(lambda row: tuple(row), axis=1) .map(notification_to_send) ) new_malfunctions = new_malfunctions.rename( columns={ "cfr": "internal_reference_number", "external_immatriculation": "external_reference_number", "beacon_status": "beacon_status_at_malfunction_creation", } ) ordered_columns = [ "internal_reference_number", "external_reference_number", "ircs", "vessel_name", "flag_state", "vessel_identifier", "vessel_status", "stage", "malfunction_start_date_utc", "malfunction_end_date_utc", "vessel_status_last_modification_date_utc", "vessel_id", "notification_requested", "latitude", "longitude", "beacon_number", "beacon_status_at_malfunction_creation", ] return new_malfunctions.loc[:, ordered_columns]
@task
[docs] def load_new_beacon_malfunctions(new_beacon_malfunctions: pd.DataFrame): load( new_beacon_malfunctions, table_name="beacon_malfunctions", schema="public", db_name="monitorfish_remote", logger=get_run_logger(), how="append", )
@task
[docs] def update_beacon_malfunction( beacon_malfunction_id: int, *, new_stage: Optional[BeaconMalfunctionStage] = None, new_vessel_status: Optional[BeaconMalfunctionVesselStatus] = None, end_of_malfunction_reason: Optional[EndOfMalfunctionReason] = None, ): """Update a `beacon_malfunction` stage or vessel status. - Exactly one of `new_state` or `new_vessel_status` must be provided - `end_of_malfunction_reason` must be provided if `new_stage` is provided and is equal to `ARCHIVED` - `end_of_malfunction_reason` cannot be be provided when `new_stage` is provided and is different from `ARCHIVED` - `end_of_malfunction_reason` cannot be be provided when `new_vessel_status` is provided Args: beacon_malfunction_id (int): id of the beacon_malfunction to update new_stage (beaconMalfunctionStage, optional): stage to move the beacon malfunction to. Defaults to None. new_vessel_status (beaconMalfunctionVesselStatus, optional): vessel_status to move the beacon malfunction to. Defaults to None. end_of_malfunction_reason (endOfMalfunctionReason, optional): reason that led to the archiving of the malfunction. Defaults to None. Raises: ValueError: in the following cases : - both `new_stage` and `new_vessel_status` are provided - both `new_stage` and `new_vessel_status` are null - `new_stage` is `ARCHIVED` and no `end_of_malfunction_reason` is provided - an `end_of_malfunction_reason` is provided along with a `new_vessel_status` - an `end_of_malfunction_reason` is provided along with a `new_stage` other than `ARCHIVED` """ try: assert ( new_stage is None and isinstance(new_vessel_status, BeaconMalfunctionVesselStatus) ) or ( new_vessel_status is None and isinstance(new_stage, BeaconMalfunctionStage) ) except AssertionError: raise ValueError( "Exactly one of new_stage or new_vessel_status must be provided" ) url = BEACON_MALFUNCTIONS_ENDPOINT + str(beacon_malfunction_id) json = {} if new_stage: json["stage"] = new_stage.value if new_stage is BeaconMalfunctionStage.ARCHIVED: try: assert isinstance(end_of_malfunction_reason, EndOfMalfunctionReason) except AssertionError: raise ValueError( ( "Cannot end a malfunction without " "giving an end_of_malfunction_reason" ) ) json["endOfBeaconMalfunctionReason"] = end_of_malfunction_reason.value else: try: assert end_of_malfunction_reason is None except AssertionError: raise ValueError( ( "Cannot give a `EndOfBeaconMalfunctionReason` for a new_stage " "other than `ARCHIVED`." ) ) if new_vessel_status: try: assert end_of_malfunction_reason is None except AssertionError: raise ValueError( ( "Unexpected argument end_of_malfunction_reason " "when updating vessel_status" ) ) json["vesselStatus"] = new_vessel_status.value if json: headers = { "Accept": "application/json, text/plain", "Content-Type": "application/json;charset=UTF-8", "X-API-KEY": BACKEND_API_KEY, } r = requests.put(url=url, json=json, headers=headers) r.raise_for_status() return beacon_malfunction_id
@task
[docs] def request_notification( beacon_malfunction_id: int, requested_notification: BeaconMalfunctionNotificationType, ): try: assert isinstance(requested_notification, BeaconMalfunctionNotificationType) except AssertionError: raise ValueError( ( "Expected BeaconMalfunctionNotificationType, " f"got {requested_notification} instead." ) ) url = ( BEACON_MALFUNCTIONS_ENDPOINT + f"{str(beacon_malfunction_id)}/{requested_notification.value}" ) headers = {"X-API-KEY": BACKEND_API_KEY} r = requests.put(url=url, headers=headers) r.raise_for_status()
@flow(name="Monitorfish - Beacons malfunctions")
[docs] def update_beacon_malfunctions_flow( max_hours_without_emission_at_sea: int = BEACONS_MAX_HOURS_WITHOUT_EMISSION_AT_SEA, max_hours_without_emission_at_port: int = BEACONS_MAX_HOURS_WITHOUT_EMISSION_AT_PORT, extract_satellite_operators_statuses_fn: Callable = extract_satellite_operators_statuses, ): # Healthcheck healthcheck = get_monitorfish_healthcheck() now = get_utcnow() assert_last_positions_flow_health(healthcheck=healthcheck, utcnow=now) # Extract last_positions = extract_last_positions.submit() vessels_that_should_emit = extract_vessels_that_should_emit.submit() known_malfunctions = extract_known_malfunctions.submit() satellite_operators_statuses = extract_satellite_operators_statuses_fn.submit() # Transform non_emission_at_sea_max_duration = make_timedelta( hours=max_hours_without_emission_at_sea ) non_emission_at_port_max_duration = make_timedelta( hours=max_hours_without_emission_at_port ) malfunction_datetime_utc_threshold_at_sea = now - non_emission_at_sea_max_duration malfunction_datetime_utc_threshold_at_port = now - non_emission_at_port_max_duration last_emissions_of_vessels_that_should_emit = ( get_last_emissions_of_vessels_that_should_emit( vessels_that_should_emit, last_positions ) ) new_malfunctions = get_new_malfunctions( last_emissions_of_vessels_that_should_emit, known_malfunctions, satellite_operators_statuses, malfunction_datetime_utc_threshold_at_sea, malfunction_datetime_utc_threshold_at_port, ) new_malfunctions = prepare_new_beacon_malfunctions(new_malfunctions) # Load load_new_beacon_malfunctions(new_malfunctions) ( ids_not_at_port_restarted_emitting, ids_at_port_restarted_emitting, ids_not_required_to_emit, ids_unsupervised_restarted_emitting, ) = get_ended_malfunction_ids( last_emissions_of_vessels_that_should_emit, known_malfunctions, malfunction_datetime_utc_threshold_at_sea, ) # Updated August 2024 : # Malfunctions not "at port" (or supposed not to be, according to the latest # vessel_status of the malfunction) - that is, in a status of AT_SEA, # ACTIVITY_DETECTED... anything but AT_PORT - are moved to ARCHIVED and # automatically notified. # Previously : were moved to END_OF_MALFUNCTION and the notification was # requested by the user after a manual check ids_not_at_port_restarted_emitting_updated = update_beacon_malfunction.map( ids_not_at_port_restarted_emitting, new_stage=unmapped(BeaconMalfunctionStage.ARCHIVED), end_of_malfunction_reason=unmapped(EndOfMalfunctionReason.RESUMED_TRANSMISSION), ) ids_not_at_port_restarted_emitting_updated = filter_results( ids_not_at_port_restarted_emitting_updated ) request_notification.map( ids_not_at_port_restarted_emitting_updated, unmapped(BeaconMalfunctionNotificationType.END_OF_MALFUNCTION), ) # Malfunctions "at port" (or supposed to be, according to the latest # vessel_status of the malfunction) and malfunctions of unsupervised beacons # are moved to ARCHIVED and automatically notified. ids_at_port_restarted_emitting_updated = update_beacon_malfunction.map( ids_at_port_restarted_emitting, new_stage=unmapped(BeaconMalfunctionStage.ARCHIVED), end_of_malfunction_reason=unmapped(EndOfMalfunctionReason.RESUMED_TRANSMISSION), ) ids_at_port_restarted_emitting_updated = filter_results( ids_at_port_restarted_emitting_updated ) request_notification.map( ids_at_port_restarted_emitting_updated, unmapped(BeaconMalfunctionNotificationType.END_OF_MALFUNCTION), ) # Malfunctions of unsupervised beacons are archived and automatically notified. ids_unsupervised_restarted_emitting_updated = update_beacon_malfunction.map( ids_unsupervised_restarted_emitting, new_stage=unmapped(BeaconMalfunctionStage.ARCHIVED), end_of_malfunction_reason=unmapped(EndOfMalfunctionReason.RESUMED_TRANSMISSION), ) ids_unsupervised_restarted_emitting_updated = filter_results( ids_unsupervised_restarted_emitting_updated ) request_notification.map( ids_unsupervised_restarted_emitting_updated, unmapped(BeaconMalfunctionNotificationType.END_OF_MALFUNCTION), ) # Malfunctions for which the beacon has been deactivated or completely # unequipped are just archived. update_beacon_malfunction.map( ids_not_required_to_emit, new_stage=unmapped(BeaconMalfunctionStage.ARCHIVED), end_of_malfunction_reason=unmapped( EndOfMalfunctionReason.BEACON_DEACTIVATED_OR_UNEQUIPPED ), ) return ( new_malfunctions, ids_not_at_port_restarted_emitting, ids_at_port_restarted_emitting, ids_not_required_to_emit, ids_unsupervised_restarted_emitting, )