Source code for pipeline.src.parsers.ers.ers

import logging
import xml
import xml.etree.ElementTree as ET
from datetime import datetime
from functools import partial
from typing import List
from xml.etree.ElementTree import ParseError

import pandas as pd

from src.entities.data_exchange_standards import DataDomain
from src.parsers.ers.log_parsers import (
    default_log_parser,
    parse_coe,
    parse_cox,
    parse_cro,
    parse_dep,
    parse_dis,
    parse_ecps,
    parse_eof,
    parse_far,
    parse_lan,
    parse_pno,
    parse_rtp,
)
from src.parsers.ers.sal_parsers import parse_sli, parse_tli
from src.parsers.utils import (
    get_first_child,
    get_root_tag,
    make_datetime,
    remove_namespace,
)


[docs] class ERSParsingError(Exception): """Raised when an ERS message cannot be parsed."""
[docs] def simple_parser(el: xml.etree.ElementTree.Element, pass_child: bool = False): root_tag = get_root_tag(el) metadata = {"operation_type": root_tag} if pass_child: child = get_first_child(el, assert_child_single=True) else: child = None data = None logs = None return metadata, child, logs, data
[docs] def parse_ops(ops): ops_date = ops.get("OD") ops_time = ops.get("OT") is_test_message = ops.get("TS") == "1" ops_datetime = make_datetime(ops_date, ops_time) metadata = { "operation_number": ops.get("ON"), "operation_country": ops.get("FR"), "operation_datetime_utc": ops_datetime, "software": ops.get("EVL"), "is_test_message": is_test_message, } child = get_first_child(list(ops), assert_child_single=True) return metadata, child, None, None
[docs] def parse_sales_ops(ops): ops_date = ops.get("OD") ops_time = ops.get("OT") ops_datetime = make_datetime(ops_date, ops_time) metadata = { "operation_number": ops.get("ON"), "operation_country": ops.get("FR"), "operation_datetime_utc": ops_datetime, } child = get_first_child(list(ops), assert_child_single=True) return metadata, child, None, None
[docs] def parse_cor(cor): metadata = {"operation_type": "COR", "referenced_report_id": cor.get("RN")} child = get_first_child(cor, assert_child_single=True) return metadata, child, None, None
[docs] def parse_del(del_): metadata = {"operation_type": "DEL", "referenced_report_id": del_.get("RN")} return metadata, None, None, {"value": None}
[docs] def parse_ret(ret): metadata = {"operation_type": "RET", "referenced_report_id": ret.get("ON")} data = { "returnStatus": ret.get("RS"), } rejection_cause = ret.get("RE") if rejection_cause: data["rejectionCause"] = rejection_cause return metadata, None, None, {"value": data}
[docs] def parse_ers(ers): ers_date = ers.get("RD") ers_time = ers.get("RT") ers_datetime = make_datetime(ers_date, ers_time) metadata = {"report_id": ers.get("RN"), "report_datetime_utc": ers_datetime} children = list(ers) child = get_first_child( children, ) return metadata, child, None, None
[docs] def parse_log(log): logs = [child for child in list(log) if remove_namespace(child.tag) != "ELOG"] metadata = { "cfr": log.get("IR"), "ircs": log.get("RC"), "external_identification": log.get("XR"), "vessel_name": log.get("NA"), "flag_state": log.get("FS"), "imo": log.get("IM"), } elogs = [child for child in list(log) if remove_namespace(child.tag) == "ELOG"] if len(elogs) >= 1: elog = elogs[0] metadata["trip_number"] = elog.get("TN") ecps = [ el for elog in elogs for el in elog if remove_namespace(el.tag) == "ECPS" ] logs += ecps return metadata, None, logs, None
[docs] def parse_sal(sal): sals = [ child for child in list(sal) if remove_namespace(child.tag) in ("SLI", "TLI") ] metadata = { "cfr": sal.get("IR"), "ircs": sal.get("RC"), "external_identification": sal.get("XR"), "vessel_name": sal.get("NA"), "flag_state": sal.get("FS"), "invoice_number": sal.get("NR"), "invoice_datetime_utc": make_datetime(sal.get("ND")), "takeover_contract_reference": sal.get("CN"), "transport_document_reference": sal.get("TR"), } return metadata, None, sals, None
[docs] parsers = { DataDomain.LOGBOOK: { "DAT": partial(simple_parser, pass_child=True), "COR": parse_cor, "DEL": parse_del, "RET": parse_ret, "QUE": partial(simple_parser, pass_child=False), "RSP": partial(simple_parser, pass_child=False), "OPS": parse_ops, "ERS": parse_ers, "LOG": parse_log, "DEP": parse_dep, "FAR": parse_far, "ECPS": parse_ecps, "DIS": parse_dis, "EOF": parse_eof, "PNO": parse_pno, "RTP": parse_rtp, "LAN": parse_lan, "RLC": default_log_parser, "TRA": default_log_parser, "COE": parse_coe, "COX": parse_cox, "CRO": parse_cro, "TRZ": default_log_parser, "INS": default_log_parser, "PNT": default_log_parser, }, DataDomain.SALES: { "DAT": partial(simple_parser, pass_child=True), "COR": parse_cor, "DEL": parse_del, "RET": parse_ret, "QUE": partial(simple_parser, pass_child=False), "RSP": partial(simple_parser, pass_child=False), "OPS": parse_sales_ops, "ERS": parse_ers, "SAL": parse_sal, "SLI": parse_sli, "TLI": parse_tli, }, }
[docs] def parse_(el, data_domain): root_tag = get_root_tag(el) try: parser = parsers[data_domain][root_tag] except KeyError: logging.warning(f"Parser not implemented for xml tag: {root_tag}") raise ERSParsingError try: res = parser(el) except: raise ERSParsingError return res
[docs] def parse(el, data_domain: DataDomain): metadata, child, logs, data = parse_(el, data_domain) # OPS, DAT, COR and ERS elements with metadata and a child element to parse if metadata is not None and child is not None and logs is None and data is None: child_metadata, data_iter = parse(child, data_domain) return {**metadata, **child_metadata}, data_iter # LOG and SAL elements with FAR, LAN, PNO, SLI, TLI... children elif metadata is not None and child is None and logs is not None and data is None: return metadata, map(partial(parse_, data_domain=data_domain), logs) # DEL, RET elements with no child, no logs elif metadata is not None and child is None and logs is None and data is not None: return metadata, iter([data]) # RSP, QUE elements with only metadata elif metadata is not None and child is None and logs is None and data is None: return metadata, iter([]) else: raise ERSParsingError
[docs] def parse_xml_string(xml_string, data_domain: DataDomain): try: el = ET.fromstring(xml_string.strip("¿")) except ParseError: raise ERSParsingError res = parse(el, data_domain) return res
[docs] def batch_parse(xml_messages: List[str], data_domain: DataDomain) -> dict: """Parses a list of ERS messages and returns a dictionnary with the information extracted from the messages. Args: xml_messages (List[str]): list of ERS xml messages Returns: dict : dictionnary with 3 elemements: - reports pd.DataFrame: Dataframe with parsed data - raw_messages (pd.DataFrame): Dataframe with the original xml messages - batch_generated_errors (boolean): `True` if an error occurred during the treatment of one or more of the messages """ reports_list = [] raw_messages_list = [] batch_generated_errors = False raw_messages_columns = [ "operation_number", "xml_message", ] defaults = { DataDomain.LOGBOOK: { "operation_number": None, "operation_country": None, "operation_datetime_utc": None, "operation_type": None, "report_id": None, "referenced_report_id": None, "report_datetime_utc": None, "cfr": None, "ircs": None, "external_identification": None, "vessel_name": None, "flag_state": None, "imo": None, "log_type": None, "value": None, "integration_datetime_utc": None, }, DataDomain.SALES: { "operation_number": None, "operation_country": None, "operation_datetime_utc": None, "operation_type": None, "report_id": None, "referenced_report_id": None, "report_datetime_utc": None, "cfr": None, "ircs": None, "external_identification": None, "vessel_name": None, "flag_state": None, "imo": None, "sales_type": None, "products": None, "integration_datetime_utc": None, "sender_id": None, "sender_name": None, "provider_id": None, "provider_name": None, "buyer_id": None, "buyer_name": None, "recipient_id": None, "recipient_name": None, "carrier_id": None, "carrier_name": None, "buyer_id": None, "buyer_name": None, "sales_type": None, "sales_datetime_utc": None, "sales_country": None, "sales_port_code": None, "provider_name": None, "sales_contract_reference": None, "bcd_number": None, "takeover_organization_name": None, "storage_facility_name": None, "storage_facility_address": None, "transport_document_reference": None, "departure_datetime_utc": None, "sales_id": None, "trip_number": None, }, } reports_defaults = defaults[data_domain] for xml_message in xml_messages: try: metadata, data_iterator = parse_xml_string( xml_message, data_domain=data_domain ) now = datetime.utcnow() raw = { "operation_number": metadata.get("operation_number"), "xml_message": xml_message, } for data in data_iterator: reports_list.append( pd.Series( { **reports_defaults, **metadata, **data, "integration_datetime_utc": now, } ) ) raw_messages_list.append(pd.Series(raw)) except ERSParsingError: log_end = "..." if len(xml_message) > 40 else "" logging.error( "Parsing error - one ERS message will be ignored : " + xml_message[:40] + log_end ) batch_generated_errors = True except: logging.error("Unkonwn error with message " + xml_message) batch_generated_errors = True reports = pd.DataFrame(columns=pd.Index(reports_defaults)) raw_messages = pd.DataFrame(columns=pd.Index(raw_messages_columns)) if len(reports_list) > 0: reports = pd.concat(reports_list, axis=1).T if len(raw_messages_list) > 0: raw_messages = pd.concat(raw_messages_list, axis=1).T return { "reports": reports, "raw_messages": raw_messages, "batch_generated_errors": batch_generated_errors, }