pipeline.src.flows.notify_beacon_malfunctions

Functions

extract_malfunctions_to_notify()

to_malfunctions_to_notify_list(...)

get_templates(→ dict)

get_sms_templates(→ dict)

render(→ Union[str, bytes])

render_sms(→ str)

create_email(...)

create_sms(...)

create_fax(...)

send_beacon_malfunction_message(...)

Sends input email using the contents of From header as sender and To, Cc

load_notifications(notifications)

make_reset_requested_notifications_statement(...)

notify_beacon_malfunctions_flow(test_mode, is_integration)

Module Contents

pipeline.src.flows.notify_beacon_malfunctions.extract_malfunctions_to_notify()[source]
pipeline.src.flows.notify_beacon_malfunctions.to_malfunctions_to_notify_list(malfunctions_to_notify: pandas.DataFrame, test_mode: bool) List[src.entities.beacon_malfunctions.BeaconMalfunctionToNotify][source]
pipeline.src.flows.notify_beacon_malfunctions.get_templates() dict[source]
pipeline.src.flows.notify_beacon_malfunctions.get_sms_templates() dict[source]
pipeline.src.flows.notify_beacon_malfunctions.render(m: src.entities.beacon_malfunctions.BeaconMalfunctionToNotify, templates: dict, output_format: str = 'html') str | bytes[source]
pipeline.src.flows.notify_beacon_malfunctions.render_sms(m: src.entities.beacon_malfunctions.BeaconMalfunctionToNotify, templates: dict) str[source]
pipeline.src.flows.notify_beacon_malfunctions.create_email(html: str, pdf: bytes, m: src.entities.beacon_malfunctions.BeaconMalfunctionToNotify) src.entities.beacon_malfunctions.BeaconMalfunctionMessageToSend[source]
pipeline.src.flows.notify_beacon_malfunctions.create_sms(text: str | None, m: src.entities.beacon_malfunctions.BeaconMalfunctionToNotify) src.entities.beacon_malfunctions.BeaconMalfunctionMessageToSend[source]
pipeline.src.flows.notify_beacon_malfunctions.create_fax(pdf: bytes, m: src.entities.beacon_malfunctions.BeaconMalfunctionToNotify) src.entities.beacon_malfunctions.BeaconMalfunctionMessageToSend[source]
pipeline.src.flows.notify_beacon_malfunctions.send_beacon_malfunction_message(msg_to_send: src.entities.beacon_malfunctions.BeaconMalfunctionMessageToSend, is_integration: bool) List[src.entities.beacon_malfunctions.BeaconMalfunctionNotification][source]

Sends input email using the contents of From header as sender and To, Cc and Bcc headers as recipients.

Parameters:
  • msg (EmailMessage) – email message to send

  • is_integration (bool) – if False, the message is not actually sent

Returns:

Dict of errors returned by the server for each recipient that was refused, with the following form {”three@three.org” : ( 550 ,”User unknown” )}

Return type:

dict

pipeline.src.flows.notify_beacon_malfunctions.load_notifications(notifications: List[src.entities.beacon_malfunctions.BeaconMalfunctionNotification])[source]
pipeline.src.flows.notify_beacon_malfunctions.make_reset_requested_notifications_statement(beacon_malfunctions_table: sqlalchemy.Table, notified_malfunctions: List[src.entities.beacon_malfunctions.BeaconMalfunctionToNotify]) sqlalchemy.sql.dml.Update[source]
pipeline.src.flows.notify_beacon_malfunctions.notify_beacon_malfunctions_flow(test_mode: bool, is_integration: bool)[source]