Source code for pipeline.src.flows.species

import io

import pandas as pd
import requests
from prefect import flow, get_run_logger, task

from config import DATA_GOUV_SPECIES_URL, PROXIES
from src.generic_tasks import load
from src.processing import coalesce


@task
[docs] def extract_species(url: str, proxies: dict) -> pd.DataFrame: r = requests.get(url, proxies=proxies) r.encoding = "utf8" f = io.StringIO(r.text) usecols = [ "species_code", "scientific_name", "french_name", ] species = pd.read_csv(f, usecols=usecols) return species
@task
[docs] def transform_species(species: pd.DataFrame) -> pd.DataFrame: res = species.copy(deep=True) # Coalesce french_name and scientific_name name_columns = ["french_name", "scientific_name"] res["species_name"] = coalesce(res[name_columns]) res = res.drop(columns=name_columns) # ADD BFT calibers res = pd.concat( [ res, pd.DataFrame( { "species_code": ["BF1", "BF2", "BF3"], "species_name": [ "Thon rouge de l'Atlantique (Calibre 1)", "Thon rouge de l'Atlantique (Calibre 2)", "Thon rouge de l'Atlantique (Calibre 3)", ], } ), ], axis=0, ).reset_index(drop=True) # Add id column res["id"] = res.index.values # Add SCIP species type pelagic_species = [ "ALB", "ANE", "WHB", "BOC", "BOR", "HER", "ARU", "JAX", "HOM", "HMM", "PIL", "SPR", "SAN", "NOP", "MAC", ] demersal_species = [ "ALF", "ANF", "ANK", "BLI", "BSF", "COD", "ELE", "GFB", "GHL", "HAD", "HKE", "LDB", "LEZ", "LIN", "MEG", "MNZ", "MON", "NEP", "PLE", "POK", "PRA", "RHG", "RJC", "RJE", "RJF", "RJH", "RJI", "RJM", "RJN", "RJU", "RNG", "SBR", "SOL", "SOO", "SRX", "USK", "WHG", ] tuna = ["BFT", "SWO", "YFT", "SKJ", "BET", "BF1", "BF2", "BF3"] other = ["DPS", "LKJ", "ARA", "ARS", "MUT", "MUX", "COL", "DOL"] species_type = { **{s: "PELAGIC" for s in pelagic_species}, **{s: "DEMERSAL" for s in demersal_species}, **{s: "TUNA" for s in tuna}, **{s: "OTHER" for s in other}, } res["scip_species_type"] = res.species_code.map(species_type) return res
@task
[docs] def load_species(species: pd.DataFrame): load( species, table_name="species", schema="public", db_name="monitorfish_remote", logger=get_run_logger(), how="replace", replace_with_truncate=True, )
@flow(name="Monitorfish - Species")
[docs] def species_flow(extract_species_task=extract_species): species = extract_species_task(url=DATA_GOUV_SPECIES_URL, proxies=PROXIES) species = transform_species(species) load_species(species)