64 lines
2.0 KiB
Python
64 lines
2.0 KiB
Python
from django.conf import settings
|
|
from django.contrib.auth.models import Permission
|
|
from django.db import transaction
|
|
from django.db.models import CharField
|
|
from django.db.models import Value as V
|
|
from django.db.models.functions import Concat
|
|
from mozilla_django_oidc.auth import (
|
|
OIDCAuthenticationBackend as BaseOIDCAuthenticationBackend,
|
|
)
|
|
|
|
from .models import OIDCUser
|
|
|
|
|
|
def get_permissions(claims):
|
|
roles = claims.get("resource_access")
|
|
if roles is None or settings.OIDC_RP_CLIENT_ID not in roles:
|
|
return Permission.objects.none()
|
|
|
|
return Permission.objects.annotate(
|
|
fullname=Concat(
|
|
"content_type__app_label", V("."), "codename", output_field=CharField()
|
|
)
|
|
).filter(
|
|
fullname__in=claims.get("resource_access")[settings.OIDC_RP_CLIENT_ID]["roles"]
|
|
)
|
|
|
|
|
|
class OIDCAuthenticationBackend(BaseOIDCAuthenticationBackend):
|
|
@transaction.atomic
|
|
def create_user(self, claims):
|
|
user = self.UserModel.objects.create_user(
|
|
claims.get("preferred_username"), claims.get("email")
|
|
)
|
|
user.first_name = claims.get("given_name", "")
|
|
user.last_name = claims.get("family_name", "")
|
|
user.user_permissions.set(get_permissions(claims))
|
|
user.save()
|
|
|
|
OIDCUser.objects.create(uuid=claims.get("sub"), user=user)
|
|
|
|
return user
|
|
|
|
def update_user(self, user, claims):
|
|
user.email = claims.get("email") or user.email
|
|
user.user_permissions.set(get_permissions(claims))
|
|
user.save()
|
|
|
|
return user
|
|
|
|
def filter_users_by_claims(self, claims):
|
|
uuid = claims.get("sub")
|
|
if not uuid:
|
|
return self.UserModel.objects.none()
|
|
|
|
try:
|
|
oidc_user = OIDCUser.objects.get(uuid=uuid)
|
|
return [oidc_user.user]
|
|
except OIDCUser.DoesNotExist:
|
|
return self.UserModel.objects.none()
|
|
|
|
def verify_claims(self, claims):
|
|
roles = claims.get("resource_access")
|
|
return roles is not None and settings.OIDC_RP_CLIENT_ID in roles
|