from logging import Logger
import pandas as pd
from prefect import flow, get_run_logger, task
from sqlalchemy import text
from src.db_config import create_engine
from src.generic_tasks import extract
from src.helpers.dates import Period
from src.helpers.spatial import enrich_positions
from src.processing import left_isin_right_by_decreasing_priority, zeros_ones_to_bools
from src.shared_tasks.dates import make_periods
from src.shared_tasks.positions import tag_positions_at_port
from src.utils import psql_insert_copy
[docs]
def filter_already_enriched_vessels(positions: pd.DataFrame) -> pd.DataFrame:
"""
Filters the input positions `DateFrame` by removing positions of vessels that have
all their positions already enriched (which is detected by checking whether the
`time_emitting_at_sea` column contains any null values).
Args:
positions (pd.DataFrame): vessels' positions. Must have columns:
- 'cfr'
- 'external_immatriculation'
- 'ircs'
- 'time_emitting_at_sea'
- any other column required for the rest of the flow (latitude, longitude,
datetime...)
Returns:
pd.DataFrame: same as input with some rows removed.
"""
vessels_to_enrich = positions[positions.time_emitting_at_sea.isna()][
["cfr", "external_immatriculation", "ircs"]
].drop_duplicates()
positions_to_enrich = positions.loc[
left_isin_right_by_decreasing_priority(
positions[["cfr", "external_immatriculation", "ircs"]], vessels_to_enrich
)
].reset_index(drop=True)
return positions_to_enrich
[docs]
def enrich_positions_by_vessel(
positions: pd.DataFrame,
minimum_consecutive_positions: int,
min_fishing_speed_threshold: float,
max_fishing_speed_threshold: float,
minimum_minutes_of_emission_at_sea: int,
) -> pd.DataFrame:
"""
Applies `enrich_positions` to each vessel's positions.
Args:
positions (pd.DataFrame): input positions. Must have columns:
- 'cfr'
- 'external_immatriculation'
- 'ircs'
- 'latitude'
- 'longitude'
- 'datetime_utc'
- 'is_at_port'
- 'time_emitting_at_sea'
Returns:
pd.DataFrame: same as input, with the following columns added:
- 'meters_from_previous_position'
- 'time_since_previous_position'
- 'average_speed'
- 'is_fishing'
and with the `time_emitting_at_sea` values recomputed / updated.
"""
if len(positions) == 0:
# With an empty DataFrame, the `groupby` has nothing to group on and therefore
# `enrich_positions` does not get applied at all, which causes the result to
# be equal to the input and therefore some columns are missing.
# In this case, applying `enrich_positions` without any groupby just adds the
# desired columns and solves the problem.
res = enrich_positions(
positions,
minimum_minutes_of_emission_at_sea=minimum_minutes_of_emission_at_sea,
)
else:
res = positions.groupby(
["cfr", "ircs", "external_immatriculation"], dropna=False, group_keys=False
).apply(
enrich_positions,
minimum_minutes_of_emission_at_sea=minimum_minutes_of_emission_at_sea,
minimum_consecutive_positions=minimum_consecutive_positions,
min_fishing_speed_threshold=min_fishing_speed_threshold,
max_fishing_speed_threshold=max_fishing_speed_threshold,
return_floats=True,
)
# It is much faster to apply zeros_ones_to_bool once after processing all
# vessels' positions than to apply it inside the enrich_position function
# for each vessel individually
res["is_fishing"] = zeros_ones_to_bools(res["is_fishing"])
return res
[docs]
def load_fishing_activity(positions: pd.DataFrame, period: Period, logger: Logger):
"""Updates `positions` table with the contents of the input `DataFrame`.
The input `DataFrame` must have columns:
- id
- is_at_port
- meters_from_previous_position
- time_since_previous_position
- average_speed
- is_fishing
- time_emitting_at_sea
Args:
positions (pd.DataFrame): Enriched positions data
period (Period): the `Period` covered by the input `DataFrame`. This is used
to add a `where` clause on the `positions` hypertable limiting the time range
queried when looking for `id` corresponding to the rows to update.
logger (Logger): `Logger`
"""
e = create_engine("monitorfish_remote")
with e.begin() as connection:
logger.info("Creating temporary table")
connection.execute(
text(
"CREATE TEMP TABLE tmp_enriched_positions("
" id INTEGER PRIMARY KEY,"
" is_at_port BOOLEAN,"
" meters_from_previous_position REAL,"
" time_since_previous_position DOUBLE PRECISION,"
" average_speed REAL,"
" is_fishing BOOLEAN,"
" time_emitting_at_sea DOUBLE PRECISION"
")"
"ON COMMIT DROP;"
)
)
columns_to_load = [
"id",
"is_at_port",
"meters_from_previous_position",
"time_since_previous_position",
"average_speed",
"is_fishing",
"time_emitting_at_sea",
]
logger.info("Loading to temporary table")
positions[columns_to_load].to_sql(
"tmp_enriched_positions",
connection,
if_exists="append",
index=False,
method=psql_insert_copy,
)
logger.info("Updating positions from temporary table")
connection.execute(
text(
"UPDATE public.positions p "
"SET "
" is_at_port = ep.is_at_port, "
" meters_from_previous_position = COALESCE( "
" ep.meters_from_previous_position, "
" p.meters_from_previous_position "
" ), "
" time_since_previous_position = COALESCE( "
" ep.time_since_previous_position, "
" p.time_since_previous_position "
" ), "
" average_speed = COALESCE( "
" ep.average_speed, "
" p.average_speed "
" ), "
" is_fishing = COALESCE( "
" ep.is_fishing, "
" p.is_fishing "
" ),"
" time_emitting_at_sea = COALESCE( "
" ep.time_emitting_at_sea, "
" p.time_emitting_at_sea "
" )"
"FROM tmp_enriched_positions ep "
"WHERE p.id = ep.id "
"AND p.date_time >= :start "
"AND p.date_time <= :end;"
),
{
"start": period.start,
"end": period.end,
},
)
@task
@flow(name="Monitorfish - Enrich positions")
[docs]
def enrich_positions_flow(
start_hours_ago: int,
end_hours_ago: int,
minutes_per_chunk: int,
chunk_overlap_minutes: int,
minimum_consecutive_positions: int,
minimum_minutes_of_emission_at_sea: int,
min_fishing_speed_threshold: float,
max_fishing_speed_threshold: float,
recompute_all: bool,
):
periods = make_periods(
start_hours_ago,
end_hours_ago,
minutes_per_chunk,
chunk_overlap_minutes,
)
for period in periods:
extract_enrich_load(
period,
minimum_consecutive_positions=minimum_consecutive_positions,
min_fishing_speed_threshold=min_fishing_speed_threshold,
max_fishing_speed_threshold=max_fishing_speed_threshold,
minimum_minutes_of_emission_at_sea=minimum_minutes_of_emission_at_sea,
recompute_all=recompute_all,
)