Source code for pipeline.src.entities.pnos

from dataclasses import dataclass, field
from datetime import datetime
from email.message import EmailMessage
from enum import Enum
from typing import List, Optional

import pandas as pd

from src.entities.communication_means import CommunicationMeans
from src.entities.control_units import ControlUnit
from src.entities.fleet_segments import FishingGear, FleetSegment
from src.entities.missions import Infraction


@dataclass(kw_only=True)
[docs] class PnoCatch:
[docs] species_code: str
[docs] species_name: str
[docs] species_name_code: str = field(init=False)
[docs] weight: float
[docs] number_of_fish: int
[docs] fao_area: str
[docs] statistical_rectangle: str
[docs] def __post_init__(self): self.species_name_code = f"{self.species_name or '-'} ({self.species_code})"
[docs] class PnoSource(Enum):
[docs] MANUAL = "MANUAL"
[docs] LOGBOOK = "LOGBOOK"
@dataclass(kw_only=True)
[docs] class PnoToRender:
[docs] id: int
[docs] operation_datetime_utc: datetime
[docs] report_id: str
[docs] report_datetime_utc: datetime
[docs] vessel_id: str
[docs] cfr: str
[docs] ircs: str
[docs] external_identification: str
[docs] vessel_name: str
[docs] flag_state: str
[docs] purpose: str
[docs] catch_onboard: List[dict]
[docs] port_locode: str
[docs] port_name: str
[docs] facade: str
[docs] predicted_arrival_datetime_utc: datetime
[docs] predicted_landing_datetime_utc: datetime
[docs] trip_gears: List[dict]
[docs] trip_segments: List[dict]
[docs] pno_types: List[dict]
[docs] note: str
[docs] vessel_length: float
[docs] mmsi: str
[docs] risk_factor: float
[docs] last_control_datetime_utc: datetime
[docs] last_control_infractions: List[dict]
[docs] is_verified: bool
[docs] is_being_sent: bool
[docs] source: PnoSource
[docs] is_correction: bool
[docs] previous_notification_date_utc: datetime | None
[docs] def __post_init__(self): datetime_attrs = [ "operation_datetime_utc", "report_datetime_utc", "predicted_arrival_datetime_utc", "predicted_landing_datetime_utc", "last_control_datetime_utc", "previous_notification_date_utc", ] for att in datetime_attrs: if isinstance(getattr(self, att), pd.Timedelta): setattr(self, att, getattr(self, att).to_pydatetime()) # float and datetime nulls are represented as np.nan and pd.NaT in pandas, which we normalize to None nullables_to_correct = [ "operation_datetime_utc", "report_datetime_utc", "predicted_arrival_datetime_utc", "predicted_landing_datetime_utc", "vessel_length", "risk_factor", "last_control_datetime_utc", "previous_notification_date_utc", ] for att in nullables_to_correct: if pd.isna(getattr(self, att)): setattr(self, att, None) if not isinstance(self.source, PnoSource): assert isinstance(self.source, str) self.source = PnoSource(self.source)
@dataclass(kw_only=True)
[docs] class PreRenderedPno:
[docs] id: int
[docs] operation_datetime_utc: datetime
[docs] report_id: str
[docs] report_datetime_utc: datetime
[docs] vessel_id: str
[docs] cfr: str
[docs] ircs: str
[docs] external_identification: str
[docs] vessel_name: str
[docs] flag_state: str
[docs] purpose: str
[docs] catch_onboard: pd.DataFrame
[docs] bft_summary: str
[docs] port_locode: str
[docs] port_name: str
[docs] facade: str
[docs] predicted_arrival_datetime_utc: datetime
[docs] predicted_landing_datetime_utc: datetime
[docs] trip_gears: List[FishingGear]
[docs] trip_segments: List[FleetSegment]
[docs] pno_types: List[str]
[docs] note: str
[docs] vessel_length: float
[docs] mmsi: str
[docs] risk_factor: float
[docs] last_control_datetime_utc: datetime
[docs] last_control_infractions: List[Infraction]
[docs] is_verified: bool
[docs] is_being_sent: bool
[docs] is_landing: bool
[docs] source: PnoSource
[docs] purpose_suffix: str
[docs] is_zero: bool
[docs] is_correction: bool
[docs] previous_notification_date_utc: datetime | None
@staticmethod
[docs] def assert_equal(left: object, right: object): if not isinstance(left, PreRenderedPno): raise AssertionError("`left` is not a `PreRenderedPno`") if not isinstance(right, PreRenderedPno): raise AssertionError("`right` is not a `PreRenderedPno`") if not (left.catch_onboard is None and right.catch_onboard is None): try: pd.testing.assert_frame_equal(left.catch_onboard, right.catch_onboard) except AssertionError as e: raise AssertionError( ( "`left` and `right` are not equal. Their `catch_onboard` " f"attributes are different : {str(e)}" ) ) attributes_to_check = [k for k in left.__dict__.keys() if k != "catch_onboard"] for attr in attributes_to_check: if getattr(left, attr) != getattr(right, attr): if not (pd.isna(getattr(left, attr)) and pd.isna(getattr(right, attr))): raise AssertionError( ( f"`self` and `other` are not equal. Their `{attr}` " "attributes are different : " f"{getattr(left, attr)} != {getattr(right, attr)}" ) )
[docs] class ReturnToPortPurpose(Enum):
[docs] SHE = "SHE"
[docs] OTH = "OTH"
[docs] LAN = "LAN"
[docs] REF = "REF"
[docs] REP = "REP"
[docs] RES = "RES"
[docs] ECY = "ECY"
[docs] TRA = "TRA"
[docs] SCR = "SCR"
[docs] GRD = "GRD"
[docs] ACS = "ACS"
[docs] def label(self): labels = { "SHE": "Mise à l’abri", "OTH": "Autre", "LAN": "Débarquement", "REF": "Ravitaillement", "REP": "Réparation", "RES": "Repos", "ECY": "Urgence", "TRA": "Transbordement", "SCR": "Retour pour Recherche Scientifique", "GRD": "Immobilisation et convocation par les autorités", "ACS": "Accès aux services", } return labels[self.name]
@dataclass
[docs] class PnoAddressee:
[docs] name: str
[docs] organization: str
[docs] communication_means: CommunicationMeans
[docs] email_address_or_number: str
@dataclass
[docs] class RenderedPno:
[docs] report_id: str
[docs] vessel_id: int
[docs] cfr: str
[docs] vessel_name: str
[docs] is_verified: bool
[docs] is_being_sent: bool
[docs] trip_segments: list
[docs] pno_types: List[str]
[docs] port_locode: str
[docs] facade: str
[docs] source: PnoSource
[docs] purpose_suffix: str
[docs] html_for_pdf: str | None = None
[docs] pdf_document: bytes | None = None
[docs] generation_datetime_utc: datetime | None = None
[docs] html_email_body: str | None = None
[docs] sms_content: str | None = None
[docs] control_units: List[ControlUnit] | None = None
[docs] additional_addressees: List[PnoAddressee] = None
[docs] is_zero: bool = False
[docs] is_correction: bool = False
[docs] previous_notification_date_utc: datetime | None = None
[docs] def get_addressees( self, communication_means: CommunicationMeans ) -> List[PnoAddressee]: addressees = [] if self.control_units: if communication_means == CommunicationMeans.EMAIL: addressees += [ PnoAddressee( name=control_unit.control_unit_name, organization=control_unit.administration, communication_means=CommunicationMeans.EMAIL, email_address_or_number=email, ) for control_unit in self.control_units for email in control_unit.emails ] elif communication_means == CommunicationMeans.SMS: addressees += [ PnoAddressee( name=control_unit.control_unit_name, organization=control_unit.administration, communication_means=CommunicationMeans.SMS, email_address_or_number=phone_number, ) for control_unit in self.control_units for phone_number in control_unit.phone_numbers ] else: raise ValueError( f"Unexpected communication_means {communication_means}" ) if self.additional_addressees: addressees += [ PnoAddressee( name=add.name, organization=add.organization, communication_means=communication_means, email_address_or_number=add.email_address_or_number, ) for add in self.additional_addressees if add.communication_means is communication_means ] return addressees
@dataclass
[docs] class PnoToSend:
[docs] pno: RenderedPno
[docs] message: EmailMessage
[docs] communication_means: CommunicationMeans
@dataclass
[docs] class PriorNotificationSentMessage:
[docs] prior_notification_report_id: str
[docs] prior_notification_source: PnoSource
[docs] date_time_utc: datetime
[docs] communication_means: CommunicationMeans
[docs] recipient_address_or_number: str
[docs] success: bool
[docs] recipient_name: str
[docs] recipient_organization: str
[docs] error_message: Optional[str] = None