# # Copyright 2016,2017 # - Skia # - Sli # # Ce fichier fait partie du site de l'Association des Étudiants de l'UTBM, # http://ae.utbm.fr. # # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License a published by the Free Software # Foundation; either version 3 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Sofware Foundation, Inc., 59 Temple # Place - Suite 330, Boston, MA 02111-1307, USA. # # from __future__ import annotations from typing import Iterable, Self from django.conf import settings from django.core import validators from django.core.cache import cache from django.core.exceptions import ObjectDoesNotExist, ValidationError from django.core.validators import RegexValidator, validate_email from django.db import models, transaction from django.db.models import Exists, F, OuterRef, Q from django.urls import reverse from django.utils import timezone from django.utils.functional import cached_property from django.utils.timezone import localdate from django.utils.translation import gettext_lazy as _ from core.models import Group, Notification, Page, SithFile, User # Create your models here. # This function prevents generating migration upon settings change def get_default_owner_group(): return settings.SITH_GROUP_ROOT_ID class Club(models.Model): """The Club class, made as a tree to allow nice tidy organization.""" id = models.AutoField(primary_key=True, db_index=True) name = models.CharField(_("name"), max_length=64) parent = models.ForeignKey( "Club", related_name="children", null=True, blank=True, on_delete=models.CASCADE ) unix_name = models.CharField( _("unix name"), max_length=30, unique=True, validators=[ validators.RegexValidator( r"^[a-z0-9][a-z0-9._-]*[a-z0-9]$", _( "Enter a valid unix name. This value may contain only " "letters, numbers ./-/_ characters." ), ) ], error_messages={"unique": _("A club with that unix name already exists.")}, ) logo = models.ImageField( upload_to="club_logos", verbose_name=_("logo"), null=True, blank=True ) is_active = models.BooleanField(_("is active"), default=True) short_description = models.CharField( _("short description"), max_length=1000, default="", blank=True, null=True ) address = models.CharField(_("address"), max_length=254) owner_group = models.ForeignKey( Group, related_name="owned_club", default=get_default_owner_group, on_delete=models.CASCADE, ) edit_groups = models.ManyToManyField( Group, related_name="editable_club", blank=True ) view_groups = models.ManyToManyField( Group, related_name="viewable_club", blank=True ) home = models.OneToOneField( SithFile, related_name="home_of_club", verbose_name=_("home"), null=True, blank=True, on_delete=models.SET_NULL, ) page = models.OneToOneField( Page, related_name="club", blank=True, null=True, on_delete=models.CASCADE ) members_group = models.OneToOneField( Group, related_name="club", on_delete=models.CASCADE ) board_group = models.OneToOneField( Group, related_name="club_board", on_delete=models.CASCADE ) class Meta: ordering = ["name", "unix_name"] def __str__(self): return self.name @transaction.atomic() def save(self, *args, **kwargs): creation = self._state.adding if not creation and Club.objects.get(id=self.id).unix_name != self.unix_name: self.home.name = self.unix_name self.home.save() if creation: self.board_group = Group.objects.create( name=f"{self.name} - Bureau", is_manually_manageable=False ) self.members_group = Group.objects.create( name=f"{self.name} - Membres", is_manually_manageable=False ) super().save(*args, **kwargs) if creation: subscribers = Group.objects.filter( name=settings.SITH_MAIN_MEMBERS_GROUP ).first() self.make_home() self.home.edit_groups.add(self.board_group) self.home.view_groups.add(self.members_group, subscribers) self.make_page() cache.set(f"sith_club_{self.unix_name}", self) def get_absolute_url(self): return reverse("club:club_view", kwargs={"club_id": self.id}) @cached_property def president(self) -> Membership | None: """Fetch the membership of the current president of this club.""" return self.members.filter( role=settings.SITH_CLUB_ROLES_ID["President"], end_date=None ).first() def check_loop(self): """Raise a validation error when a loop is found within the parent list.""" objs = [] cur = self while cur.parent is not None: if cur in objs: raise ValidationError(_("You can not make loops in clubs")) objs.append(cur) cur = cur.parent def clean(self): self.check_loop() def make_home(self) -> None: if self.home: return home_root = SithFile.objects.filter(parent=None, name="clubs").first() root = User.objects.filter(username="root").first() if home_root and root: home = SithFile(parent=home_root, name=self.unix_name, owner=root) home.save() self.home = home self.save() def make_page(self) -> None: root = User.objects.filter(username="root").first() if not self.page: club_root = Page.objects.filter(name=settings.SITH_CLUB_ROOT_PAGE).first() if root and club_root: public = Group.objects.filter(id=settings.SITH_GROUP_PUBLIC_ID).first() p = Page(name=self.unix_name) p.parent = club_root p.save(force_lock=True) if public: p.view_groups.add(public) p.save(force_lock=True) if self.parent and self.parent.page: p.parent = self.parent.page self.page = p self.save() elif self.page and self.page.name != self.unix_name: self.page.unset_lock() self.page.name = self.unix_name self.page.save(force_lock=True) elif ( self.page and self.parent and self.parent.page and self.page.parent != self.parent.page ): self.page.unset_lock() self.page.parent = self.parent.page self.page.save(force_lock=True) def delete(self, *args, **kwargs) -> tuple[int, dict[str, int]]: # Invalidate the cache of this club and of its memberships for membership in self.members.ongoing().select_related("user"): cache.delete(f"membership_{self.id}_{membership.user.id}") cache.delete(f"sith_club_{self.unix_name}") return super().delete(*args, **kwargs) def get_display_name(self) -> str: return self.name def is_owned_by(self, user: User) -> bool: """Method to see if that object can be super edited by the given user.""" if user.is_anonymous: return False return user.is_board_member def get_full_logo_url(self) -> str: return f"https://{settings.SITH_URL}{self.logo.url}" def can_be_edited_by(self, user: User) -> bool: """Method to see if that object can be edited by the given user.""" return self.has_rights_in_club(user) def can_be_viewed_by(self, user: User) -> bool: """Method to see if that object can be seen by the given user.""" return user.was_subscribed def get_membership_for(self, user: User) -> Membership | None: """Return the current membership the given user. Note: The result is cached. """ if user.is_anonymous: return None membership = cache.get(f"membership_{self.id}_{user.id}") if membership == "not_member": return None if membership is None: membership = self.members.filter(user=user, end_date=None).first() if membership is None: cache.set(f"membership_{self.id}_{user.id}", "not_member") else: cache.set(f"membership_{self.id}_{user.id}", membership) return membership def has_rights_in_club(self, user: User) -> bool: m = self.get_membership_for(user) return m is not None and m.role > settings.SITH_MAXIMUM_FREE_ROLE class MembershipQuerySet(models.QuerySet): def ongoing(self) -> Self: """Filter all memberships which are not finished yet.""" return self.filter(Q(end_date=None) | Q(end_date__gt=localdate())) def board(self) -> Self: """Filter all memberships where the user is/was in the board. Be aware that users who were in the board in the past are included, even if there are no more members. If you want to get the users who are currently in the board, mind combining this with the :meth:`ongoing` queryset method """ return self.filter(role__gt=settings.SITH_MAXIMUM_FREE_ROLE) def update(self, **kwargs) -> int: """Refresh the cache and edit group ownership. Update the cache, when necessary, remove users from club groups they are no more in and add them in the club groups they should be in. Be aware that this adds three db queries : one to retrieve the updated memberships, one to perform group removal and one to perform group attribution. """ nb_rows = super().update(**kwargs) if nb_rows == 0: # if no row was affected, no need to refresh the cache return 0 cache_memberships = {} memberships = set(self.select_related("club")) # delete all User-Group relations and recreate the necessary ones # It's more concise to write and more reliable Membership._remove_club_groups(memberships) Membership._add_club_groups(memberships) for member in memberships: cache_key = f"membership_{member.club_id}_{member.user_id}" if member.end_date is None: cache_memberships[cache_key] = member else: cache_memberships[cache_key] = "not_member" cache.set_many(cache_memberships) return nb_rows def delete(self) -> tuple[int, dict[str, int]]: """Work just like the default Django's delete() method, but add a cache invalidation for the elements of the queryset before the deletion, and a removal of the user from the club groups. Be aware that this adds some db queries : - 1 to retrieve the deleted elements in order to perform post-delete operations. As we can't know if a delete will affect rows or not, this query will always happen - 1 query to remove the users from the club groups. If the delete operation affected no row, this query won't happen. """ memberships = set(self.all()) nb_rows, rows_counts = super().delete() if nb_rows > 0: Membership._remove_club_groups(memberships) cache.set_many( { f"membership_{m.club_id}_{m.user_id}": "not_member" for m in memberships } ) return nb_rows, rows_counts class Membership(models.Model): """The Membership class makes the connection between User and Clubs. Both Users and Clubs can have many Membership objects: - a user can be a member of many clubs at a time - a club can have many members at a time too A User is currently member of all the Clubs where its Membership has an end_date set to null/None. Otherwise, it's a past membership kept because it can be very useful to see who was in which Club in the past. """ user = models.ForeignKey( User, verbose_name=_("user"), related_name="memberships", null=False, blank=False, on_delete=models.CASCADE, ) club = models.ForeignKey( Club, verbose_name=_("club"), related_name="members", null=False, blank=False, on_delete=models.CASCADE, ) start_date = models.DateField(_("start date"), default=timezone.now) end_date = models.DateField(_("end date"), null=True, blank=True) role = models.IntegerField( _("role"), choices=sorted(settings.SITH_CLUB_ROLES.items()), default=sorted(settings.SITH_CLUB_ROLES.items())[0][0], ) description = models.CharField( _("description"), max_length=128, null=False, blank=True ) objects = MembershipQuerySet.as_manager() class Meta: constraints = [ models.CheckConstraint( check=Q(end_date__gte=F("start_date")), name="end_after_start" ), ] def __str__(self): return ( f"{self.club.name} - {self.user.username} " f"- {settings.SITH_CLUB_ROLES[self.role]} " f"- {str(_('past member')) if self.end_date is not None else ''}" ) def save(self, *args, **kwargs): super().save(*args, **kwargs) # a save may either be an update or a creation # and may result in either an ongoing or an ended membership. # It could also be a retrogradation from the board to being a simple member. # To avoid problems, the user is removed from the club groups beforehand ; # he will be added back if necessary self._remove_club_groups([self]) if self.end_date is None: self._add_club_groups([self]) cache.set(f"membership_{self.club_id}_{self.user_id}", self) else: cache.set(f"membership_{self.club_id}_{self.user_id}", "not_member") def get_absolute_url(self): return reverse("club:club_members", kwargs={"club_id": self.club_id}) def is_owned_by(self, user: User) -> bool: """Method to see if that object can be super edited by the given user.""" if user.is_anonymous: return False return user.is_root or user.is_board_member def can_be_edited_by(self, user: User) -> bool: """Check if that object can be edited by the given user.""" if user.is_root or user.is_board_member: return True membership = self.club.get_membership_for(user) return membership is not None and membership.role >= self.role def delete(self, *args, **kwargs): self._remove_club_groups([self]) super().delete(*args, **kwargs) cache.delete(f"membership_{self.club_id}_{self.user_id}") @staticmethod def _remove_club_groups( memberships: Iterable[Membership], ) -> tuple[int, dict[str, int]]: """Remove users of those memberships from the club groups. For example, if a user is in the Troll club board, he is in the board group and the members group of the Troll. After calling this function, he will be in neither. Returns: The result of the deletion queryset. Warnings: If this function isn't used in combination with an actual deletion of the memberships, it will result in an inconsistent state, where users will be in the clubs, without having the associated rights. """ clubs = {m.club_id for m in memberships} users = {m.user_id for m in memberships} groups = Group.objects.filter(Q(club__in=clubs) | Q(club_board__in=clubs)) return User.groups.through.objects.filter( Q(group__in=groups) & Q(user__in=users) ).delete() @staticmethod def _add_club_groups( memberships: Iterable[Membership], ) -> list[User.groups.through]: """Add users of those memberships to the club groups. For example, if a user just joined the Troll club board, he will be added in both the members group and the board group of the club. Returns: The created User-Group relations. Warnings: If this function isn't used in combination with an actual update/creation of the memberships, it will result in an inconsistent state, where users will have the rights associated to the club, without actually being part of it. """ # only active membership (i.e. `end_date=None`) # grant the attribution of club groups. memberships = [m for m in memberships if m.end_date is None] if not memberships: return [] if sum(1 for m in memberships if not hasattr(m, "club")) > 1: # if more than one membership hasn't its `club` attribute set # it's less expensive to reload the whole query with # a select_related than perform a distinct query # to fetch each club. ids = {m.id for m in memberships} memberships = list( Membership.objects.filter(id__in=ids).select_related("club") ) club_groups = [] for membership in memberships: club_groups.append( User.groups.through( user_id=membership.user_id, group_id=membership.club.members_group_id, ) ) if membership.role > settings.SITH_MAXIMUM_FREE_ROLE: club_groups.append( User.groups.through( user_id=membership.user_id, group_id=membership.club.board_group_id, ) ) return User.groups.through.objects.bulk_create( club_groups, ignore_conflicts=True ) class Mailing(models.Model): """A Mailing list for a club. Warning: Remember that mailing lists should be validated by UTBM. """ club = models.ForeignKey( Club, verbose_name=_("Club"), related_name="mailings", null=False, blank=False, on_delete=models.CASCADE, ) email = models.CharField( _("Email address"), unique=True, null=False, blank=False, max_length=256, validators=[ RegexValidator( validate_email.user_regex, _("Enter a valid address. Only the root of the address is needed."), ) ], ) is_moderated = models.BooleanField(_("is moderated"), default=False) moderator = models.ForeignKey( User, related_name="moderated_mailings", verbose_name=_("moderator"), null=True, on_delete=models.CASCADE, ) def __str__(self): return "%s - %s" % (self.club, self.email_full) def save(self, *args, **kwargs): if not self.is_moderated: unread_notif_subquery = Notification.objects.filter( user=OuterRef("pk"), type="MAILING_MODERATION", viewed=False ) for user in User.objects.filter( ~Exists(unread_notif_subquery), groups__id__in=[settings.SITH_GROUP_COM_ADMIN_ID], ): Notification( user=user, url=reverse("com:mailing_admin"), type="MAILING_MODERATION", ).save(*args, **kwargs) super().save(*args, **kwargs) def clean(self): if Mailing.objects.filter(email=self.email).exists(): raise ValidationError(_("This mailing list already exists.")) if self.can_moderate(self.moderator): self.is_moderated = True else: self.moderator = None super().clean() @property def email_full(self): return self.email + "@" + settings.SITH_MAILING_DOMAIN def can_moderate(self, user): return user.is_root or user.is_com_admin def is_owned_by(self, user): if user.is_anonymous: return False return user.is_root or user.is_com_admin def can_view(self, user): return self.club.has_rights_in_club(user) def can_be_edited_by(self, user): return self.club.has_rights_in_club(user) def delete(self, *args, **kwargs): self.subscriptions.all().delete() super().delete() def fetch_format(self): destination = "".join(s.fetch_format() for s in self.subscriptions.all()) return f"{self.email}: {destination}" class MailingSubscription(models.Model): """Link between user and mailing list.""" mailing = models.ForeignKey( Mailing, verbose_name=_("Mailing"), related_name="subscriptions", null=False, blank=False, on_delete=models.CASCADE, ) user = models.ForeignKey( User, verbose_name=_("User"), related_name="mailing_subscriptions", null=True, blank=True, on_delete=models.CASCADE, ) email = models.EmailField(_("Email address"), blank=False, null=False) class Meta: unique_together = (("user", "email", "mailing"),) def __str__(self): return "(%s) - %s : %s" % (self.mailing, self.get_username, self.email) def clean(self): if not self.user and not self.email: raise ValidationError(_("At least user or email is required")) try: if self.user and not self.email: self.email = self.user.email if MailingSubscription.objects.filter( mailing=self.mailing, email=self.email ).exists(): raise ValidationError( _("This email is already suscribed in this mailing") ) except ObjectDoesNotExist: pass super().clean() def is_owned_by(self, user): if user.is_anonymous: return False return ( self.mailing.club.has_rights_in_club(user) or user.is_root or self.user.is_com_admin ) def can_be_edited_by(self, user): return self.user is not None and user.id == self.user.id @cached_property def get_email(self): if self.user and not self.email: return self.user.email return self.email @cached_property def get_username(self): if self.user: return str(self.user) return _("Unregistered user") def fetch_format(self): return self.get_email + " "