pipeline.src.flows.last_positions ================================= .. py:module:: pipeline.src.flows.last_positions Functions --------- .. autoapisummary:: pipeline.src.flows.last_positions.validate_action pipeline.src.flows.last_positions.extract_ais_last_positions pipeline.src.flows.last_positions.extract_last_positions pipeline.src.flows.last_positions.extract_pending_alerts pipeline.src.flows.last_positions.extract_reportings pipeline.src.flows.last_positions.drop_duplicates pipeline.src.flows.last_positions.extract_previous_last_positions pipeline.src.flows.last_positions.drop_unchanged_new_last_positions pipeline.src.flows.last_positions.split pipeline.src.flows.last_positions.compute_emission_period pipeline.src.flows.last_positions.concatenate pipeline.src.flows.last_positions.extract_risk_factors pipeline.src.flows.last_positions.extract_beacon_malfunctions pipeline.src.flows.last_positions.estimate_current_positions pipeline.src.flows.last_positions.join pipeline.src.flows.last_positions.load_last_positions pipeline.src.flows.last_positions.load_last_positions_ais pipeline.src.flows.last_positions.last_positions_flow Module Contents --------------- .. py:function:: validate_action(action: str) -> str Checks that the received parameter value is valid and returns it. Raises ValueError otherwise. :param action: input parameter for the flow :type action: str :returns: input, if valid :rtype: str :raises ValueError: if input in not valid .. py:function:: extract_ais_last_positions() -> pandas.DataFrame Extracts the last AIS position of each vessel. :returns: DataFrame of vessels' last AIS position. :rtype: pd.DataFrame .. py:function:: extract_last_positions(minutes: int) -> pandas.DataFrame Extracts the last position of each vessel over the past `minutes` minutes. :param minutes: number of minutes from current datetime to extract :type minutes: int :returns: DataFrame of vessels' last position. :rtype: pd.DataFrame .. py:function:: extract_pending_alerts() -> pandas.DataFrame .. py:function:: extract_reportings() -> pandas.DataFrame .. py:function:: drop_duplicates(positions: pandas.DataFrame) -> pandas.DataFrame Drop duplicate vessels in a `pandas.DataFrame` of positions, keeping only the most recent position of each vessel. This is required although the query that computes last positions already contains a DISTINCT ON clause because for some vessels, we receive each position twice with partially different identifiers - for instance, the same CFR but different ircs or external immatriculation. De-deplucation is done using, by decreasing priority, vessel_id, CFR, ircs and external_immatriculation. :param positions: positions of vessels. Must contain columns "vessel_id", "cfr", "external_immatriculation", "ircs" and "last_position_datetime_utc". :type positions: pd.DataFrame :returns: DataFrame of vessels' last position with duplicates removed. :rtype: pd.DataFrame .. py:function:: extract_previous_last_positions() -> pandas.DataFrame Extracts the contents of the `last_positions` table (which was computed by the previous run of the `last_positions` flow), with the `has_charter` field updated by taking the current value in the `vessels` table. :returns: DataFrame of vessels' last position as (it was last computed by the last_positions flow). :rtype: pd.DataFrame .. py:function:: drop_unchanged_new_last_positions(new_last_positions: pandas.DataFrame, previous_last_positions: pandas.DataFrame) -> pandas.DataFrame Filters all positions of new_last_positions that are present in previous_last_positions. :param previous_last_positions: :type previous_last_positions: pd.DataFrame :param new_last_positions: :type new_last_positions: pd.DataFrame :returns: subset of new_last_positions :rtype: pd.DataFrame .. py:function:: split(previous_last_positions: pandas.DataFrame, new_last_positions: pandas.DataFrame) -> Tuple[pandas.DataFrame, pandas.DataFrame, pandas.DataFrame] Splits vessels into 3 categories: - The ones that are in previous_last_positions only (known vessels that haven't moved) - The ones that are in new_last_positions only (new vessels never seen before) - The ones in both datasets (known vessels that have moved and whose position must be updated) Returns the last_positions data of these 3 sets of vessels separately in 3 DataFrames. For vessels whose position must be updated, the returned DataFrame contains the data of both the previous and the new last_position, in order to make it possible to computed some metrics (i.e. the emission period). :param previous_last_positions: :type previous_last_positions: pd.DataFrame :param new_last_positions: :type new_last_positions: pd.DataFrame :returns: - unchanged_previous_last_positions - new_vessels_last_positions - last_positions_to_update :rtype: Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame] .. py:function:: compute_emission_period(last_positions_to_update: pandas.DataFrame) -> pandas.DataFrame Computes the emission period of the last_positions that require an update. If an emission period is already present (which might happen if there are more than one position per vessel in the requested time period of the last_position query), this emission period is used. Otherwise, the emission period is taken to be equal to the time between the previous last_position_datetime_utc and the new last_position_datetime_utc. :param last_positions_to_update: last_positions data for vessels that have moved :type last_positions_to_update: pd.DataFrame :returns: updated last_positions with computed emission period field :rtype: pd.DataFrame .. py:function:: concatenate(unchanged_previous_last_positions: pandas.DataFrame, new_vessels_last_positions: pandas.DataFrame, updated_last_positions: pandas.DataFrame) -> pandas.DataFrame Concatenates the 3 sets of last_positions and reindexes the rows from 1 to n. :param unchanged_previous_last_positions: :type unchanged_previous_last_positions: pd.DataFrame :param new_vessels_last_positions: :type new_vessels_last_positions: pd.DataFrame :param updated_last_positions: :type updated_last_positions: pd.DataFrame :returns: concatenation of the 3 inputs sets of last_positions :rtype: pd.DataFrame .. py:function:: extract_risk_factors() .. py:function:: extract_beacon_malfunctions() .. py:function:: estimate_current_positions(last_positions: pandas.DataFrame, max_hours_since_last_position: float) -> pandas.DataFrame :param last_positions: vessels' last position with route and speed data. :type last_positions: pd.DataFrame max_hours_since_last_position (float): maximum time in hours since the last position above which the current position will not be extrapolated. :returns: vessels' last position with added estimated_current_latitude and estimated_current_longitude fields :rtype: pd.DataFrame .. py:function:: join(last_positions: pandas.DataFrame, risk_factors: pandas.DataFrame, pending_alerts: pandas.DataFrame, reportings: pandas.DataFrame, beacon_malfunctions: pandas.DataFrame, ais_last_positions: pandas.DataFrame) -> pandas.DataFrame Performs a left join on last_positions, risk_factors, pending_alerts, reportings and beacon_malfunctions using vessel_id cfr, ircs and external_immatriculation as join keys. Also updates last_position_datetime_utc, latitude and longitude with the most recent position between last_positions (VMS) and ais_last_positions (AIS), matched on cfr. Sets position_type to 'AIS' or 'VMS' accordingly. .. py:function:: load_last_positions(last_positions) .. py:function:: load_last_positions_ais(ais_last_positions) .. py:function:: last_positions_flow(current_position_estimation_max_hours: int = CURRENT_POSITION_ESTIMATION_MAX_HOURS, minutes: int = 5, action: str = 'update')