pipeline.src.generic_tasks ========================== .. py:module:: pipeline.src.generic_tasks Functions --------- .. autoapisummary:: pipeline.src.generic_tasks.extract pipeline.src.generic_tasks.load pipeline.src.generic_tasks.load_with_connection pipeline.src.generic_tasks.delete_rows pipeline.src.generic_tasks.read_query_task Module Contents --------------- .. py:function:: extract(db_name: str, query_filepath: pathlib.Path | str, dtypes: dict = None, parse_dates: list | dict = None, params: dict = None, backend: str = 'pandas', geom_col: str = 'geom', crs: int = None, return_pyarrow_dtypes: bool = False) -> Union[pandas.DataFrame, geopandas.GeoDataFrame] Run SQL query against the indicated database and return the result as a `pandas.DataFrame`. :param db_name: name of the database to extract from. Possible values : - 'fmc' - 'ocan' - 'monitorfish_local' - 'monitorfish_remote' - 'monitorenv_remove' - 'cacem_local' - 'data_warehouse' :type db_name: str :param query_filepath: path to .sql file, starting from the saved queries folder. example : "ocan/nav_fr_peche.sql" :type query_filepath: Union[Path, str] :param dtypes: If specified, use {col: dtype, …}, where col is a column label and dtype is a numpy.dtype or Python type to cast one or more of the DataFrame’s columns to column-specific types. Defaults to None. :type dtypes: Union[None, dict], optional :param parse_dates: - List of column names to parse as dates. - Dict of ``{column_name: format string}`` where format string is strftime compatible in case of parsing string times or is one of (D, s, ns, ms, us) in case of parsing integer timestamps. - Dict of ``{column_name: arg dict}``, where the arg dict corresponds to the keyword arguments of :func:`pandas.to_datetime` Defaults to None. :type parse_dates: Union[list, dict, None], optional :param params: Parameters to pass to execute method. Defaults to None. :type params: Union[dict, None], optional :param backend: 'pandas' to run a SQL query and return a `pandas.DataFrame` or 'geopandas' to run a PostGIS query and return a `geopandas.GeoDataFrame`. Defaults to 'pandas'. :type backend: str, optional :param geom_col: column name to convert to shapely geometries when `backend` is 'geopandas'. Ignored when `backend` is 'pandas'. Defaults to 'geom'. :type geom_col: str, optional :param crs: CRS to use for the returned GeoDataFrame; if not set, tries to determine CRS from the SRID associated with the first geometry in the database, and assigns that to all geometries. Ignored when `backend` is 'pandas'. Defaults to None. :type crs: str, optional :param return_pyarrow_dtypes: If `True`, and `db_name` is `"data_warehouse"`, results are returned as a pandas DataFrame of `pyarrow` dtypes. Ignored if `db_name` is not `"data_warehouse"`. Defaults to `False`. :type return_pyarrow_dtypes: bool, optional :returns: Query results :rtype: Union[pd.DataFrame, gpd.GeoDataFrame] .. py:function:: load(df: Union[pandas.DataFrame, geopandas.GeoDataFrame], *, table_name: str, schema: str, logger: logging.Logger, how: str = 'replace', replace_with_truncate: bool = False, db_name: str = None, pg_array_columns: list = None, handle_array_conversion_errors: bool = True, value_on_array_conversion_error: str = '{}', jsonb_columns: list = None, table_id_column: str = None, df_id_column: str = None, nullable_integer_columns: list = None, timedelta_columns: list = None, enum_columns: list = None, connection: sqlalchemy.engine.Connection = None, init_ddls: List[sqlalchemy.DDL] = None, end_ddls: List[sqlalchemy.DDL] = None, bytea_columns: list = None) Load a DataFrame or GeoDataFrame to a database table using sqlalchemy. The table must already exist in the database. :param df: data to load :type df: Union[pd.DataFrame, gpd.GeoDataFrame] :param table_name: name of the table :type table_name: str :param schema: database schema of the table :type schema: str :param logger: logger instance :type logger: logging.Logger :param how: one of - 'replace' to truncate the table before loading - 'append' to append the data to rows already in the table - 'upsert' to append the rows to the table, replacing the rows whose id is already present in the table :type how: str :param replace_with_truncate: if `how` is `replace`, and `replace_with_truncate` is `True`, the table to replace will be truncated before loading the new data. If `how` is `replace`, and `replace_with_truncate` is `False` (the default), the table to replace will be deleted before loading the new data. If `how` is anything but `replace`, `replace_with_truncate` is ignored. TRUNCATE is more efficient than DELETE as the whole file holding table data is dropped, rather than deleting rows one by one as DELETE does. It also results in reallocating new pages and therefore results in table data without any bloat (dead or free space in data pages). However, TRUNCATE requires an ACCESS EXCLUSIVE lock on the table, which may conflict with other database operations, notably `pg_dump` and `ALTER TABLE` commands, and result in a deadlock and therefore downtime of the entire system during database backup or migration. Use only if you know what you're doing. :type replace_with_truncate: bool :param db_name: Required if a `connection` is not provided. 'monitorfish_remote', 'monitorenv_remote' or 'monitorfish_local'. Defaults to None. :type db_name: str, optional :param pg_array_columns: columns containing sequences that must be serialized before loading into columns with Postgresql `Array` type :type pg_array_columns: list, optional :param handle_array_conversion_errors: whether to handle or raise upon error during the serialization of columns to load into Postgresql `Array` columns. Defaults to True. :type handle_array_conversion_errors: bool :param value_on_array_conversion_error: if `handle_array_conversion_errors`, the value to use when an error must be handled. Defaults to '{}'. :type value_on_array_conversion_error: str, optional :param jsonb_columns: columns containing values that must be serialized before loading into columns with Postgresql `JSONB` type :type jsonb_columns: list, optional :param table_id_column: name of the table column to use an id. Required if `how` is "upsert". :type table_id_column: str, optional :param df_id_column: name of the DataFrame column to use an id. Required if `how` is "upsert". :type df_id_column: str, optional :param nullable_integer_columns: columns containing values that must loaded into columns with Postgresql `Integer` type. If these columns contain `NA` values, pandas will automatically change the dtype to `float` and the loading into Postgreql `Integer` columns will fail, so it is necessary to serialize these values as `Integer`-compatible `str` objects. :type nullable_integer_columns: list, optional :param timedelta_columns: columns containing `Timedelta` values to load into Postgresql `Interval` columns. If these columns contain `NaT` values, the loading will fail, so it is necessary to serialize these values as `Interval`-compatible `str` objects. :type timedelta_columns: list, optional :param enum_columns: columns containing Enum values to load into Postgresql. Values in these columns will be converted to string using the enum's `.value`. Null values will remain null. :type enum_columns: list, optional :param connection: Databse connection to use for the insert operation. If not provided, `db_name` must be given and a connection to the designated database will be created for the insert operation. Defaults to None. :type connection: Connection, optional :param init_ddls: (List[DDL], optional): If given, these DDLs will be executed before the loading operation. Defaults to None. :param end_ddls: (List[DDL], optional): If given, these DDLs will be executed after the loading operation. Defaults to None. :param bytea_columns: columns containing bytes that must be serialized before loading into columns with Postgresql `BYTEA` type. Serialization is done following Postgresql `Hex` format, which consists in representing each byte by two hexadecimal digits and prefixing the whole hex string by '\x'. For example, for a two-byte sequence (01011001, 11000001) : - the hex representation of each byte is computed : - '59' for 01011001 - 'c1' for 11000001 - the Postgresql hex string will be '\x59c1' :type bytea_columns: list, optional .. py:function:: load_with_connection(df: Union[pandas.DataFrame, geopandas.GeoDataFrame], *, connection: sqlalchemy.engine.Connection, table_name: str, schema: str, logger: logging.Logger, how: str = 'replace', replace_with_truncate: bool = False, table_id_column: str = None, df_id_column: str = None, init_ddls: List[sqlalchemy.DDL] = None, end_ddls: List[sqlalchemy.DDL] = None) .. py:function:: delete_rows(*, table_name: str, schema: str, db_name: str, table_id_column: str, ids_to_delete: set, logger: logging.Logger) Delete rows from a database table. :param table_name: name of the table :type table_name: str :param schema: database schema of the table :type schema: str :param db_name: name of the database. One of - 'monitorfish_remote' - 'monitorfish_local' :type db_name: str :param table_id_column: name of the id column in the database. :type table_id_column: str :param ids_to_delete: the ids of the rows to delete. :type ids_to_delete: set :param logger: logger instance. :type logger: logging.Logger .. py:function:: read_query_task(database: str, query: sqlalchemy.sql.Select) -> pandas.DataFrame Prefect `task` decorated version of `read_query`.