shiftregister/shiftregister/importer/importer.py

125 lines
4.2 KiB
Python

from datetime import timezone
import requests
from django.conf import settings
from django.db import transaction
from dynamic_preferences.registries import global_preferences_registry
from icalendar import Calendar
from .models import Event, Room, Shift
global_preferences = global_preferences_registry.manager()
def import_calendar(calendar):
try:
r = requests.get(calendar.url)
r.raise_for_status()
except:
if settings.DEBUG:
raise
return False
if not r.headers["content-type"].startswith("text/calendar"):
return False
not_before = global_preferences["import__not_before"]
try:
cal = Calendar.from_ical(r.text)
rooms = {}
events = {}
for event in cal.walk("vevent"):
uid = event.decoded("uid").decode()
summary = (event.decoded("summary", None) or b"").decode()
description = (event.decoded("description", None) or b"").decode()
start = event.decoded("dtstart").astimezone(timezone.utc)
end = event.decoded("dtend").astimezone(timezone.utc)
location = event.decoded("location", None)
if not summary or end.date() < not_before:
continue
if location is not None:
location = location.decode().strip()
if location:
room = location
try:
try:
# new edge case: location is defined, but summary contains an addition to the location and a count
room2, required_helpers = tuple(summary.rsplit(maxsplit=1))
room2 = room2.strip()
required_helpers = int(required_helpers)
if room != room2:
room = f"{room2} ({room})"
except ValueError:
required_helpers = int(summary)
except ValueError:
required_helpers = 0
if room != summary:
room = f"{summary} ({room})"
else:
try:
room, required_helpers = tuple(summary.rsplit(maxsplit=1))
required_helpers = int(required_helpers)
except ValueError:
room = summary
required_helpers = 0
if not uid or not room:
return False
rooms[room] = None
events[uid] = (
room,
{
"start_at": start,
"duration": end - start,
"required_helpers": required_helpers,
"description": description,
"uuid": uid,
"calendar": calendar,
},
)
with transaction.atomic():
for r in Room.objects.filter(name__in=rooms):
rooms[r.name] = r
for room, r in rooms.items():
if r == None:
rooms[room] = Room(
name=room, required_helpers=0
) # required_helpers=0 ensures a shift in a new room is not displayed unless the correct number of required helpers is set or the shift itself specifies it
rooms[room].save()
for e in Event.objects.filter(calendar=calendar, uuid__in=events):
uuid = str(e.uuid)
room, event = events[uuid]
e.room = rooms[room]
e.start_at = event["start_at"]
e.duration = event["duration"]
e.required_helpers = event["required_helpers"]
e.description = event["description"]
e.save()
events[uuid] = (room, e)
for event in events:
room, e = events[event]
if not isinstance(e, Event):
Event(room=rooms[room], **e).save()
for event in Event.objects.filter(calendar=calendar).exclude(
uuid__in=events
):
event.deleted = True
event.save()
except:
if settings.DEBUG:
raise
return False
return True