import io
from dataclasses import dataclass
from typing import List, Set
import geopandas as gpd
import h3
import pandas as pd
import requests
from prefect import flow, get_run_logger, task
from vptree import VPTree
from config import (
ANCHORAGES_H3_CELL_RESOLUTION,
ANCHORAGES_URL,
LIBRARY_LOCATION,
PROXIES,
ROOT_DIRECTORY,
)
from src.db_config import create_engine
from src.generic_tasks import load
from src.helpers.spatial import (
Position,
get_h3_indices,
get_k_ring_of_h3_cells,
point_dist,
)
from src.read_query import read_query
from src.utils import psql_insert_copy
#######################################################################################
####################################### HELPERS #######################################
#######################################################################################
@dataclass
[docs]
class PortsVPTree(VPTree):
"""
Vantage Point Tree to efficiently find the nearest port from a given
Position(lat, lon).
If there are p ports in the tree, searching for the port that is closest to a
given Position has complexity log(p).
"""
def __init__(self, ports_locations: List[PortLocation]):
super().__init__(ports_locations, point_dist)
[docs]
def get_nearest_port(self, pos: Position) -> dict:
"""
Returns the distance (in meters) and locode of the PortLocation
that is closest to the input Position.
Args:
pos (Position): Position instance
Returns:
dict: dict with nearest_port_distance and
nearest_port_locode keys.
"""
nearest_neighbor = self.get_nearest_neighbor(pos)
nearest_port_distance = nearest_neighbor[0]
nearest_port_location = nearest_neighbor[1]
return {
"nearest_port_distance": nearest_port_distance,
"nearest_port_locode": nearest_port_location.locode,
}
#######################################################################################
################################### TASKS AND FLOWS ###################################
#######################################################################################
############### Flow to compute anchorages and attribute cells to ports ###############
@task
@task
@task
@task
@task
@task
@task
[docs]
def get_anchorage_h3_cells(
static_positions: pd.DataFrame,
h3_resolution: int = 9,
number_signals_threshold: int = 100,
) -> pd.DataFrame:
"""
Bins input positions into h3 cells of the given resolutions and filters said h3
cells to keep only the ones that appear at least `number_signals_threshold` times
in the dataset.
Args:
static_positions (pd.DataFrame): DataFrame with latitude and longitude
columns
h3_resolution (int): h3 resolution to use
number_signals_threshold (int): number of occurences below which h3 cells are
filtered out
"""
static_positions = static_positions[["latitude", "longitude"]].copy()
static_positions["h3"] = get_h3_indices(static_positions, resolution=h3_resolution)
signals_by_hexagon = (
static_positions.groupby("h3")
.count()
.rename(columns={"longitude": "number_signals"})
.reset_index()[["h3", "number_signals"]]
)
anchorage_h3_cells = set(
signals_by_hexagon.loc[
signals_by_hexagon.number_signals >= number_signals_threshold, "h3"
]
)
return anchorage_h3_cells
@task
[docs]
def get_anchorage_h3_cells_rings(
ais_anchorage_h3_cells: Set[str],
vms_anchorage_h3_cells: Set[str],
manual_anchorage_h3_cells: Set[str],
) -> pd.DataFrame:
"""
Unites two sets of h3 cells corresponding to anchorage locations of vessels
in AIS and VMS data, then adds two "rings" of cells around them.
Returns the result as a DataFrame containing the indices, latitude and longitude
of cells as well as whether each cell was present in the original cells (ring 0)
or was added in rings 1 and 2 that surround the initial cells.
Args:
ais_anchorage_h3_cells (Set[str]): set of indices of h3 cells where
vessels anchor (AIS data)
vms_anchorage_h3_cells (Set[str]): set of indices of h3 cells where
vessels anchor (VMS data)
manual_anchorage_h3_cells (Set[str]): set of additional indices of h3 cells
returns:
pd.DataFrame: DataFrame of h3 cells with 2 levels of rings added
"""
anchorage_h3_cells = ais_anchorage_h3_cells.union(vms_anchorage_h3_cells).union(
manual_anchorage_h3_cells
)
anchorage_h3_cells_ring_1 = (
get_k_ring_of_h3_cells(anchorage_h3_cells, k=1) - anchorage_h3_cells
)
anchorage_h3_cells_ring_2 = (
get_k_ring_of_h3_cells(anchorage_h3_cells, k=2)
- anchorage_h3_cells_ring_1
- anchorage_h3_cells
)
df_0 = pd.DataFrame(anchorage_h3_cells, columns=["h3"])
df_0["ring_number"] = 0
df_1 = pd.DataFrame(anchorage_h3_cells_ring_1, columns=["h3"])
df_1["ring_number"] = 1
df_2 = pd.DataFrame(anchorage_h3_cells_ring_2, columns=["h3"])
df_2["ring_number"] = 2
anchorage_h3_cells_rings = pd.concat([df_0, df_1, df_2])
anchorage_h3_cells_rings[
["latitude", "longitude"]
] = anchorage_h3_cells_rings.apply(
lambda row: h3.cell_to_latlng(row["h3"]), result_type="expand", axis=1
)
return anchorage_h3_cells_rings
@task
[docs]
def get_ports_locations(ports: pd.DataFrame) -> List[PortLocation]:
"""
Transforms a DataFrame into a list of PortLocation objects.
Args:
ports (pd.DataFrame): DataFrame with columns matching
the fields of a PortLocation object.
Returns:
List[PortLocation]
"""
ports_locations = [PortLocation(**port) for port in ports.to_dict(orient="records")]
return ports_locations
@task
[docs]
def get_anchorages_closest_port(
anchorage_h3_cells_rings: pd.DataFrame, ports_locations: List[PortLocation]
) -> pd.DataFrame:
ports_vptree = PortsVPTree(ports_locations)
anchorages_closest_port = anchorage_h3_cells_rings.apply(
lambda row: ports_vptree.get_nearest_port(row), axis=1, result_type="expand"
)
return pd.concat(
[
anchorage_h3_cells_rings.rename(
columns={"latitude": "cell_latitude", "longitude": "cell_longitude"}
),
anchorages_closest_port,
],
axis=1,
)
@task
[docs]
def unite_ports_locodes(
ers_ports_locode: Set[str], control_ports_locodes: Set[str]
) -> Set[str]:
"""
Unites sets of port locodes.
Args:
ers_ports_locode (Set[str]) : set of the locodes of ports used in ERS
control_ports_locodes (Set[str]) : set of the locodes of ports used in controls
Returns:
Set[str]: union of the two input sets
"""
return ers_ports_locode.union(control_ports_locodes)
@task
[docs]
def get_active_ports(
ports: pd.DataFrame, active_ports_locodes: Set[str]
) -> pd.DataFrame:
active_ports = ports[ports.locode.isin(active_ports_locodes)].copy(deep=True)
return active_ports
@task
[docs]
def merge_closest_port_closest_active_port(
anchorages_closest_port: pd.DataFrame, anchorages_closest_active_port: pd.DataFrame
) -> pd.DataFrame:
"""
Merges anchorages closest port and closest active port.
"""
anchorages_closest_active_port = anchorages_closest_active_port.rename(
columns={
"nearest_port_distance": "nearest_active_port_distance",
"nearest_port_locode": "nearest_active_port_locode",
}
)
return pd.merge(
anchorages_closest_port,
anchorages_closest_active_port[
["h3", "nearest_active_port_distance", "nearest_active_port_locode"]
],
on="h3",
)
@task
[docs]
def load_processed_anchorages(anchorages: pd.DataFrame):
"""
Load anchorages to processed.anchorages
"""
e = create_engine("monitorfish_remote")
anchorages.to_sql(
name="anchorages_2023_01",
con=e,
schema="processed",
if_exists="replace",
index=False,
method=psql_insert_copy,
)
@flow(name="Monitorfish - Anchorages Compute")
[docs]
def anchorages_compute_flow(
h3_resolution: int = ANCHORAGES_H3_CELL_RESOLUTION,
number_signals_threshold: int = 100,
static_vms_positions_file_path: str = "data/raw/anchorages/static_vms_positions_2021_03_to_10.parquet",
):
"""Flow to compute anchorages and attribute cells to ports"""
# Extract
ais_anchorage_coordinates = extract_ais_anchorage_coordinates()
vms_static_positions = extract_vms_static_positions(static_vms_positions_file_path)
ports = extract_ports()
ers_ports_locodes = extract_ers_ports_locodes()
control_ports_locodes = extract_control_ports_locodes()
manual_anchorages_coordinates = extract_manual_anchorages_coordinates()
# Transform
manual_anchorage_h3_cells = get_anchorage_h3_cells(
manual_anchorages_coordinates,
h3_resolution=h3_resolution,
number_signals_threshold=0,
)
ais_anchorage_h3_cells = get_anchorage_h3_cells(
ais_anchorage_coordinates,
h3_resolution=h3_resolution,
number_signals_threshold=0,
)
vms_anchorage_h3_cells = get_anchorage_h3_cells(
vms_static_positions,
h3_resolution=h3_resolution,
number_signals_threshold=number_signals_threshold,
)
anchorage_h3_cells_rings = get_anchorage_h3_cells_rings(
ais_anchorage_h3_cells, vms_anchorage_h3_cells, manual_anchorage_h3_cells
)
ports_locations = get_ports_locations(ports)
active_ports_locodes = unite_ports_locodes(ers_ports_locodes, control_ports_locodes)
active_ports = get_active_ports(ports, active_ports_locodes)
active_ports_locations = get_ports_locations(active_ports)
anchorages_closest_port = get_anchorages_closest_port(
anchorage_h3_cells_rings, ports_locations
)
anchorages_closest_active_port = get_anchorages_closest_port(
anchorage_h3_cells_rings, active_ports_locations
)
anchorages = merge_closest_port_closest_active_port(
anchorages_closest_port, anchorages_closest_active_port
)
# Load
load_processed_anchorages(anchorages)
### Flow to extract anchorages from data.gouv.fr and upload to Monitorfish database ###
@task
@task
[docs]
def load_anchorages_to_monitorfish(anchorages: pd.DataFrame):
"""
Loads anchorages data to monitorfish database.
Args:
anchorages (pd.DataFrame): anchorages data
"""
load(
anchorages,
table_name="anchorages",
schema="public",
db_name="monitorfish_remote",
logger=get_run_logger(),
how="replace",
replace_with_truncate=True,
)
@flow(name="Monitorfish - Anchorages")
[docs]
def anchorages_flow():
"""Main anchorages flow - extract from data.gouv.fr and load to database"""
anchorages = extract_datagouv_anchorages(
anchorages_url=ANCHORAGES_URL, proxies=PROXIES
)
load_anchorages_to_monitorfish(anchorages)