pipeline.src.flows.sales_and_logbook
Attributes
Functions
|
Takes a zipfile name like UN_JBE202001123614.zip or ERS3_ACK_JBE202102365445.zip |
|
Scans input_dir, in which logbook zipfiles are expected to be arranged in a |
|
Takes a dict with the following structure : |
|
|
|
|
|
Loads sales logbook data into public.logbook_reports / public.sales_notes and |
|
Module Contents
- pipeline.src.flows.sales_and_logbook.get_zipped_file_type(zipfile_name: str) src.entities.data_exchange_standards.ZippedFileType[source]
Takes a zipfile name like UN_JBE202001123614.zip or ERS3_ACK_JBE202102365445.zip and returns the coresponding ZippedFileType, based on pattern matching.
The expected pattern is of the form
<prefix><YYYYMMXXXXXX>.zip
where :
prefix is one of the ZippedFileType enum values
Y, M and X are digits
- Parameters:
zipfile_name (str) – name of a zipfile containing logbook or sales data.
- Returns:
the type of data corresponding to the name of the zipfile
- Return type:
- Raises:
ValueError – if the name does not match the expected pattern or the matched string does not correspond to a known ZippedFileType.
Examples
>>> get_zipped_file_type("UN_JBE2020010199999.zip") <ZippedFileType.UN: 'UN_JBE'>
>>> get_zipped_file_type("UN_JBE20200101999999.zip") ValueError
>>> get_zipped_file_type("UN_JBE2020010199999.txt") ValueError
- pipeline.src.flows.sales_and_logbook.extract_zipfiles(input_dir: pathlib.Path, treated_dir: pathlib.Path, error_dir: pathlib.Path) List[dict][source]
Scans input_dir, in which logbook zipfiles are expected to be arranged in a hierarchy of folders like by year / month / zipfiles, and returns a list of dict that describe the zipfiles found.
Files whose name does not match the expected pattern (see get_logbook_zipped_file_type for details) are moved to error_dir.
Files located in input_dir but whose location does not match the expected year / month hierarchy of subfolders are ignored.
- Parameters:
input_dir (Path) –
location of input zipfiles. Zipfiles are expected to be organized in subfolers inside this directory :
by year
by month, inside yearly subfolders
treated_dir (Path) – directory where zipfiles are to be transfered after integration into the monitorfish database
error_dir (Path) – directory where zipfiles are to be transfered if an error occurs during their treatment
- Returns:
- list of dict, one for each of the found zipfiles. Each dict in
the list has the following elements :
full_name (str): name of the zipfile, e.g.g. “UN_JBE_202001999999.zip”
input_dir (Path): path of the folder container the zipfile (including year/month)
treated_dir (Path): path where the zipfile should be transfered to after integration (year/month subfolder to the supplied treated_dir argument)
error_dir (Path): path where the zipfile should be transfered to in case of error during its treatment (year/month subfolder to the supplied error_dir argument)
transmission_format (LogbookTransmissionFormat): transmission format, inferred from the zipfile’s name.
- Return type:
List[dict]
- pipeline.src.flows.sales_and_logbook.extract_xmls_from_zipfile(zipfile: None | dict) None | dict[source]
Takes a dict with the following structure :
full_name (str): name of the zipfile
input_dir (Path): path of the folder container the zipfile
treated_dir (Path): path where the zipfile is be transfered after integration
error_dir (Path): path where the zipfile should be transfered in case of error during its treatment
zipped_file_type (ZippedFileType): type of data in the zip file
data_domain (DataDomain): data domain
transmission_format (TransmissionFormat): transmission format
Opens the corresponding zipfile on the filesystem, reads the xml files it is expected to contain, puts the content of these xml files in a list of strings, then returns a copy of the input dict with an added xml_messages item that contains that list of strings.
- Parameters:
zipfile (Union[None, dict])
- Returns:
- Copy of the input dict with an additionnal xml_messages
item that contains the list of strings contained inside the zipfile identified by the input_dir and full_name in the input dictionnary
- Return type:
Union[None, dict]
- pipeline.src.flows.sales_and_logbook.load_sales_and_logbook_data(cleaned_data: List[dict])[source]
Loads sales logbook data into public.logbook_reports / public.sales_notes and public.logbook_raw_messages / public.sales_notes_raw_messages tables.
- Parameters:
cleaned_data (list) – list of dictionaries (output of clean task)