pipeline.src.flows.sales_and_logbook

Attributes

RECEIVED_DIRECTORY

TREATED_DIRECTORY

ERROR_DIRECTORY

Functions

get_zipped_file_type(...)

Takes a zipfile name like UN_JBE202001123614.zip or ERS3_ACK_JBE202102365445.zip

extract_zipfiles(→ List[dict])

Scans input_dir, in which logbook zipfiles are expected to be arranged in a

extract_xmls_from_zipfile(→ Union[None, dict])

Takes a dict with the following structure :

parse_xmls(→ Union[None, dict])

clean(→ Union[None, dict])

load_sales_and_logbook_data(cleaned_data)

Loads sales logbook data into public.logbook_reports / public.sales_notes and

sales_and_logbook_flow([received_directory, ...])

Module Contents

pipeline.src.flows.sales_and_logbook.RECEIVED_DIRECTORY[source]
pipeline.src.flows.sales_and_logbook.TREATED_DIRECTORY[source]
pipeline.src.flows.sales_and_logbook.ERROR_DIRECTORY[source]
pipeline.src.flows.sales_and_logbook.get_zipped_file_type(zipfile_name: str) src.entities.data_exchange_standards.ZippedFileType[source]

Takes a zipfile name like UN_JBE202001123614.zip or ERS3_ACK_JBE202102365445.zip and returns the coresponding ZippedFileType, based on pattern matching.

The expected pattern is of the form

<prefix><YYYYMMXXXXXX>.zip

where :

  • prefix is one of the ZippedFileType enum values

  • Y, M and X are digits

Parameters:

zipfile_name (str) – name of a zipfile containing logbook or sales data.

Returns:

the type of data corresponding to the name of the zipfile

Return type:

ZippedFileType

Raises:

ValueError – if the name does not match the expected pattern or the matched string does not correspond to a known ZippedFileType.

Examples

>>> get_zipped_file_type("UN_JBE2020010199999.zip")
<ZippedFileType.UN: 'UN_JBE'>
>>> get_zipped_file_type("UN_JBE20200101999999.zip")
ValueError
>>> get_zipped_file_type("UN_JBE2020010199999.txt")
ValueError
pipeline.src.flows.sales_and_logbook.extract_zipfiles(input_dir: pathlib.Path, treated_dir: pathlib.Path, error_dir: pathlib.Path) List[dict][source]

Scans input_dir, in which logbook zipfiles are expected to be arranged in a hierarchy of folders like by year / month / zipfiles, and returns a list of dict that describe the zipfiles found.

Files whose name does not match the expected pattern (see get_logbook_zipped_file_type for details) are moved to error_dir.

Files located in input_dir but whose location does not match the expected year / month hierarchy of subfolders are ignored.

Parameters:
  • input_dir (Path) –

    location of input zipfiles. Zipfiles are expected to be organized in subfolers inside this directory :

    • by year

    • by month, inside yearly subfolders

  • treated_dir (Path) – directory where zipfiles are to be transfered after integration into the monitorfish database

  • error_dir (Path) – directory where zipfiles are to be transfered if an error occurs during their treatment

Returns:

list of dict, one for each of the found zipfiles. Each dict in

the list has the following elements :

  • full_name (str): name of the zipfile, e.g.g. “UN_JBE_202001999999.zip”

  • input_dir (Path): path of the folder container the zipfile (including year/month)

  • treated_dir (Path): path where the zipfile should be transfered to after integration (year/month subfolder to the supplied treated_dir argument)

  • error_dir (Path): path where the zipfile should be transfered to in case of error during its treatment (year/month subfolder to the supplied error_dir argument)

  • transmission_format (LogbookTransmissionFormat): transmission format, inferred from the zipfile’s name.

Return type:

List[dict]

pipeline.src.flows.sales_and_logbook.extract_xmls_from_zipfile(zipfile: None | dict) None | dict[source]

Takes a dict with the following structure :

  • full_name (str): name of the zipfile

  • input_dir (Path): path of the folder container the zipfile

  • treated_dir (Path): path where the zipfile is be transfered after integration

  • error_dir (Path): path where the zipfile should be transfered in case of error during its treatment

  • zipped_file_type (ZippedFileType): type of data in the zip file

  • data_domain (DataDomain): data domain

  • transmission_format (TransmissionFormat): transmission format

Opens the corresponding zipfile on the filesystem, reads the xml files it is expected to contain, puts the content of these xml files in a list of strings, then returns a copy of the input dict with an added xml_messages item that contains that list of strings.

Parameters:

zipfile (Union[None, dict])

Returns:

Copy of the input dict with an additionnal xml_messages

item that contains the list of strings contained inside the zipfile identified by the input_dir and full_name in the input dictionnary

Return type:

Union[None, dict]

pipeline.src.flows.sales_and_logbook.parse_xmls(zipfile: None | dict) None | dict[source]
pipeline.src.flows.sales_and_logbook.clean(zipfile: None | dict) None | dict[source]
pipeline.src.flows.sales_and_logbook.load_sales_and_logbook_data(cleaned_data: List[dict])[source]

Loads sales logbook data into public.logbook_reports / public.sales_notes and public.logbook_raw_messages / public.sales_notes_raw_messages tables.

Parameters:

cleaned_data (list) – list of dictionaries (output of clean task)

pipeline.src.flows.sales_and_logbook.sales_and_logbook_flow(received_directory: str = RECEIVED_DIRECTORY.as_posix(), treated_directory: str = TREATED_DIRECTORY.as_posix(), error_directory: str = ERROR_DIRECTORY.as_posix())[source]