Source code for pipeline.src.flows.export_species
import io
import zipfile
import pandas as pd
import requests
from prefect import flow, task
from config import FAO_SPECIES_URL, ISSCAAP_GROUPS_URL, PROXIES
@task
[docs]
def extract_species(url: str, proxies: dict) -> pd.DataFrame:
# Extract zipfile from fao.org
r = requests.get(url, proxies=proxies)
r.raise_for_status()
zipobj = zipfile.ZipFile(io.BytesIO(r.content))
# Find .txt file in zipfile
files = zipobj.namelist()
txt_files = list(filter(lambda s: s.split(".")[-1] == "txt", files))
try:
assert len(txt_files) == 1
except AssertionError:
raise ValueError(f".txt file could not be identifide in {url}")
txt_file = txt_files[0]
# Extract .txt file from zipfile
species = pd.read_csv(zipobj.open(txt_file))
species = species.rename(
columns={
"ISSCAAP_Group ": "isscaap_code",
"Taxonomic_Code": "taxocode",
"Alpha3_Code": "species_code",
"Scientific_Name": "scientific_name",
"English_name": "english_name",
"French_name": "french_name",
"Spanish_name": "spanish_name",
"Arabic_name": "arabic_name",
"Chinese_name": "chinese_name",
"Russian_name": "russian_name",
"Author": "author",
"Family": "family",
"Order or higher taxa": "order",
"FishStat_Data": "stats_data",
}
)
return species
@task
[docs]
def extract_isscaap_groups(url: str, proxies: dict) -> pd.DataFrame:
# Extract isscaap codes table
r = requests.get(url, proxies=proxies)
r.raise_for_status()
r.encoding = "utf8"
f = io.StringIO(r.text)
isscaap_groups = pd.read_csv(f)
return isscaap_groups
@task
[docs]
def transform_species(
species: pd.DataFrame, isscaap_groups: pd.DataFrame
) -> pd.DataFrame:
res = pd.merge(species, isscaap_groups, on="isscaap_code", how="left")
res["order"] = res.order.map(str.capitalize)
column_order = [
"species_code",
"taxocode",
"scientific_name",
"english_name",
"french_name",
"spanish_name",
"arabic_name",
"chinese_name",
"russian_name",
"author",
"family",
"order",
"stats_data",
"isscaap_code",
"isscaap_group_en",
"isscaap_group_fr",
"isscaap_group_es",
"isscaap_division_code",
"isscaap_division_en",
"isscaap_division_fr",
"isscaap_division_es",
]
return res[column_order]
@task
[docs]
def export_species(species: pd.DataFrame, csv_filepath: str) -> None:
species.to_csv(csv_filepath, index=False, encoding="utf8")
@flow(name="Monitorfish - Export Species")
[docs]
def export_species_flow(csv_filepath: str):
species = extract_species(url=FAO_SPECIES_URL, proxies=PROXIES)
isscaap_groups = extract_isscaap_groups(url=ISSCAAP_GROUPS_URL, proxies=PROXIES)
species = transform_species(species, isscaap_groups)
export_species(species, csv_filepath=csv_filepath)