# # Copyright 2016,2017,2018 # - Skia # - Sli # # Ce fichier fait partie du site de l'Association des Étudiants de l'UTBM, # http://ae.utbm.fr. # # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License a published by the Free Software # Foundation; either version 3 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Sofware Foundation, Inc., 59 Temple # Place - Suite 330, Boston, MA 02111-1307, USA. # # from __future__ import annotations import importlib import logging import os import string import unicodedata from datetime import timedelta from pathlib import Path from typing import TYPE_CHECKING, Any, Optional, Self from django.conf import settings from django.contrib.auth.models import AbstractBaseUser, UserManager from django.contrib.auth.models import ( AnonymousUser as AuthAnonymousUser, ) from django.contrib.auth.models import ( Group as AuthGroup, ) from django.contrib.auth.models import ( GroupManager as AuthGroupManager, ) from django.contrib.staticfiles.storage import staticfiles_storage from django.core import validators from django.core.cache import cache from django.core.exceptions import PermissionDenied, ValidationError from django.core.mail import send_mail from django.db import models, transaction from django.db.models import Exists, OuterRef, Q from django.urls import reverse from django.utils import timezone from django.utils.functional import cached_property from django.utils.html import escape from django.utils.timezone import localdate, now from django.utils.translation import gettext_lazy as _ from phonenumber_field.modelfields import PhoneNumberField if TYPE_CHECKING: from pydantic import NonNegativeInt from club.models import Club class RealGroupManager(AuthGroupManager): def get_queryset(self): return super().get_queryset().filter(is_meta=False) class MetaGroupManager(AuthGroupManager): def get_queryset(self): return super().get_queryset().filter(is_meta=True) class Group(AuthGroup): """Implement both RealGroups and Meta groups. Groups are sorted by their is_meta property """ #: If False, this is a RealGroup is_meta = models.BooleanField( _("meta group status"), default=False, help_text=_("Whether a group is a meta group or not"), ) #: Description of the group description = models.CharField(_("description"), max_length=60) class Meta: ordering = ["name"] def get_absolute_url(self) -> str: return reverse("core:group_list") def save(self, *args, **kwargs) -> None: super().save(*args, **kwargs) cache.set(f"sith_group_{self.id}", self) cache.set(f"sith_group_{self.name.replace(' ', '_')}", self) def delete(self, *args, **kwargs) -> None: super().delete(*args, **kwargs) cache.delete(f"sith_group_{self.id}") cache.delete(f"sith_group_{self.name.replace(' ', '_')}") class MetaGroup(Group): """MetaGroups are dynamically created groups. Generally used with clubs where creating a club creates two groups: * club-SITH_BOARD_SUFFIX * club-SITH_MEMBER_SUFFIX """ #: Assign a manager in a way that MetaGroup.objects only return groups with is_meta=False objects = MetaGroupManager() class Meta: proxy = True def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.is_meta = True @cached_property def associated_club(self) -> Club | None: """Return the group associated with this meta group. The result of this function is cached Returns: The associated club if it exists, else None """ from club.models import Club if self.name.endswith(settings.SITH_BOARD_SUFFIX): # replace this with str.removesuffix as soon as Python # is upgraded to 3.10 club_name = self.name[: -len(settings.SITH_BOARD_SUFFIX)] elif self.name.endswith(settings.SITH_MEMBER_SUFFIX): club_name = self.name[: -len(settings.SITH_MEMBER_SUFFIX)] else: return None club = cache.get(f"sith_club_{club_name}") if club is None: club = Club.objects.filter(unix_name=club_name).first() cache.set(f"sith_club_{club_name}", club) return club class RealGroup(Group): """RealGroups are created by the developer. Most of the time they match a number in settings to be easily used for permissions. """ #: Assign a manager in a way that MetaGroup.objects only return groups with is_meta=True objects = RealGroupManager() class Meta: proxy = True def validate_promo(value: int) -> None: start_year = settings.SITH_SCHOOL_START_YEAR delta = (localdate() + timedelta(days=180)).year - start_year if value < 0 or delta < value: raise ValidationError( _("%(value)s is not a valid promo (between 0 and %(end)s)"), params={"value": value, "end": delta}, ) def get_group(*, pk: int | None = None, name: str | None = None) -> Group | None: """Search for a group by its primary key or its name. Either one of the two must be set. The result is cached for the default duration (should be 5 minutes). Args: pk: The primary key of the group name: The name of the group Returns: The group if it exists, else None Raises: ValueError: If no group matches the criteria """ if pk is None and name is None: raise ValueError("Either pk or name must be set") # replace space characters to hide warnings with memcached backend pk_or_name: str | int = pk if pk is not None else name.replace(" ", "_") group = cache.get(f"sith_group_{pk_or_name}") if group == "not_found": # Using None as a cache value is a little bit tricky, # so we use a special string to represent None return None elif group is not None: return group # if this point is reached, the group is not in cache if pk is not None: group = Group.objects.filter(pk=pk).first() else: group = Group.objects.filter(name=name).first() if group is not None: cache.set(f"sith_group_{group.id}", group) cache.set(f"sith_group_{group.name.replace(' ', '_')}", group) else: cache.set(f"sith_group_{pk_or_name}", "not_found") return group class UserQuerySet(models.QuerySet): def filter_inactive(self) -> Self: from counter.models import Refilling, Selling from subscription.models import Subscription threshold = now() - settings.SITH_ACCOUNT_INACTIVITY_DELTA subscriptions = Subscription.objects.filter( member_id=OuterRef("pk"), subscription_end__gt=localdate(threshold) ) refills = Refilling.objects.filter( customer__user_id=OuterRef("pk"), date__gt=threshold ) purchases = Selling.objects.filter( customer__user_id=OuterRef("pk"), date__gt=threshold ) return self.exclude( Q(Exists(subscriptions)) | Q(Exists(refills)) | Q(Exists(purchases)) ) class CustomUserManager(UserManager.from_queryset(UserQuerySet)): # see https://docs.djangoproject.com/fr/stable/topics/migrations/#model-managers pass class User(AbstractBaseUser): """Defines the base user class, useable in every app. This is almost the same as the auth module AbstractUser since it inherits from it, but some fields are required, and the username is generated automatically with the name of the user (see generate_username()). Added field: nick_name, date_of_birth Required fields: email, first_name, last_name, date_of_birth """ username = models.CharField( _("username"), max_length=254, unique=True, help_text=_( "Required. 254 characters or fewer. Letters, digits and ./+/-/_ only." ), validators=[ validators.RegexValidator( r"^[\w.+-]+$", _( "Enter a valid username. This value may contain only " "letters, numbers " "and ./+/-/_ characters." ), ) ], error_messages={"unique": _("A user with that username already exists.")}, ) first_name = models.CharField(_("first name"), max_length=64) last_name = models.CharField(_("last name"), max_length=64) email = models.EmailField(_("email address"), unique=True) date_of_birth = models.DateField(_("date of birth"), blank=True, null=True) nick_name = models.CharField(_("nick name"), max_length=64, null=True, blank=True) is_staff = models.BooleanField( _("staff status"), default=False, help_text=_("Designates whether the user can log into this admin site."), ) is_active = models.BooleanField( _("active"), default=True, help_text=_( "Designates whether this user should be treated as active. " "Unselect this instead of deleting accounts." ), ) date_joined = models.DateField(_("date joined"), auto_now_add=True) last_update = models.DateTimeField(_("last update"), auto_now=True) is_superuser = models.BooleanField( _("superuser"), default=False, help_text=_("Designates whether this user is a superuser. "), ) groups = models.ManyToManyField(RealGroup, related_name="users", blank=True) home = models.OneToOneField( "SithFile", related_name="home_of", verbose_name=_("home"), null=True, blank=True, on_delete=models.SET_NULL, ) profile_pict = models.OneToOneField( "SithFile", related_name="profile_of", verbose_name=_("profile"), null=True, blank=True, on_delete=models.SET_NULL, ) avatar_pict = models.OneToOneField( "SithFile", related_name="avatar_of", verbose_name=_("avatar"), null=True, blank=True, on_delete=models.SET_NULL, ) scrub_pict = models.OneToOneField( "SithFile", related_name="scrub_of", verbose_name=_("scrub"), null=True, blank=True, on_delete=models.SET_NULL, ) sex = models.CharField( _("sex"), max_length=10, null=True, blank=True, choices=[("MAN", _("Man")), ("WOMAN", _("Woman")), ("OTHER", _("Other"))], ) pronouns = models.CharField(_("pronouns"), max_length=64, blank=True, default="") tshirt_size = models.CharField( _("tshirt size"), max_length=5, choices=[ ("-", _("-")), ("XS", _("XS")), ("S", _("S")), ("M", _("M")), ("L", _("L")), ("XL", _("XL")), ("XXL", _("XXL")), ("XXXL", _("XXXL")), ], default="-", ) role = models.CharField( _("role"), max_length=15, choices=[ ("STUDENT", _("Student")), ("ADMINISTRATIVE", _("Administrative agent")), ("TEACHER", _("Teacher")), ("AGENT", _("Agent")), ("DOCTOR", _("Doctor")), ("FORMER STUDENT", _("Former student")), ("SERVICE", _("Service")), ], blank=True, default="", ) department = models.CharField( _("department"), max_length=15, choices=settings.SITH_PROFILE_DEPARTMENTS, default="NA", blank=True, ) dpt_option = models.CharField( _("dpt option"), max_length=32, blank=True, default="" ) semester = models.CharField(_("semester"), max_length=5, blank=True, default="") quote = models.CharField(_("quote"), max_length=256, blank=True, default="") school = models.CharField(_("school"), max_length=80, blank=True, default="") promo = models.IntegerField( _("promo"), validators=[validate_promo], null=True, blank=True ) forum_signature = models.TextField( _("forum signature"), max_length=256, blank=True, default="" ) second_email = models.EmailField(_("second email address"), null=True, blank=True) phone = PhoneNumberField(_("phone"), null=True, blank=True) parent_phone = PhoneNumberField(_("parent phone"), null=True, blank=True) address = models.CharField(_("address"), max_length=128, blank=True, default="") parent_address = models.CharField( _("parent address"), max_length=128, blank=True, default="" ) is_subscriber_viewable = models.BooleanField( _("is subscriber viewable"), default=True ) godfathers = models.ManyToManyField("User", related_name="godchildren", blank=True) objects = CustomUserManager() USERNAME_FIELD = "username" def __str__(self): return self.get_display_name() def save(self, *args, **kwargs): with transaction.atomic(): if self.id: old = User.objects.filter(id=self.id).first() if old and old.username != self.username: self._change_username(self.username) super().save(*args, **kwargs) def get_absolute_url(self) -> str: return reverse("core:user_profile", kwargs={"user_id": self.pk}) def promo_has_logo(self) -> bool: return Path( settings.BASE_DIR / f"core/static/core/img/promo_{self.promo}.png" ).exists() def has_module_perms(self, package_name: str) -> bool: return self.is_active def has_perm(self, perm: str, obj: Any = None) -> bool: return self.is_active and self.is_superuser @cached_property def was_subscribed(self) -> bool: return self.subscriptions.exists() @cached_property def is_subscribed(self) -> bool: s = self.subscriptions.filter( subscription_start__lte=timezone.now(), subscription_end__gte=timezone.now() ) return s.exists() @cached_property def account_balance(self): if hasattr(self, "customer"): return self.customer.amount else: return 0 def is_in_group(self, *, pk: int | None = None, name: str | None = None) -> bool: """Check if this user is in the given group. Either a group id or a group name must be provided. If both are passed, only the id will be considered. The group will be fetched using the given parameter. If no group is found, return False. If a group is found, check if this user is in the latter. Returns: True if the user is the group, else False """ if pk is not None: group: Optional[Group] = get_group(pk=pk) elif name is not None: group: Optional[Group] = get_group(name=name) else: raise ValueError("You must either provide the id or the name of the group") if group is None: return False if group.id == settings.SITH_GROUP_PUBLIC_ID: return True if group.id == settings.SITH_GROUP_SUBSCRIBERS_ID: return self.is_subscribed if group.id == settings.SITH_GROUP_OLD_SUBSCRIBERS_ID: return self.was_subscribed if group.id == settings.SITH_GROUP_ROOT_ID: return self.is_root if group.is_meta: # check if this group is associated with a club group.__class__ = MetaGroup club = group.associated_club if club is None: return False membership = club.get_membership_for(self) if membership is None: return False if group.name.endswith(settings.SITH_MEMBER_SUFFIX): return True return membership.role > settings.SITH_MAXIMUM_FREE_ROLE return group in self.cached_groups @property def cached_groups(self) -> list[Group]: """Get the list of groups this user is in. The result is cached for the default duration (should be 5 minutes) Returns: A list of all the groups this user is in. """ groups = cache.get(f"user_{self.id}_groups") if groups is None: groups = list(self.groups.all()) cache.set(f"user_{self.id}_groups", groups) return groups @cached_property def is_root(self) -> bool: if self.is_superuser: return True root_id = settings.SITH_GROUP_ROOT_ID return any(g.id == root_id for g in self.cached_groups) @cached_property def is_board_member(self): main_club = settings.SITH_MAIN_CLUB["unix_name"] return self.is_in_group(name=main_club + settings.SITH_BOARD_SUFFIX) @cached_property def can_read_subscription_history(self): if self.is_root or self.is_board_member: return True from club.models import Club for club in Club.objects.filter( id__in=settings.SITH_CAN_READ_SUBSCRIPTION_HISTORY ): if club in self.clubs_with_rights: return True return False @cached_property def can_create_subscription(self) -> bool: return self.is_root or ( self.memberships.board() .ongoing() .filter(club_id__in=settings.SITH_CAN_CREATE_SUBSCRIPTIONS) .exists() ) @cached_property def is_launderette_manager(self): from club.models import Club return ( Club.objects.filter( unix_name=settings.SITH_LAUNDERETTE_MANAGER["unix_name"] ) .first() .get_membership_for(self) ) @cached_property def is_banned_alcohol(self): return self.is_in_group(pk=settings.SITH_GROUP_BANNED_ALCOHOL_ID) @cached_property def is_banned_counter(self): return self.is_in_group(pk=settings.SITH_GROUP_BANNED_COUNTER_ID) @cached_property def age(self) -> int: """Return the age this user has the day the method is called. If the user has not filled his age, return 0. """ if self.date_of_birth is None: return 0 today = timezone.now() age = today.year - self.date_of_birth.year # remove a year if this year's birthday is yet to come age -= (today.month, today.day) < ( self.date_of_birth.month, self.date_of_birth.day, ) return age def make_home(self): if self.home is None: home_root = SithFile.objects.filter(parent=None, name="users").first() if home_root is not None: home = SithFile(parent=home_root, name=self.username, owner=self) home.save() self.home = home self.save() def _change_username(self, new_name): u = User.objects.filter(username=new_name).first() if u is None: if self.home: self.home.name = new_name self.home.save() else: raise ValidationError(_("A user with that username already exists")) def get_profile(self): return { "last_name": self.last_name, "first_name": self.first_name, "nick_name": self.nick_name, "date_of_birth": self.date_of_birth, } def get_full_name(self): """Returns the first_name plus the last_name, with a space in between.""" full_name = "%s %s" % (self.first_name, self.last_name) return full_name.strip() def get_short_name(self): """Returns the short name for the user.""" if self.nick_name: return self.nick_name return self.first_name + " " + self.last_name def get_display_name(self) -> str: """Returns the display name of the user. A nickname if possible, otherwise, the full name. """ if self.nick_name: return "%s (%s)" % (self.get_full_name(), self.nick_name) return self.get_full_name() def get_age(self): """Returns the age.""" today = timezone.now() born = self.date_of_birth return ( today.year - born.year - ((today.month, today.day) < (born.month, born.day)) ) def get_family( self, godfathers_depth: NonNegativeInt = 4, godchildren_depth: NonNegativeInt = 4, ) -> set[User.godfathers.through]: """Get the family of the user, with the given depth. Args: godfathers_depth: The number of generations of godfathers to fetch godchildren_depth: The number of generations of godchildren to fetch Returns: A list of family relationships in this user's family """ res = [] for depth, key, reverse_key in [ (godfathers_depth, "from_user_id", "to_user_id"), (godchildren_depth, "to_user_id", "from_user_id"), ]: if depth == 0: continue links = list(User.godfathers.through.objects.filter(**{key: self.id})) res.extend(links) for _ in range(1, depth): # noqa: F402 we don't care about gettext here ids = [getattr(c, reverse_key) for c in links] links = list( User.godfathers.through.objects.filter( **{f"{key}__in": ids} ).exclude(id__in=[r.id for r in res]) ) if not links: break res.extend(links) return set(res) def email_user(self, subject, message, from_email=None, **kwargs): """Sends an email to this User.""" if from_email is None: from_email = settings.DEFAULT_FROM_EMAIL send_mail(subject, message, from_email, [self.email], **kwargs) def generate_username(self) -> str: """Generates a unique username based on the first and last names. For example: Guy Carlier gives gcarlier, and gcarlier1 if the first one exists. Returns: The generated username. """ def remove_accents(data): return "".join( x for x in unicodedata.normalize("NFKD", data) if unicodedata.category(x)[0] == "L" ).lower() user_name = ( remove_accents(self.first_name[0] + self.last_name) .encode("ascii", "ignore") .decode("utf-8") ) # load all usernames which could conflict with the new one. # we need to actually load them, instead of performing a count, # because we cannot be sure that two usernames refer to the # actual same word (eg. tmore and tmoreau) possible_conflicts: list[str] = list( User.objects.filter(username__startswith=user_name).values_list( "username", flat=True ) ) nb_conflicts = sum( 1 for name in possible_conflicts if name.rstrip(string.digits) == user_name ) if nb_conflicts > 0: user_name += str(nb_conflicts) # exemple => exemple1 self.username = user_name return user_name def is_owner(self, obj): """Determine if the object is owned by the user.""" if hasattr(obj, "is_owned_by") and obj.is_owned_by(self): return True if hasattr(obj, "owner_group") and self.is_in_group(pk=obj.owner_group.id): return True return self.is_root def can_edit(self, obj): """Determine if the object can be edited by the user.""" if hasattr(obj, "can_be_edited_by") and obj.can_be_edited_by(self): return True if hasattr(obj, "edit_groups"): for pk in obj.edit_groups.values_list("pk", flat=True): if self.is_in_group(pk=pk): return True if isinstance(obj, User) and obj == self: return True return self.is_owner(obj) def can_view(self, obj): """Determine if the object can be viewed by the user.""" if hasattr(obj, "can_be_viewed_by") and obj.can_be_viewed_by(self): return True if hasattr(obj, "view_groups"): for pk in obj.view_groups.values_list("pk", flat=True): if self.is_in_group(pk=pk): return True return self.can_edit(obj) def can_be_edited_by(self, user): return user.is_root or user.is_board_member def can_be_viewed_by(self, user): return (user.was_subscribed and self.is_subscriber_viewable) or user.is_root def get_mini_item(self): return """