pipeline.src.flows.enrich_logbook

Functions

extract_infringement_risk_levels()

extract_pno_types(→ pandas.DataFrame)

extract_control_anteriority(→ pandas.DataFrame)

extract_vessels_with_active_reportings(→ set)

reset_pnos(period)

Deletes enriched data from pnos in logbook table in the designated Period.

extract_pno_trips_period(→ src.helpers.dates.Period)

Extracts the earliest tripStartDate and the latest predictedArrivalDatetimeUtc

extract_pno_catches(→ pandas.DataFrame)

compute_pno_segments(→ pandas.DataFrame)

Computes the segments of the input PNO species and gears.

compute_pno_types(→ pandas.DataFrame)

Computes the PNO types of the input PNO species and gears.

merge_pnos_data(→ pandas.DataFrame)

Merges the input DataFrames on logbook_reports_pno_id

compute_pno_risk_factors(→ pandas.DataFrame)

flag_pnos_to_verify_and_send(pnos, ...)

load_enriched_pnos(enriched_pnos, period, logger)

Loads enriched_pnos data to logbook_reports table.

extract_enrich_load_logbook(period, segments, ...)

Extract pnos for the given Period, enrich and update the logbook table.

enrich_logbook_flow(start_hours_ago, end_hours_ago, ...)

Module Contents

pipeline.src.flows.enrich_logbook.extract_infringement_risk_levels()[source]
pipeline.src.flows.enrich_logbook.extract_pno_types() pandas.DataFrame[source]
pipeline.src.flows.enrich_logbook.extract_control_anteriority() pandas.DataFrame[source]
pipeline.src.flows.enrich_logbook.extract_vessels_with_active_reportings() set[source]
pipeline.src.flows.enrich_logbook.reset_pnos(period: src.helpers.dates.Period)[source]

Deletes enriched data from pnos in logbook table in the designated Period.

Distribution attributes (isInVerificationScope, IsVerified, IsSent, IsBeingSent) are not reset.

pipeline.src.flows.enrich_logbook.extract_pno_trips_period(period: src.helpers.dates.Period) src.helpers.dates.Period[source]

Extracts the earliest tripStartDate and the latest predictedArrivalDatetimeUtc from all PNOs emitted during the given Period.

Parameters:

period (Period) – Period of reception of PNOs

Returns:

Period with start = min_trip_start_date and

end = max_predicted_arrival_datetime_utc

Return type:

Period

pipeline.src.flows.enrich_logbook.extract_pno_catches(pno_emission_period: src.helpers.dates.Period, trips_period: src.helpers.dates.Period) pandas.DataFrame[source]
pipeline.src.flows.enrich_logbook.compute_pno_segments(pno_catches: pandas.DataFrame, segments: pandas.DataFrame, infringement_risk_levels: pandas.DataFrame) pandas.DataFrame[source]

Computes the segments of the input PNO species and gears.

Parameters:
  • pno_catches (pd.DataFrame) –

    DataFrame of PNO species. 1 line = catch. Must have columns :

    • logbook_reports_pno_id int 1

    • trip_gears List[dict] [{“gear”: “xxx”, “mesh”: yyy, “dimensions”: “zzz}, {…}]

    • species str ‘COD’

    • fao_area str ‘27.7.d’

    • year int 2022

    • facade str ‘MED’

    • weight float 230.2

    • vessel_type str Fishing vessel

    • scip_species_type str DEMERSAL

  • segments (pd.DataFrame) –

    DataFrame of segments definitions. 1 line = 1 segment. Must have columns :

    • year int 2022

    • segment str SWW1

    • segment_name str Nom du segment

    • gears List[str] [“OTB”, …]

    • fao_areas List[str] [“27.8”, …]

    • target_species List[str] [“COD”, …]

    • impact_risk_factor float 2.8

    • min_mesh float 80.0

    • max_mesh float 120.0

    • min_share_of_target_species float 0.2

    • main_scip_species_type str DEMERSAL

    • vessel_types List[str] [‘Fishing vessel’, ‘Trawler’]

    • priority float 1.0

  • infringement_risk_levels (pd.DataFrame) –

    DataFrame of infringement risk levels. Must have columns:

    • year int 2022

    • facade str “MEMN”

    • segment str “SWW01”

    • infringement_risk_level float 2.8

Returns:

DataFrame of PNOs with attributed PNO types. 1 line = 1 PNO.

Has columns:

  • logbook_reports_pno_id int 1

  • trip_gears List[dict] [{“gear”: “xxx”, “mesh”: yyy, “dimensions”: “zzz}, {…}]

  • pno_types List[dict] ```[

    {

    “pno_type_name”: “Type 1”, “minimum_notification_period”: 4.0, “has_designated_ports”: True

    }, {…}

    ]```

  • impact_risk_factor float 2.8

  • infringement_risk_level float 2.8

Return type:

pd.DataFrame

pipeline.src.flows.enrich_logbook.compute_pno_types(pno_catches: pandas.DataFrame, pno_types: pandas.DataFrame) pandas.DataFrame[source]

Computes the PNO types of the input PNO species and gears.

Parameters:
  • pno_catches (pd.DataFrame) –

    DataFrame of PNO species. 1 line = catch. Must have columns :

    • logbook_reports_pno_id int 1

    • cfr str FRA000000000

    • predicted_arrival_datetime_utc datetime

    • year int 2023

    • species str ‘COD’

    • trip_gears List[dict] [{“gear”: “xxx”, “mesh”: yyy, “dimensions”: “zzz}, {…}]

    • fao_area str ‘27.7.d’

    • weight float 150.5

    • flag_state str ‘FRA’

    • locode str CCXXX

    • country_code_iso2 ‘FR’

  • pno_types (pd.DataFrame) –

    DataFrame of pno_types definitions. 1 line = 1 rule. Must have columns :

    • pno_type_id int 1

    • pno_type_name str “Ports désignés thon rouge”

    • minimum_notification_period float 4.0

    • has_designated_ports bool True

    • pno_type_rule_id int 1

    • species List[str] [“COD”, …]

    • gears List[str] [“OTB”, …]

    • fao_areas List[str] [“27.8”, …]

    • flag_states List[str] [“GBR”, …]

    • minimum_quantity_kg float 2500.0

Returns:

DataFrame of PNOs with attributed PNO types. 1 line = 1 PNO.

Has columns:

  • logbook_reports_pno_id int 1

  • cfr str FRA000000000

  • locode str CCXXX

  • predicted_arrival_datetime_utc datetime

  • trip_gears List[dict] [{“gear”: “xxx”, “mesh”: yyy, “dimensions”: “zzz}, {…}]

  • pno_types List[dict] ```[

    {

    “pnoTypeName”: “Type 1”, “minimumNotificationPeriod”: 4.0, “hasDesignatedPorts”: True

    }, {…}

    ]```

Return type:

pd.DataFrame

pipeline.src.flows.enrich_logbook.merge_pnos_data(pnos_with_types: pandas.DataFrame, pnos_with_segments: pandas.DataFrame) pandas.DataFrame[source]

Merges the input DataFrames on logbook_reports_pno_id

Parameters:
  • pnos_with_types (pd.DataFrame) – DataFrame of PNOs with their types

  • pnos_with_segments (pd.DataFrame) – DataFrame of PNOs with their segments

Returns:

DataFrame of PNOs with their types and segments

Return type:

pd.DataFrame

pipeline.src.flows.enrich_logbook.compute_pno_risk_factors(pnos: pandas.DataFrame, control_anteriority: pandas.DataFrame) pandas.DataFrame[source]
pipeline.src.flows.enrich_logbook.flag_pnos_to_verify_and_send(pnos: pandas.DataFrame, pno_units_targeting_vessels: pandas.DataFrame, pno_units_ports_and_segments_subscriptions: pandas.DataFrame, predicted_arrival_threshold: datetime.datetime, vessels_with_active_reportings: set)[source]
pipeline.src.flows.enrich_logbook.load_enriched_pnos(enriched_pnos: pandas.DataFrame, period: src.helpers.dates.Period, logger: logging.Logger)[source]

Loads enriched_pnos data to logbook_reports table.

If distribution attributes (isInVerificationScope, IsVerified, IsSent, IsBeingSent) are already present in the value field of the logbook_reports table, the values in the table are preserved and values from the DataFrame are ignored.

Parameters:
  • enriched_pnos (pd.DataFrame) – Enriched PNOs data

  • period (Period) – Date range of PNOs

  • logger (Logger) – logger instance

pipeline.src.flows.enrich_logbook.extract_enrich_load_logbook(period: src.helpers.dates.Period, segments: pandas.DataFrame, pno_types: pandas.DataFrame, control_anteriority: pandas.DataFrame, infringement_risk_levels: pandas.DataFrame, pno_units_targeting_vessels: pandas.DataFrame, pno_units_ports_and_segments_subscriptions: pandas.DataFrame, utcnow: datetime.datetime, vessels_with_active_reportings: set)[source]

Extract pnos for the given Period, enrich and update the logbook table.

This is all done in one Task in order to avoid having tasks returning anything. Indeed Prefect stores all task results in memory until the flow run is done running, which in this case must be avoided in order to benefit from the chunked processing logic in terms of memory consumption.

pipeline.src.flows.enrich_logbook.enrich_logbook_flow(start_hours_ago: int, end_hours_ago: int, minutes_per_chunk: int, recompute_all: bool)[source]