pipeline.src.flows.enrich_positions
Functions
|
Extracts all positions of a given Period. |
|
Filters the input positions DateFrame by removing positions of vessels that have |
|
Applies enrich_positions to each vessel's positions. |
|
Updates positions table with the contents of the input DataFrame. |
|
Extract positions for the given Period, enrich and update the positions |
|
Module Contents
- pipeline.src.flows.enrich_positions.extract_positions(period: src.helpers.dates.Period) pandas.DataFrame[source]
Extracts all positions of a given Period.
- Parameters:
period (Period) – Period of extraction
- Returns:
DataFrame of positions.
- Return type:
pd.DataFrame
- pipeline.src.flows.enrich_positions.filter_already_enriched_vessels(positions: pandas.DataFrame) pandas.DataFrame[source]
Filters the input positions DateFrame by removing positions of vessels that have all their positions already enriched (which is detected by checking whether the time_emitting_at_sea column contains any null values).
- Parameters:
positions (pd.DataFrame) –
vessels’ positions. Must have columns:
’cfr’
’external_immatriculation’
’ircs’
’time_emitting_at_sea’
any other column required for the rest of the flow (latitude, longitude, datetime…)
- Returns:
same as input with some rows removed.
- Return type:
pd.DataFrame
- pipeline.src.flows.enrich_positions.enrich_positions_by_vessel(positions: pandas.DataFrame, minimum_consecutive_positions: int, min_fishing_speed_threshold: float, max_fishing_speed_threshold: float, minimum_minutes_of_emission_at_sea: int) pandas.DataFrame[source]
Applies enrich_positions to each vessel’s positions.
- Parameters:
positions (pd.DataFrame) –
input positions. Must have columns:
’cfr’
’external_immatriculation’
’ircs’
’latitude’
’longitude’
’datetime_utc’
’is_at_port’
’time_emitting_at_sea’
- Returns:
same as input, with the following columns added:
’meters_from_previous_position’
’time_since_previous_position’
’average_speed’
’is_fishing’
and with the time_emitting_at_sea values recomputed / updated.
- Return type:
pd.DataFrame
- pipeline.src.flows.enrich_positions.load_fishing_activity(positions: pandas.DataFrame, period: src.helpers.dates.Period, logger: logging.Logger)[source]
Updates positions table with the contents of the input DataFrame. The input DataFrame must have columns:
id
is_at_port
meters_from_previous_position
time_since_previous_position
average_speed
is_fishing
time_emitting_at_sea
- Parameters:
positions (pd.DataFrame) – Enriched positions data
period (Period) – the Period covered by the input DataFrame. This is used to add a where clause on the positions hypertable limiting the time range queried when looking for id corresponding to the rows to update.
logger (Logger) – Logger
- pipeline.src.flows.enrich_positions.extract_enrich_load(period: src.helpers.dates.Period, minimum_consecutive_positions: int, min_fishing_speed_threshold: float, max_fishing_speed_threshold: float, minimum_minutes_of_emission_at_sea: int, recompute_all: bool = False)[source]
Extract positions for the given Period, enrich and update the positions table.
This is all done in one Task in order to avoid having tasks returning anything. Indeed Prefect stores all task results in memory until the flow run is done running, which in this case must be avoided in order to benefit from the chunked processing logic in terms of memory consumption.
- pipeline.src.flows.enrich_positions.enrich_positions_flow(start_hours_ago: int, end_hours_ago: int, minutes_per_chunk: int, chunk_overlap_minutes: int, minimum_consecutive_positions: int, minimum_minutes_of_emission_at_sea: int, min_fishing_speed_threshold: float, max_fishing_speed_threshold: float, recompute_all: bool)[source]