pipeline.src.flows.enrich_positions

Functions

extract_positions(→ pandas.DataFrame)

Extracts all positions of a given Period.

filter_already_enriched_vessels(→ pandas.DataFrame)

Filters the input positions DateFrame by removing positions of vessels that have

enrich_positions_by_vessel(→ pandas.DataFrame)

Applies enrich_positions to each vessel's positions.

load_fishing_activity(positions, period, logger)

Updates positions table with the contents of the input DataFrame.

extract_enrich_load(period, ...[, recompute_all])

Extract positions for the given Period, enrich and update the positions

enrich_positions_flow(start_hours_ago, end_hours_ago, ...)

Module Contents

pipeline.src.flows.enrich_positions.extract_positions(period: src.helpers.dates.Period) pandas.DataFrame[source]

Extracts all positions of a given Period.

Parameters:

period (Period) – Period of extraction

Returns:

DataFrame of positions.

Return type:

pd.DataFrame

pipeline.src.flows.enrich_positions.filter_already_enriched_vessels(positions: pandas.DataFrame) pandas.DataFrame[source]

Filters the input positions DateFrame by removing positions of vessels that have all their positions already enriched (which is detected by checking whether the time_emitting_at_sea column contains any null values).

Parameters:

positions (pd.DataFrame) –

vessels’ positions. Must have columns:

  • ’cfr’

  • ’external_immatriculation’

  • ’ircs’

  • ’time_emitting_at_sea’

  • any other column required for the rest of the flow (latitude, longitude, datetime…)

Returns:

same as input with some rows removed.

Return type:

pd.DataFrame

pipeline.src.flows.enrich_positions.enrich_positions_by_vessel(positions: pandas.DataFrame, minimum_consecutive_positions: int, min_fishing_speed_threshold: float, max_fishing_speed_threshold: float, minimum_minutes_of_emission_at_sea: int) pandas.DataFrame[source]

Applies enrich_positions to each vessel’s positions.

Parameters:

positions (pd.DataFrame) –

input positions. Must have columns:

  • ’cfr’

  • ’external_immatriculation’

  • ’ircs’

  • ’latitude’

  • ’longitude’

  • ’datetime_utc’

  • ’is_at_port’

  • ’time_emitting_at_sea’

Returns:

same as input, with the following columns added:

  • ’meters_from_previous_position’

  • ’time_since_previous_position’

  • ’average_speed’

  • ’is_fishing’

and with the time_emitting_at_sea values recomputed / updated.

Return type:

pd.DataFrame

pipeline.src.flows.enrich_positions.load_fishing_activity(positions: pandas.DataFrame, period: src.helpers.dates.Period, logger: logging.Logger)[source]

Updates positions table with the contents of the input DataFrame. The input DataFrame must have columns:

  • id

  • is_at_port

  • meters_from_previous_position

  • time_since_previous_position

  • average_speed

  • is_fishing

  • time_emitting_at_sea

Parameters:
  • positions (pd.DataFrame) – Enriched positions data

  • period (Period) – the Period covered by the input DataFrame. This is used to add a where clause on the positions hypertable limiting the time range queried when looking for id corresponding to the rows to update.

  • logger (Logger) – Logger

pipeline.src.flows.enrich_positions.extract_enrich_load(period: src.helpers.dates.Period, minimum_consecutive_positions: int, min_fishing_speed_threshold: float, max_fishing_speed_threshold: float, minimum_minutes_of_emission_at_sea: int, recompute_all: bool = False)[source]

Extract positions for the given Period, enrich and update the positions table.

This is all done in one Task in order to avoid having tasks returning anything. Indeed Prefect stores all task results in memory until the flow run is done running, which in this case must be avoided in order to benefit from the chunked processing logic in terms of memory consumption.

pipeline.src.flows.enrich_positions.enrich_positions_flow(start_hours_ago: int, end_hours_ago: int, minutes_per_chunk: int, chunk_overlap_minutes: int, minimum_consecutive_positions: int, minimum_minutes_of_emission_at_sea: int, min_fishing_speed_threshold: float, max_fishing_speed_threshold: float, recompute_all: bool)[source]