pipeline.src.flows.last_positions

Functions

validate_action(→ str)

Checks that the received parameter value is valid and returns it. Raises ValueError

extract_ais_last_positions(→ pandas.DataFrame)

Extracts the last AIS position of each vessel.

extract_last_positions(→ pandas.DataFrame)

Extracts the last position of each vessel over the past minutes minutes.

extract_pending_alerts(→ pandas.DataFrame)

extract_reportings(→ pandas.DataFrame)

drop_duplicates(→ pandas.DataFrame)

Drop duplicate vessels in a pandas.DataFrame of positions, keeping only the most

extract_previous_last_positions(→ pandas.DataFrame)

Extracts the contents of the last_positions table (which was computed by the

drop_unchanged_new_last_positions(→ pandas.DataFrame)

Filters all positions of new_last_positions that are present in

split(→ Tuple[pandas.DataFrame, pandas.DataFrame, ...)

Splits vessels into 3 categories:

compute_emission_period(→ pandas.DataFrame)

Computes the emission period of the last_positions that require an update.

concatenate(→ pandas.DataFrame)

Concatenates the 3 sets of last_positions and reindexes the rows from 1 to n.

extract_risk_factors()

extract_beacon_malfunctions()

estimate_current_positions(→ pandas.DataFrame)

join(→ pandas.DataFrame)

Performs a left join on last_positions, risk_factors, pending_alerts, reportings and

load_last_positions(last_positions)

load_last_positions_ais(ais_last_positions)

last_positions_flow([...])

Module Contents

pipeline.src.flows.last_positions.validate_action(action: str) str[source]

Checks that the received parameter value is valid and returns it. Raises ValueError otherwise.

Parameters:

action (str) – input parameter for the flow

Returns:

input, if valid

Return type:

str

Raises:

ValueError – if input in not valid

pipeline.src.flows.last_positions.extract_ais_last_positions() pandas.DataFrame[source]

Extracts the last AIS position of each vessel.

Returns:

DataFrame of vessels’ last AIS position.

Return type:

pd.DataFrame

pipeline.src.flows.last_positions.extract_last_positions(minutes: int) pandas.DataFrame[source]

Extracts the last position of each vessel over the past minutes minutes.

Parameters:

minutes (int) – number of minutes from current datetime to extract

Returns:

DataFrame of vessels’ last position.

Return type:

pd.DataFrame

pipeline.src.flows.last_positions.extract_pending_alerts() pandas.DataFrame[source]
pipeline.src.flows.last_positions.extract_reportings() pandas.DataFrame[source]
pipeline.src.flows.last_positions.drop_duplicates(positions: pandas.DataFrame) pandas.DataFrame[source]

Drop duplicate vessels in a pandas.DataFrame of positions, keeping only the most recent position of each vessel.

This is required although the query that computes last positions already contains a DISTINCT ON clause because for some vessels, we receive each position twice with partially different identifiers - for instance, the same CFR but different ircs or external immatriculation.

De-deplucation is done using, by decreasing priority, vessel_id, CFR, ircs and external_immatriculation.

Parameters:

positions (pd.DataFrame) – positions of vessels. Must contain columns “vessel_id”, “cfr”, “external_immatriculation”, “ircs” and “last_position_datetime_utc”.

Returns:

DataFrame of vessels’ last position with duplicates removed.

Return type:

pd.DataFrame

pipeline.src.flows.last_positions.extract_previous_last_positions() pandas.DataFrame[source]

Extracts the contents of the last_positions table (which was computed by the previous run of the last_positions flow), with the has_charter field updated by taking the current value in the vessels table.

Returns:

DataFrame of vessels’ last position as (it was last computed by

the last_positions flow).

Return type:

pd.DataFrame

pipeline.src.flows.last_positions.drop_unchanged_new_last_positions(new_last_positions: pandas.DataFrame, previous_last_positions: pandas.DataFrame) pandas.DataFrame[source]

Filters all positions of new_last_positions that are present in previous_last_positions.

Parameters:
  • previous_last_positions (pd.DataFrame)

  • new_last_positions (pd.DataFrame)

Returns:

subset of new_last_positions

Return type:

pd.DataFrame

pipeline.src.flows.last_positions.split(previous_last_positions: pandas.DataFrame, new_last_positions: pandas.DataFrame) Tuple[pandas.DataFrame, pandas.DataFrame, pandas.DataFrame][source]

Splits vessels into 3 categories:

  • The ones that are in previous_last_positions only (known vessels that haven’t moved)

  • The ones that are in new_last_positions only (new vessels never seen before)

  • The ones in both datasets (known vessels that have moved and whose position must be updated)

Returns the last_positions data of these 3 sets of vessels separately in 3 DataFrames. For vessels whose position must be updated, the returned DataFrame contains the data of both the previous and the new last_position, in order to make it possible to computed some metrics (i.e. the emission period).

Parameters:
  • previous_last_positions (pd.DataFrame)

  • new_last_positions (pd.DataFrame)

Returns:

  • unchanged_previous_last_positions

  • new_vessels_last_positions

  • last_positions_to_update

Return type:

Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]

pipeline.src.flows.last_positions.compute_emission_period(last_positions_to_update: pandas.DataFrame) pandas.DataFrame[source]

Computes the emission period of the last_positions that require an update.

If an emission period is already present (which might happen if there are more than one position per vessel in the requested time period of the last_position query), this emission period is used. Otherwise, the emission period is taken to be equal to the time between the previous last_position_datetime_utc and the new last_position_datetime_utc.

Parameters:

last_positions_to_update (pd.DataFrame) – last_positions data for vessels that have moved

Returns:

updated last_positions with computed emission period field

Return type:

pd.DataFrame

pipeline.src.flows.last_positions.concatenate(unchanged_previous_last_positions: pandas.DataFrame, new_vessels_last_positions: pandas.DataFrame, updated_last_positions: pandas.DataFrame) pandas.DataFrame[source]

Concatenates the 3 sets of last_positions and reindexes the rows from 1 to n.

Parameters:
  • unchanged_previous_last_positions (pd.DataFrame)

  • new_vessels_last_positions (pd.DataFrame)

  • updated_last_positions (pd.DataFrame)

Returns:

concatenation of the 3 inputs sets of last_positions

Return type:

pd.DataFrame

pipeline.src.flows.last_positions.extract_risk_factors()[source]
pipeline.src.flows.last_positions.extract_beacon_malfunctions()[source]
pipeline.src.flows.last_positions.estimate_current_positions(last_positions: pandas.DataFrame, max_hours_since_last_position: float) pandas.DataFrame[source]
Parameters:

last_positions – vessels’ last position with route and speed data.

Returns:

vessels’ last position with added estimated_current_latitude and

estimated_current_longitude fields

Return type:

pd.DataFrame

pipeline.src.flows.last_positions.join(last_positions: pandas.DataFrame, risk_factors: pandas.DataFrame, pending_alerts: pandas.DataFrame, reportings: pandas.DataFrame, beacon_malfunctions: pandas.DataFrame, ais_last_positions: pandas.DataFrame) pandas.DataFrame[source]

Performs a left join on last_positions, risk_factors, pending_alerts, reportings and beacon_malfunctions using vessel_id cfr, ircs and external_immatriculation as join keys.

Also updates last_position_datetime_utc, latitude and longitude with the most recent position between last_positions (VMS) and ais_last_positions (AIS), matched on cfr. Sets position_type to ‘AIS’ or ‘VMS’ accordingly.

pipeline.src.flows.last_positions.load_last_positions(last_positions)[source]
pipeline.src.flows.last_positions.load_last_positions_ais(ais_last_positions)[source]
pipeline.src.flows.last_positions.last_positions_flow(current_position_estimation_max_hours: int = CURRENT_POSITION_ESTIMATION_MAX_HOURS, minutes: int = 5, action: str = 'update')[source]