feat(messaging): add seven.io backend

This commit is contained in:
Luca 2025-03-11 01:45:19 +01:00
parent b8dcb02ef1
commit db3926316c
1 changed files with 108 additions and 0 deletions

View File

@ -0,0 +1,108 @@
from datetime import datetime, timezone
import requests
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from django.utils.timezone import now
from ..exceptions import OutboundMessageError
from ..message import Message, MessageType
from .abc import Receiver as BaseReceiver
from .abc import Sender as BaseSender
__all__ = ("Receiver", "Sender")
BASE_URL = "https://gateway.seven.io/api"
RESPONSE_CODES = {
"100": "Sent",
"101": "Message could not be sent",
"201": "Invalid sender",
"202": "Invalid recipient",
"301": "Missing 'to' parameter",
"305": "Invalid 'text' parameter",
"401": "Parameter 'text' too long",
"402": "Message has already been sent",
"403": "Exceeded daily limit for recipient",
"500": "Insufficient balance",
"600": "Error while sending message",
"900": "Authentication failed",
"901": "Signature verification failed",
"902": "Access denied",
"903": "Request IP is not in allow list",
}
class Receiver(BaseReceiver):
fetch = None
def handle(self, data, webhook_event, webhook_timestamp, **kwargs):
if webhook_event != "sms_mo":
return
yield Message(
data["id"],
recipient=data["system"],
sender=data["sender"],
text=data["text"],
type=MessageType.INBOUND,
created_at=datetime.fromtimestamp(data["time"], timezone.utc),
)
class Sender(BaseSender):
def __init__(self):
for setting in ("sevenio_api_key", "sevenio_sender"):
try:
settings.SMS_SETTINGS[setting]
except KeyError:
raise ImproperlyConfigured(
f"'{setting}' must be set in SMS_SETTINGS for seven.io backend"
)
sender = settings.SMS_SETTINGS["sevenio_sender"]
sender_is_alnum = sender.isalnum()
sender_is_num = sender.isnumeric()
if (
not sender_is_alnum
or len(sender) > 16
or not sender_is_num
and len(sender) > 11
):
raise ImproperlyConfigured("invalid 'sevenio_sender' in SMS_SETTINGS")
self.client = requests.Session()
self.client.headers["Accept"] = "application/json"
self.client.headers["X-Api-Key"] = settings.SMS_SETTINGS["sevenio_api_key"]
self.sender = sender
def send(self, messages):
for message in messages:
data = {
"from": self.sender,
"label": message.key,
"text": message.text,
"to": message.recipient,
}
r = self.client.post(f"{BASE_URL}/sms", data=data)
r.raise_for_status()
resp = r.json()
code = resp["success"]
if code != "100":
raise OutboundMessageError(
RESPONSE_CODES.get(code, f"Unknown error: {code}")
)
for m in resp["messages"]:
if m["success"]:
yield Message(
m["label"],
recipient=m["recipient"],
sender=m["sender"],
text=m["text"],
type=MessageType.OUTBOUND,
created_at=now(),
)