Source code for pipeline.src.shared_tasks.positions

import pandas as pd
from prefect import task

from config import ANCHORAGES_H3_CELL_RESOLUTION
from src.generic_tasks import extract
from src.helpers.bathymetry import get_depth
from src.helpers.spatial import get_h3_indices
from src.processing import get_first_non_null_column_name


@task
[docs] def tag_positions_at_port(positions: pd.DataFrame) -> pd.DataFrame: """ Adds an `is_at_port` boolean field to a DataFrame containing positions. Mandatory fields `latitude` and `longitude` are converted to h3 indices and compared to a referential of h3 indices of ports and anchor areas. If the h3 index of the latitude-longitude position matches that of a port or anchor area, then `is_at_port` will be `True`. Additionnally, if a `is_on_land` boolean column is present in the input DataFrame, positions with `is_on_land` = True will have `is_at_port` = True regardless of the latitude-longitude position. The `is_on_land` column is dropped after being taken into account for the computation of `is_at_port`. Args: positions: DataFrame with at least 'latitude' and 'longitude' columns and an optionnal `is_on_land` column. Returns: pd.DataFrame: same as input with an added 'is_at_port' boolean field. """ positions = positions.copy(deep=True) if len(positions) == 0: positions["is_at_port"] = False else: positions["h3"] = get_h3_indices( positions, lat="latitude", lon="longitude", resolution=ANCHORAGES_H3_CELL_RESOLUTION, ) h3_indices_at_port = set( extract( db_name="monitorfish_remote", query_filepath="monitorfish/h3_is_anchorage.sql", params={"h3_cells": tuple(positions.h3)}, )["h3"] ) positions["is_at_port"] = positions.h3.isin(h3_indices_at_port) positions = positions.drop(columns=["h3"]) if "is_on_land" in positions: positions["is_at_port"] = positions[["is_at_port", "is_on_land"]].any( axis=1 ) positions = positions.drop(columns=["is_on_land"]) return positions
@task
[docs] def add_vessel_identifier(positions: pd.DataFrame) -> pd.DataFrame: vessel_identifier_labels = { "cfr": "INTERNAL_REFERENCE_NUMBER", "ircs": "IRCS", "external_immatriculation": "EXTERNAL_REFERENCE_NUMBER", } positions = positions.copy(deep=True) positions["vessel_identifier"] = get_first_non_null_column_name( positions[["cfr", "ircs", "external_immatriculation"]], vessel_identifier_labels, ) return positions
@task
[docs] def add_depth(positions: pd.DataFrame) -> pd.DataFrame: positions = positions.copy(deep=True) positions["depth"] = positions.apply( lambda row: get_depth(lon=row["longitude"], lat=row["latitude"]), axis=1, result_type="reduce", ) return positions