pipeline.src.utils

Functions

get_table(→ sqlalchemy.Table)

Performs reflection to get a sqlalchemy Table object with metadata reflecting

delete(tables, connection, logger[, truncate])

Delete tables.

delete_rows(table, id_column, ids_to_delete, ...)

Deletes all rows of a table whose id is in ids_to_delete.

psql_insert_copy(table, conn, keys, data_iter)

Execute SQL statement inserting data

move(→ None)

Moves a file to another directory. If the destination directory

remove_file(fp[, ignore_errors])

Module Contents

pipeline.src.utils.get_table(table_name: str, schema: str, conn: sqlalchemy.engine.Connectable, logger: logging.Logger) sqlalchemy.Table[source]

Performs reflection to get a sqlalchemy Table object with metadata reflecting the table found in the databse. Returns resulting Table object.

If the table is not found in the database, raises an error.

pipeline.src.utils.delete(tables: List[sqlalchemy.Table], connection: sqlalchemy.engine.base.Connection, logger: logging.Logger, truncate: bool = False)[source]

Delete tables. Useful to wipe tables before re-inserting fresh data in ETL jobs.

pipeline.src.utils.delete_rows(table: sqlalchemy.Table, id_column: str, ids_to_delete: Sequence, connection: sqlalchemy.engine.base.Connection, logger: logging.Logger)[source]

Deletes all rows of a table whose id is in ids_to_delete.

Parameters:
  • table (sqlalchemy.Table) – table to remove rows from

  • id_column (str) – name of the column in the table that contains ids to delete

  • ids (Sequence) – list-like sequence of ids to look for in the table and delete

  • connection (sqlalchemy.engine.base.Connection) – database connection

  • logger (logging.Logger) – logger

pipeline.src.utils.psql_insert_copy(table, conn, keys, data_iter)[source]

Execute SQL statement inserting data

Parameters:
  • table (pandas.io.sql.SQLTable)

  • conn (sqlalchemy.engine.Engine or sqlalchemy.engine.Connection)

  • keys (list of str) – Column names

  • data_iter (Iterable that iterates the values to be inserted)

pipeline.src.utils.move(src_fp: pathlib.Path, dest_dirpath: pathlib.Path, if_exists: str = 'raise') None[source]

Moves a file to another directory. If the destination directory does not exist, it is created, as well as all intermediate directories.

pipeline.src.utils.remove_file(fp: str | pathlib.Path, ignore_errors: bool = True)[source]