Source code for pipeline.src.helpers.spatial

from dataclasses import dataclass
from typing import Iterable, Set, Tuple, Union
from urllib.parse import quote

import h3
import numpy as np
import pandas as pd
import requests
from pyproj import Geod
from shapely.geometry import MultiPolygon, Polygon

from config import GOOGLE_API_TOKEN, LOCATIONIQ_TOKEN
from src.helpers.dates import get_datetime_intervals
from src.processing import rows_belong_to_sequence, zeros_ones_to_bools


@dataclass
[docs] class Position:
[docs] latitude: float
[docs] longitude: float
@dataclass
[docs] class PositionRepresentation: """ Representation of a position with latitude and longitude in human readable format. """
[docs] latitude: str
[docs] longitude: str
[docs] def coordinate_to_dms(coord: float) -> Tuple[int, float, int, int]: """ Takes a coordinate and return the corresponding degrees, minutes_decimal, minutes and seconds. The sign is not taken into account - only returns positive values. Args: coord (float): latitude or longitude coordinate value Returns: Tuple[int, float, int, int]: degrees, minutes_decimal, minutes, seconds Examples: >>> coordinate_to_dms(45.123) (45, 7.379999999999853, 7, 23) >>> coordinate_to_dms(-45.123) (45, 7.379999999999853, 7, 23) """ abs_coord = abs(coord) degrees = int(abs_coord) decimal_part = abs_coord - degrees minutes_decimal = decimal_part * 60 minutes = int(minutes_decimal) seconds = round((minutes_decimal - minutes) * 60) return degrees, minutes_decimal, minutes, seconds
[docs] def position_to_position_representation( p: Position, representation_type: str = "DMS" ) -> PositionRepresentation: """ Converts a `Position` to a `PositionRepresentation` in the designated `representation_type`. Args: p (Position): input `Position` representation_type (str): "DMS" or "DMD". Defaults to "DMS". Returns: PositionRepresentation Raises: ValueError : if : - `lat` is greater than 90.0 and less than -90.0 - `lon` is greater than 180.0 and less than -180.0 - `representation_type` is not 'DMD' or 'DMS'. """ lat = p.latitude lon = p.longitude try: assert -90 <= lat <= 90 except AssertionError: raise ValueError("Latitude must be between -90.0 and +90.0") try: assert -180 <= lon <= 180 except AssertionError: raise ValueError("Longitude must be between -180.0 and +180.0") north_south = "N" if lat >= 0 else "S" east_west = "E" if lon >= 0 else "W" lat_d, lat_md, lat_m, lat_s = coordinate_to_dms(lat) lon_d, lon_md, lon_m, lon_s = coordinate_to_dms(lon) if representation_type == "DMS": p = PositionRepresentation( latitude=f"{lat_d:02}° {lat_m:02}' {lat_s:02}'' {north_south}", longitude=f"{lon_d:03}° {lon_m:02}' {lon_s:02}'' {east_west}", ) elif representation_type == "DMD": p = PositionRepresentation( latitude=f"{lat_d:02}° {lat_md:06.3f}' {north_south}", longitude=f"{lon_d:03}° {lon_md:06.3f}' {east_west}", ) else: raise ValueError( ( "Unexpected `representation_type`. " f"Expected 'DMS' or 'DMD', got {representation_type} instead." ) ) return p
[docs] def to_multipolygon(p: Union[Polygon, MultiPolygon]) -> MultiPolygon: """ Returns a MultiPolygon of the input Polygon or MultiPolygon geometry. """ if isinstance(p, Polygon): res = MultiPolygon([p]) elif isinstance(p, MultiPolygon): res = p else: raise ValueError("Input must be shapely Polygon or MultiPolygon") return res
[docs] def estimate_current_position( last_latitude: float, last_longitude: float, course: float, speed: float, hours_since_last_position: float, max_hours_since_last_position: float = 2.0, on_error: str = "ignore", ) -> Tuple[float, float]: """ Estimate the current position of a vessel based on its last position, course and speed. If the last position is older than max_hours_since_last_position, or is in the future (i.e. hours_since_last_position is negative), returns None. Args: last_latitude (float): last known latitude of vessel last_longitude (float): last known longitude of vessel course (float): last known route of vessel in degrees speed (float): last known speed of vessel in nots hours_since_last_position (float): time since last known position of vessel, in hours max_hours_since_last_position (float): maximum time in hours since last position, after which the estimation is not performed (returns None instead) Defaults to 2.0 on_error (str): 'ignore' or 'raise' Returns: float: estimated current latitude float: estimated current longitude """ if not 0 <= hours_since_last_position <= max_hours_since_last_position: lat, lon = None, None else: geod = Geod(ellps="WGS84") try: distance = speed * hours_since_last_position * 1852 lon, lat, _ = geod.fwd(last_longitude, last_latitude, course, distance) except: if on_error == "ignore": lat, lon = None, None elif on_error == "raise": raise else: raise ValueError( f"on_error argument must be 'ignore' or 'raise', got {on_error}." ) return lat, lon
[docs] def get_h3_indices( df: pd.DataFrame, lat: str = "latitude", lon: str = "longitude", resolution: int = 12, ) -> pd.Series: """ Returns a Series with the same index as the input DataFrame and values equal to the h3 index corresponding to the latitude and longitude of the indicated columns of the DataFrame Args: df (pd.DataFrame): DataFrame with latitude and longitude coordinates in 2 of its columns lat (str): name of the column containing latitudes. Defaults to "latitude". lon (str): name of the column containing longitudes. Defaults to "longitude". resolution (int): h3 resolution of the h3 cells to output. Returns: pd.Series: h3 cells indices """ if len(df) == 0: res = pd.Series([], dtype=object) else: res = df.apply( lambda row: h3.latlng_to_cell(row[lat], row[lon], resolution), axis=1, ) return res
[docs] def get_k_ring_of_h3_cells(h3_sequence: Iterable[str], k: int) -> Set[str]: """ Takes an list-like sequence of h3 cells and an integer k, returns the set of h3 cells that belong to the k-ring of at least one of the h3 cells in the input sequence. Args: h3_sequence (sequence): sequence of h3 cells k (int): number of rings to add around the input cells Returns: sequence[str]: sequence of h3 cells belonging to the k-ring of at least one of the h3 cells in the input sequence """ h3_cells = [h3.grid_disk(h, k) for h in h3_sequence] return set.union(*h3_cells)
[docs] def point_dist(position1: Position, position2: Position) -> float: """ Computes the spherical distance between two Position objects in meters. Args: position1 (Position) position2 (Position) Returns: float: distance in meters between the two input Positions """ d = h3.great_circle_distance( (position1.latitude, position1.longitude), (position2.latitude, position2.longitude), unit="m", ) return d
[docs] def get_step_distances( df: pd.DataFrame, lat: str = "latitude", lon: str = "longitude", how: str = "backward", unit: str = "m", ) -> np.array: """Compute the distance between successive positions (rows). The DataFrame must have latitude and longitude columns. Returns a numpy array with the same length as the input DataFrame and distances as values. Args: df lat (str): column name containing latitudes lon (str): column name containing longitudes how (str): if, 'forward', computes the interval between each position and the next one. if 'backward', computes the interval between each position and the previous one. unit (str): the distance unit (passed to h3.great_circle_distance). Defaults to 'm'. Returns: np.array: array of distances between the successive positions. """ if len(df) < 2: distances = [np.nan] * len(df) else: # For some reason, this is 100x faster than df[[lat, lon]].values lat_lon = np.concatenate( (df[lat].values[:, None], df[lon].values[:, None]), axis=1 ) strides = np.lib.stride_tricks.sliding_window_view( lat_lon, window_shape=(2, 2), ).reshape((len(df) - 1, 4)) # Using a list comprehension is 5x faster than using np.apply_over_axis here distances = np.array( [ h3.great_circle_distance((lat1, lon1), (lat2, lon2), unit=unit) for lat1, lon1, lat2, lon2 in strides ] ) if how == "forward": distances = np.append(distances, [np.nan]) elif how == "backward": distances = np.append([np.nan], distances) else: raise ValueError(f"how must be 'forward' or 'backward', got f{how}") res = np.array(distances) return res
[docs] def compute_movement_metrics( positions: pd.DataFrame, lat: str = "latitude", lon: str = "longitude", datetime_column: str = "datetime_utc", is_at_port_column: str = "is_at_port", time_emitting_at_sea_column: str = "time_emitting_at_sea", ) -> pd.DataFrame: """Takes a pandas DataFrame with: - latitude and longitude columns (float dtypes) - a column indicating the date and time of the position (datetime dtype) - a column indicating whether the vessel is at port (boolean dtype) - a column indicating how long the vessel has been continuously emitting at sea in hours (float dtype) whose rows represent successive positions of a vessel, assumed to be sorted chronologically by ascending order. Returns pandas DataFrame with the same index and columns, with : - speed, distance and time between successive positions as additionnal computed features in new columns - values for `time_emitting_at_sea_column` computed and updated - so if the input contained any NULL values, they will be computed and filled in. Args: positions (pd.DataFrame): DataFrame representing a vessel route lat (str) : column name of latitude values. May not contain null values. lon (str) : column name of longitude values. May not contain null values. datetime_column (str) : column name of datetime values. May not contain null values. is_at_port_column (str) : column indicating whether the vessel is at port. May not contain null values. time_emitting_at_sea_column (float) : column indicating how long the vessel has been continuously emitting at sea, in hours. May contain null values. Returns: pd.DataFrame: the same DataFrame, plus added columns with the computed features """ # TODO: add `maximum_time_between_positions` argument, above which metrics should # not be computed - computing the average speed between positions emitted more than # a few hours apart does not make much sense. positions = positions.copy(deep=True) # Compute meters_from_previous_position meters_from_previous_position = get_step_distances( positions, lat=lat, lon=lon, how="backward", unit="m" ) positions["meters_from_previous_position"] = meters_from_previous_position # Compute time_since_previous_position time_since_previous_position = get_datetime_intervals( positions[datetime_column], how="backward", unit="h" ) positions["time_since_previous_position"] = time_since_previous_position # Compute average speed average_speed = meters_from_previous_position / 1852 / time_since_previous_position positions["average_speed"] = average_speed # Compute time_emitting_at_sea if len(time_since_previous_position) > 0: # time| # intervals| _____ : cumsum of all time # between| vessel at sea | intervals between # positions| <-----------> | sucessive positions # | _____| # | | # | ____| # | ______| . # | | . # | ___| . # | ___| . ____._________ : A: cumsum of time # | | .______| | intervals at sea only # | _____| | | # |__|_____________| |_________ : res = A - cummax( # A*is_at_port) # -----------------*-----------*-------------------------------------> # pos1 pos2 port port successive # exit entry positions # # To compute the cumulative time of uninterrupted emission at sea, we compute # the cumulative sum of time intervals between successive positions at sea (A) # and then subtract A * is_port_entry to bring the number back to zero at each # port entry. is_at_port = positions[is_at_port_column].values was_previously_at_port = np.insert(is_at_port[:-1], 0, False) is_at_port_or_just_left_port = is_at_port | was_previously_at_port t0 = positions[time_emitting_at_sea_column].values[0] t0 = t0 if not np.isnan(t0) else 0 time_emitting_at_sea_intervals = time_since_previous_position.values np.put(time_emitting_at_sea_intervals, 0, t0) time_emitting_at_sea_intervals = time_emitting_at_sea_intervals * ( ~is_at_port_or_just_left_port ) time_emitting_at_sea = np.cumsum(time_emitting_at_sea_intervals) time_emitting_at_sea = time_emitting_at_sea - np.maximum.accumulate( time_emitting_at_sea * is_at_port ) positions[time_emitting_at_sea_column] = time_emitting_at_sea return positions
[docs] def detect_fishing_activity( positions: pd.DataFrame, minimum_minutes_of_emission_at_sea: int, is_at_port_column: str = "is_at_port", average_speed_column: str = "average_speed", time_emitting_at_sea_column: str = "time_emitting_at_sea", minimum_consecutive_positions: int = 3, min_fishing_speed_threshold: float = 0.1, max_fishing_speed_threshold: float = 4.5, return_floats: bool = False, ) -> pd.DataFrame: """ Detects fishing activity from positions of a vessel. Rows of the input DataFrame represent successive positions of the analyzed vessel, assumed to be sorted chronologically by ascending order. The DataFrame must have a columns indicating : 1) whether the position is at port 2) the average speed between each position and the previous one, in knots A vessel will be considered to be fishing if its average speed remains above the `min_fishing_speed_threshold` and below the `max_fishing_speed_threshold` for a minimum of `minimum_consecutive_positions` positions outside a port and after at least `minimum_time_of_emission_at_sea` time of uninterrupted VMS emission outside of a port. Args: positions (pd.DataFrame) : DataFrame representing successive positions of a vessel, assumed to be sorted by ascending datetime minimum_minutes_of_emission_at_sea (int): the minimum time a vessel is required to emit continuously at sea in order to be considred as in fishing activity, in minutes. This avoids detecting fishing activity when vessels leave ports. is_at_port_column (str) : name of the column containing boolean values for whether a position is in at port or not average_speed_column (str) : name of the column containing average speed values (distance from previous position divided by time since the last position), in knots time_emitting_at_sea_column (str): name of the column containing the duration (in hours) for which the vessel has been continuously emitting at sea outside ports. minimum_consecutive_positions (int): minimum number of consecutive positions below fishing speed threshold to consider that a vessel is fishing min_fishing_speed_threshold (float): speed below which a vessel is considered to be stopped max_fishing_speed_threshold (float): speed above which a vessel is considered to be in transit return_floats (bool): if `True`, return `float` dtypes with 1.0 representing `True`, 0.0 representing `False` and `np.nan` for null values. If `False` (the default), the return dtype is `object` and values are `True`, `False` and `np.nan`, which is more explicit and natural but slower. Returns: pd.DataFrame: copy of the input DataFrame with the added boolean column "is_fishing" """ positions = positions.copy(deep=True) if len(positions) == 0: positions["is_fishing"] = None else: is_at_port = positions[is_at_port_column].values average_speed = positions[average_speed_column].values time_emitting_at_sea = positions[time_emitting_at_sea_column].values # The average speed may contain null values, in particular the first position # of a series of positions, for which the time and distance from the previous # positions is not known. In this case we must test whether the outcome of the # evaluation of whether positions are in fishing activity depend on these # unknown speeds : # # - positions that are evaluated as being in fishing activity regardless of # whether the unknown speeds are below or above the fishing speed threshold # can be asserted to be in fishing activity # # - positions that are evaluated as NOT being in fishing activity regardless # of whether the unknown speeds are below or above the fishing speed # threshold can be asserted to NOT be in fishing activity # # - positions for which the evaluation of whether they correspond to a # fishing activity depends on the unknown speeds cannot be decided and # should therefore evaluate to `None`. is_at_fishing_speed = (average_speed >= min_fishing_speed_threshold) * ( average_speed <= max_fishing_speed_threshold ) is_at_fishing_speed_unknown_is_true = np.where( np.isnan(average_speed), True, is_at_fishing_speed ) is_at_fishing_speed_unknown_is_false = np.where( np.isnan(average_speed), False, is_at_fishing_speed ) arr_unknown_speed_is_fishing_speed = np.concatenate( (is_at_port[:, None], is_at_fishing_speed_unknown_is_true[:, None]), axis=1 ) arr_unknown_speed_is_not_fishing_speed = np.concatenate( (is_at_port[:, None], is_at_fishing_speed_unknown_is_false[:, None]), axis=1 ) fishing_activity_unknown_speed_is_fishing_speed = rows_belong_to_sequence( arr_unknown_speed_is_fishing_speed, np.array([False, True]), window_length=minimum_consecutive_positions, ) fishing_activity_unknown_speed_is_not_fishing_speed = rows_belong_to_sequence( arr_unknown_speed_is_not_fishing_speed, np.array([False, True]), window_length=minimum_consecutive_positions, ) fishing_activity = np.where( ( fishing_activity_unknown_speed_is_fishing_speed == fishing_activity_unknown_speed_is_not_fishing_speed ), fishing_activity_unknown_speed_is_fishing_speed, np.nan, ) fishing_activity = np.where( time_emitting_at_sea > minimum_minutes_of_emission_at_sea / 60.0, fishing_activity, False, ) positions["is_fishing"] = fishing_activity if not return_floats: positions["is_fishing"] = zeros_ones_to_bools(positions["is_fishing"]) return positions
[docs] def enrich_positions( positions: pd.DataFrame, minimum_minutes_of_emission_at_sea: int, lat: str = "latitude", lon: str = "longitude", datetime_column: str = "datetime_utc", is_at_port_column: str = "is_at_port", time_emitting_at_sea_column: str = "time_emitting_at_sea", minimum_consecutive_positions: int = 3, min_fishing_speed_threshold: float = 0.1, max_fishing_speed_threshold: float = 4.5, return_floats: bool = False, ) -> pd.DataFrame: """ Applies `compute_movement_metrics` and `detect_fishing_activity` successively. See these two functions for help. """ positions = compute_movement_metrics( positions, lat=lat, lon=lon, datetime_column=datetime_column, is_at_port_column=is_at_port_column, time_emitting_at_sea_column=time_emitting_at_sea_column, ) positions = detect_fishing_activity( positions, minimum_minutes_of_emission_at_sea=minimum_minutes_of_emission_at_sea, is_at_port_column=is_at_port_column, average_speed_column="average_speed", time_emitting_at_sea_column=time_emitting_at_sea_column, minimum_consecutive_positions=minimum_consecutive_positions, min_fishing_speed_threshold=min_fishing_speed_threshold, max_fishing_speed_threshold=max_fishing_speed_threshold, return_floats=return_floats, ) return positions
[docs] def geocode( query_string=None, country_code_iso2=None, backend: str = "Nominatim", **kwargs ): """ Return latitude, longitude for input location from a query string or from one or more of the following keyword arguments: - street - city - county - state - country - postalcode """ if backend == "Nominatim": base_url = "https://nominatim.openstreetmap.org/search" params = {"format": "json"} elif backend == "LocationIQ": assert LOCATIONIQ_TOKEN base_url = "https://eu1.locationiq.com/v1/search.php" params = {"key": LOCATIONIQ_TOKEN, "format": "json"} else: raise ValueError( f"backend param expects 'Nominatim' or 'LocatinIQ', received '{backend}'." ) if query_string is not None and not pd.isna(query_string): if kwargs: print( "Keyword arguments cannot be used in combination with text query. " + "Keyword arguments will be ignored." ) params["q"] = query_string elif kwargs: params = {**params, **kwargs} else: print( "You must provide either a query string or at least 1 of the allowed " + "keyword arguments." ) return None, None if country_code_iso2: params["countrycodes"] = str(country_code_iso2) response = requests.get(base_url, params=params) try: response.raise_for_status() # When the location cannot be geocoded, LocationIQ returns a 404, whereas # Nominatim returns a 200 with an empty responsen, so an additionnal check # is required to cover this case data = response.json() assert len(data) > 0 except (requests.HTTPError, AssertionError): raise ValueError(f"Could not geocode {params}") data = response.json()[0] return float(data["lat"]), float(data["lon"])
[docs] def geocode_google(address=None, **kwargs): """ Return latitude, longitude for input location from a query string, with optionnal filtering on one or more of the following keyword arguments: - postal_code - country (country name or country code ISO2) - route - locality - administrative_area If address is not given, at least one kwarg must be given. """ assert GOOGLE_API_TOKEN base_url = "https://maps.googleapis.com/maps/api/geocode/json?" try: assert address or kwargs except AssertionError: raise ValueError("No arguments given.") params = dict() components = None if address: # params += f"address={quote(address)}" params["address"] = address if kwargs: components = "|".join([f"{k}:{quote(v)}" for k, v in kwargs.items()]) params["components"] = components # params += f"components={components}" # params += f"key={GOOGLE_API_TOKEN}" params["key"] = GOOGLE_API_TOKEN response = requests.get(base_url, params=params) try: response.raise_for_status() data = response.json() status = data["status"] assert status == "OK" except AssertionError: print(f"Could not geocode {address} {components}. Status {status}.") if "error_message" in data: print(data["error_message"]) raise except requests.HTTPError: print(f"HTTPError with {address} {components}") raise location = data["results"][0]["geometry"]["location"] return float(location["lat"]), float(location["lng"])