mirror of
https://github.com/ae-utbm/sith.git
synced 2024-12-22 15:51:19 +00:00
667 lines
23 KiB
Python
667 lines
23 KiB
Python
#
|
|
# Copyright 2016,2017
|
|
# - Skia <skia@libskia.so>
|
|
# - Sli <antoine@bartuccio.fr>
|
|
#
|
|
# Ce fichier fait partie du site de l'Association des Étudiants de l'UTBM,
|
|
# http://ae.utbm.fr.
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify it under
|
|
# the terms of the GNU General Public License a published by the Free Software
|
|
# Foundation; either version 3 of the License, or (at your option) any later
|
|
# version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful, but WITHOUT
|
|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
|
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
|
# details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License along with
|
|
# this program; if not, write to the Free Sofware Foundation, Inc., 59 Temple
|
|
# Place - Suite 330, Boston, MA 02111-1307, USA.
|
|
#
|
|
#
|
|
from __future__ import annotations
|
|
|
|
from typing import Iterable, Self
|
|
|
|
from django.conf import settings
|
|
from django.core import validators
|
|
from django.core.cache import cache
|
|
from django.core.exceptions import ObjectDoesNotExist, ValidationError
|
|
from django.core.validators import RegexValidator, validate_email
|
|
from django.db import models, transaction
|
|
from django.db.models import Exists, F, OuterRef, Q
|
|
from django.urls import reverse
|
|
from django.utils import timezone
|
|
from django.utils.functional import cached_property
|
|
from django.utils.timezone import localdate
|
|
from django.utils.translation import gettext_lazy as _
|
|
|
|
from core.models import Group, Notification, Page, SithFile, User
|
|
|
|
# Create your models here.
|
|
|
|
|
|
# This function prevents generating migration upon settings change
|
|
def get_default_owner_group():
|
|
return settings.SITH_GROUP_ROOT_ID
|
|
|
|
|
|
class Club(models.Model):
|
|
"""The Club class, made as a tree to allow nice tidy organization."""
|
|
|
|
id = models.AutoField(primary_key=True, db_index=True)
|
|
name = models.CharField(_("name"), max_length=64)
|
|
parent = models.ForeignKey(
|
|
"Club", related_name="children", null=True, blank=True, on_delete=models.CASCADE
|
|
)
|
|
unix_name = models.CharField(
|
|
_("unix name"),
|
|
max_length=30,
|
|
unique=True,
|
|
validators=[
|
|
validators.RegexValidator(
|
|
r"^[a-z0-9][a-z0-9._-]*[a-z0-9]$",
|
|
_(
|
|
"Enter a valid unix name. This value may contain only "
|
|
"letters, numbers ./-/_ characters."
|
|
),
|
|
)
|
|
],
|
|
error_messages={"unique": _("A club with that unix name already exists.")},
|
|
)
|
|
logo = models.ImageField(
|
|
upload_to="club_logos", verbose_name=_("logo"), null=True, blank=True
|
|
)
|
|
is_active = models.BooleanField(_("is active"), default=True)
|
|
short_description = models.CharField(
|
|
_("short description"), max_length=1000, default="", blank=True, null=True
|
|
)
|
|
address = models.CharField(_("address"), max_length=254)
|
|
|
|
owner_group = models.ForeignKey(
|
|
Group,
|
|
related_name="owned_club",
|
|
default=get_default_owner_group,
|
|
on_delete=models.CASCADE,
|
|
)
|
|
edit_groups = models.ManyToManyField(
|
|
Group, related_name="editable_club", blank=True
|
|
)
|
|
view_groups = models.ManyToManyField(
|
|
Group, related_name="viewable_club", blank=True
|
|
)
|
|
home = models.OneToOneField(
|
|
SithFile,
|
|
related_name="home_of_club",
|
|
verbose_name=_("home"),
|
|
null=True,
|
|
blank=True,
|
|
on_delete=models.SET_NULL,
|
|
)
|
|
page = models.OneToOneField(
|
|
Page, related_name="club", blank=True, null=True, on_delete=models.CASCADE
|
|
)
|
|
members_group = models.OneToOneField(
|
|
Group, related_name="club", on_delete=models.CASCADE
|
|
)
|
|
board_group = models.OneToOneField(
|
|
Group, related_name="club_board", on_delete=models.CASCADE
|
|
)
|
|
|
|
class Meta:
|
|
ordering = ["name", "unix_name"]
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
@transaction.atomic()
|
|
def save(self, *args, **kwargs):
|
|
creation = self._state.adding
|
|
if not creation and Club.objects.get(id=self.id).unix_name != self.unix_name:
|
|
self.home.name = self.unix_name
|
|
self.home.save()
|
|
if creation:
|
|
self.board_group = Group.objects.create(
|
|
name=f"{self.name} - Bureau", is_manually_manageable=False
|
|
)
|
|
self.members_group = Group.objects.create(
|
|
name=f"{self.name} - Membres", is_manually_manageable=False
|
|
)
|
|
super().save(*args, **kwargs)
|
|
if creation:
|
|
subscribers = Group.objects.filter(
|
|
name=settings.SITH_MAIN_MEMBERS_GROUP
|
|
).first()
|
|
self.make_home()
|
|
self.home.edit_groups.add(self.board_group)
|
|
self.home.view_groups.add(self.members_group, subscribers)
|
|
self.make_page()
|
|
cache.set(f"sith_club_{self.unix_name}", self)
|
|
|
|
def get_absolute_url(self):
|
|
return reverse("club:club_view", kwargs={"club_id": self.id})
|
|
|
|
@cached_property
|
|
def president(self) -> Membership | None:
|
|
"""Fetch the membership of the current president of this club."""
|
|
return self.members.filter(
|
|
role=settings.SITH_CLUB_ROLES_ID["President"], end_date=None
|
|
).first()
|
|
|
|
def check_loop(self):
|
|
"""Raise a validation error when a loop is found within the parent list."""
|
|
objs = []
|
|
cur = self
|
|
while cur.parent is not None:
|
|
if cur in objs:
|
|
raise ValidationError(_("You can not make loops in clubs"))
|
|
objs.append(cur)
|
|
cur = cur.parent
|
|
|
|
def clean(self):
|
|
self.check_loop()
|
|
|
|
def make_home(self) -> None:
|
|
if self.home:
|
|
return
|
|
home_root = SithFile.objects.filter(parent=None, name="clubs").first()
|
|
root = User.objects.filter(username="root").first()
|
|
if home_root and root:
|
|
home = SithFile(parent=home_root, name=self.unix_name, owner=root)
|
|
home.save()
|
|
self.home = home
|
|
self.save()
|
|
|
|
def make_page(self) -> None:
|
|
root = User.objects.filter(username="root").first()
|
|
if not self.page:
|
|
club_root = Page.objects.filter(name=settings.SITH_CLUB_ROOT_PAGE).first()
|
|
if root and club_root:
|
|
public = Group.objects.filter(id=settings.SITH_GROUP_PUBLIC_ID).first()
|
|
p = Page(name=self.unix_name)
|
|
p.parent = club_root
|
|
p.save(force_lock=True)
|
|
if public:
|
|
p.view_groups.add(public)
|
|
p.save(force_lock=True)
|
|
if self.parent and self.parent.page:
|
|
p.parent = self.parent.page
|
|
self.page = p
|
|
self.save()
|
|
elif self.page and self.page.name != self.unix_name:
|
|
self.page.unset_lock()
|
|
self.page.name = self.unix_name
|
|
self.page.save(force_lock=True)
|
|
elif (
|
|
self.page
|
|
and self.parent
|
|
and self.parent.page
|
|
and self.page.parent != self.parent.page
|
|
):
|
|
self.page.unset_lock()
|
|
self.page.parent = self.parent.page
|
|
self.page.save(force_lock=True)
|
|
|
|
def delete(self, *args, **kwargs) -> tuple[int, dict[str, int]]:
|
|
# Invalidate the cache of this club and of its memberships
|
|
for membership in self.members.ongoing().select_related("user"):
|
|
cache.delete(f"membership_{self.id}_{membership.user.id}")
|
|
cache.delete(f"sith_club_{self.unix_name}")
|
|
return super().delete(*args, **kwargs)
|
|
|
|
def get_display_name(self) -> str:
|
|
return self.name
|
|
|
|
def is_owned_by(self, user: User) -> bool:
|
|
"""Method to see if that object can be super edited by the given user."""
|
|
if user.is_anonymous:
|
|
return False
|
|
return user.is_board_member
|
|
|
|
def get_full_logo_url(self) -> str:
|
|
return f"https://{settings.SITH_URL}{self.logo.url}"
|
|
|
|
def can_be_edited_by(self, user: User) -> bool:
|
|
"""Method to see if that object can be edited by the given user."""
|
|
return self.has_rights_in_club(user)
|
|
|
|
def can_be_viewed_by(self, user: User) -> bool:
|
|
"""Method to see if that object can be seen by the given user."""
|
|
return user.was_subscribed
|
|
|
|
def get_membership_for(self, user: User) -> Membership | None:
|
|
"""Return the current membership the given user.
|
|
|
|
Note:
|
|
The result is cached.
|
|
"""
|
|
if user.is_anonymous:
|
|
return None
|
|
membership = cache.get(f"membership_{self.id}_{user.id}")
|
|
if membership == "not_member":
|
|
return None
|
|
if membership is None:
|
|
membership = self.members.filter(user=user, end_date=None).first()
|
|
if membership is None:
|
|
cache.set(f"membership_{self.id}_{user.id}", "not_member")
|
|
else:
|
|
cache.set(f"membership_{self.id}_{user.id}", membership)
|
|
return membership
|
|
|
|
def has_rights_in_club(self, user: User) -> bool:
|
|
m = self.get_membership_for(user)
|
|
return m is not None and m.role > settings.SITH_MAXIMUM_FREE_ROLE
|
|
|
|
|
|
class MembershipQuerySet(models.QuerySet):
|
|
def ongoing(self) -> Self:
|
|
"""Filter all memberships which are not finished yet."""
|
|
return self.filter(Q(end_date=None) | Q(end_date__gt=localdate()))
|
|
|
|
def board(self) -> Self:
|
|
"""Filter all memberships where the user is/was in the board.
|
|
|
|
Be aware that users who were in the board in the past
|
|
are included, even if there are no more members.
|
|
|
|
If you want to get the users who are currently in the board,
|
|
mind combining this with the :meth:`ongoing` queryset method
|
|
"""
|
|
return self.filter(role__gt=settings.SITH_MAXIMUM_FREE_ROLE)
|
|
|
|
def update(self, **kwargs) -> int:
|
|
"""Refresh the cache and edit group ownership.
|
|
|
|
Update the cache, when necessary, remove
|
|
users from club groups they are no more in
|
|
and add them in the club groups they should be in.
|
|
|
|
Be aware that this adds three db queries :
|
|
one to retrieve the updated memberships,
|
|
one to perform group removal and one to perform
|
|
group attribution.
|
|
"""
|
|
nb_rows = super().update(**kwargs)
|
|
if nb_rows == 0:
|
|
# if no row was affected, no need to refresh the cache
|
|
return 0
|
|
|
|
cache_memberships = {}
|
|
memberships = set(self.select_related("club"))
|
|
# delete all User-Group relations and recreate the necessary ones
|
|
# It's more concise to write and more reliable
|
|
Membership._remove_club_groups(memberships)
|
|
Membership._add_club_groups(memberships)
|
|
for member in memberships:
|
|
cache_key = f"membership_{member.club_id}_{member.user_id}"
|
|
if member.end_date is None:
|
|
cache_memberships[cache_key] = member
|
|
else:
|
|
cache_memberships[cache_key] = "not_member"
|
|
cache.set_many(cache_memberships)
|
|
return nb_rows
|
|
|
|
def delete(self) -> tuple[int, dict[str, int]]:
|
|
"""Work just like the default Django's delete() method,
|
|
but add a cache invalidation for the elements of the queryset
|
|
before the deletion,
|
|
and a removal of the user from the club groups.
|
|
|
|
Be aware that this adds some db queries :
|
|
|
|
- 1 to retrieve the deleted elements in order to perform
|
|
post-delete operations.
|
|
As we can't know if a delete will affect rows or not,
|
|
this query will always happen
|
|
- 1 query to remove the users from the club groups.
|
|
If the delete operation affected no row,
|
|
this query won't happen.
|
|
"""
|
|
memberships = set(self.all())
|
|
nb_rows, rows_counts = super().delete()
|
|
if nb_rows > 0:
|
|
Membership._remove_club_groups(memberships)
|
|
cache.set_many(
|
|
{
|
|
f"membership_{m.club_id}_{m.user_id}": "not_member"
|
|
for m in memberships
|
|
}
|
|
)
|
|
return nb_rows, rows_counts
|
|
|
|
|
|
class Membership(models.Model):
|
|
"""The Membership class makes the connection between User and Clubs.
|
|
|
|
Both Users and Clubs can have many Membership objects:
|
|
- a user can be a member of many clubs at a time
|
|
- a club can have many members at a time too
|
|
|
|
A User is currently member of all the Clubs where its Membership has an end_date set to null/None.
|
|
Otherwise, it's a past membership kept because it can be very useful to see who was in which Club in the past.
|
|
"""
|
|
|
|
user = models.ForeignKey(
|
|
User,
|
|
verbose_name=_("user"),
|
|
related_name="memberships",
|
|
null=False,
|
|
blank=False,
|
|
on_delete=models.CASCADE,
|
|
)
|
|
club = models.ForeignKey(
|
|
Club,
|
|
verbose_name=_("club"),
|
|
related_name="members",
|
|
null=False,
|
|
blank=False,
|
|
on_delete=models.CASCADE,
|
|
)
|
|
start_date = models.DateField(_("start date"), default=timezone.now)
|
|
end_date = models.DateField(_("end date"), null=True, blank=True)
|
|
role = models.IntegerField(
|
|
_("role"),
|
|
choices=sorted(settings.SITH_CLUB_ROLES.items()),
|
|
default=sorted(settings.SITH_CLUB_ROLES.items())[0][0],
|
|
)
|
|
description = models.CharField(
|
|
_("description"), max_length=128, null=False, blank=True
|
|
)
|
|
|
|
objects = MembershipQuerySet.as_manager()
|
|
|
|
class Meta:
|
|
constraints = [
|
|
models.CheckConstraint(
|
|
check=Q(end_date__gte=F("start_date")), name="end_after_start"
|
|
),
|
|
]
|
|
|
|
def __str__(self):
|
|
return (
|
|
f"{self.club.name} - {self.user.username} "
|
|
f"- {settings.SITH_CLUB_ROLES[self.role]} "
|
|
f"- {str(_('past member')) if self.end_date is not None else ''}"
|
|
)
|
|
|
|
def save(self, *args, **kwargs):
|
|
super().save(*args, **kwargs)
|
|
# a save may either be an update or a creation
|
|
# and may result in either an ongoing or an ended membership.
|
|
# It could also be a retrogradation from the board to being a simple member.
|
|
# To avoid problems, the user is removed from the club groups beforehand ;
|
|
# he will be added back if necessary
|
|
self._remove_club_groups([self])
|
|
if self.end_date is None:
|
|
self._add_club_groups([self])
|
|
cache.set(f"membership_{self.club_id}_{self.user_id}", self)
|
|
else:
|
|
cache.set(f"membership_{self.club_id}_{self.user_id}", "not_member")
|
|
|
|
def get_absolute_url(self):
|
|
return reverse("club:club_members", kwargs={"club_id": self.club_id})
|
|
|
|
def is_owned_by(self, user: User) -> bool:
|
|
"""Method to see if that object can be super edited by the given user."""
|
|
if user.is_anonymous:
|
|
return False
|
|
return user.is_root or user.is_board_member
|
|
|
|
def can_be_edited_by(self, user: User) -> bool:
|
|
"""Check if that object can be edited by the given user."""
|
|
if user.is_root or user.is_board_member:
|
|
return True
|
|
membership = self.club.get_membership_for(user)
|
|
return membership is not None and membership.role >= self.role
|
|
|
|
def delete(self, *args, **kwargs):
|
|
self._remove_club_groups([self])
|
|
super().delete(*args, **kwargs)
|
|
cache.delete(f"membership_{self.club_id}_{self.user_id}")
|
|
|
|
@staticmethod
|
|
def _remove_club_groups(
|
|
memberships: Iterable[Membership],
|
|
) -> tuple[int, dict[str, int]]:
|
|
"""Remove users of those memberships from the club groups.
|
|
|
|
For example, if a user is in the Troll club board,
|
|
he is in the board group and the members group of the Troll.
|
|
After calling this function, he will be in neither.
|
|
|
|
Returns:
|
|
The result of the deletion queryset.
|
|
|
|
Warnings:
|
|
If this function isn't used in combination
|
|
with an actual deletion of the memberships,
|
|
it will result in an inconsistent state,
|
|
where users will be in the clubs, without
|
|
having the associated rights.
|
|
"""
|
|
clubs = {m.club_id for m in memberships}
|
|
users = {m.user_id for m in memberships}
|
|
groups = Group.objects.filter(Q(club__in=clubs) | Q(club_board__in=clubs))
|
|
return User.groups.through.objects.filter(
|
|
Q(group__in=groups) & Q(user__in=users)
|
|
).delete()
|
|
|
|
@staticmethod
|
|
def _add_club_groups(
|
|
memberships: Iterable[Membership],
|
|
) -> list[User.groups.through]:
|
|
"""Add users of those memberships to the club groups.
|
|
|
|
For example, if a user just joined the Troll club board,
|
|
he will be added in both the members group and the board group
|
|
of the club.
|
|
|
|
Returns:
|
|
The created User-Group relations.
|
|
|
|
Warnings:
|
|
If this function isn't used in combination
|
|
with an actual update/creation of the memberships,
|
|
it will result in an inconsistent state,
|
|
where users will have the rights associated to the
|
|
club, without actually being part of it.
|
|
"""
|
|
# only active membership (i.e. `end_date=None`)
|
|
# grant the attribution of club groups.
|
|
memberships = [m for m in memberships if m.end_date is None]
|
|
if not memberships:
|
|
return []
|
|
|
|
if sum(1 for m in memberships if not hasattr(m, "club")) > 1:
|
|
# if more than one membership hasn't its `club` attribute set
|
|
# it's less expensive to reload the whole query with
|
|
# a select_related than perform a distinct query
|
|
# to fetch each club.
|
|
ids = {m.id for m in memberships}
|
|
memberships = list(
|
|
Membership.objects.filter(id__in=ids).select_related("club")
|
|
)
|
|
club_groups = []
|
|
for membership in memberships:
|
|
club_groups.append(
|
|
User.groups.through(
|
|
user_id=membership.user_id,
|
|
group_id=membership.club.members_group_id,
|
|
)
|
|
)
|
|
if membership.role > settings.SITH_MAXIMUM_FREE_ROLE:
|
|
club_groups.append(
|
|
User.groups.through(
|
|
user_id=membership.user_id,
|
|
group_id=membership.club.board_group_id,
|
|
)
|
|
)
|
|
return User.groups.through.objects.bulk_create(
|
|
club_groups, ignore_conflicts=True
|
|
)
|
|
|
|
|
|
class Mailing(models.Model):
|
|
"""A Mailing list for a club.
|
|
|
|
Warning:
|
|
Remember that mailing lists should be validated by UTBM.
|
|
"""
|
|
|
|
club = models.ForeignKey(
|
|
Club,
|
|
verbose_name=_("Club"),
|
|
related_name="mailings",
|
|
null=False,
|
|
blank=False,
|
|
on_delete=models.CASCADE,
|
|
)
|
|
email = models.CharField(
|
|
_("Email address"),
|
|
unique=True,
|
|
null=False,
|
|
blank=False,
|
|
max_length=256,
|
|
validators=[
|
|
RegexValidator(
|
|
validate_email.user_regex,
|
|
_("Enter a valid address. Only the root of the address is needed."),
|
|
)
|
|
],
|
|
)
|
|
is_moderated = models.BooleanField(_("is moderated"), default=False)
|
|
moderator = models.ForeignKey(
|
|
User,
|
|
related_name="moderated_mailings",
|
|
verbose_name=_("moderator"),
|
|
null=True,
|
|
on_delete=models.CASCADE,
|
|
)
|
|
|
|
def __str__(self):
|
|
return "%s - %s" % (self.club, self.email_full)
|
|
|
|
def save(self, *args, **kwargs):
|
|
if not self.is_moderated:
|
|
unread_notif_subquery = Notification.objects.filter(
|
|
user=OuterRef("pk"), type="MAILING_MODERATION", viewed=False
|
|
)
|
|
for user in User.objects.filter(
|
|
~Exists(unread_notif_subquery),
|
|
groups__id__in=[settings.SITH_GROUP_COM_ADMIN_ID],
|
|
):
|
|
Notification(
|
|
user=user,
|
|
url=reverse("com:mailing_admin"),
|
|
type="MAILING_MODERATION",
|
|
).save(*args, **kwargs)
|
|
super().save(*args, **kwargs)
|
|
|
|
def clean(self):
|
|
if Mailing.objects.filter(email=self.email).exists():
|
|
raise ValidationError(_("This mailing list already exists."))
|
|
if self.can_moderate(self.moderator):
|
|
self.is_moderated = True
|
|
else:
|
|
self.moderator = None
|
|
super().clean()
|
|
|
|
@property
|
|
def email_full(self):
|
|
return self.email + "@" + settings.SITH_MAILING_DOMAIN
|
|
|
|
def can_moderate(self, user):
|
|
return user.is_root or user.is_com_admin
|
|
|
|
def is_owned_by(self, user):
|
|
if user.is_anonymous:
|
|
return False
|
|
return user.is_root or user.is_com_admin
|
|
|
|
def can_view(self, user):
|
|
return self.club.has_rights_in_club(user)
|
|
|
|
def can_be_edited_by(self, user):
|
|
return self.club.has_rights_in_club(user)
|
|
|
|
def delete(self, *args, **kwargs):
|
|
self.subscriptions.all().delete()
|
|
super().delete()
|
|
|
|
def fetch_format(self):
|
|
destination = "".join(s.fetch_format() for s in self.subscriptions.all())
|
|
return f"{self.email}: {destination}"
|
|
|
|
|
|
class MailingSubscription(models.Model):
|
|
"""Link between user and mailing list."""
|
|
|
|
mailing = models.ForeignKey(
|
|
Mailing,
|
|
verbose_name=_("Mailing"),
|
|
related_name="subscriptions",
|
|
null=False,
|
|
blank=False,
|
|
on_delete=models.CASCADE,
|
|
)
|
|
user = models.ForeignKey(
|
|
User,
|
|
verbose_name=_("User"),
|
|
related_name="mailing_subscriptions",
|
|
null=True,
|
|
blank=True,
|
|
on_delete=models.CASCADE,
|
|
)
|
|
email = models.EmailField(_("Email address"), blank=False, null=False)
|
|
|
|
class Meta:
|
|
unique_together = (("user", "email", "mailing"),)
|
|
|
|
def __str__(self):
|
|
return "(%s) - %s : %s" % (self.mailing, self.get_username, self.email)
|
|
|
|
def clean(self):
|
|
if not self.user and not self.email:
|
|
raise ValidationError(_("At least user or email is required"))
|
|
try:
|
|
if self.user and not self.email:
|
|
self.email = self.user.email
|
|
if MailingSubscription.objects.filter(
|
|
mailing=self.mailing, email=self.email
|
|
).exists():
|
|
raise ValidationError(
|
|
_("This email is already suscribed in this mailing")
|
|
)
|
|
except ObjectDoesNotExist:
|
|
pass
|
|
super().clean()
|
|
|
|
def is_owned_by(self, user):
|
|
if user.is_anonymous:
|
|
return False
|
|
return (
|
|
self.mailing.club.has_rights_in_club(user)
|
|
or user.is_root
|
|
or self.user.is_com_admin
|
|
)
|
|
|
|
def can_be_edited_by(self, user):
|
|
return self.user is not None and user.id == self.user.id
|
|
|
|
@cached_property
|
|
def get_email(self):
|
|
if self.user and not self.email:
|
|
return self.user.email
|
|
return self.email
|
|
|
|
@cached_property
|
|
def get_username(self):
|
|
if self.user:
|
|
return str(self.user)
|
|
return _("Unregistered user")
|
|
|
|
def fetch_format(self):
|
|
return self.get_email + " "
|