Source code for pipeline.src.flows.sales_and_logbook
import os
import re
from pathlib import Path
from typing import List, Union
from zipfile import ZipFile
from prefect import flow, get_run_logger, task
from config import ERS_FILES_LOCATION
from src.db_config import create_engine
from src.entities.data_exchange_standards import (
DataDomain,
TransmissionFormat,
ZippedFileType,
)
from src.generic_tasks import load
from src.parsers.ers import ers
from src.parsers.flux import flux
from src.processing import drop_rows_already_in_table
from src.shared_tasks.control_flow import str_to_path
from src.utils import get_table, move
####################################### HELPERS #######################################
[docs]
def get_zipped_file_type(zipfile_name: str) -> ZippedFileType:
"""Takes a zipfile name like UN_JBE202001123614.zip or ERS3_ACK_JBE202102365445.zip
and returns the coresponding `ZippedFileType`, based on pattern matching.
The expected pattern is of the form
`<prefix><YYYYMMXXXXXX>.zip`
where :
* prefix is one of the `ZippedFileType` enum values
* Y, M and X are digits
Args:
zipfile_name (str): name of a zipfile containing logbook or sales data.
Returns:
ZippedFileType: the type of data corresponding to the name of the zipfile
Raises:
ValueError: if the name does not match the expected pattern or the matched
string does not correspond to a known `ZippedFileType`.
Examples:
>>> get_zipped_file_type("UN_JBE2020010199999.zip")
<ZippedFileType.UN: 'UN_JBE'>
>>> get_zipped_file_type("UN_JBE20200101999999.zip")
ValueError
>>> get_zipped_file_type("UN_JBE2020010199999.txt")
ValueError
"""
zipfile_name_pattern = r"^(?P<file_type>.*)\d{12}.zip"
match = re.match(zipfile_name_pattern, zipfile_name)
try:
assert match
return ZippedFileType[match["file_type"]]
except (AssertionError, KeyError):
raise ValueError(
(
"Unexpected file name. Files containing logbook data are expected to "
f"have a name matching the pattern {zipfile_name_pattern}, with "
"`file_type` equal to one of "
f"{list(map(lambda s: s.name, ZippedFileType))} .Got "
f"{zipfile_name} which does not match."
)
)
######################################## TASKS ########################################
@task
[docs]
def extract_zipfiles(
input_dir: Path,
treated_dir: Path,
error_dir: Path,
) -> List[dict]:
"""Scans `input_dir`, in which logbook zipfiles are expected to be arranged in a
hierarchy of folders like by year / month / zipfiles, and returns a list of `dict`
that describe the zipfiles found.
Files whose name does not match the expected pattern (see
`get_logbook_zipped_file_type` for details) are moved to `error_dir`.
Files located in `input_dir` but whose location does not match the expected
year / month hierarchy of subfolders are ignored.
Args:
input_dir (Path): location of input zipfiles. Zipfiles are expected to be
organized in subfolers inside this directory :
- by year
- by month, inside yearly subfolders
treated_dir (Path): directory where zipfiles are to be transfered after
integration into the monitorfish database
error_dir (Path): directory where zipfiles are to be transfered if an error
occurs during their treatment
Returns:
List[dict]: list of `dict`, one for each of the found zipfiles. Each `dict` in
the list has the following elements :
- full_name (`str`): name of the zipfile, e.g.g. "UN_JBE_202001999999.zip"
- input_dir (`Path`): path of the folder container the zipfile (including
year/month)
- treated_dir (`Path`): path where the zipfile should be transfered to
after integration (year/month subfolder to the supplied `treated_dir`
argument)
- error_dir (`Path`): path where the zipfile should be transfered to
in case of error during its treatment (year/month subfolder to the supplied
`error_dir` argument)
- transmission_format (`LogbookTransmissionFormat`): transmission format,
inferred from the zipfile's name.
"""
logger = get_run_logger()
expected_months = [
"1",
"2",
"3",
"4",
"5",
"6",
"7",
"8",
"9",
"01",
"02",
"03",
"04",
"05",
"06",
"07",
"08",
"09",
"10",
"11",
"12",
]
expected_years = list(map(str, range(2000, 2050)))
years = os.listdir(input_dir)
res = []
n = 0
for year in years:
if year not in expected_years:
logger.warning(f"Unexpected year {year}. Skipping directory.")
continue
logger.info(f"Starting extraction of logbook messages for year {year}.")
months = os.listdir(input_dir / year)
for month in months:
if month not in expected_months:
logger.warning(f"Unexpected month {month}. Skipping directory.")
continue
logger.info(f"Starting extraction of logbook messages for {year}/{month}.")
zipfile_input_dir = input_dir / year / month
zipfile_treated_dir = treated_dir / year / month
zipfile_error_dir = error_dir / year / month
zipfile_names = os.listdir(zipfile_input_dir)
for zipfile_name in zipfile_names:
try:
zipped_file_type = get_zipped_file_type(zipfile_name)
except ValueError as e:
logger.error(e)
move(
zipfile_input_dir / zipfile_name,
zipfile_error_dir,
if_exists="replace",
)
continue
res.append(
{
"full_name": zipfile_name,
"input_dir": zipfile_input_dir,
"treated_dir": zipfile_treated_dir,
"error_dir": zipfile_error_dir,
"data_domain": DataDomain.from_zipped_file_type(
zipped_file_type
),
"transmission_format": TransmissionFormat.from_zipped_file_type(
zipped_file_type
),
}
)
n += 1
if n == 200:
return res
return res
@task
[docs]
def extract_xmls_from_zipfile(zipfile: Union[None, dict]) -> Union[None, dict]:
"""Takes a `dict` with the following structure :
- full_name (`str`): name of the zipfile
- input_dir (`Path`): path of the folder container the zipfile
- treated_dir (`Path`): path where the zipfile is be transfered after
integration
- error_dir (`Path`): path where the zipfile should be transfered
in case of error during its treatment
- zipped_file_type (`ZippedFileType`): type of data in the zip file
- data_domain (`DataDomain`): data domain
- transmission_format (`TransmissionFormat`): transmission format
Opens the corresponding zipfile on the filesystem, reads the xml files it is
expected to contain, puts the content of these xml files in a list of strings,
then returns a copy of the input `dict` with an added `xml_messages` item that
contains that list of strings.
Args:
zipfile (Union[None, dict]):
Returns:
Union[None, dict]: Copy of the input `dict` with an additionnal `xml_messages`
item that contains the list of strings contained inside the zipfile
identified by the `input_dir` and `full_name` in the input dictionnary
"""
if zipfile:
logger = get_run_logger()
logger.info(f"Extracting zipfile {zipfile['full_name']}.")
with ZipFile(zipfile["input_dir"] / zipfile["full_name"]) as zipobj:
xml_filenames = zipobj.namelist()
xml_messages = []
for xml_filename in xml_filenames:
with zipobj.open(xml_filename, mode="r") as f:
xml_messages.append(f.read().decode("utf-8"))
zipfile["xml_messages"] = xml_messages
return zipfile
@task
[docs]
def parse_xmls(zipfile: Union[None, dict]) -> Union[None, dict]:
batch_parsers = {
TransmissionFormat.ERS: ers.batch_parse,
TransmissionFormat.FLUX: flux.batch_parse,
}
logger = get_run_logger()
if zipfile:
logger.info(f"Parsing messages of zipfile {zipfile['full_name']}")
transmission_format = zipfile["transmission_format"]
batch_parser = batch_parsers[transmission_format]
parsed_batch = batch_parser(
zipfile["xml_messages"], data_domain=zipfile["data_domain"]
)
parsed_batch["reports"]["transmission_format"] = transmission_format.value
zipfile.pop("xml_messages")
zipfile = {**zipfile, **parsed_batch}
return zipfile
@task
[docs]
def clean(zipfile: Union[None, dict]) -> Union[None, dict]:
logger = get_run_logger()
if zipfile:
if zipfile["transmission_format"] is TransmissionFormat.ERS:
logger.info(
"Removing QUE and RSP messages from messages of "
+ f"zipfile {zipfile['full_name']}."
)
zipfile["reports"] = zipfile["reports"][
zipfile["reports"].operation_type.isin(["DAT", "DEL", "COR", "RET"])
]
zipfile["raw_messages"] = zipfile["raw_messages"][
zipfile["raw_messages"].operation_number.isin(
zipfile["reports"]["operation_number"]
)
]
return zipfile
@task
[docs]
def load_sales_and_logbook_data(cleaned_data: List[dict]):
"""
Loads sales logbook data into public.logbook_reports / public.sales_notes and
public.logbook_raw_messages / public.sales_notes_raw_messages tables.
Args:
cleaned_data (list) : list of dictionaries (output of `clean` task)
"""
schema = "public"
engine = create_engine("monitorfish_remote")
logger = get_run_logger()
cleaned_data = list(filter(lambda x: True if x else False, cleaned_data))
reports_tables = {
DataDomain.LOGBOOK: get_table("logbook_reports", schema, engine, logger),
DataDomain.SALES: get_table("sales_notes", schema, engine, logger),
}
raw_messages_tables = {
DataDomain.LOGBOOK: get_table("logbook_raw_messages", schema, engine, logger),
DataDomain.SALES: get_table("sales_notes_raw_messages", schema, engine, logger),
}
for zipfile in cleaned_data:
data_domain = zipfile["data_domain"]
reports_table = reports_tables[data_domain]
raw_messages_table = raw_messages_tables[data_domain]
with engine.begin() as connection:
reports = zipfile["reports"]
raw_messages = zipfile["raw_messages"]
transmission_format = zipfile["transmission_format"]
# Drop rows for which the operation number already exists in the
# logbook_raw_messages database
raw_messages = drop_rows_already_in_table(
df=raw_messages,
df_column_name="operation_number",
table=raw_messages_table,
table_column_name="operation_number",
connection=connection,
logger=logger,
)
if transmission_format is TransmissionFormat.FLUX:
reports = drop_rows_already_in_table(
df=reports,
df_column_name="report_id",
table=reports_table,
table_column_name="report_id",
connection=connection,
logger=logger,
)
else:
# With ERS data, we cannot rely on having unique report_ids like we do
# in FLUX data for two reasons :
# - DEL messages have a NULL report_id
# - Visiocapture data holds multiple reports in a single ERS element,
# and therefore several logbook_reports with the same report_id
#
# What we do instead is ensure we only insert logbook_reports for which
# the corresponding logbook_raw_message is not yet in the database.
reports = reports[
reports.operation_number.isin(raw_messages.operation_number)
]
if len(raw_messages) > 0:
n_lines = len(raw_messages)
logger.info(
f"Inserting {n_lines} messages in {raw_messages_table.name} table."
)
load(
raw_messages,
table_name=raw_messages_table.name,
schema=schema,
logger=logger,
how="append",
connection=connection,
)
if len(reports) > 0:
n_lines = len(reports)
logger.info(
f"Inserting {n_lines} messages in {reports_table.name} table."
)
jsonb_column = {
DataDomain.LOGBOOK: "value",
DataDomain.SALES: "products",
}[data_domain]
load(
reports,
table_name=reports_table.name,
schema=schema,
logger=logger,
how="append",
connection=connection,
jsonb_columns=[jsonb_column],
)
if zipfile["batch_generated_errors"]:
logger.error(
"Errors occurred during parsing of some of the messages. "
f"Moving {zipfile['full_name']} to error directory."
)
move(
zipfile["input_dir"] / zipfile["full_name"],
zipfile["error_dir"],
if_exists="replace",
)
else:
move(
zipfile["input_dir"] / zipfile["full_name"],
zipfile["treated_dir"],
if_exists="replace",
)
@flow(name="Monitorfish - Sales and Logbook", log_prints=True)
[docs]
def sales_and_logbook_flow(
received_directory: str = RECEIVED_DIRECTORY.as_posix(),
treated_directory: str = TREATED_DIRECTORY.as_posix(),
error_directory: str = ERROR_DIRECTORY.as_posix(),
):
received_directory = str_to_path(received_directory)
treated_directory = str_to_path(treated_directory)
error_directory = str_to_path(error_directory)
zipfiles = extract_zipfiles(
received_directory,
treated_directory,
error_directory,
)
zipfiles = extract_xmls_from_zipfile.map(zipfiles)
zipfiles = parse_xmls.map(zipfiles)
zipfiles = clean.map(zipfiles)
load_sales_and_logbook_data(zipfiles)