Source code for pipeline.src.flows.init_infraction_threat_characterization

import pandas as pd
from prefect import flow, get_run_logger, task
from sqlalchemy import DDL, Table, text

from config import LIBRARY_LOCATION
from src.db_config import create_engine
from src.generic_tasks import load
from src.shared_tasks.infrastructure import get_table
from src.utils import delete


@task
[docs] def extract_threats(): return pd.read_csv( LIBRARY_LOCATION / "data/threats.csv", encoding="utf8", )
@task
[docs] def extract_risk_elements(): return pd.read_csv( LIBRARY_LOCATION / "data/risk_elements.csv", encoding="utf8", )
@task
[docs] def extract_threat_characterization(): return pd.read_csv( LIBRARY_LOCATION / "data/threat_characterizations.csv", encoding="utf8", )
@task
[docs] def extract_infraction_threat_characterization(): return pd.read_csv( LIBRARY_LOCATION / "data/infraction_threat_characterization.csv", encoding="utf8", )
@task
[docs] def extract_isr(): return pd.read_csv( LIBRARY_LOCATION / "data/isr.csv", encoding="utf8", )
@task
[docs] def load_threat_characterization_and_join_table( threats: pd.DataFrame, threat_characterizations: pd.DataFrame, infraction_threat_characterization: pd.DataFrame, isr: pd.DataFrame, risk_elements: pd.DataFrame, threats_table: Table, threat_characterizations_table: Table, infraction_threat_characterization_table: Table, isr_table: Table, risk_elements_table: Table, ): logger = get_run_logger() e = create_engine("monitorfish_remote") with e.begin() as con: con.execute(text("ALTER TABLE vessels_risk_elements DROP CONSTRAINT vessels_risk_elements_risk_element_code_fkey")) delete( tables=[ infraction_threat_characterization_table, risk_elements_table, threat_characterizations_table, threats_table, isr_table, ], connection=con, logger=logger, ) load( threats, table_name="threats", schema="public", connection=con, logger=logger, how="append", init_ddls=[ DDL( "SELECT setval(" "pg_get_serial_sequence('threats', 'id'), 1, false" ")" ) ], ) load( threat_characterizations, table_name="threat_characterizations", schema="public", connection=con, logger=logger, how="append", init_ddls=[ DDL( "SELECT setval(" "pg_get_serial_sequence('threat_characterizations', 'id'), 1, false" ")" ) ], ) load( isr, table_name="isr", schema="public", connection=con, logger=logger, how="append", ) load( infraction_threat_characterization, table_name="infraction_threat_characterization", schema="public", connection=con, logger=logger, how="append", init_ddls=[ DDL( "SELECT setval(" "pg_get_serial_sequence('infraction_threat_characterization', 'id'), 1, false" ")" ) ], ) load( risk_elements, table_name="risk_elements", schema="public", connection=con, logger=logger, how="append", ) con.execute(text("ALTER TABLE vessels_risk_elements ADD CONSTRAINT vessels_risk_elements_risk_element_code_fkey FOREIGN KEY (risk_element_code) REFERENCES risk_elements (code)"))
@flow(name="Monitorfish - Init infractions threat characterization")
[docs] def init_infraction_threat_characterization_flow(): risk_elements_table = get_table("risk_elements") threats_table = get_table("threats") threat_characterizations_table = get_table("threat_characterizations") infraction_threat_characterization_table = get_table( "infraction_threat_characterization" ) isr_table = get_table("isr") threats = extract_threats() risk_elements = extract_risk_elements() threat_characterizations = extract_threat_characterization() infraction_threat_characterization = extract_infraction_threat_characterization() isr = extract_isr() load_threat_characterization_and_join_table( threats, threat_characterizations, infraction_threat_characterization, isr, risk_elements, threats_table, threat_characterizations_table, infraction_threat_characterization_table, isr_table, risk_elements_table, )