pipeline.src.flows.enrich_logbook
Functions
|
|
|
|
|
Deletes enriched data from pnos in logbook table in the designated Period. |
|
Extracts the earliest tripStartDate and the latest predictedArrivalDatetimeUtc |
|
|
|
Computes the segments of the input PNO species and gears. |
|
Computes the PNO types of the input PNO species and gears. |
|
Merges the input DataFrames on logbook_reports_pno_id |
|
|
|
|
|
Loads enriched_pnos data to logbook_reports table. |
|
Extract pnos for the given Period, enrich and update the logbook table. |
|
Module Contents
- pipeline.src.flows.enrich_logbook.reset_pnos(period: src.helpers.dates.Period)[source]
Deletes enriched data from pnos in logbook table in the designated Period.
Distribution attributes (isInVerificationScope, IsVerified, IsSent, IsBeingSent) are not reset.
- pipeline.src.flows.enrich_logbook.extract_pno_trips_period(period: src.helpers.dates.Period) src.helpers.dates.Period[source]
Extracts the earliest tripStartDate and the latest predictedArrivalDatetimeUtc from all PNOs emitted during the given Period.
- pipeline.src.flows.enrich_logbook.extract_pno_catches(pno_emission_period: src.helpers.dates.Period, trips_period: src.helpers.dates.Period) pandas.DataFrame[source]
- pipeline.src.flows.enrich_logbook.compute_pno_segments(pno_catches: pandas.DataFrame, segments: pandas.DataFrame, infringement_risk_levels: pandas.DataFrame) pandas.DataFrame[source]
Computes the segments of the input PNO species and gears.
- Parameters:
pno_catches (pd.DataFrame) –
DataFrame of PNO species. 1 line = catch. Must have columns :
logbook_reports_pno_id int 1
trip_gears List[dict] [{“gear”: “xxx”, “mesh”: yyy, “dimensions”: “zzz}, {…}]
species str ‘COD’
fao_area str ‘27.7.d’
year int 2022
facade str ‘MED’
weight float 230.2
vessel_type str Fishing vessel
scip_species_type str DEMERSAL
segments (pd.DataFrame) –
DataFrame of segments definitions. 1 line = 1 segment. Must have columns :
year int 2022
segment str SWW1
segment_name str Nom du segment
gears List[str] [“OTB”, …]
fao_areas List[str] [“27.8”, …]
target_species List[str] [“COD”, …]
impact_risk_factor float 2.8
min_mesh float 80.0
max_mesh float 120.0
min_share_of_target_species float 0.2
main_scip_species_type str DEMERSAL
vessel_types List[str] [‘Fishing vessel’, ‘Trawler’]
priority float 1.0
infringement_risk_levels (pd.DataFrame) –
DataFrame of infringement risk levels. Must have columns:
year int 2022
facade str “MEMN”
segment str “SWW01”
infringement_risk_level float 2.8
- Returns:
- DataFrame of PNOs with attributed PNO types. 1 line = 1 PNO.
Has columns:
logbook_reports_pno_id int 1
trip_gears List[dict] [{“gear”: “xxx”, “mesh”: yyy, “dimensions”: “zzz}, {…}]
-
- {
“pno_type_name”: “Type 1”, “minimum_notification_period”: 4.0, “has_designated_ports”: True
}, {…}
]```
impact_risk_factor float 2.8
infringement_risk_level float 2.8
- Return type:
pd.DataFrame
- pipeline.src.flows.enrich_logbook.compute_pno_types(pno_catches: pandas.DataFrame, pno_types: pandas.DataFrame) pandas.DataFrame[source]
Computes the PNO types of the input PNO species and gears.
- Parameters:
pno_catches (pd.DataFrame) –
DataFrame of PNO species. 1 line = catch. Must have columns :
logbook_reports_pno_id int 1
cfr str FRA000000000
predicted_arrival_datetime_utc datetime
year int 2023
species str ‘COD’
trip_gears List[dict] [{“gear”: “xxx”, “mesh”: yyy, “dimensions”: “zzz}, {…}]
fao_area str ‘27.7.d’
weight float 150.5
flag_state str ‘FRA’
locode str CCXXX
country_code_iso2 ‘FR’
pno_types (pd.DataFrame) –
DataFrame of pno_types definitions. 1 line = 1 rule. Must have columns :
pno_type_id int 1
pno_type_name str “Ports désignés thon rouge”
minimum_notification_period float 4.0
has_designated_ports bool True
pno_type_rule_id int 1
species List[str] [“COD”, …]
gears List[str] [“OTB”, …]
fao_areas List[str] [“27.8”, …]
flag_states List[str] [“GBR”, …]
minimum_quantity_kg float 2500.0
- Returns:
- DataFrame of PNOs with attributed PNO types. 1 line = 1 PNO.
Has columns:
logbook_reports_pno_id int 1
cfr str FRA000000000
locode str CCXXX
predicted_arrival_datetime_utc datetime
trip_gears List[dict] [{“gear”: “xxx”, “mesh”: yyy, “dimensions”: “zzz}, {…}]
-
- {
“pnoTypeName”: “Type 1”, “minimumNotificationPeriod”: 4.0, “hasDesignatedPorts”: True
}, {…}
]```
- Return type:
pd.DataFrame
- pipeline.src.flows.enrich_logbook.merge_pnos_data(pnos_with_types: pandas.DataFrame, pnos_with_segments: pandas.DataFrame) pandas.DataFrame[source]
Merges the input DataFrames on logbook_reports_pno_id
- Parameters:
pnos_with_types (pd.DataFrame) – DataFrame of PNOs with their types
pnos_with_segments (pd.DataFrame) – DataFrame of PNOs with their segments
- Returns:
DataFrame of PNOs with their types and segments
- Return type:
pd.DataFrame
- pipeline.src.flows.enrich_logbook.compute_pno_risk_factors(pnos: pandas.DataFrame, control_anteriority: pandas.DataFrame) pandas.DataFrame[source]
- pipeline.src.flows.enrich_logbook.flag_pnos_to_verify_and_send(pnos: pandas.DataFrame, pno_units_targeting_vessels: pandas.DataFrame, pno_units_ports_and_segments_subscriptions: pandas.DataFrame, predicted_arrival_threshold: datetime.datetime, vessels_with_active_reportings: set)[source]
- pipeline.src.flows.enrich_logbook.load_enriched_pnos(enriched_pnos: pandas.DataFrame, period: src.helpers.dates.Period, logger: logging.Logger)[source]
Loads enriched_pnos data to logbook_reports table.
If distribution attributes (isInVerificationScope, IsVerified, IsSent, IsBeingSent) are already present in the value field of the logbook_reports table, the values in the table are preserved and values from the DataFrame are ignored.
- Parameters:
enriched_pnos (pd.DataFrame) – Enriched PNOs data
period (Period) – Date range of PNOs
logger (Logger) – logger instance
- pipeline.src.flows.enrich_logbook.extract_enrich_load_logbook(period: src.helpers.dates.Period, segments: pandas.DataFrame, pno_types: pandas.DataFrame, control_anteriority: pandas.DataFrame, infringement_risk_levels: pandas.DataFrame, pno_units_targeting_vessels: pandas.DataFrame, pno_units_ports_and_segments_subscriptions: pandas.DataFrame, utcnow: datetime.datetime, vessels_with_active_reportings: set)[source]
Extract pnos for the given Period, enrich and update the logbook table.
This is all done in one Task in order to avoid having tasks returning anything. Indeed Prefect stores all task results in memory until the flow run is done running, which in this case must be avoided in order to benefit from the chunked processing logic in terms of memory consumption.