shiftregister/shiftregister/messaging/backends/sipgate.py

94 lines
2.6 KiB
Python

from datetime import datetime, timezone
import requests
from django.conf import settings
from requests.auth import HTTPBasicAuth
from ..message import Message, MessageType
from .abc import Receiver as BaseReceiver
from .abc import Sender as BaseSender
__all__ = ("Receiver", "Sender")
BASE_URL = "https://api.sipgate.com/v2"
class Receiver(BaseReceiver):
def __init__(self):
self.auth = HTTPBasicAuth(
settings.SMS_SETTINGS.get(
"sipgate_incoming_token_id", settings.SIPGATE_INCOMING_TOKEN_ID
),
settings.SMS_SETTINGS.get(
"sipgate_incoming_token", settings.SIPGATE_INCOMING_TOKEN
),
)
self.from_dt = None
def fetch(self):
limit = 10
params = {
"directions": "INCOMING",
"limit": limit,
"types": "SMS",
}
if self.from_dt is not None:
params["from"] = (
from_dt.astimezone(timezone.utc)
.isoformat(timespec="seconds")
.replace("+00:00", "Z")
)
offset = 0
total = 10
while offset < total:
r = requests.get(
f"{BASE_URL}/history",
auth=self.auth,
params=params | {"offset": offset},
)
r.raise_for_status()
data = r.json()
for item in data["items"]:
created_at = datetime.fromisoformat(item["created"])
self.from_dt = max(self.from_dt, created_at)
yield Message(
item["id"],
sender=item["source"],
text=item["smsContent"],
type=MessageType.INBOUND,
created_at=created_at,
)
offset += limit
total = data["totalCount"]
handle = None
class Sender(BaseSender):
def __init__(self):
self.auth = HTTPBasicAuth(
settings.SMS_SETTINGS.get("sipgate_token_id", settings.SIPGATE_TOKEN_ID),
settings.SMS_SETTINGS.get("sipgate_token", settings.SIPGATE_TOKEN),
)
def send(self, messages):
for message in messages:
r = requests.post(
f"{BASE_URL}/sessions/sms",
auth=self.auth,
json={
"smsId": settings.SMS_SETTINGS.get(
"sipgate_sms_extension", settings.SIPGATE_SMS_EXTENSION
),
"recipient": message.recipient,
"message": message.text,
},
)
r.raise_for_status()