Source code for pipeline.src.flows.trips_snapshot

import pandas as pd
from prefect import flow, get_run_logger, task

from src.generic_tasks import extract, load


@task
[docs] def extract_trips_snapshot() -> pd.DataFrame: return extract( db_name="data_warehouse", query_filepath="data_warehouse/trips_snapshot.sql", return_pyarrow_dtypes=True, )
@task
[docs] def load_trips_snapshot(trips: pd.DataFrame): logger = get_run_logger() load( df=trips, table_name="trips_snapshot", schema="public", db_name="monitorfish_remote", logger=logger, how="replace", )
@flow(name="Monitorfish - Trips snapshot")
[docs] def trips_snapshot_flow(): # Extract trips_snapshot = extract_trips_snapshot() # Load load_trips_snapshot(trips_snapshot)