pipeline.src.flows.enrich_positions =================================== .. py:module:: pipeline.src.flows.enrich_positions Functions --------- .. autoapisummary:: pipeline.src.flows.enrich_positions.extract_positions pipeline.src.flows.enrich_positions.filter_already_enriched_vessels pipeline.src.flows.enrich_positions.enrich_positions_by_vessel pipeline.src.flows.enrich_positions.load_fishing_activity pipeline.src.flows.enrich_positions.extract_enrich_load pipeline.src.flows.enrich_positions.enrich_positions_flow Module Contents --------------- .. py:function:: extract_positions(period: src.helpers.dates.Period) -> pandas.DataFrame Extracts all positions of a given Period. :param period: Period of extraction :type period: Period :returns: DataFrame of positions. :rtype: pd.DataFrame .. py:function:: filter_already_enriched_vessels(positions: pandas.DataFrame) -> pandas.DataFrame Filters the input positions `DateFrame` by removing positions of vessels that have all their positions already enriched (which is detected by checking whether the `time_emitting_at_sea` column contains any null values). :param positions: vessels' positions. Must have columns: - 'cfr' - 'external_immatriculation' - 'ircs' - 'time_emitting_at_sea' - any other column required for the rest of the flow (latitude, longitude, datetime...) :type positions: pd.DataFrame :returns: same as input with some rows removed. :rtype: pd.DataFrame .. py:function:: enrich_positions_by_vessel(positions: pandas.DataFrame, minimum_consecutive_positions: int, min_fishing_speed_threshold: float, max_fishing_speed_threshold: float, minimum_minutes_of_emission_at_sea: int) -> pandas.DataFrame Applies `enrich_positions` to each vessel's positions. :param positions: input positions. Must have columns: - 'cfr' - 'external_immatriculation' - 'ircs' - 'latitude' - 'longitude' - 'datetime_utc' - 'is_at_port' - 'time_emitting_at_sea' :type positions: pd.DataFrame :returns: same as input, with the following columns added: - 'meters_from_previous_position' - 'time_since_previous_position' - 'average_speed' - 'is_fishing' and with the `time_emitting_at_sea` values recomputed / updated. :rtype: pd.DataFrame .. py:function:: load_fishing_activity(positions: pandas.DataFrame, period: src.helpers.dates.Period, logger: logging.Logger) Updates `positions` table with the contents of the input `DataFrame`. The input `DataFrame` must have columns: - id - is_at_port - meters_from_previous_position - time_since_previous_position - average_speed - is_fishing - time_emitting_at_sea :param positions: Enriched positions data :type positions: pd.DataFrame :param period: the `Period` covered by the input `DataFrame`. This is used to add a `where` clause on the `positions` hypertable limiting the time range queried when looking for `id` corresponding to the rows to update. :type period: Period :param logger: `Logger` :type logger: Logger .. py:function:: extract_enrich_load(period: src.helpers.dates.Period, minimum_consecutive_positions: int, min_fishing_speed_threshold: float, max_fishing_speed_threshold: float, minimum_minutes_of_emission_at_sea: int, recompute_all: bool = False) Extract positions for the given `Period`, enrich and update the `positions` table. This is all done in one `Task` in order to avoid having tasks returning anything. Indeed Prefect stores all task results in memory until the flow run is done running, which in this case must be avoided in order to benefit from the chunked processing logic in terms of memory consumption. .. py:function:: enrich_positions_flow(start_hours_ago: int, end_hours_ago: int, minutes_per_chunk: int, chunk_overlap_minutes: int, minimum_consecutive_positions: int, minimum_minutes_of_emission_at_sea: int, min_fishing_speed_threshold: float, max_fishing_speed_threshold: float, recompute_all: bool)