import pandas as pd
from prefect import flow, get_run_logger, task
from sqlalchemy import DDL, Table, text
from config import LIBRARY_LOCATION
from src.db_config import create_engine
from src.generic_tasks import load
from src.shared_tasks.infrastructure import get_table
from src.utils import delete
@task
@task
@task
@task
@task
@task
[docs]
def load_threat_characterization_and_join_table(
threats: pd.DataFrame,
threat_characterizations: pd.DataFrame,
infraction_threat_characterization: pd.DataFrame,
isr: pd.DataFrame,
risk_elements: pd.DataFrame,
threats_table: Table,
threat_characterizations_table: Table,
infraction_threat_characterization_table: Table,
isr_table: Table,
risk_elements_table: Table,
):
logger = get_run_logger()
e = create_engine("monitorfish_remote")
with e.begin() as con:
con.execute(text("ALTER TABLE vessels_risk_elements DROP CONSTRAINT vessels_risk_elements_risk_element_code_fkey"))
delete(
tables=[
infraction_threat_characterization_table,
risk_elements_table,
threat_characterizations_table,
threats_table,
isr_table,
],
connection=con,
logger=logger,
)
load(
threats,
table_name="threats",
schema="public",
connection=con,
logger=logger,
how="append",
init_ddls=[
DDL(
"SELECT setval("
"pg_get_serial_sequence('threats', 'id'), 1, false"
")"
)
],
)
load(
threat_characterizations,
table_name="threat_characterizations",
schema="public",
connection=con,
logger=logger,
how="append",
init_ddls=[
DDL(
"SELECT setval("
"pg_get_serial_sequence('threat_characterizations', 'id'), 1, false"
")"
)
],
)
load(
isr,
table_name="isr",
schema="public",
connection=con,
logger=logger,
how="append",
)
load(
infraction_threat_characterization,
table_name="infraction_threat_characterization",
schema="public",
connection=con,
logger=logger,
how="append",
init_ddls=[
DDL(
"SELECT setval("
"pg_get_serial_sequence('infraction_threat_characterization', 'id'), 1, false"
")"
)
],
)
load(
risk_elements,
table_name="risk_elements",
schema="public",
connection=con,
logger=logger,
how="append",
)
con.execute(text("ALTER TABLE vessels_risk_elements ADD CONSTRAINT vessels_risk_elements_risk_element_code_fkey FOREIGN KEY (risk_element_code) REFERENCES risk_elements (code)"))
@flow(name="Monitorfish - Init infractions threat characterization")
[docs]
def init_infraction_threat_characterization_flow():
risk_elements_table = get_table("risk_elements")
threats_table = get_table("threats")
threat_characterizations_table = get_table("threat_characterizations")
infraction_threat_characterization_table = get_table(
"infraction_threat_characterization"
)
isr_table = get_table("isr")
threats = extract_threats()
risk_elements = extract_risk_elements()
threat_characterizations = extract_threat_characterization()
infraction_threat_characterization = extract_infraction_threat_characterization()
isr = extract_isr()
load_threat_characterization_and_join_table(
threats,
threat_characterizations,
infraction_threat_characterization,
isr,
risk_elements,
threats_table,
threat_characterizations_table,
infraction_threat_characterization_table,
isr_table,
risk_elements_table,
)