pipeline.src.flows.last_positions
Functions
|
Checks that the received parameter value is valid and returns it. Raises ValueError |
|
Extracts the last AIS position of each vessel. |
|
Extracts the last position of each vessel over the past minutes minutes. |
|
|
|
|
|
Drop duplicate vessels in a pandas.DataFrame of positions, keeping only the most |
|
Extracts the contents of the last_positions table (which was computed by the |
|
Filters all positions of new_last_positions that are present in |
|
Splits vessels into 3 categories: |
|
Computes the emission period of the last_positions that require an update. |
|
Concatenates the 3 sets of last_positions and reindexes the rows from 1 to n. |
|
|
|
Performs a left join on last_positions, risk_factors, pending_alerts, reportings and |
|
|
|
|
|
Module Contents
- pipeline.src.flows.last_positions.validate_action(action: str) str[source]
Checks that the received parameter value is valid and returns it. Raises ValueError otherwise.
- Parameters:
action (str) – input parameter for the flow
- Returns:
input, if valid
- Return type:
str
- Raises:
ValueError – if input in not valid
- pipeline.src.flows.last_positions.extract_ais_last_positions() pandas.DataFrame[source]
Extracts the last AIS position of each vessel.
- Returns:
DataFrame of vessels’ last AIS position.
- Return type:
pd.DataFrame
- pipeline.src.flows.last_positions.extract_last_positions(minutes: int) pandas.DataFrame[source]
Extracts the last position of each vessel over the past minutes minutes.
- Parameters:
minutes (int) – number of minutes from current datetime to extract
- Returns:
DataFrame of vessels’ last position.
- Return type:
pd.DataFrame
- pipeline.src.flows.last_positions.drop_duplicates(positions: pandas.DataFrame) pandas.DataFrame[source]
Drop duplicate vessels in a pandas.DataFrame of positions, keeping only the most recent position of each vessel.
This is required although the query that computes last positions already contains a DISTINCT ON clause because for some vessels, we receive each position twice with partially different identifiers - for instance, the same CFR but different ircs or external immatriculation.
De-deplucation is done using, by decreasing priority, vessel_id, CFR, ircs and external_immatriculation.
- Parameters:
positions (pd.DataFrame) – positions of vessels. Must contain columns “vessel_id”, “cfr”, “external_immatriculation”, “ircs” and “last_position_datetime_utc”.
- Returns:
DataFrame of vessels’ last position with duplicates removed.
- Return type:
pd.DataFrame
- pipeline.src.flows.last_positions.extract_previous_last_positions() pandas.DataFrame[source]
Extracts the contents of the last_positions table (which was computed by the previous run of the last_positions flow), with the has_charter field updated by taking the current value in the vessels table.
- Returns:
- DataFrame of vessels’ last position as (it was last computed by
the last_positions flow).
- Return type:
pd.DataFrame
- pipeline.src.flows.last_positions.drop_unchanged_new_last_positions(new_last_positions: pandas.DataFrame, previous_last_positions: pandas.DataFrame) pandas.DataFrame[source]
Filters all positions of new_last_positions that are present in previous_last_positions.
- Parameters:
previous_last_positions (pd.DataFrame)
new_last_positions (pd.DataFrame)
- Returns:
subset of new_last_positions
- Return type:
pd.DataFrame
- pipeline.src.flows.last_positions.split(previous_last_positions: pandas.DataFrame, new_last_positions: pandas.DataFrame) Tuple[pandas.DataFrame, pandas.DataFrame, pandas.DataFrame][source]
Splits vessels into 3 categories:
The ones that are in previous_last_positions only (known vessels that haven’t moved)
The ones that are in new_last_positions only (new vessels never seen before)
The ones in both datasets (known vessels that have moved and whose position must be updated)
Returns the last_positions data of these 3 sets of vessels separately in 3 DataFrames. For vessels whose position must be updated, the returned DataFrame contains the data of both the previous and the new last_position, in order to make it possible to computed some metrics (i.e. the emission period).
- Parameters:
previous_last_positions (pd.DataFrame)
new_last_positions (pd.DataFrame)
- Returns:
unchanged_previous_last_positions
new_vessels_last_positions
last_positions_to_update
- Return type:
Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]
- pipeline.src.flows.last_positions.compute_emission_period(last_positions_to_update: pandas.DataFrame) pandas.DataFrame[source]
Computes the emission period of the last_positions that require an update.
If an emission period is already present (which might happen if there are more than one position per vessel in the requested time period of the last_position query), this emission period is used. Otherwise, the emission period is taken to be equal to the time between the previous last_position_datetime_utc and the new last_position_datetime_utc.
- Parameters:
last_positions_to_update (pd.DataFrame) – last_positions data for vessels that have moved
- Returns:
updated last_positions with computed emission period field
- Return type:
pd.DataFrame
- pipeline.src.flows.last_positions.concatenate(unchanged_previous_last_positions: pandas.DataFrame, new_vessels_last_positions: pandas.DataFrame, updated_last_positions: pandas.DataFrame) pandas.DataFrame[source]
Concatenates the 3 sets of last_positions and reindexes the rows from 1 to n.
- Parameters:
unchanged_previous_last_positions (pd.DataFrame)
new_vessels_last_positions (pd.DataFrame)
updated_last_positions (pd.DataFrame)
- Returns:
concatenation of the 3 inputs sets of last_positions
- Return type:
pd.DataFrame
- pipeline.src.flows.last_positions.estimate_current_positions(last_positions: pandas.DataFrame, max_hours_since_last_position: float) pandas.DataFrame[source]
- Parameters:
last_positions – vessels’ last position with route and speed data.
- Returns:
- vessels’ last position with added estimated_current_latitude and
estimated_current_longitude fields
- Return type:
pd.DataFrame
- pipeline.src.flows.last_positions.join(last_positions: pandas.DataFrame, risk_factors: pandas.DataFrame, pending_alerts: pandas.DataFrame, reportings: pandas.DataFrame, beacon_malfunctions: pandas.DataFrame, ais_last_positions: pandas.DataFrame) pandas.DataFrame[source]
Performs a left join on last_positions, risk_factors, pending_alerts, reportings and beacon_malfunctions using vessel_id cfr, ircs and external_immatriculation as join keys.
Also updates last_position_datetime_utc, latitude and longitude with the most recent position between last_positions (VMS) and ais_last_positions (AIS), matched on cfr. Sets position_type to ‘AIS’ or ‘VMS’ accordingly.