from typing import Callable, Tuple
import geopandas as gpd
import pandas as pd
from prefect import flow, get_run_logger, task
from sqlalchemy import DDL
from config import POSEIDON_CONTROL_ID_TO_MONITORENV_MISSION_ID_SHIFT
from src.db_config import create_engine
from src.entities.missions import (
InfractionType,
MissionActionType,
MissionOrigin,
MissionType,
)
from src.generic_tasks import extract, load
from src.helpers.controls import make_infractions
from src.helpers.fao_areas import remove_redundant_fao_area_codes
from src.processing import df_to_dict_series, zeros_ones_to_bools
from src.shared_tasks.facades import extract_facade_areas
# ********************************** Tasks and flow ***********************************
@task
@task
@task
@task
@task
@task
@task
[docs]
def compute_controls_fao_areas(
controls: pd.DataFrame, fao_areas: gpd.GeoDataFrame, ports: pd.DataFrame
) -> pd.DataFrame:
"""
Compute the FAO area(s) of controls.
For controls with a location (latitude and longitude), the FAO area of the location
of the control is returned.
For controls with a port (locode), the FAO area(s) of the port are taken.
NB : controls that have no fao_area (because they lack location or port information
or because their location / ports does not belong to an FAO area) will not be
included in the result.
Args:
controls (pd.DataFrame): controls with at least `id`, `latitude`, `longitude`
and `port_locode` columns
fao_areas (gpd.GeoDataFrame): FAO areas with `f_code` column (and geometry)
ports (pd.DataFrame): ports with `locode` and `fao_areas` columns
Returns:
pd.DataFrame: controls with FAO areas added
"""
# For controls with a latitudide and longitude (air and sea controls), assign the
# corresponding FAO area
localized_controls = controls.loc[
(controls.longitude.notnull()) & (controls.latitude.notnull()),
["id", "latitude", "longitude"],
]
localized_controls = gpd.GeoDataFrame(
localized_controls,
geometry=gpd.points_from_xy(
localized_controls.longitude, localized_controls.latitude
),
crs=4326,
)
localized_controls = (
gpd.sjoin(
localized_controls,
fao_areas,
)[["id", "f_code"]]
.groupby("id")["f_code"]
.agg(list)
.map(lambda li: remove_redundant_fao_area_codes(li))
.rename("fao_areas")
.reset_index()
)
# For controls with a port (land controls), assign the corresponding FAO area
controls_at_port = controls.loc[
(controls.longitude.isna() | controls.latitude.isna())
& (controls.port_locode.notnull()),
["id", "port_locode"],
]
controls_at_port = pd.merge(
controls_at_port,
ports.rename(columns={"locode": "port_locode"}),
on="port_locode",
)[["id", "fao_areas"]]
# Concatenate controls
controls_fao_areas = pd.concat([localized_controls, controls_at_port])
return controls_fao_areas
@task
[docs]
def compute_controls_facade(
controls: pd.DataFrame, facade_areas: gpd.GeoDataFrame, ports: pd.DataFrame
) -> pd.DataFrame:
"""
Compute the facade of controls.
For controls with a location (latitude and longitude), the facade of the
location of the control is returned.
For controls with a port (locode), the facade of the port is taken.
NB : controls that have no facade (because they lack location or port information
or because their location / ports does not belong to a facade area) will not be
included in the result.
Args:
controls (pd.DataFrame): controls with at least `id`, `latitude`, `longitude`
and `port_locode` columns
facade_areas (gpd.GeoDataFrame): facades with `facade` column (and
geometry)
ports (pd.DataFrame): ports with `locode` and `facade` columns
Returns:
pd.DataFrame: DataFrame with columns `id` and `facade`
"""
# For controls with a latitude and longitude (air and sea controls), assign the
# corresponding facade
localized_controls = controls.loc[
(controls.longitude.notnull()) & (controls.latitude.notnull()),
["id", "latitude", "longitude"],
]
localized_controls = gpd.GeoDataFrame(
localized_controls,
geometry=gpd.points_from_xy(
localized_controls.longitude, localized_controls.latitude
),
crs=4326,
)
localized_controls = gpd.sjoin(localized_controls, facade_areas)[["id", "facade"]]
# For controls with a port (land controls), assign the corresponding facade
controls_at_port = controls.loc[
(controls.longitude.isna() | controls.latitude.isna())
& (controls.port_locode.notnull()),
["id", "port_locode"],
]
controls_at_port = pd.merge(
controls_at_port,
ports.rename(columns={"locode": "port_locode"}),
on="port_locode",
)[["id", "facade"]]
# Concatenate controls
controls_facade = pd.concat([localized_controls, controls_at_port])
return controls_facade
@task
[docs]
def merge_controls_data(
controls: pd.DataFrame,
catch_controls: pd.DataFrame,
controls_fao_areas: pd.DataFrame,
controls_facade: pd.DataFrame,
) -> pd.DataFrame:
controls = pd.merge(
pd.merge(
pd.merge(controls, catch_controls, how="left", on="id"),
controls_fao_areas,
how="left",
on="id",
),
controls_facade,
how="left",
on="id",
)
# Fill null values in jsonb array volumns with []
controls["species_onboard"] = controls.species_onboard.map(
lambda li: li if isinstance(li, list) else []
)
controls["fao_areas"] = controls.fao_areas.map(
lambda li: li if isinstance(li, list) else []
)
# Segment allocation is no longer done in this flow
# If legacy controls data were to be re-imported one day, segments would have to be
# allocated after import using the `recompute_controls_segments` flow
controls["segments"] = [[]] * len(controls)
return controls
@task
[docs]
def make_missions_actions_and_missions_control_units(
controls: pd.DataFrame,
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
# Poseidon control ids are shifted to avoid any overlap the the ids of missions
# created in Monitorenv.
# The resulting shifted index is used both as the mission_action id in Monitorfish
# and as mission id in Monitorenv.
controls["id"] = controls["id"] + POSEIDON_CONTROL_ID_TO_MONITORENV_MISSION_ID_SHIFT
# Create missions
missions_columns = [
"id",
"action_datetime_utc",
"open_by",
"facade",
"mission_order",
"mission_types",
"closed_by",
]
missions = controls[missions_columns].copy(deep=True)
missions["deleted"] = False
missions["mission_source"] = MissionOrigin.POSEIDON_CNSP
missions["closed"] = missions.closed_by.notnull()
missions["start_datetime_utc"] = missions["action_datetime_utc"]
missions["end_datetime_utc"] = missions["action_datetime_utc"]
missions = missions.drop(columns=["action_datetime_utc"])
# Create mission_actions
mission_actions_columns = [
"id",
"action_type",
"action_datetime_utc",
"vessel_id",
"cfr",
"ircs",
"external_immatriculation",
"vessel_name",
"latitude",
"longitude",
"port_locode",
"flag_state",
"facade",
"district_code",
"fao_areas",
"segments",
"gear_onboard",
"infractions",
"species_onboard",
"has_some_gears_seized",
"has_some_species_seized",
"seizure_and_diversion",
"seizure_and_diversion_comments",
"other_comments",
"vessel_targeted",
"open_by",
"closed_by",
]
mission_actions = controls[mission_actions_columns].copy(deep=True)
mission_actions["mission_id"] = mission_actions["id"]
mission_actions["is_from_poseidon"] = True
mission_actions = mission_actions.rename(columns={"open_by": "user_trigram"})
mission_actions = mission_actions.rename(columns={"closed_by": "completed_by"})
# Create missions_control_units
missions_control_units_columns = ["id", "control_unit_id"]
missions_control_units = (
controls[missions_control_units_columns]
.rename(columns={"id": "mission_id"})
.copy(deep=True)
)
return missions, mission_actions, missions_control_units
@task
[docs]
def load_missions_and_missions_control_units(
missions: pd.DataFrame, missions_control_units: pd.DataFrame, loading_mode: str
):
# In "replace" loading mode, we want to replace all `missions` whose
# `mission_souce` is `POSEIDON_CNSP`. So we use `mission_source` as the identifier.
# In "upsert" loading mode, we want to replace only the missions whose `id` is
# present in the DataFrame. So we use `id` as the identifier.
assert loading_mode in ("replace", "upsert")
id_column = "mission_source" if loading_mode == "replace" else "id"
e = create_engine("monitorenv_remote")
with e.begin() as connection:
load(
missions,
table_name="missions",
schema="public",
connection=connection,
logger=get_run_logger(),
pg_array_columns=["mission_types"],
how="upsert",
table_id_column=id_column,
df_id_column=id_column,
enum_columns=["mission_source"],
init_ddls=[
DDL(
"ALTER TABLE public.missions_control_units "
"DROP CONSTRAINT missions_control_units_mission_id_fkey;"
),
DDL(
"ALTER TABLE public.missions_control_units "
"ADD CONSTRAINT missions_control_units_mission_id_cascade_fkey "
"FOREIGN KEY (mission_id) "
"REFERENCES public.missions (id) "
"ON DELETE CASCADE;"
),
DDL(
"ALTER TABLE public.missions_control_resources "
"DROP CONSTRAINT missions_control_resources_mission_id_fkey;"
),
DDL(
"ALTER TABLE public.missions_control_resources "
"ADD CONSTRAINT missions_control_resources_mission_id_cascade_fkey "
"FOREIGN KEY (mission_id) "
"REFERENCES public.missions (id) "
"ON DELETE CASCADE;"
),
],
end_ddls=[
DDL(
"ALTER TABLE public.missions_control_resources "
"DROP CONSTRAINT missions_control_resources_mission_id_cascade_fkey;"
),
DDL(
"ALTER TABLE public.missions_control_resources "
"ADD CONSTRAINT missions_control_resources_mission_id_fkey "
"FOREIGN KEY (mission_id) "
"REFERENCES public.missions (id);"
),
DDL(
"ALTER TABLE public.missions_control_units "
"DROP CONSTRAINT missions_control_units_mission_id_cascade_fkey;"
),
DDL(
"ALTER TABLE public.missions_control_units "
"ADD CONSTRAINT missions_control_units_mission_id_fkey "
"FOREIGN KEY (mission_id) "
"REFERENCES public.missions (id);"
),
],
)
load(
missions_control_units,
table_name="missions_control_units",
schema="public",
connection=connection,
logger=get_run_logger(),
how="append",
)
@task
[docs]
def load_mission_actions(mission_actions: pd.DataFrame, loading_mode: str):
# In "replace" loading mode, we want to replace all `mission_actions` for which
# `is_from_poseidon` is True. So we use `is_from_poseidon` as the identifier.
# In "upsert" loading mode, we want to replace only the `mission_actions` whose id
# is present in the DataFrame. So we use `id` as the identifier.
assert loading_mode in ("replace", "upsert")
id_column = "is_from_poseidon" if loading_mode == "replace" else "id"
load(
mission_actions,
table_name="mission_actions",
schema="public",
db_name="monitorfish_remote",
logger=get_run_logger(),
pg_array_columns=["fao_areas"],
jsonb_columns=[
"segments",
"gear_onboard",
"species_onboard",
"infractions",
],
how="upsert",
table_id_column=id_column,
df_id_column=id_column,
enum_columns=["action_type"],
)
@flow(name="Monitorfish - Controls")
[docs]
def controls_flow(
loading_mode: str,
number_of_months: int,
extract_controls_fn: Callable = extract_controls,
extract_catch_controls_fn: Callable = extract_catch_controls,
load_missions_and_missions_control_units_fn: Callable = load_missions_and_missions_control_units,
):
"""
Controls flow - extracts and processes control data from FMC database
Args:
loading_mode: Either "replace" or "upsert" mode for loading data
number_of_months: Number of months of data to extract
"""
# Extract
controls = extract_controls_fn(number_of_months=number_of_months)
fao_areas = extract_fao_areas()
facade_areas = extract_facade_areas()
ports = extract_ports()
catch_controls = extract_catch_controls_fn()
# Transform
controls = transform_controls(controls)
catch_controls = transform_catch_controls(catch_controls)
controls_fao_areas = compute_controls_fao_areas(controls, fao_areas, ports)
controls_facade = compute_controls_facade(controls, facade_areas, ports)
controls = merge_controls_data(
controls, catch_controls, controls_fao_areas, controls_facade
)
(
missions,
mission_actions,
missions_control_units,
) = make_missions_actions_and_missions_control_units(controls)
# Load
load_mission_actions(
mission_actions,
loading_mode=loading_mode,
)
return load_missions_and_missions_control_units_fn(
missions, missions_control_units, loading_mode=loading_mode
)