shiftregister/shiftregister/importer/importer.py

114 lines
3.8 KiB
Python

from datetime import timezone
import requests
from django.conf import settings
from django.db import transaction
from icalendar import Calendar
from .models import Event, Room, Shift
def import_calendar(calendar):
try:
r = requests.get(calendar.url)
r.raise_for_status()
except:
if settings.DEBUG:
raise
return False
if not r.headers["content-type"].startswith("text/calendar"):
return False
try:
cal = Calendar.from_ical(r.text)
rooms = {}
events = {}
for event in cal.walk("vevent"):
uid = event.decoded("uid").decode()
summary = (event.decoded("summary", None) or b"").decode()
description = (event.decoded("description", None) or b"").decode()
start = event.decoded("dtstart").astimezone(timezone.utc)
end = event.decoded("dtend").astimezone(timezone.utc)
location = event.decoded("location", None)
if not summary:
continue
if location is not None:
location = location.decode()
if location:
room = location
try:
try:
# new edge case: location is defined, but summary contains an addition to the location and a count
room2, required_helpers = tuple(summary.rsplit(maxsplit=1))
required_helpers = int(required_helpers)
room = f"{room} ({room2})"
except ValueError:
required_helpers = int(summary)
except ValueError:
required_helpers = 0
else:
try:
room, required_helpers = tuple(summary.rsplit(maxsplit=1))
required_helpers = int(required_helpers)
except ValueError:
room = summary
required_helpers = 0
if not uid or not room:
return False
rooms[room] = None
events[uid] = (
room,
{
"start_at": start,
"duration": end - start,
"required_helpers": required_helpers,
"description": description,
"uuid": uid,
"calendar": calendar,
},
)
with transaction.atomic():
for r in Room.objects.filter(name__in=rooms):
rooms[r.name] = r
for room, r in rooms.items():
if r == None:
rooms[room] = Room(
name=room, required_helpers=0
) # required_helpers=0 ensures a shift in a new room is not displayed unless the correct number of required helpers is set or the shift itself specifies it
rooms[room].save()
for e in Event.objects.filter(calendar=calendar, uuid__in=events):
uuid = str(e.uuid)
room, event = events[uuid]
e.room = rooms[room]
e.start_at = event["start_at"]
e.duration = event["duration"]
e.required_helpers = event["required_helpers"]
e.description = event["description"]
e.save()
events[uuid] = (room, e)
for event in events:
room, e = events[event]
if not isinstance(e, Event):
Event(room=rooms[room], **e).save()
for event in Event.objects.filter(calendar=calendar).exclude(
uuid__in=events
):
event.deleted = True
event.save()
except:
if settings.DEBUG:
raise
return False
return True