pipeline.src.flows.enrich_logbook ================================= .. py:module:: pipeline.src.flows.enrich_logbook Functions --------- .. autoapisummary:: pipeline.src.flows.enrich_logbook.extract_infringement_risk_levels pipeline.src.flows.enrich_logbook.extract_pno_types pipeline.src.flows.enrich_logbook.extract_control_anteriority pipeline.src.flows.enrich_logbook.extract_vessels_with_active_reportings pipeline.src.flows.enrich_logbook.reset_pnos pipeline.src.flows.enrich_logbook.extract_pno_trips_period pipeline.src.flows.enrich_logbook.extract_pno_catches pipeline.src.flows.enrich_logbook.compute_pno_segments pipeline.src.flows.enrich_logbook.compute_pno_types pipeline.src.flows.enrich_logbook.merge_pnos_data pipeline.src.flows.enrich_logbook.compute_pno_risk_factors pipeline.src.flows.enrich_logbook.flag_pnos_to_verify_and_send pipeline.src.flows.enrich_logbook.load_enriched_pnos pipeline.src.flows.enrich_logbook.extract_enrich_load_logbook pipeline.src.flows.enrich_logbook.enrich_logbook_flow Module Contents --------------- .. py:function:: extract_infringement_risk_levels() .. py:function:: extract_pno_types() -> pandas.DataFrame .. py:function:: extract_control_anteriority() -> pandas.DataFrame .. py:function:: extract_vessels_with_active_reportings() -> set .. py:function:: reset_pnos(period: src.helpers.dates.Period) Deletes enriched data from pnos in logbook table in the designated Period. Distribution attributes (`isInVerificationScope`, `IsVerified`, `IsSent`, `IsBeingSent`) are not reset. .. py:function:: extract_pno_trips_period(period: src.helpers.dates.Period) -> src.helpers.dates.Period Extracts the earliest `tripStartDate` and the latest `predictedArrivalDatetimeUtc` from all PNOs emitted during the given `Period`. :param period: `Period` of reception of PNOs :type period: Period :returns: `Period` with `start` = `min_trip_start_date` and `end` = `max_predicted_arrival_datetime_utc` :rtype: Period .. py:function:: extract_pno_catches(pno_emission_period: src.helpers.dates.Period, trips_period: src.helpers.dates.Period) -> pandas.DataFrame .. py:function:: compute_pno_segments(pno_catches: pandas.DataFrame, segments: pandas.DataFrame, infringement_risk_levels: pandas.DataFrame) -> pandas.DataFrame Computes the segments of the input PNO species and gears. :param pno_catches: DataFrame of PNO species. 1 line = catch. Must have columns : - logbook_reports_pno_id `int` `1` - trip_gears `List[dict]` `[{"gear": "xxx", "mesh": yyy, "dimensions": "zzz}, {...}]` - species `str` `'COD'` - fao_area `str` `'27.7.d'` - year `int` `2022` - facade `str` `'MED'` - weight `float` `230.2` - vessel_type `str` `Fishing vessel` - scip_species_type `str` `DEMERSAL` :type pno_catches: pd.DataFrame :param segments: DataFrame of segments definitions. 1 line = 1 segment. Must have columns : - year `int` `2022` - segment `str` `SWW1` - segment_name `str` `Nom du segment` - gears `List[str]` `["OTB", ...]` - fao_areas `List[str]` `["27.8", ...]` - target_species `List[str]` `["COD", ...]` - impact_risk_factor `float` `2.8` - min_mesh `float` `80.0` - max_mesh `float` `120.0` - min_share_of_target_species `float` `0.2` - main_scip_species_type `str` `DEMERSAL` - vessel_types `List[str]` `['Fishing vessel', 'Trawler']` - priority `float` `1.0` :type segments: pd.DataFrame :param infringement_risk_levels: DataFrame of infringement risk levels. Must have columns: - year `int` 2022 - facade `str` "MEMN" - segment `str` "SWW01" - infringement_risk_level `float` 2.8 :type infringement_risk_levels: pd.DataFrame :returns: DataFrame of PNOs with attributed PNO types. 1 line = 1 PNO. Has columns: - logbook_reports_pno_id `int` `1` - trip_gears `List[dict]` `[{"gear": "xxx", "mesh": yyy, "dimensions": "zzz}, {...}]` - pno_types `List[dict]` ```[ { "pno_type_name": "Type 1", "minimum_notification_period": 4.0, "has_designated_ports": True }, {...} ]``` - impact_risk_factor `float` `2.8` - infringement_risk_level `float` `2.8` :rtype: pd.DataFrame .. py:function:: compute_pno_types(pno_catches: pandas.DataFrame, pno_types: pandas.DataFrame) -> pandas.DataFrame Computes the PNO types of the input PNO species and gears. :param pno_catches: DataFrame of PNO species. 1 line = catch. Must have columns : - logbook_reports_pno_id `int` `1` - cfr `str` `FRA000000000` - `predicted_arrival_datetime_utc` `datetime` - year `int` `2023` - species `str` `'COD'` - trip_gears `List[dict]` `[{"gear": "xxx", "mesh": yyy, "dimensions": "zzz}, {...}]` - fao_area `str` `'27.7.d'` - weight `float` `150.5` - flag_state `str` `'FRA'` - locode `str` `CCXXX` - country_code_iso2 `'FR'` :type pno_catches: pd.DataFrame :param pno_types: DataFrame of pno_types definitions. 1 line = 1 rule. Must have columns : - pno_type_id `int` `1` - pno_type_name `str` `"Ports désignés thon rouge"` - minimum_notification_period `float` `4.0` - has_designated_ports `bool` `True` - pno_type_rule_id `int` `1` - species `List[str]` `["COD", ...]` - gears `List[str]` `["OTB", ...]` - fao_areas `List[str]` `["27.8", ...]` - flag_states `List[str]` `["GBR", ...]` - minimum_quantity_kg `float` `2500.0` :type pno_types: pd.DataFrame :returns: DataFrame of PNOs with attributed PNO types. 1 line = 1 PNO. Has columns: - logbook_reports_pno_id `int` `1` - cfr `str` `FRA000000000` - locode `str` `CCXXX` - `predicted_arrival_datetime_utc` `datetime` - trip_gears `List[dict]` `[{"gear": "xxx", "mesh": yyy, "dimensions": "zzz}, {...}]` - pno_types `List[dict]` ```[ { "pnoTypeName": "Type 1", "minimumNotificationPeriod": 4.0, "hasDesignatedPorts": True }, {...} ]``` :rtype: pd.DataFrame .. py:function:: merge_pnos_data(pnos_with_types: pandas.DataFrame, pnos_with_segments: pandas.DataFrame) -> pandas.DataFrame Merges the input DataFrames on `logbook_reports_pno_id` :param pnos_with_types: DataFrame of PNOs with their types :type pnos_with_types: pd.DataFrame :param pnos_with_segments: DataFrame of PNOs with their segments :type pnos_with_segments: pd.DataFrame :returns: DataFrame of PNOs with their types and segments :rtype: pd.DataFrame .. py:function:: compute_pno_risk_factors(pnos: pandas.DataFrame, control_anteriority: pandas.DataFrame) -> pandas.DataFrame .. py:function:: flag_pnos_to_verify_and_send(pnos: pandas.DataFrame, pno_units_targeting_vessels: pandas.DataFrame, pno_units_ports_and_segments_subscriptions: pandas.DataFrame, predicted_arrival_threshold: datetime.datetime, vessels_with_active_reportings: set) .. py:function:: load_enriched_pnos(enriched_pnos: pandas.DataFrame, period: src.helpers.dates.Period, logger: logging.Logger) Loads `enriched_pnos` data to `logbook_reports` table. If distribution attributes (`isInVerificationScope`, `IsVerified`, `IsSent`, `IsBeingSent`) are already present in the `value` field of the `logbook_reports` table, the values in the table are preserved and values from the DataFrame are ignored. :param enriched_pnos: Enriched PNOs data :type enriched_pnos: pd.DataFrame :param period: Date range of PNOs :type period: Period :param logger: logger instance :type logger: Logger .. py:function:: extract_enrich_load_logbook(period: src.helpers.dates.Period, segments: pandas.DataFrame, pno_types: pandas.DataFrame, control_anteriority: pandas.DataFrame, infringement_risk_levels: pandas.DataFrame, pno_units_targeting_vessels: pandas.DataFrame, pno_units_ports_and_segments_subscriptions: pandas.DataFrame, utcnow: datetime.datetime, vessels_with_active_reportings: set) Extract pnos for the given `Period`, enrich and update the `logbook` table. This is all done in one `Task` in order to avoid having tasks returning anything. Indeed Prefect stores all task results in memory until the flow run is done running, which in this case must be avoided in order to benefit from the chunked processing logic in terms of memory consumption. .. py:function:: enrich_logbook_flow(start_hours_ago: int, end_hours_ago: int, minutes_per_chunk: int, recompute_all: bool)