pipeline.src.flows.notify_beacon_malfunctions ============================================= .. py:module:: pipeline.src.flows.notify_beacon_malfunctions Functions --------- .. autoapisummary:: pipeline.src.flows.notify_beacon_malfunctions.extract_malfunctions_to_notify pipeline.src.flows.notify_beacon_malfunctions.to_malfunctions_to_notify_list pipeline.src.flows.notify_beacon_malfunctions.get_templates pipeline.src.flows.notify_beacon_malfunctions.get_sms_templates pipeline.src.flows.notify_beacon_malfunctions.render pipeline.src.flows.notify_beacon_malfunctions.render_sms pipeline.src.flows.notify_beacon_malfunctions.create_email pipeline.src.flows.notify_beacon_malfunctions.create_sms pipeline.src.flows.notify_beacon_malfunctions.create_fax pipeline.src.flows.notify_beacon_malfunctions.send_beacon_malfunction_message pipeline.src.flows.notify_beacon_malfunctions.load_notifications pipeline.src.flows.notify_beacon_malfunctions.make_reset_requested_notifications_statement pipeline.src.flows.notify_beacon_malfunctions.notify_beacon_malfunctions_flow Module Contents --------------- .. py:function:: extract_malfunctions_to_notify() .. py:function:: to_malfunctions_to_notify_list(malfunctions_to_notify: pandas.DataFrame, test_mode: bool) -> List[src.entities.beacon_malfunctions.BeaconMalfunctionToNotify] .. py:function:: get_templates() -> dict .. py:function:: get_sms_templates() -> dict .. py:function:: render(m: src.entities.beacon_malfunctions.BeaconMalfunctionToNotify, templates: dict, output_format: str = 'html') -> Union[str, bytes] .. py:function:: render_sms(m: src.entities.beacon_malfunctions.BeaconMalfunctionToNotify, templates: dict) -> str .. py:function:: create_email(html: str, pdf: bytes, m: src.entities.beacon_malfunctions.BeaconMalfunctionToNotify) -> src.entities.beacon_malfunctions.BeaconMalfunctionMessageToSend .. py:function:: create_sms(text: str | None, m: src.entities.beacon_malfunctions.BeaconMalfunctionToNotify) -> src.entities.beacon_malfunctions.BeaconMalfunctionMessageToSend .. py:function:: create_fax(pdf: bytes, m: src.entities.beacon_malfunctions.BeaconMalfunctionToNotify) -> src.entities.beacon_malfunctions.BeaconMalfunctionMessageToSend .. py:function:: send_beacon_malfunction_message(msg_to_send: src.entities.beacon_malfunctions.BeaconMalfunctionMessageToSend, is_integration: bool) -> List[src.entities.beacon_malfunctions.BeaconMalfunctionNotification] Sends input email using the contents of `From` header as sender and `To`, `Cc` and `Bcc` headers as recipients. :param msg: email message to send :type msg: EmailMessage :param is_integration: if ``False``, the message is not actually sent :type is_integration: bool :returns: Dict of errors returned by the server for each recipient that was refused, with the following form {"three@three.org" : ( 550 ,"User unknown" )} :rtype: dict .. py:function:: load_notifications(notifications: List[src.entities.beacon_malfunctions.BeaconMalfunctionNotification]) .. py:function:: make_reset_requested_notifications_statement(beacon_malfunctions_table: sqlalchemy.Table, notified_malfunctions: List[src.entities.beacon_malfunctions.BeaconMalfunctionToNotify]) -> sqlalchemy.sql.dml.Update .. py:function:: notify_beacon_malfunctions_flow(test_mode: bool, is_integration: bool)