mirror of
https://github.com/mediacms-io/mediacms.git
synced 2025-07-25 15:03:31 +00:00
154 lines
5.6 KiB
Python
154 lines
5.6 KiB
Python
import base64
|
|
import logging
|
|
|
|
from allauth.socialaccount.adapter import DefaultSocialAccountAdapter
|
|
from allauth.socialaccount.models import SocialApp
|
|
from allauth.socialaccount.signals import social_account_updated
|
|
from django.core.files.base import ContentFile
|
|
from django.dispatch import receiver
|
|
|
|
from identity_providers.models import IdentityProviderUserLog
|
|
from rbac.models import RBACGroup, RBACMembership
|
|
|
|
|
|
class SAMLAccountAdapter(DefaultSocialAccountAdapter):
|
|
def is_open_for_signup(self, request, socialaccount):
|
|
return True
|
|
|
|
def pre_social_login(self, request, sociallogin):
|
|
# data = sociallogin.data
|
|
|
|
return super().pre_social_login(request, sociallogin)
|
|
|
|
def populate_user(self, request, sociallogin, data):
|
|
user = sociallogin.user
|
|
user.username = sociallogin.account.uid
|
|
for item in ["name", "first_name", "last_name"]:
|
|
if data.get(item):
|
|
setattr(user, item, data[item])
|
|
sociallogin.data = data
|
|
# User is not retrieved through DB. Id is None.
|
|
|
|
return user
|
|
|
|
def save_user(self, request, sociallogin, form=None):
|
|
user = super().save_user(request, sociallogin, form)
|
|
# Runs after new user is created
|
|
perform_user_actions(user, sociallogin.account)
|
|
return user
|
|
|
|
|
|
@receiver(social_account_updated)
|
|
def social_account_updated(sender, request, sociallogin, **kwargs):
|
|
# Runs after existing user is updated
|
|
user = sociallogin.user
|
|
# data is there due to populate_user
|
|
common_fields = sociallogin.data
|
|
perform_user_actions(user, sociallogin.account, common_fields)
|
|
|
|
|
|
def perform_user_actions(user, social_account, common_fields=None):
|
|
# common_fields is data already mapped to the attributes we want
|
|
if common_fields:
|
|
# check the following fields, if they are updated from the IDP side, update
|
|
# the user object too
|
|
fields_to_update = []
|
|
for item in ["name", "first_name", "last_name", "email"]:
|
|
if common_fields.get(item) and common_fields[item] != getattr(user, item):
|
|
setattr(user, item, common_fields[item])
|
|
fields_to_update.append(item)
|
|
if fields_to_update:
|
|
user.save(update_fields=fields_to_update)
|
|
|
|
# extra_data is the plain response from SAML provider
|
|
|
|
extra_data = social_account.extra_data
|
|
# there's no FK from Social Account to Social App
|
|
social_app = SocialApp.objects.filter(provider_id=social_account.provider).first()
|
|
saml_configuration = None
|
|
if social_app:
|
|
saml_configuration = social_app.saml_configurations.first()
|
|
|
|
add_user_logo(user, extra_data)
|
|
handle_role_mapping(user, extra_data, social_app, saml_configuration)
|
|
if saml_configuration and saml_configuration.save_saml_response_logs:
|
|
handle_saml_logs_save(user, extra_data, social_app)
|
|
|
|
return user
|
|
|
|
|
|
def add_user_logo(user, extra_data):
|
|
try:
|
|
if extra_data.get("jpegPhoto") and user.logo.name in ["userlogos/user.jpg", "", None]:
|
|
base64_string = extra_data.get("jpegPhoto")[0]
|
|
image_data = base64.b64decode(base64_string)
|
|
image_content = ContentFile(image_data)
|
|
user.logo.save('user.jpg', image_content, save=True)
|
|
except Exception as e:
|
|
logging.error(e)
|
|
return True
|
|
|
|
|
|
def handle_role_mapping(user, extra_data, social_app, saml_configuration):
|
|
if not saml_configuration:
|
|
return False
|
|
|
|
rbac_groups = []
|
|
role = "member"
|
|
# get groups key from configuration / attributes mapping
|
|
groups_key = saml_configuration.groups
|
|
groups = extra_data.get(groups_key, [])
|
|
# groups is a list of group_ids here
|
|
|
|
if groups:
|
|
rbac_groups = RBACGroup.objects.filter(identity_provider=social_app, uid__in=groups)
|
|
|
|
try:
|
|
# try to get the role, always use member as fallback
|
|
role_key = saml_configuration.role
|
|
role = extra_data.get(role_key, "student")
|
|
if role and isinstance(role, list):
|
|
role = role[0]
|
|
|
|
# populate global role
|
|
global_role = social_app.global_roles.filter(name=role).first()
|
|
if global_role:
|
|
user.set_role_from_mapping(global_role.map_to)
|
|
|
|
group_role = social_app.group_roles.filter(name=role).first()
|
|
if group_role:
|
|
if group_role.map_to in ['member', 'contributor', 'manager']:
|
|
role = group_role.map_to
|
|
|
|
except Exception as e:
|
|
logging.error(e)
|
|
|
|
role = role if role in ['member', 'contributor', 'manager'] else 'member'
|
|
|
|
for rbac_group in rbac_groups:
|
|
membership = RBACMembership.objects.filter(user=user, rbac_group=rbac_group).first()
|
|
if membership and role != membership.role:
|
|
membership.role = role
|
|
membership.save(update_fields=["role"])
|
|
if not membership:
|
|
try:
|
|
# use role from early above
|
|
membership = RBACMembership.objects.create(user=user, rbac_group=rbac_group, role=role)
|
|
except Exception as e:
|
|
logging.error(e)
|
|
# if remove_from_groups setting is True and user is part of groups for this
|
|
# social app that are not included anymore on the response, then remove user from group
|
|
if saml_configuration.remove_from_groups:
|
|
for group in user.rbac_groups.filter(identity_provider=social_app):
|
|
if group not in rbac_groups:
|
|
group.members.remove(user)
|
|
|
|
return True
|
|
|
|
|
|
def handle_saml_logs_save(user, extra_data, social_app):
|
|
# do not save jpegPhoto, if it exists
|
|
extra_data.pop("jpegPhoto", None)
|
|
log = IdentityProviderUserLog.objects.create(user=user, identity_provider=social_app, logs=extra_data) # noqa
|
|
return True
|