Source code for pipeline.src.parsers.flux.sales_parsers

import logging
import xml.etree.ElementTree as ET
from typing import List, Optional, Tuple
from xml.etree.ElementTree import ParseError

from src.parsers.utils import try_float

[docs] NS_FLUX_SALES = { "": "urn:un:unece:uncefact:data:standard:FLUXSalesReportMessage:3", "rsm": "urn:un:unece:uncefact:data:standard:FLUXSalesReportMessage:3", "ram": "urn:un:unece:uncefact:data:standard:ReusableAggregateBusinessInformationEntity:20", "udt": "urn:un:unece:uncefact:data:standard:UnqualifiedDataType:20", }
[docs] def _find(el: ET.Element, path: str) -> Optional[ET.Element]: return el.find(path, NS_FLUX_SALES)
[docs] def _findall(el: ET.Element, path: str) -> List[ET.Element]: return el.findall(path, NS_FLUX_SALES)
[docs] def _get(el: ET.Element, path: str) -> Optional[str]: found = _find(el, path) return found.text if found is not None else None
[docs] def _make_datetime(date_str: Optional[str]): from src.parsers.flux.utils import make_datetime return make_datetime(date_str)
[docs] def parse_product(product: ET.Element, currency: str) -> dict: return { "species": _get(product, './ram:SpeciesCode[@listID="FAO_SPECIES"]'), "weight": try_float(_get(product, './ram:WeightMeasure[@unitCode="KGM"]')), "usage": _get(product, './ram:UsageCode[@listID="PROD_USAGE"]'), "freshness": _get( product, './ram:AppliedAAPProcess/ram:TypeCode[@listID="FISH_FRESHNESS"]' ), "preservationState": _get( product, './ram:AppliedAAPProcess/ram:TypeCode[@listID="FISH_PRESERVATION"]', ), "presentation": _get( product, './ram:AppliedAAPProcess/ram:TypeCode[@listID="FISH_PRESENTATION"]', ), "totalPrice": try_float( _get(product, "./ram:TotalSalesPrice/ram:ChargeAmount") ), "currency": currency, "sizeCategory": _get( product, './ram:SpecifiedSizeDistribution/ram:CategoryCode[@listID="FISH_SIZE_CATEGORY"]', ), "sizeClass": _get( product, './ram:SpecifiedSizeDistribution/ram:ClassCode[@listID="FISH_SIZE_CLASS"]', ), "faoZone": _get( product, './ram:OriginFLUXLocation/ram:ID[@schemeID="FAO_AREA"]' ), }
[docs] def parse_sales_document( doc: ET.Element, ) -> Tuple[dict, dict, Optional[str], Optional[str]]: """Parse IncludedSalesDocument. Returns: (value_dict, vessel_dict, trip_number, sales_datetime_utc_str) """ currency = (_get(doc, './ram:CurrencyCode[@listID="TERRITORY_CURR"]'),) products = [ parse_product(p, currency=currency) for p in _findall(doc, "./ram:SpecifiedSalesBatch/ram:SpecifiedAAPProduct") ] parties = {} for party in _findall(doc, "./ram:SpecifiedSalesParty"): role = _get(party, './ram:RoleCode[@listID="FLUX_SALES_PARTY_ROLE"]') if role is None: role = _get(party, './ram:TypeCode[@listID="FLUX_SALES_PARTY_ROLE"]') if role: parties[role.lower()] = { "name": _get(party, "./ram:Name"), "id": _get(party, "./ram:ID"), } sales_data = { "sales_id": _get(doc, './ram:ID[@schemeID="EU_SALES_ID"]'), "sales_datetime_utc": _get( doc, "./ram:SpecifiedSalesEvent/ram:OccurrenceDateTime/udt:DateTime" ), "sales_port_code": _get( doc, './ram:SpecifiedFLUXLocation/ram:ID[@schemeID="LOCATION"]' ), "landing_port_code": _get( doc, './ram:SpecifiedFishingActivity/ram:RelatedFLUXLocation/ram:ID[@schemeID="LOCATION"]', ), "departure_datetime_utc": _get( doc, "./ram:SpecifiedFishingActivity/ram:SpecifiedDelimitedPeriod/ram:StartDateTime/udt:DateTime", ), "landing_datetime_utc": _get( doc, "./ram:SpecifiedFishingActivity/ram:SpecifiedDelimitedPeriod/ram:EndDateTime/udt:DateTime", ), "products": products, } party_types = ["sender", "provider", "buyer", "recipient", "carrier"] for party_type in party_types: if party_type in parties: sales_data[f"{party_type}_id"] = parties[party_type].get("id") sales_data[f"{party_type}_name"] = parties[party_type].get("name") vessel = _find( doc, "./ram:SpecifiedFishingActivity/ram:RelatedVesselTransportMeans" ) if vessel is not None: vessel_data = { "cfr": _get(vessel, './ram:ID[@schemeID="CFR"]'), "ircs": _get(vessel, './ram:ID[@schemeID="IRCS"]'), "external_identification": _get(vessel, './ram:ID[@schemeID="EXT_MARK"]'), "vessel_name": _get(vessel, "./ram:Name"), "flag_state": _get( vessel, './ram:RegistrationVesselCountry/ram:ID[@schemeID="TERRITORY"]', ), "imo": _get(vessel, './ram:ID[@schemeID="UVI"]'), } else: vessel_data = { "cfr": None, "ircs": None, "external_identification": None, "vessel_name": None, "flag_state": None, "imo": None, } trip_number = _get( doc, './ram:SpecifiedFishingActivity/ram:SpecifiedFishingTrip/ram:ID[@schemeID="EU_TRIP_ID"]', ) return sales_data, vessel_data, trip_number
[docs] def parse_sales_report_message_string( message_string: str, ) -> Tuple[str, List[dict]]: from src.parsers.flux.flux import FLUXParsingError, get_operation_type try: root = ET.fromstring(message_string) except ParseError as e: raise FLUXParsingError("Could not parse xml string: ", e) flux_doc = _find(root, "FLUXReportDocument") if flux_doc is None: raise FLUXParsingError( "Could not find FLUXReportDocument in FLUXSalesReportMessage" ) operation_number = _get(flux_doc, './ram:ID[@schemeID="UUID"]') operation_datetime_utc = _make_datetime( _get(flux_doc, "./ram:CreationDateTime/udt:DateTime") ) operation_data = { "operation_number": operation_number, "operation_datetime_utc": operation_datetime_utc, } report_data = { "operation_type": get_operation_type(flux_doc), "report_id": operation_number, "referenced_report_id": _get(flux_doc, './ram:ReferencedID[@schemeID="UUID"]'), "report_datetime_utc": operation_datetime_utc, } sales_reports = _findall(root, "SalesReport") if not sales_reports: return operation_number, [{**operation_data, **report_data}] message_data = [] for sales_report in sales_reports: sales_type = _get(sales_report, './ram:ItemTypeCode[@listID="FLUX_SALES_TYPE"]') doc = _find(sales_report, "./ram:IncludedSalesDocument") if doc is not None: try: ( sales_data, vessel_data, trip_number, ) = parse_sales_document(doc) except Exception: logging.error( "Could not parse sales document. This report will be skipped." ) continue else: sales_data = {} vessel_data = {} trip_number = None message_data.append( { **operation_data, **report_data, "sales_type": sales_type, "trip_number": trip_number, **sales_data, **vessel_data, } ) return operation_number, message_data