shiftregister/shiftregister/fallback/models.py

208 lines
8.3 KiB
Python

import math
import secrets
from base64 import urlsafe_b64encode
from datetime import datetime, time
from random import random
import sentry_sdk
from django.db.models import Count, Exists, ExpressionWrapper, Max, OuterRef, Sum
from django.db.models.fields import DateTimeField
from django.db.models.lookups import LessThan
from django.utils import timezone
from shiftregister.importer.models import *
def generate_id():
return int.from_bytes(secrets.token_bytes(3), byteorder="big")
class TeamMember(models.Model):
id = models.IntegerField(default=generate_id, editable=False, primary_key=True)
name = models.CharField(max_length=100)
comment = models.CharField(max_length=100, default="")
fallback_shifts = models.ManyToManyField(Shift, through="FallbackAssignment")
def url(self):
return "https://helfen.kntkt.de" + reverse(
"my_fallback_shifts",
kwargs={
"team_member_id": urlsafe_b64encode(
self.id.to_bytes(3, byteorder="big")
).decode("utf-8")
},
)
def assign_random_shifts(self):
needs_fallback = Q(deleted=False, calendar__needs_fallback=True)
current_tz = timezone.get_current_timezone()
# create a datetime combining the last date having fallback shifts
# after 20:00 with the time 20:00 in the current timezone
last_night = datetime.combine(
Event.objects.filter(needs_fallback, start_at__hour__gte=20)
.latest("start_at")
.start_at.astimezone(current_tz),
time(hour=20),
current_tz,
)
is_last_night = Q(start_at__gte=last_night)
is_night_shift = Q(start_at__hour__gte=21) | Q(start_at__hour__lte=10)
if self.fallback_shifts.count() != 0:
return
day_shifts = ~is_night_shift & ~is_last_night & needs_fallback
night_shifts = is_night_shift & ~is_last_night & needs_fallback
shit_shifts = is_last_night & needs_fallback
self._assign_from_bucket(day_shifts)
self._assign_from_bucket(night_shifts)
self._assign_from_bucket(shit_shifts)
def _assign_from_bucket(self, bucket_selector):
bucket = Event.objects.filter(bucket_selector).annotate(
fallback_count=Count("fallbackassignment"),
real_required_helpers=Case(
When(required_helpers=0, then=F("room__required_helpers")),
default=F("required_helpers"),
),
)
free_bucket = bucket.filter(fallback_count__lt=F("real_required_helpers"))
total_slot_count = bucket.aggregate(sum=Sum("real_required_helpers"))["sum"]
free_slot_count = free_bucket.annotate(
needed_helpers=F("real_required_helpers") - F("fallback_count")
).aggregate(sum=Sum("needed_helpers"))["sum"]
quota = global_preferences["helper__fallback_quota"]
number_of_team_members = TeamMember.objects.count()
max_shifts_per_member = total_slot_count / max(
number_of_team_members * quota, 1
)
active_team_members = (
TeamMember.objects.filter(~Q(fallback_shifts=None)).count() + 1
)
shifts_per_member = total_slot_count / active_team_members
extra_chance, shift_count = math.modf(
min(max_shifts_per_member, shifts_per_member)
)
shift_count = int(shift_count)
if extra_chance > random():
shift_count += 1
blocked_times = []
for shift in self.fallback_shifts.all():
blocked_times.append(
Q(start_at__gte=(shift.start_at + shift.duration))
| Q(end_at__lte=shift.start_at)
)
# easy part: enough free shifts for everyone:
assigned_shift_count = 0
for _ in range(shift_count):
shift = (
free_bucket.annotate(
end_at=ExpressionWrapper(
F("start_at") + F("duration"),
output_field=models.DateTimeField(),
)
)
.filter(*blocked_times)
.order_by("?")
.first()
)
if not shift:
break
self.fallback_shifts.add(shift)
assigned_shift_count += 1
blocked_times.append(
Q(start_at__gte=(shift.start_at + shift.duration))
| Q(end_at__lte=shift.start_at)
)
# there is a chance that even if quota*teammembers team members are activated, there are still unassigned shifts
# this happens if there are shifts with multiple people left, as we cannot assign multiple slots for
# the same shift to one member.
# for now we will just reduce the quota a bit to calculate for these cases.
shifts_needed = shift_count - assigned_shift_count
# this is not done very often so we can do this kinda inefficient but readable and maintainable:
# for each missing shift, get the team member who has the most shifts in our bucket and take one of them at random
# but also take care to not take any slots for shifts we already have...
while shifts_needed > 0:
# this is a bit more complex and uses subqueries and id lists because we want to reuse the query selector for events.
# maybe there is a good way to transform q-expressions to add prefixes to fields but for now this has to work
candidate_shift_ids = Event.objects.filter(bucket_selector).values(
"shift_ptr_id"
)
relevant_assignments = FallbackAssignment.objects.filter(
shift_id__in=candidate_shift_ids
)
# get teammembers sorted by the most shifts in the relevant bucket
sorted_members = (
TeamMember.objects.annotate(
relevant_fallback_count=Count(
"fallback_shifts",
distinct=True,
filter=Q(fallback_shifts__id__in=candidate_shift_ids),
),
overall_fallback_count=Count("fallback_shifts"),
)
.exclude(pk=self.pk)
.order_by("-relevant_fallback_count", "-overall_fallback_count", "?")
)
assignment = False
for member in sorted_members:
# now get all their assignments in the relevant bucket but exclude the ones where we already have a slot in the same shift...
assignment = (
FallbackAssignment.objects.annotate(
start_at=F("shift__start_at"),
end_at=ExpressionWrapper(
F("shift__start_at") + F("shift__duration"),
output_field=models.DateTimeField(),
),
)
.filter(team_member_id=member.id, shift_id__in=candidate_shift_ids)
.exclude(
shift_id__in=FallbackAssignment.objects.filter(
team_member_id=self.pk
).values("shift_id")
)
.filter(*blocked_times)
.order_by("?")
.first()
)
if assignment:
break
if not assignment:
print("could not find any matching assignments to take away")
sentry_sdk.capture_message(
"could not find any matching assignments to take away"
)
return
shifts_needed -= 1
blocked_times.append(
Q(start_at__gte=(assignment.shift.start_at + assignment.shift.duration))
| Q(end_at__lte=assignment.shift.start_at)
)
self.fallback_shifts.add(assignment.shift)
assignment.delete()
def __str__(self):
return f"{self.name}{f': {self.comment}' if self.comment else ''}"
class FallbackAssignment(models.Model):
shift = models.ForeignKey(Shift, on_delete=models.CASCADE)
team_member = models.ForeignKey(TeamMember, on_delete=models.CASCADE)
was_full = models.BooleanField(default=False)
def __str__(self):
return f"{self.shift} {self.team_member.name}"