Source code for pipeline.src.flows.refresh_materialized_view

import pandas as pd
from prefect import flow, task
from sqlalchemy import Table, text

from src.db_config import create_engine
from src.shared_tasks.infrastructure import get_table


@task
[docs] def refresh_view(view: Table) -> pd.DataFrame: assert isinstance(view, Table) query = text(f"REFRESH MATERIALIZED VIEW {view.schema}.{view.name}") e = create_engine("monitorfish_remote") with e.begin() as connection: connection.execute(query)
@flow(name="Monitorfish - Refresh materialized view")
[docs] def refresh_materialized_view_flow( view_name: str, schema: str = "public", ): view = get_table(table_name=view_name, schema=schema) refresh_view(view)