pipeline.src.generic_tasks
Functions
|
Run SQL query against the indicated database and return the result as a |
|
Load a DataFrame or GeoDataFrame to a database table using sqlalchemy. The table |
|
|
|
Delete rows from a database table. |
|
Prefect task decorated version of read_query. |
Module Contents
- pipeline.src.generic_tasks.extract(db_name: str, query_filepath: pathlib.Path | str, dtypes: dict = None, parse_dates: list | dict = None, params: dict = None, backend: str = 'pandas', geom_col: str = 'geom', crs: int = None, return_pyarrow_dtypes: bool = False) pandas.DataFrame | geopandas.GeoDataFrame[source]
Run SQL query against the indicated database and return the result as a pandas.DataFrame.
- Parameters:
db_name (str) – name of the database to extract from. Possible values : - ‘fmc’ - ‘ocan’ - ‘monitorfish_local’ - ‘monitorfish_remote’ - ‘monitorenv_remove’ - ‘cacem_local’ - ‘data_warehouse’
query_filepath (Union[Path, str]) – path to .sql file, starting from the saved queries folder. example : “ocan/nav_fr_peche.sql”
dtypes (Union[None, dict], optional) – If specified, use {col: dtype, …}, where col is a column label and dtype is a numpy.dtype or Python type to cast one or more of the DataFrame’s columns to column-specific types. Defaults to None.
parse_dates (Union[list, dict, None], optional) –
List of column names to parse as dates.
Dict of
{column_name: format string}where format string is strftime compatible in case of parsing string times or is one of (D, s, ns, ms, us) in case of parsing integer timestamps.Dict of
{column_name: arg dict}, where the arg dict corresponds to the keyword arguments ofpandas.to_datetime()
Defaults to None.
params (Union[dict, None], optional) – Parameters to pass to execute method. Defaults to None.
backend (str, optional) – ‘pandas’ to run a SQL query and return a pandas.DataFrame or ‘geopandas’ to run a PostGIS query and return a geopandas.GeoDataFrame. Defaults to ‘pandas’.
geom_col (str, optional) – column name to convert to shapely geometries when backend is ‘geopandas’. Ignored when backend is ‘pandas’. Defaults to ‘geom’.
crs (str, optional) – CRS to use for the returned GeoDataFrame; if not set, tries to determine CRS from the SRID associated with the first geometry in the database, and assigns that to all geometries. Ignored when backend is ‘pandas’. Defaults to None.
return_pyarrow_dtypes (bool, optional) – If True, and db_name is “data_warehouse”, results are returned as a pandas DataFrame of pyarrow dtypes. Ignored if db_name is not “data_warehouse”. Defaults to False.
- Returns:
Query results
- Return type:
Union[pd.DataFrame, gpd.GeoDataFrame]
- pipeline.src.generic_tasks.load(df: pandas.DataFrame | geopandas.GeoDataFrame, *, table_name: str, schema: str, logger: logging.Logger, how: str = 'replace', replace_with_truncate: bool = False, db_name: str = None, pg_array_columns: list = None, handle_array_conversion_errors: bool = True, value_on_array_conversion_error: str = '{}', jsonb_columns: list = None, table_id_column: str = None, df_id_column: str = None, nullable_integer_columns: list = None, timedelta_columns: list = None, enum_columns: list = None, connection: sqlalchemy.engine.Connection = None, init_ddls: List[sqlalchemy.DDL] = None, end_ddls: List[sqlalchemy.DDL] = None, bytea_columns: list = None)[source]
Load a DataFrame or GeoDataFrame to a database table using sqlalchemy. The table must already exist in the database.
- Parameters:
df (Union[pd.DataFrame, gpd.GeoDataFrame]) – data to load
table_name (str) – name of the table
schema (str) – database schema of the table
logger (logging.Logger) – logger instance
how (str) –
one of
’replace’ to truncate the table before loading
’append’ to append the data to rows already in the table
’upsert’ to append the rows to the table, replacing the rows whose id is already present in the table
replace_with_truncate (bool) – if how is replace, and replace_with_truncate is True, the table to replace will be truncated before loading the new data. If how is replace, and replace_with_truncate is False (the default), the table to replace will be deleted before loading the new data. If how is anything but replace, replace_with_truncate is ignored. TRUNCATE is more efficient than DELETE as the whole file holding table data is dropped, rather than deleting rows one by one as DELETE does. It also results in reallocating new pages and therefore results in table data without any bloat (dead or free space in data pages). However, TRUNCATE requires an ACCESS EXCLUSIVE lock on the table, which may conflict with other database operations, notably pg_dump and ALTER TABLE commands, and result in a deadlock and therefore downtime of the entire system during database backup or migration. Use only if you know what you’re doing.
db_name (str, optional) – Required if a connection is not provided. ‘monitorfish_remote’, ‘monitorenv_remote’ or ‘monitorfish_local’. Defaults to None.
pg_array_columns (list, optional) – columns containing sequences that must be serialized before loading into columns with Postgresql Array type
handle_array_conversion_errors (bool) – whether to handle or raise upon error during the serialization of columns to load into Postgresql Array columns. Defaults to True.
value_on_array_conversion_error (str, optional) – if handle_array_conversion_errors, the value to use when an error must be handled. Defaults to ‘{}’.
jsonb_columns (list, optional) – columns containing values that must be serialized before loading into columns with Postgresql JSONB type
table_id_column (str, optional) – name of the table column to use an id. Required if how is “upsert”.
df_id_column (str, optional) – name of the DataFrame column to use an id. Required if how is “upsert”.
nullable_integer_columns (list, optional) – columns containing values that must loaded into columns with Postgresql Integer type. If these columns contain NA values, pandas will automatically change the dtype to float and the loading into Postgreql Integer columns will fail, so it is necessary to serialize these values as Integer-compatible str objects.
timedelta_columns (list, optional) – columns containing Timedelta values to load into Postgresql Interval columns. If these columns contain NaT values, the loading will fail, so it is necessary to serialize these values as Interval-compatible str objects.
enum_columns (list, optional) – columns containing Enum values to load into Postgresql. Values in these columns will be converted to string using the enum’s .value. Null values will remain null.
connection (Connection, optional) – Databse connection to use for the insert operation. If not provided, db_name must be given and a connection to the designated database will be created for the insert operation. Defaults to None.
init_ddls – (List[DDL], optional): If given, these DDLs will be executed before the loading operation. Defaults to None.
end_ddls – (List[DDL], optional): If given, these DDLs will be executed after the loading operation. Defaults to None.
bytea_columns (list, optional) –
columns containing bytes that must be serialized before loading into columns with Postgresql BYTEA type. Serialization is done following Postgresql Hex format, which consists in representing each byte by two hexadecimal digits and prefixing the whole hex string by ‘x’. For example, for a two-byte sequence (01011001, 11000001) :
the hex representation of each byte is computed :
’59’ for 01011001
’c1’ for 11000001
the Postgresql hex string will be ‘x59c1’
- pipeline.src.generic_tasks.load_with_connection(df: pandas.DataFrame | geopandas.GeoDataFrame, *, connection: sqlalchemy.engine.Connection, table_name: str, schema: str, logger: logging.Logger, how: str = 'replace', replace_with_truncate: bool = False, table_id_column: str = None, df_id_column: str = None, init_ddls: List[sqlalchemy.DDL] = None, end_ddls: List[sqlalchemy.DDL] = None)[source]
- pipeline.src.generic_tasks.delete_rows(*, table_name: str, schema: str, db_name: str, table_id_column: str, ids_to_delete: set, logger: logging.Logger)[source]
Delete rows from a database table.
- Parameters:
table_name (str) – name of the table
schema (str) – database schema of the table
db_name (str) – name of the database. One of - ‘monitorfish_remote’ - ‘monitorfish_local’
table_id_column (str) – name of the id column in the database.
ids_to_delete (set) – the ids of the rows to delete.
logger (logging.Logger) – logger instance.