56 lines
1.4 KiB
Python
56 lines
1.4 KiB
Python
from datetime import timezone
|
|
from django.conf import settings
|
|
import requests
|
|
|
|
|
|
BASE_URL = "https://api.sipgate.com/v2"
|
|
|
|
|
|
class SMS:
|
|
def __init__(self, item):
|
|
self.content = item["smsContent"]
|
|
self.created_at = item["created"]
|
|
self.id = item["id"]
|
|
self.sender = item["source"]
|
|
|
|
|
|
def list_incoming_sms(from_date=None):
|
|
if not settings.SIPGATE_INCOMING_TOKEN_ID:
|
|
raise RuntimeError("required setting SIPGATE_INCOMING_TOKEN_ID is not set")
|
|
|
|
if not settings.SIPGATE_INCOMING_TOKEN:
|
|
raise RuntimeError("required setting SIPGATE_INCOMING_TOKEN is not set")
|
|
|
|
limit = 10
|
|
params = {
|
|
"directions": "INCOMING",
|
|
"limit": limit,
|
|
"types": "SMS",
|
|
}
|
|
|
|
if from_date is not None:
|
|
params["from"] = (
|
|
from_dt.astimezone(timezone.utc).isoformat(timespec="seconds") + "Z"
|
|
)
|
|
|
|
items = []
|
|
offset = 0
|
|
total = 10
|
|
while offset < total:
|
|
r = requests.get(
|
|
f"{BASE_URL}/history",
|
|
auth=requests.auth.HTTPBasicAuth(
|
|
settings.SIPGATE_INCOMING_TOKEN_ID, settings.SIPGATE_INCOMING_TOKEN
|
|
),
|
|
params=params | {"offset": offset},
|
|
)
|
|
r.raise_for_status()
|
|
|
|
data = r.json()
|
|
|
|
items += data["items"]
|
|
offset += limit
|
|
total = data["totalCount"]
|
|
|
|
return list(map(lambda item: SMS(item), items))
|