pipeline.src.flows.distribute_pnos ================================== .. py:module:: pipeline.src.flows.distribute_pnos Functions --------- .. autoapisummary:: pipeline.src.flows.distribute_pnos.extract_species_names pipeline.src.flows.distribute_pnos.extract_fishing_gear_names pipeline.src.flows.distribute_pnos.extract_pnos_to_generate pipeline.src.flows.distribute_pnos.extract_facade_email_addresses pipeline.src.flows.distribute_pnos.extract_pno_extra_subscriptions pipeline.src.flows.distribute_pnos.make_prior_notification_attachments_query pipeline.src.flows.distribute_pnos.execute_prior_notification_attachments_query pipeline.src.flows.distribute_pnos.to_pnos_to_render pipeline.src.flows.distribute_pnos.pre_render_pno pipeline.src.flows.distribute_pnos.get_html_for_pdf_template pipeline.src.flows.distribute_pnos.get_email_body_template pipeline.src.flows.distribute_pnos.get_sms_template pipeline.src.flows.distribute_pnos.render_pno pipeline.src.flows.distribute_pnos.load_pno_pdf_documents pipeline.src.flows.distribute_pnos.attribute_addressees pipeline.src.flows.distribute_pnos.create_email pipeline.src.flows.distribute_pnos.create_sms pipeline.src.flows.distribute_pnos.send_pno_message pipeline.src.flows.distribute_pnos.load_prior_notification_sent_messages pipeline.src.flows.distribute_pnos.make_update_logbook_reports_statement pipeline.src.flows.distribute_pnos.make_manual_prior_notifications_statement pipeline.src.flows.distribute_pnos.distribute_pnos_flow pipeline.src.flows.distribute_pnos.is_prior_notification_zero Module Contents --------------- .. py:function:: extract_species_names() -> dict Returns `dict` with species code as key and species name as value .. py:function:: extract_fishing_gear_names() -> dict Returns `dict` with fishing gear code as key and fishing gear name as value .. py:function:: extract_pnos_to_generate(start_datetime_utc: datetime.datetime, end_datetime_utc: datetime.datetime) -> Tuple[pandas.DataFrame, bool] .. py:function:: extract_facade_email_addresses() -> dict .. py:function:: extract_pno_extra_subscriptions() -> pandas.DataFrame .. py:function:: make_prior_notification_attachments_query(pnos: List[src.entities.pnos.RenderedPno], prior_notification_uploads_table: sqlalchemy.Table) -> sqlalchemy.Select | None .. py:function:: execute_prior_notification_attachments_query(query: sqlalchemy.Select | None) -> dict .. py:function:: to_pnos_to_render(pnos: pandas.DataFrame) -> List[src.entities.pnos.PnoToRender] .. py:function:: pre_render_pno(pno: src.entities.pnos.PnoToRender, species_names: dict, fishing_gear_names: dict) -> src.entities.pnos.PreRenderedPno .. py:function:: get_html_for_pdf_template() -> jinja2.Template .. py:function:: get_email_body_template() -> jinja2.Template .. py:function:: get_sms_template() -> jinja2.Template .. py:function:: render_pno(pno: src.entities.pnos.PreRenderedPno, html_for_pdf_template: jinja2.Template, email_body_template: jinja2.Template, sms_template: jinja2.Template) -> src.entities.pnos.RenderedPno .. py:function:: load_pno_pdf_documents(pno_pdf_documents: List[src.entities.pnos.RenderedPno]) -> Tuple[List[src.entities.pnos.RenderedPno], bool] Loads input pno_pdf_documents to `prior_notification_pdf_documents` and returns the subset of the input that must be distributed, i.e. the list of `RenderedPno` on which `is_being_sent` is `True`. :param pno_pdf_documents: RenderedPnos to load :type pno_pdf_documents: List[RenderedPno] :returns: subset of input having `is_being_sent` equal to `True`. :rtype: List[RenderedPno] .. py:function:: attribute_addressees(pno_to_distribute: src.entities.pnos.RenderedPno, units_targeting_vessels: pandas.DataFrame, units_ports_and_segments_subscriptions: pandas.DataFrame, pno_extra_subscriptions: pandas.DataFrame, control_units: List[src.entities.control_units.ControlUnit]) -> src.entities.pnos.RenderedPno Returns a copy of the input `RenderedPno`'s with its `control_units`, and `addtionnal_addressees` attributes updated, representing control units and other addressees that should receive the PNO. The control units attributed to the PNO are : - control units who target the vessel - control units who subscribed to the port with the "receive all pnos" option - Plus : - control units who subscribed to the port, if the PNO is in verification scope - control units who subscribed to the port AND to a segment of the PNO if the PNO is not in verification scope The addtionnal addressees attributed to the PNO are those in `pno_extra_subscriptions` who subscribed to the port and type of the PNO. :param pno_to_distribute: RenderedPno :type pno_to_distribute: RenderedPno :param units_targeting_vessels: DataFrame with columns `control_unit_ids_targeting_vessel` and `vessel_id` :type units_targeting_vessels: pd.DataFrame :param units_ports_and_segments_subscriptions: DataFrame with columns `control_unit_id`, `port_locode`, `receive_all_pnos_from_port`, and `unit_subscribed_segments` facade email addresses as values. The email address of the corresponding facade will be added to addressees in PNO emails. :type units_ports_and_segments_subscriptions: pd.DataFrame :param pno_extra_subscriptions: DataFrame with columns `pno_type_name`, `port_locode`, `recipient_name`, `recipient_organization`, `communication_means` and `recipient_email_address_or_number`. :type pno_extra_subscriptions: pd.DataFrame :param control_units: List of all active `ControlUnits` :type control_units: List[ControlUnit] :returns: copy of the input `pno_to_distribute` with addressees added :rtype: RenderedPno .. py:function:: create_email(pno: src.entities.pnos.RenderedPno, uploaded_attachments: dict, facade_email_addresses: dict, test_mode: bool) -> src.entities.pnos.PnoToSend .. py:function:: create_sms(pno: src.entities.pnos.RenderedPno, test_mode: bool) -> src.entities.pnos.PnoToSend .. py:function:: send_pno_message(pno_to_send: src.entities.pnos.PnoToSend, is_integration: bool) -> List[src.entities.pnos.PriorNotificationSentMessage] .. py:function:: load_prior_notification_sent_messages(prior_notification_sent_messages: List[src.entities.pnos.PriorNotificationSentMessage]) .. py:function:: make_update_logbook_reports_statement(pnos_to_update: List[src.entities.pnos.RenderedPno], sent_messages: List[src.entities.pnos.PriorNotificationSentMessage], start_datetime_utc: datetime.datetime, end_datetime_utc: datetime.datetime) -> sqlalchemy.Executable | None Creates slqalchemy update statement to update `isBeingSent` and `isSent` fields of PNO of source `LOGBOOK` in table `logbook_reports`. :param pnos_to_update: PNOs to update :type pnos_to_update: List[RenderedPno] :param sent_messages: PNOs that were sent :type sent_messages: List[PriorNotificationSentMessage] :param start_datetime_utc: start date :type start_datetime_utc: datetime :param end_datetime_utc: end date :type end_datetime_utc: datetime :raises prefect.engine.signals.SKIP: If no PNOs with source = 'LOGBOOK' is in the input list :returns: SQLAlchemy Update Statement :rtype: Executable .. py:function:: make_manual_prior_notifications_statement(pnos_to_update: List[src.entities.pnos.RenderedPno], sent_messages: List[src.entities.pnos.PriorNotificationSentMessage]) -> sqlalchemy.Executable | None Creates slqalchemy update statement to update `isBeingSent` and `isSent` fields of PNO of source `MANUAL` in table `manual_prior_notifications`. :param pnos_to_update: PNOs to update :type pnos_to_update: List[RenderedPno] :raises prefect.engine.signals.SKIP: If no PNOs with source = 'MANUAL' is in the input list :returns: SQLAlchemy Update Statement :rtype: Executable .. py:function:: distribute_pnos_flow(test_mode: bool, is_integration: bool, start_hours_ago: int | float, end_hours_ago: int | float) .. py:function:: is_prior_notification_zero(pno: src.entities.pnos.PnoToRender) -> bool