Source code for pipeline.src.read_query

from pathlib import Path
from typing import Union

import geopandas as gpd
import pandas as pd
from sqlalchemy import Selectable, TextClause, text
from sqlalchemy.engine import Connection, Engine

from config import QUERIES_LOCATION
from src.db_config import create_datawarehouse_client, create_engine


[docs] def read_saved_query( sql_filepath: Union[str, Path], *, db: str = None, con: Union[Connection, Engine] = None, chunksize: Union[None, str] = None, params: Union[dict, None] = None, backend: str = "pandas", geom_col: str = "geom", crs: Union[int, None] = None, parse_dates: Union[list, dict, None] = None, return_pyarrow_dtypes: bool = False, **kwargs, ) -> Union[pd.DataFrame, gpd.GeoDataFrame]: """Run saved SQLquery on a database. Supported databases : - 'ocan' : OCAN database - 'fmc': FMC database - 'monitorfish_remote': Monitorfish database - 'monitorfish_local': Monitorfish PostGIS database hosted in CNSP - 'monitorenv_remote': Monitorfish database - 'cacem_local' : CACEM PostGIS database hosted in CNSP - 'data_warehouse' : Monitorfish/Monitorenv/RapportNav Data Warehouse Database credentials must be present in the environement. Args: sql_filepath (str): path to .sql file, starting from the saved queries folder. example : 'ocan/nav_fr_peche.sql' db (str, optional): Database name. Possible values : 'ocan', 'fmc', 'monitorfish_remote', 'monitorfish_local'. If `db` is None, `con` must be passed. con (Union[Connection, Engine], optional) : `sqlalchemy.engine.Connection` or `sqlalchemy.engine.Engine` object. Mandatory if no `db` is given. Ignored if `db` is given. chunksize (Union[None, str], optional): If specified, return an iterator where `chunksize` is the number of rows to include in each chunk. Defaults to None. params (Union[dict, None], optional): Parameters to pass to execute method. Defaults to None. backend (str, optional) : 'pandas' to run a SQL query and return a `pandas.DataFrame` or 'geopandas' to run a PostGIS query and return a `geopandas.GeoDataFrame`. Defaults to 'pandas'. geom_col (str, optional): column name to convert to shapely geometries when `backend` is 'geopandas'. Ignored when `backend` is 'pandas'. Defaults to 'geom'. crs (Union[None, str], optional) : CRS to use for the returned GeoDataFrame; if not set, tries to determine CRS from the SRID associated with the first geometry in the database, and assigns that to all geometries. Ignored when `backend` is 'pandas'. Defaults to None. parse_dates (Union[list, dict, None], optional): - List of column names to parse as dates. - Dict of ``{column_name: format string}`` where format string is strftime compatible in case of parsing string times or is one of (D, s, ns, ms, us) in case of parsing integer timestamps. - Dict of ``{column_name: arg dict}``, where the arg dict corresponds to the keyword arguments of :func:`pandas.to_datetime` return_pyarrow_dtypes (bool, optional): If `True`, and `db` is `"data_warehouse"`, results are returned as a pandas DataFrame of `pyarrow` dtypes. Ignored if `db` is not `"data_warehouse"`. Defaults to `False`. kwargs : passed to pd.read_sql or gpd.read_postgis Returns: Union[pd.DataFrame, gpd.DataFrame]: Query results """ sql_filepath = QUERIES_LOCATION / sql_filepath with open(sql_filepath, "r") as sql_file: query = sql_file.read() return read_query( query, db=db, con=con, chunksize=chunksize, params=params, backend=backend, geom_col=geom_col, crs=crs, parse_dates=parse_dates, return_pyarrow_dtypes=return_pyarrow_dtypes, **kwargs, )
[docs] def read_query( query: str | Selectable | TextClause, *, db: str = None, con: Union[Connection, Engine] = None, chunksize: Union[None, str] = None, params: Union[dict, None] = None, backend: str = "pandas", geom_col: str = "geom", crs: Union[int, None] = None, parse_dates: Union[list, dict, None] = None, return_pyarrow_dtypes: bool = False, **kwargs, ) -> Union[pd.DataFrame, gpd.GeoDataFrame]: """Run SQLquery on a database. Supported databases : - 'ocan' : OCAN database - 'fmc': FMC database - 'monitorfish_remote': Monitorfish database - 'monitorfish_local': Monitorfish PostGIS database hosted in CNSP - 'monitorenv_remote': Monitorenv database - 'cacem_local' : CACEM PostGIS database hosted in CNSP - 'data_warehouse' : Monitorfish/Monitorenv/RapportNav Data Warehouse Database credentials must be present in the environement. Args: query (str | Selectable | TextClause): Query to execute (must be a string if querying data warehouse). db (str, optional): Database name. Possible values : 'ocan', 'fmc', 'monitorfish_remote', 'monitorfish_local', 'monitorenv_remote', 'cacem_local'. If `db` is None, `con` must be passed. con (Union[Connection, Engine], optional) : `sqlalchemy.engine.Connection` or `sqlalchemy.engine.Engine` object. Mandatory if no `db` is given. Ignored if `db` is given. chunksize (Union[None, str], optional): If specified, return an iterator where `chunksize` is the number of rows to include in each chunk. Defaults to None. params (Union[dict, None], optional): Parameters to pass to execute method. Defaults to None. backend (str, optional) : 'pandas' to run a SQL query and return a `pandas.DataFrame` or 'geopandas' to run a PostGIS query and return a `geopandas.GeoDataFrame`. Defaults to 'pandas'. geom_col (str, optional): column name to convert to shapely geometries when `backend` is 'geopandas'. Ignored when `backend` is 'pandas'. Defaults to 'geom'. crs (Union[None, str], optional) : CRS to use for the returned GeoDataFrame; if not set, tries to determine CRS from the SRID associated with the first geometry in the database, and assigns that to all geometries. Ignored when `backend` is 'pandas'. Defaults to None. parse_dates (Union[list, dict, None], optional): - List of column names to parse as dates. - Dict of ``{column_name: format string}`` where format string is strftime compatible in case of parsing string times or is one of (D, s, ns, ms, us) in case of parsing integer timestamps. - Dict of ``{column_name: arg dict}``, where the arg dict corresponds to the keyword arguments of :func:`pandas.to_datetime` return_pyarrow_dtypes (bool, optional): If `True`, and `db` is `"data_warehouse"`, results are returned as a pandas DataFrame of `pyarrow` dtypes. Ignored if `db` is not `"data_warehouse"`. Defaults to `False`. kwargs : passed to pd.read_sql or gpd.read_postgis Returns: Union[pd.DataFrame, gpd.DataFrame]: Query results """ if db == "data_warehouse": assert isinstance(query, str) client = create_datawarehouse_client() # `query_df` returns an empty DataFrame without any column when there are no # rows in the result set. Using query_arrow().to_pandas() returns # an (empty) DataFrame with columns, thus preserving the consistency of the # expected dataset. types_mapper = pd.ArrowDtype if return_pyarrow_dtypes else None return client.query_arrow(query, parameters=params).to_pandas( types_mapper=types_mapper ) else: if isinstance(query, str): query = text(query) else: assert isinstance(query, (Selectable, TextClause)) if db: con = create_engine(db=db, execution_options=dict(stream_results=True)) elif con: assert isinstance(con, (Engine, Connection)) else: raise ValueError("At least one of `db` or `con` must be passed.") if backend == "pandas": return pd.read_sql( query, con, chunksize=chunksize, parse_dates=parse_dates, params=params, **kwargs, ) elif backend == "geopandas": return gpd.read_postgis( query, con, geom_col=geom_col, crs=crs, chunksize=chunksize, parse_dates=parse_dates, params=params, **kwargs, ) else: raise ValueError(f"backend must be 'pandas' or 'geopandas', got {backend}")
[docs] def read_table(db: str, schema: str, table_name: str): """Loads database table into pandas Dataframe. Supported databases : - 'ocan' : OCAN database - 'fmc': FMC database - 'monitorfish_remote': Monitorfish database - 'monitorfish_local': Monitorfish PostGIS database hosted in CNSP - 'monitorenv_remote': Monitorenv database - 'cacem_local' : CACEM PostGIS database hosted in CNSP Args: db (str): Database name. Possible values : 'ocan', 'fmc', 'monitorfish_remote', 'monitorfish_local', 'monitorenv_remote', 'cacem_local'. schema (str): Schema name table_name (str): Table name Returns: pd.DataFrame: Dataframe containing the entire table """ engine = create_engine(db=db) return pd.read_sql_table(table_name, engine, schema=schema)