Source code for pipeline.src.flows.fao_areas

import geopandas as gpd
import requests
from prefect import Task, flow, get_run_logger, task

from config import FAO_AREAS_URL, PROXIES
from src.generic_tasks import load


@task
[docs] def extract_fao_areas(url: str, proxies: dict) -> gpd.GeoDataFrame: """ Download shapefile of FAO areas and load to GeoDataFrame. Args: url (str): url to fetch the shapefile from proxies (dict): http and https proxies to use for the download. Returns: gpd.GeoDataFrame: GeoDataFrame of FAO areas """ r = requests.get(url, proxies=proxies) r.raise_for_status() fao_areas = gpd.read_file(r.text) return fao_areas
@task
[docs] def transform_fao_areas(fao_areas: gpd.GeoDataFrame) -> gpd.GeoDataFrame: """ Transforms the ``fao_areas`` DataFrame to match the desired table columns. """ fao_areas = fao_areas.copy(deep=True) fao_areas.columns = fao_areas.columns.map(str.lower) fao_areas = fao_areas.drop(columns=["id"]) fao_areas = gpd.GeoDataFrame(fao_areas) fao_areas = fao_areas.rename(columns={"geometry": "wkb_geometry"}) fao_areas = fao_areas.set_geometry("wkb_geometry") return fao_areas
@task
[docs] def load_fao_areas(fao_areas: gpd.GeoDataFrame): logger = get_run_logger() load( fao_areas, table_name="fao_areas", schema="public", db_name="monitorfish_remote", logger=logger, how="replace", replace_with_truncate=True, )
@flow(name="Monitorfish - FAO areas")
[docs] def fao_areas_flow( url: str = FAO_AREAS_URL, proxies: dict = PROXIES, extract_fao_areas_fn: Task = extract_fao_areas, ): fao_areas = extract_fao_areas_fn(url=url, proxies=proxies) fao_areas = transform_fao_areas(fao_areas) load_fao_areas(fao_areas)