mirror of
https://github.com/mediacms-io/mediacms.git
synced 2025-07-25 15:03:31 +00:00
97 lines
3.8 KiB
Python
97 lines
3.8 KiB
Python
from allauth.socialaccount.models import SocialApp
|
|
from django.conf import settings
|
|
from django.db import models
|
|
from django.db.models.signals import m2m_changed
|
|
from django.dispatch import receiver
|
|
from django.utils.crypto import get_random_string
|
|
|
|
|
|
def generate_uid():
|
|
return get_random_string(length=10)
|
|
|
|
|
|
class RBACGroup(models.Model):
|
|
uid = models.CharField(max_length=255, default=generate_uid, help_text='Unique identifier for the RBAC group (unique per identity provider)')
|
|
name = models.CharField(max_length=100, help_text='MediaCMS Group name')
|
|
description = models.TextField(blank=True)
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
|
updated_at = models.DateTimeField(auto_now=True)
|
|
|
|
# access to members through the membership model
|
|
members = models.ManyToManyField("users.User", through='RBACMembership', through_fields=('rbac_group', 'user'), related_name='rbac_groups')
|
|
|
|
categories = models.ManyToManyField('files.Category', related_name='rbac_groups', blank=True, help_text='Categories this RBAC group has access to')
|
|
|
|
identity_provider = models.ForeignKey(SocialApp, on_delete=models.SET_NULL, null=True, blank=True, related_name='rbac_groups', verbose_name='IDP Config Name')
|
|
|
|
def __str__(self):
|
|
name = f"{self.name}"
|
|
if self.identity_provider:
|
|
name = f"{name} for {self.identity_provider}"
|
|
return name
|
|
|
|
class Meta:
|
|
verbose_name = 'RBAC Group'
|
|
verbose_name_plural = 'RBAC Groups'
|
|
unique_together = [['uid', 'identity_provider'], ['name', 'identity_provider']]
|
|
|
|
|
|
class RBACRole(models.TextChoices):
|
|
MEMBER = 'member', 'Member'
|
|
CONTRIBUTOR = 'contributor', 'Contributor'
|
|
MANAGER = 'manager', 'Manager'
|
|
|
|
|
|
class RBACMembership(models.Model):
|
|
user = models.ForeignKey("users.User", on_delete=models.CASCADE, related_name='rbac_memberships')
|
|
rbac_group = models.ForeignKey(RBACGroup, on_delete=models.CASCADE, related_name='memberships')
|
|
role = models.CharField(max_length=20, choices=RBACRole.choices, default=RBACRole.MEMBER)
|
|
joined_at = models.DateTimeField(auto_now_add=True)
|
|
updated_at = models.DateTimeField(auto_now=True)
|
|
|
|
class Meta:
|
|
unique_together = ['user', 'rbac_group', 'role']
|
|
verbose_name = 'RBAC Membership'
|
|
verbose_name_plural = 'RBAC Memberships'
|
|
|
|
def clean(self):
|
|
super().clean()
|
|
return True
|
|
|
|
def save(self, *args, **kwargs):
|
|
self.full_clean()
|
|
super().save(*args, **kwargs)
|
|
|
|
def __str__(self):
|
|
return f'{self.user.username} - {self.rbac_group.name} ({self.role})'
|
|
|
|
|
|
@receiver(m2m_changed, sender=RBACGroup.categories.through)
|
|
def handle_rbac_group_categories_change(sender, instance, action, pk_set, **kwargs):
|
|
"""
|
|
Signal handler for when categories are added to or removed from an RBACGroup.
|
|
"""
|
|
if not getattr(settings, 'USE_IDENTITY_PROVIDERS', False):
|
|
return
|
|
|
|
from files.models import Category
|
|
from identity_providers.models import IdentityProviderCategoryMapping
|
|
|
|
if action == 'post_add':
|
|
if not instance.identity_provider:
|
|
return
|
|
# the following apply only if identity_provider is there
|
|
for category_id in pk_set:
|
|
category = Category.objects.get(pk=category_id)
|
|
|
|
mapping_exists = IdentityProviderCategoryMapping.objects.filter(identity_provider=instance.identity_provider, name=instance.uid, map_to=category).exists()
|
|
|
|
if not mapping_exists:
|
|
IdentityProviderCategoryMapping.objects.create(identity_provider=instance.identity_provider, name=instance.uid, map_to=category)
|
|
|
|
elif action == 'post_remove':
|
|
for category_id in pk_set:
|
|
category = Category.objects.get(pk=category_id)
|
|
|
|
IdentityProviderCategoryMapping.objects.filter(identity_provider=instance.identity_provider, name=instance.uid, map_to=category).delete()
|