Source code for pipeline.src.helpers.vessels

from logging import Logger

import numpy as np
import pandas as pd
from sqlalchemy import Table, select
from sqlalchemy.sql import Select

from src.processing import get_unused_col_name, join_on_multiple_keys


[docs] def make_add_vessels_columns_query( vessel_ids: list, vessels_table: Table, vessels_columns_to_add: list = None, districts_table: Table = None, districts_columns_to_add: list = None, ) -> Select: """ Creates a `sqlalchemy.select` statement representing a query to fetch the designated columns from the `vessels` and / or `districts` tables for the indicated `vessel_ids`. Args: vessel_ids (list): List of vessels `id` to fetch data for. vessels_table (Table): vessels table. vessels_columns_to_add (list, optional): List of columns to get from the `vessels` table. Defaults to None. districts_table (Table, optional): districts table. Must be supplied if `districts_columns_to_get` is given. Defaults to None. districts_columns_to_add (list, optional): List of columns to get from the `districts` table. Defaults to None. Returns: Select: select statement to execute to get the indicated data. """ from_table = vessels_table if districts_columns_to_add: assert isinstance(districts_table, Table) from_table = from_table.join( districts_table, from_table.c.district_code == districts_table.c.district_code, isouter=True, ) columns = [vessels_table.c.id.label("vessel_id")] if vessels_columns_to_add: columns += [vessels_table.c.get(col) for col in vessels_columns_to_add] if districts_columns_to_add: columns += [districts_table.c.get(col) for col in districts_columns_to_add] q = ( select(*columns) .select_from(from_table) .where(vessels_table.c.id.in_(vessel_ids)) ) return q
[docs] def make_find_vessels_query( vessels: pd.DataFrame, vessels_table: Table, ) -> Select: """ Creates a `sqlalchemy.select` object representing a query to find `vessels` in the `vessels` table that match any of the lines in the input `DataFrame` on any of `cfr`, `ircs` or `external_immatriculation`. Args: vessels (pd.DataFrame): `DataFrame`. Must have columns `cfr`, `ircs` and `external_immatriculation`. If any other columns are present they are ignored. vessels_table (Table): `sqlalchemy.Table` object representing the `vessels` table. Must have columns `cfr`, `ircs` and `external_immatriculation`. If any other columns are present they are ignored. Returns: Select: query object with columns `vessel_id`, `cfr`, `ircs` and `external_immatriculation`. """ assert "cfr" in vessels assert "ircs" in vessels assert "external_immatriculation" in vessels q = select( vessels_table.c.id.label("vessel_id"), vessels_table.c.cfr, vessels_table.c.ircs, vessels_table.c.external_immatriculation, ).where( vessels_table.c.cfr.in_(vessels.cfr.dropna().drop_duplicates().to_list()) | vessels_table.c.external_immatriculation.in_( vessels.external_immatriculation.dropna().drop_duplicates().to_list() ) | vessels_table.c.ircs.in_(vessels.ircs.dropna().drop_duplicates().to_list()) ) return q
[docs] def merge_vessel_id( vessels: pd.DataFrame, found_vessels: pd.DataFrame, logger: Logger ) -> pd.DataFrame: """ The two input DataFrames are assumed to be: - a list of vessels with `cfr`, `ircs` and `external_immatriculation` identifiers (plus potential other columns) without a `vessel_id` column - a list of vessels with `cfr`, `ircs` and `external_immatriculation` and `vessel_id` columns (and no other columns). Typically these are the vessels that are found in the `vessels` table that match one of the identifiers of the `vessels` DataFrame by the `make_find_vessels_query` query. The idea is to add the `vessel_id` from the second DataFrame as a new column in the first DataFrame, by matching the right lines in both DataFrame. This is done by perfoming a left join of the input DataFrames using join_on_multiple_keys on ["cfr", "ircs", "external_immatriculation"]. Additionnally, the returned `vessel_id` for each line in the first DataFrame is `None` if the following conditions are not met : - there is no ambiguity: only one vessel in the second DataFrame can be matched to a given line in the first DataFrame - there is no conflict: at most one vessel in the first DataFrame can be matched to a given line in the second DataFrame Lines in the second DataFrame that do not match a line in the first DataFrame are absent from the result. Lines in the first DataFrame that do not match a line in the second DataFrame are present in the result with a `vessel_id` of `None`. The result always has exactly the same lines as the first input DataFrame. Args: vessels (pd.DataFrame): Vessels to match to a found_vessel found_vessels (pd.DataFrame): found_vessels to match to a vessel logger (Logger): Logger instance Returns: pd.DataFrame: Same as vessels with an added `vessel_id` column. """ initial_length = len(vessels) vessels = vessels.copy(deep=True) # Number rows of input DataFrame input_id = get_unused_col_name("input_row_number", vessels) vessels[input_id] = range(len(vessels)) # Join vessels = join_on_multiple_keys( vessels, found_vessels, or_join_keys=["cfr", "ircs", "external_immatriculation"], how="left", coalesce_common_columns=False, ) vessels["is_ambiguous"] = vessels.duplicated(subset=input_id, keep=False) vessels["is_in_conflict"] = vessels.duplicated(subset="vessel_id", keep=False) if vessels.is_ambiguous.any(): ambiguous_vessels = vessels.loc[ vessels.is_ambiguous, [input_id, "cfr", "ircs", "external_immatriculation", "vessel_id"], ].sort_values(input_id) warning_message = ( "The following identifiers are ambiguous as they could correspond to " "more than one vessel:\n" + str(ambiguous_vessels.to_string(index=False)) ) logger.warning(warning_message) if vessels.is_in_conflict.any(): vessels_in_conflict = vessels.loc[ vessels.is_in_conflict, [input_id, "cfr", "ircs", "external_immatriculation", "vessel_id"], ].sort_values("vessel_id") warning_message = ( "The following identifiers conflict with one another - " "more than one match the same vessel:\n" + str(vessels_in_conflict.to_string(index=False)) ) logger.warning(warning_message) vessels = vessels.drop_duplicates(subset=input_id) assert len(vessels) == initial_length vessels["vessel_id"] = vessels.vessel_id.where( ~(vessels[["is_ambiguous", "is_in_conflict"]].any(axis=1)), np.nan ) return ( vessels.sort_values(input_id) .drop(columns=[input_id, "is_ambiguous", "is_in_conflict"]) .reset_index(drop=True) )