mirror of
https://github.com/ae-utbm/sith.git
synced 2024-12-23 00:01:16 +00:00
589 lines
20 KiB
Python
589 lines
20 KiB
Python
#
|
|
# Copyright 2016,2017
|
|
# - Skia <skia@libskia.so>
|
|
# - Sli <antoine@bartuccio.fr>
|
|
#
|
|
# Ce fichier fait partie du site de l'Association des Étudiants de l'UTBM,
|
|
# http://ae.utbm.fr.
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify it under
|
|
# the terms of the GNU General Public License a published by the Free Software
|
|
# Foundation; either version 3 of the License, or (at your option) any later
|
|
# version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful, but WITHOUT
|
|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
|
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
|
# details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License along with
|
|
# this program; if not, write to the Free Sofware Foundation, Inc., 59 Temple
|
|
# Place - Suite 330, Boston, MA 02111-1307, USA.
|
|
#
|
|
#
|
|
from __future__ import annotations
|
|
|
|
from typing import Self
|
|
|
|
from django.conf import settings
|
|
from django.core import validators
|
|
from django.core.cache import cache
|
|
from django.core.exceptions import ObjectDoesNotExist, ValidationError
|
|
from django.core.validators import RegexValidator, validate_email
|
|
from django.db import models, transaction
|
|
from django.db.models import Exists, OuterRef, Q
|
|
from django.urls import reverse
|
|
from django.utils import timezone
|
|
from django.utils.functional import cached_property
|
|
from django.utils.timezone import localdate
|
|
from django.utils.translation import gettext_lazy as _
|
|
|
|
from core.models import Group, Notification, Page, SithFile, User
|
|
|
|
# Create your models here.
|
|
|
|
|
|
# This function prevents generating migration upon settings change
|
|
def get_default_owner_group():
|
|
return settings.SITH_GROUP_ROOT_ID
|
|
|
|
|
|
class Club(models.Model):
|
|
"""The Club class, made as a tree to allow nice tidy organization."""
|
|
|
|
id = models.AutoField(primary_key=True, db_index=True)
|
|
name = models.CharField(_("name"), max_length=64)
|
|
parent = models.ForeignKey(
|
|
"Club", related_name="children", null=True, blank=True, on_delete=models.CASCADE
|
|
)
|
|
unix_name = models.CharField(
|
|
_("unix name"),
|
|
max_length=30,
|
|
unique=True,
|
|
validators=[
|
|
validators.RegexValidator(
|
|
r"^[a-z0-9][a-z0-9._-]*[a-z0-9]$",
|
|
_(
|
|
"Enter a valid unix name. This value may contain only "
|
|
"letters, numbers ./-/_ characters."
|
|
),
|
|
)
|
|
],
|
|
error_messages={"unique": _("A club with that unix name already exists.")},
|
|
)
|
|
logo = models.ImageField(
|
|
upload_to="club_logos", verbose_name=_("logo"), null=True, blank=True
|
|
)
|
|
is_active = models.BooleanField(_("is active"), default=True)
|
|
short_description = models.CharField(
|
|
_("short description"), max_length=1000, default="", blank=True, null=True
|
|
)
|
|
address = models.CharField(_("address"), max_length=254)
|
|
|
|
owner_group = models.ForeignKey(
|
|
Group,
|
|
related_name="owned_club",
|
|
default=get_default_owner_group,
|
|
on_delete=models.CASCADE,
|
|
)
|
|
edit_groups = models.ManyToManyField(
|
|
Group, related_name="editable_club", blank=True
|
|
)
|
|
view_groups = models.ManyToManyField(
|
|
Group, related_name="viewable_club", blank=True
|
|
)
|
|
home = models.OneToOneField(
|
|
SithFile,
|
|
related_name="home_of_club",
|
|
verbose_name=_("home"),
|
|
null=True,
|
|
blank=True,
|
|
on_delete=models.SET_NULL,
|
|
)
|
|
page = models.OneToOneField(
|
|
Page, related_name="club", blank=True, null=True, on_delete=models.CASCADE
|
|
)
|
|
members_group = models.OneToOneField(
|
|
Group, related_name="club", on_delete=models.CASCADE
|
|
)
|
|
board_group = models.OneToOneField(
|
|
Group, related_name="club_board", on_delete=models.CASCADE
|
|
)
|
|
|
|
class Meta:
|
|
ordering = ["name", "unix_name"]
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
@transaction.atomic()
|
|
def save(self, *args, **kwargs):
|
|
creation = self._state.adding
|
|
if not creation and Club.objects.get(id=self.id).unix_name != self.unix_name:
|
|
self.home.name = self.unix_name
|
|
self.home.save()
|
|
if creation:
|
|
self.board_group = Group.objects.create(
|
|
name=f"{self.name} - Bureau", is_manually_manageable=False
|
|
)
|
|
self.members_group = Group.objects.create(
|
|
name=f"{self.name} - Membres", is_manually_manageable=False
|
|
)
|
|
super().save(*args, **kwargs)
|
|
if creation:
|
|
subscribers = Group.objects.filter(
|
|
name=settings.SITH_MAIN_MEMBERS_GROUP
|
|
).first()
|
|
self.make_home()
|
|
self.home.edit_groups.add(self.board_group)
|
|
self.home.view_groups.add(self.members_group, subscribers)
|
|
self.make_page()
|
|
cache.set(f"sith_club_{self.unix_name}", self)
|
|
|
|
def get_absolute_url(self):
|
|
return reverse("club:club_view", kwargs={"club_id": self.id})
|
|
|
|
@cached_property
|
|
def president(self) -> Membership | None:
|
|
"""Fetch the membership of the current president of this club."""
|
|
return self.members.filter(
|
|
role=settings.SITH_CLUB_ROLES_ID["President"], end_date=None
|
|
).first()
|
|
|
|
def check_loop(self):
|
|
"""Raise a validation error when a loop is found within the parent list."""
|
|
objs = []
|
|
cur = self
|
|
while cur.parent is not None:
|
|
if cur in objs:
|
|
raise ValidationError(_("You can not make loops in clubs"))
|
|
objs.append(cur)
|
|
cur = cur.parent
|
|
|
|
def clean(self):
|
|
self.check_loop()
|
|
|
|
def make_home(self) -> None:
|
|
if self.home:
|
|
return
|
|
home_root = SithFile.objects.filter(parent=None, name="clubs").first()
|
|
root = User.objects.filter(username="root").first()
|
|
if home_root and root:
|
|
home = SithFile(parent=home_root, name=self.unix_name, owner=root)
|
|
home.save()
|
|
self.home = home
|
|
self.save()
|
|
|
|
def make_page(self) -> None:
|
|
root = User.objects.filter(username="root").first()
|
|
if not self.page:
|
|
club_root = Page.objects.filter(name=settings.SITH_CLUB_ROOT_PAGE).first()
|
|
if root and club_root:
|
|
public = Group.objects.filter(id=settings.SITH_GROUP_PUBLIC_ID).first()
|
|
p = Page(name=self.unix_name)
|
|
p.parent = club_root
|
|
p.save(force_lock=True)
|
|
if public:
|
|
p.view_groups.add(public)
|
|
p.save(force_lock=True)
|
|
if self.parent and self.parent.page:
|
|
p.parent = self.parent.page
|
|
self.page = p
|
|
self.save()
|
|
elif self.page and self.page.name != self.unix_name:
|
|
self.page.unset_lock()
|
|
self.page.name = self.unix_name
|
|
self.page.save(force_lock=True)
|
|
elif (
|
|
self.page
|
|
and self.parent
|
|
and self.parent.page
|
|
and self.page.parent != self.parent.page
|
|
):
|
|
self.page.unset_lock()
|
|
self.page.parent = self.parent.page
|
|
self.page.save(force_lock=True)
|
|
|
|
def delete(self, *args, **kwargs) -> tuple[int, dict[str, int]]:
|
|
# Invalidate the cache of this club and of its memberships
|
|
for membership in self.members.ongoing().select_related("user"):
|
|
cache.delete(f"membership_{self.id}_{membership.user.id}")
|
|
cache.delete(f"sith_club_{self.unix_name}")
|
|
return super().delete(*args, **kwargs)
|
|
|
|
def get_display_name(self) -> str:
|
|
return self.name
|
|
|
|
def is_owned_by(self, user: User) -> bool:
|
|
"""Method to see if that object can be super edited by the given user."""
|
|
if user.is_anonymous:
|
|
return False
|
|
return user.is_board_member
|
|
|
|
def get_full_logo_url(self) -> str:
|
|
return f"https://{settings.SITH_URL}{self.logo.url}"
|
|
|
|
def can_be_edited_by(self, user: User) -> bool:
|
|
"""Method to see if that object can be edited by the given user."""
|
|
return self.has_rights_in_club(user)
|
|
|
|
def can_be_viewed_by(self, user: User) -> bool:
|
|
"""Method to see if that object can be seen by the given user."""
|
|
return user.was_subscribed
|
|
|
|
def get_membership_for(self, user: User) -> Membership | None:
|
|
"""Return the current membership the given user.
|
|
|
|
Note:
|
|
The result is cached.
|
|
"""
|
|
if user.is_anonymous:
|
|
return None
|
|
membership = cache.get(f"membership_{self.id}_{user.id}")
|
|
if membership == "not_member":
|
|
return None
|
|
if membership is None:
|
|
membership = self.members.filter(user=user, end_date=None).first()
|
|
if membership is None:
|
|
cache.set(f"membership_{self.id}_{user.id}", "not_member")
|
|
else:
|
|
cache.set(f"membership_{self.id}_{user.id}", membership)
|
|
return membership
|
|
|
|
def has_rights_in_club(self, user: User) -> bool:
|
|
m = self.get_membership_for(user)
|
|
return m is not None and m.role > settings.SITH_MAXIMUM_FREE_ROLE
|
|
|
|
|
|
class MembershipQuerySet(models.QuerySet):
|
|
@staticmethod
|
|
def _remove_from_club_groups(users: list[int], clubs: list[int]):
|
|
groups = Group.objects.filter(Q(club__in=clubs) | Q(club_board__in=clubs))
|
|
return User.groups.through.objects.filter(
|
|
Q(group__in=groups) & Q(user__in=users)
|
|
).delete()
|
|
|
|
def ongoing(self) -> Self:
|
|
"""Filter all memberships which are not finished yet."""
|
|
return self.filter(Q(end_date=None) | Q(end_date__gt=localdate()))
|
|
|
|
def board(self) -> Self:
|
|
"""Filter all memberships where the user is/was in the board.
|
|
|
|
Be aware that users who were in the board in the past
|
|
are included, even if there are no more members.
|
|
|
|
If you want to get the users who are currently in the board,
|
|
mind combining this with the :meth:`ongoing` queryset method
|
|
"""
|
|
return self.filter(role__gt=settings.SITH_MAXIMUM_FREE_ROLE)
|
|
|
|
def update(self, **kwargs) -> int:
|
|
"""Refresh the cache for the elements of the queryset.
|
|
|
|
Besides that, does the same job as a regular update method.
|
|
|
|
Be aware that this adds a db query to retrieve the updated objects
|
|
"""
|
|
nb_rows = super().update(**kwargs)
|
|
if nb_rows == 0:
|
|
# if no row was affected, no need to refresh the cache
|
|
return 0
|
|
|
|
cache_memberships = {}
|
|
to_remove = {"users": [], "clubs": []}
|
|
for membership in self.all():
|
|
cache_key = f"membership_{membership.club_id}_{membership.user_id}"
|
|
if membership.end_date is not None and membership.end_date <= localdate():
|
|
cache_memberships[cache_key] = "not_member"
|
|
to_remove["users"].append(membership.user_id)
|
|
to_remove["clubs"].append(membership.club_id)
|
|
else:
|
|
cache_memberships[cache_key] = membership
|
|
cache.set_many(cache_memberships)
|
|
self._remove_from_club_groups(to_remove["users"], to_remove["clubs"])
|
|
return nb_rows
|
|
|
|
def delete(self) -> tuple[int, dict[str, int]]:
|
|
"""Work just like the default Django's delete() method,
|
|
but add a cache invalidation for the elements of the queryset
|
|
before the deletion,
|
|
and a removal of the user from the club groups.
|
|
|
|
Be aware that this adds a db query to retrieve the deleted element
|
|
and another to remove users from the groups.
|
|
As queries take place before the deletion operation,
|
|
they will be performed even if the deletion fails.
|
|
"""
|
|
ids = list(self.values_list("club_id", "user_id"))
|
|
nb_rows, rows_counts = super().delete()
|
|
if nb_rows > 0:
|
|
user_ids = [i[0] for i in ids]
|
|
club_ids = [i[1] for i in ids]
|
|
self._remove_from_club_groups(user_ids, club_ids)
|
|
cache.set_many(
|
|
{
|
|
f"membership_{club_id}_{user_id}": "not_member"
|
|
for club_id, user_id in ids
|
|
}
|
|
)
|
|
return nb_rows, rows_counts
|
|
|
|
|
|
class Membership(models.Model):
|
|
"""The Membership class makes the connection between User and Clubs.
|
|
|
|
Both Users and Clubs can have many Membership objects:
|
|
- a user can be a member of many clubs at a time
|
|
- a club can have many members at a time too
|
|
|
|
A User is currently member of all the Clubs where its Membership has an end_date set to null/None.
|
|
Otherwise, it's a past membership kept because it can be very useful to see who was in which Club in the past.
|
|
"""
|
|
|
|
user = models.ForeignKey(
|
|
User,
|
|
verbose_name=_("user"),
|
|
related_name="memberships",
|
|
null=False,
|
|
blank=False,
|
|
on_delete=models.CASCADE,
|
|
)
|
|
club = models.ForeignKey(
|
|
Club,
|
|
verbose_name=_("club"),
|
|
related_name="members",
|
|
null=False,
|
|
blank=False,
|
|
on_delete=models.CASCADE,
|
|
)
|
|
start_date = models.DateField(_("start date"), default=timezone.now)
|
|
end_date = models.DateField(_("end date"), null=True, blank=True)
|
|
role = models.IntegerField(
|
|
_("role"),
|
|
choices=sorted(settings.SITH_CLUB_ROLES.items()),
|
|
default=sorted(settings.SITH_CLUB_ROLES.items())[0][0],
|
|
)
|
|
description = models.CharField(
|
|
_("description"), max_length=128, null=False, blank=True
|
|
)
|
|
|
|
objects = MembershipQuerySet.as_manager()
|
|
|
|
def __str__(self):
|
|
return (
|
|
f"{self.club.name} - {self.user.username} "
|
|
f"- {settings.SITH_CLUB_ROLES[self.role]} "
|
|
f"- {str(_('past member')) if self.end_date is not None else ''}"
|
|
)
|
|
|
|
def save(self, *args, **kwargs):
|
|
# adding must be set before calling super().save()
|
|
# because the call will switch _state.adding from True to False
|
|
adding = self._state.adding
|
|
super().save(*args, **kwargs)
|
|
if adding:
|
|
groups = [
|
|
User.groups.through(
|
|
user_id=self.user_id, group_id=self.club.members_group_id
|
|
)
|
|
]
|
|
if self.role > settings.SITH_MAXIMUM_FREE_ROLE:
|
|
groups.append(
|
|
User.groups.through(
|
|
user_id=self.user_id, group_id=self.club.board_group_id
|
|
)
|
|
)
|
|
User.groups.through.objects.bulk_create(groups)
|
|
if self.end_date is None:
|
|
cache.set(f"membership_{self.club_id}_{self.user_id}", self)
|
|
else:
|
|
cache.set(f"membership_{self.club_id}_{self.user_id}", "not_member")
|
|
|
|
def get_absolute_url(self):
|
|
return reverse("club:club_members", kwargs={"club_id": self.club_id})
|
|
|
|
@property
|
|
def is_ongoing(self):
|
|
return self.end_date is None or self.end_date
|
|
|
|
def is_owned_by(self, user):
|
|
"""Method to see if that object can be super edited by the given user."""
|
|
if user.is_anonymous:
|
|
return False
|
|
return user.is_board_member
|
|
|
|
def can_be_edited_by(self, user: User) -> bool:
|
|
"""Check if that object can be edited by the given user."""
|
|
if user.is_root or user.is_board_member:
|
|
return True
|
|
membership = self.club.get_membership_for(user)
|
|
return membership is not None and membership.role >= self.role
|
|
|
|
def delete(self, *args, **kwargs):
|
|
super().delete(*args, **kwargs)
|
|
cache.delete(f"membership_{self.club_id}_{self.user_id}")
|
|
|
|
|
|
class Mailing(models.Model):
|
|
"""A Mailing list for a club.
|
|
|
|
Warning:
|
|
Remember that mailing lists should be validated by UTBM.
|
|
"""
|
|
|
|
club = models.ForeignKey(
|
|
Club,
|
|
verbose_name=_("Club"),
|
|
related_name="mailings",
|
|
null=False,
|
|
blank=False,
|
|
on_delete=models.CASCADE,
|
|
)
|
|
email = models.CharField(
|
|
_("Email address"),
|
|
unique=True,
|
|
null=False,
|
|
blank=False,
|
|
max_length=256,
|
|
validators=[
|
|
RegexValidator(
|
|
validate_email.user_regex,
|
|
_("Enter a valid address. Only the root of the address is needed."),
|
|
)
|
|
],
|
|
)
|
|
is_moderated = models.BooleanField(_("is moderated"), default=False)
|
|
moderator = models.ForeignKey(
|
|
User,
|
|
related_name="moderated_mailings",
|
|
verbose_name=_("moderator"),
|
|
null=True,
|
|
on_delete=models.CASCADE,
|
|
)
|
|
|
|
def __str__(self):
|
|
return "%s - %s" % (self.club, self.email_full)
|
|
|
|
def save(self, *args, **kwargs):
|
|
if not self.is_moderated:
|
|
unread_notif_subquery = Notification.objects.filter(
|
|
user=OuterRef("pk"), type="MAILING_MODERATION", viewed=False
|
|
)
|
|
for user in User.objects.filter(
|
|
~Exists(unread_notif_subquery),
|
|
groups__id__in=[settings.SITH_GROUP_COM_ADMIN_ID],
|
|
):
|
|
Notification(
|
|
user=user,
|
|
url=reverse("com:mailing_admin"),
|
|
type="MAILING_MODERATION",
|
|
).save(*args, **kwargs)
|
|
super().save(*args, **kwargs)
|
|
|
|
def clean(self):
|
|
if Mailing.objects.filter(email=self.email).exists():
|
|
raise ValidationError(_("This mailing list already exists."))
|
|
if self.can_moderate(self.moderator):
|
|
self.is_moderated = True
|
|
else:
|
|
self.moderator = None
|
|
super().clean()
|
|
|
|
@property
|
|
def email_full(self):
|
|
return self.email + "@" + settings.SITH_MAILING_DOMAIN
|
|
|
|
def can_moderate(self, user):
|
|
return user.is_root or user.is_com_admin
|
|
|
|
def is_owned_by(self, user):
|
|
if user.is_anonymous:
|
|
return False
|
|
return user.is_root or user.is_com_admin
|
|
|
|
def can_view(self, user):
|
|
return self.club.has_rights_in_club(user)
|
|
|
|
def can_be_edited_by(self, user):
|
|
return self.club.has_rights_in_club(user)
|
|
|
|
def delete(self, *args, **kwargs):
|
|
self.subscriptions.all().delete()
|
|
super().delete()
|
|
|
|
def fetch_format(self):
|
|
destination = "".join(s.fetch_format() for s in self.subscriptions.all())
|
|
return f"{self.email}: {destination}"
|
|
|
|
|
|
class MailingSubscription(models.Model):
|
|
"""Link between user and mailing list."""
|
|
|
|
mailing = models.ForeignKey(
|
|
Mailing,
|
|
verbose_name=_("Mailing"),
|
|
related_name="subscriptions",
|
|
null=False,
|
|
blank=False,
|
|
on_delete=models.CASCADE,
|
|
)
|
|
user = models.ForeignKey(
|
|
User,
|
|
verbose_name=_("User"),
|
|
related_name="mailing_subscriptions",
|
|
null=True,
|
|
blank=True,
|
|
on_delete=models.CASCADE,
|
|
)
|
|
email = models.EmailField(_("Email address"), blank=False, null=False)
|
|
|
|
class Meta:
|
|
unique_together = (("user", "email", "mailing"),)
|
|
|
|
def __str__(self):
|
|
return "(%s) - %s : %s" % (self.mailing, self.get_username, self.email)
|
|
|
|
def clean(self):
|
|
if not self.user and not self.email:
|
|
raise ValidationError(_("At least user or email is required"))
|
|
try:
|
|
if self.user and not self.email:
|
|
self.email = self.user.email
|
|
if MailingSubscription.objects.filter(
|
|
mailing=self.mailing, email=self.email
|
|
).exists():
|
|
raise ValidationError(
|
|
_("This email is already suscribed in this mailing")
|
|
)
|
|
except ObjectDoesNotExist:
|
|
pass
|
|
super().clean()
|
|
|
|
def is_owned_by(self, user):
|
|
if user.is_anonymous:
|
|
return False
|
|
return (
|
|
self.mailing.club.has_rights_in_club(user)
|
|
or user.is_root
|
|
or self.user.is_com_admin
|
|
)
|
|
|
|
def can_be_edited_by(self, user):
|
|
return self.user is not None and user.id == self.user.id
|
|
|
|
@cached_property
|
|
def get_email(self):
|
|
if self.user and not self.email:
|
|
return self.user.email
|
|
return self.email
|
|
|
|
@cached_property
|
|
def get_username(self):
|
|
if self.user:
|
|
return str(self.user)
|
|
return _("Unregistered user")
|
|
|
|
def fetch_format(self):
|
|
return self.get_email + " "
|