pipeline.src.flows.missing_far_alerts

Functions

get_dates(→ Tuple[datetime.datetime, ...)

Returns the dates used in the flow as a 5-tuple :

make_positions_at_sea_query(→ sqlalchemy.sql.Select)

Generates the sqlalchemy.Select statement to run in order to get the positions of

extract_vessels_that_emitted_fars(→ set)

Extracts the vessels that emitted at least one FAR logbook report between the

concat(→ pandas.DataFrame)

Concatenates the two input DataFrame.

get_vessels_at_sea(→ pandas.DataFrame)

Returns a DataFrame with the vessels present in the input positions_at_sea

get_vessels_with_missing_fars(→ pandas.DataFrame)

Filters vessels_at_sea to keep only rows whose cfr is NOT in

merge_risk_factor(→ pandas.DataFrame)

Merges on the input DataFrame on ["cfr", "external_immatriculation", "ircs"].

missing_far_alerts_flow(alert_type, name, ...)

Module Contents

pipeline.src.flows.missing_far_alerts.get_dates(days_without_far: int) Tuple[datetime.datetime, datetime.datetime, datetime.datetime, datetime.datetime, float][source]

Returns the dates used in the flow as a 5-tuple :

  • days_without_far days ago at 00:00 (beginning of the day) in UTC (1)

  • Yesterday at 8pm in UTC

  • Today at 00:00 (beginning of the day) in UTC

  • Current datetime in UTC (2)

  • The number of hours that separate 1 and 2

Returns:

Tuple[datetime, datetime, datetime]

pipeline.src.flows.missing_far_alerts.make_positions_at_sea_query(positions_table: sqlalchemy.Table, facade_areas_table: sqlalchemy.Table, from_date: datetime.datetime, to_date: datetime.datetime, states_to_monitor_iso2: list = None, vessels_table: sqlalchemy.Table = None, minimum_length: float = None, eez_areas_table: sqlalchemy.Table = None, eez_to_monitor_iso3: list = None, only_fishing_positions: bool = False, exclude_vessels_with_logbook_exemptions: bool = False) sqlalchemy.sql.Select[source]

Generates the sqlalchemy.Select statement to run in order to get the positions of vessels that were at sea (i.e. those that emitted at least one VMS position outside of a port) between the designated dates and matching the designated flag states.

Parameters:
  • positions_table (Table) – sqlalchemy.Table representing positions

  • facade_areas_table (Table) – sqlalchemy.Table representing facade_areas

  • from_date (datetime) – Start of the time interval to query, in UTC

  • to_date (datetime) – End of the time interval to query, in UTC

  • states_to_monitor_iso2 (list, optional) – If provided, only vessels of the given flag_states will be queried. Defaults to None.

  • vessels_table (Table, optional) – sqlalchemy.Table representing vessels. Must be provided if minimum_length is not None. Defaults to None.

  • minimum_length (float, optional) – If provided, only vessels longer than the given value will be queried (only applies to french vessels). Defaults to None.

  • eez_areas_table (Table, optional) – sqlalchemy.Table representing eez_areas. Must be provided if eez_to_monitor_iso3 is not None. Defaults to None.

  • eez_to_monitor_iso3 (list, optional) – If provided, only VMS emission in the designated EEZ areas will be considered. Defaults to None.

  • only_fishing_positions (bool, optional) – if True, only positions which were detected as being in fishing operation will be considered. Defaults to False.

  • exclude_vessels_with_logbook_exemptions (bool, optional) –

    if True, vessels

    with an ‘Exempté’ logbook_equipment_status will be excluded.

    Defaults to False.

Raises:
  • ValueError – If minimum_length is not None and the vessels_table is not provided.

  • ValueError – If eez_to_monitor_iso3 is not None and the eez_areas_table is not provided.

Returns:

Select statement representing a SQL query

Return type:

Select

pipeline.src.flows.missing_far_alerts.extract_vessels_that_emitted_fars(declaration_min_datetime_utc: datetime.datetime, declaration_max_datetime_utc: datetime.datetime, fishing_operation_min_datetime_utc: datetime.datetime, fishing_operation_max_datetime_utc: datetime.datetime) set[source]

Extracts the vessels that emitted at least one FAR logbook report between the designated dates and returns the result as a set of their CFR numbers.

Date conditions on dates must be made on 3 dates :

  • operation_datetime_utc: for performance reasons (the table is chunked on this column)

  • report_datetime_utc: to get only reports that were filled between the given dates

  • farDatetimeUtc : in certain cases (in particular VisioCapture), reports can be filled weeks or months after the actual fishing operation. In the context of this flow, we are not interested in these reports and want to keep only reports that were filled directly on the boat, in ‘live’.

Parameters:
  • declaration_min_datetime_utc (datetime) – Minimum operation_datetime_utc and report_datetime_utc

  • declaration_max_datetime_utc (datetime) – Maximum operation_datetime_utc and report_datetime_utc

  • fishing_operation_min_datetime_utc (datetime) – Minimum farDatetimeUtc

  • fishing_operation_max_datetime_utc (datetime) – Maximum farDatetimeUtc

Returns:

Set of cfr number of the vessels that emitted at least one FAR report

between the given dates.

Return type:

set

pipeline.src.flows.missing_far_alerts.concat(positions_at_sea_yesterday_everywhere: pandas.DataFrame, positions_at_sea_yesterday_in_french_eez: pandas.DataFrame) pandas.DataFrame[source]

Concatenates the two input DataFrame.

Parameters:
  • positions_at_sea_yesterday_everywhere (pd.DataFrame)

  • positions_at_sea_yesterday_in_french_eez (pd.DataFrame)

Returns:

pd.DataFrame

pipeline.src.flows.missing_far_alerts.get_vessels_at_sea(positions_at_sea: pandas.DataFrame, min_days: int) pandas.DataFrame[source]

Returns a DataFrame with the vessels present in the input positions_at_sea DataFrame which were at sea on at least min_days days. Must have columns :

  • cfr

  • external_immatriculation

  • ircs

  • vessel_name

  • facade

  • flag_state

  • date_time

  • latitude

  • longitude

Parameters:
  • positions_at_sea (pd.DataFrame) – DataFrame of positions of vessels at sea

  • min_days (int) – minimum number of days at sea. Vessels at sea less than min_days days are excluded from the result.

Returns:

vessels of the input that were at sea on at least n_days different days.

Return type:

pd.DataFrame

pipeline.src.flows.missing_far_alerts.get_vessels_with_missing_fars(vessels_at_sea: pandas.DataFrame, vessels_that_emitted_fars: set, max_share_of_vessels_with_missing_fars: float = 0.5) pandas.DataFrame[source]

Filters vessels_at_sea to keep only rows whose cfr is NOT in vessels_that_emitted_fars.

Parameters:
  • vessels_at_sea (pd.DataFrame) – DataFrame of vessels at sea

  • vessels_that_emitted_fars (set) – set cfrs of vessels that emitted FAR reports

  • max_share_of_vessels_with_missing_fars (float, optional) – If the share of vessels_at_sea that are not in vessels_that_emitted_fars is greater than this value, it is assumed that there is a breakdown in the date pipeline and a MonitorfishHealthError is raised. Defaults to 0.5.

Raises:

MonitorfishHealthError – raised if the share of vessels with missing fars is greater than max_share_of_vessels_with_missing_fars

Returns:

Filtered version of vessels_at_sea with only those that are not in vessels_that_emitted_fars

Return type:

pd.DataFrame

pipeline.src.flows.missing_far_alerts.merge_risk_factor(vessels_with_missing_fars: pandas.DataFrame, current_risk_factors: pandas.DataFrame) pandas.DataFrame[source]

Merges on the input DataFrame on [“cfr”, “external_immatriculation”, “ircs”].

Parameters:
  • vessels_with_missing_fars (pd.DataFrame)

  • current_risk_factors (pd.DataFrame)

Returns:

pd.DataFrame

pipeline.src.flows.missing_far_alerts.missing_far_alerts_flow(alert_type: str, name: str, states_iso2_to_monitor_everywhere: list, states_iso2_to_monitor_in_french_eez: list, max_share_of_vessels_with_missing_fars: float, minimum_length: float, only_raise_if_route_shows_fishing: bool, days_without_far: int)[source]