replace MetaGroups by proper group management

This commit is contained in:
imperosol
2024-11-30 20:30:17 +01:00
parent cce7ecbe73
commit 6400b2c2c2
16 changed files with 410 additions and 355 deletions

View File

@ -0,0 +1,79 @@
# Generated by Django 4.2.16 on 2024-11-20 17:08
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models
from django.db.migrations.state import StateApps
def migrate_meta_groups(apps: StateApps, schema_editor):
Group = apps.get_model("core", "Group")
Club = apps.get_model("club", "Club")
meta_groups = Group.objects.filter(is_meta=True)
clubs = list(Club.objects.all())
for club in clubs:
club.board_group = meta_groups.get_or_create(
name=club.unix_name + settings.SITH_BOARD_SUFFIX
)[0]
club.members_group = meta_groups.get_or_create(
name=club.unix_name + settings.SITH_MEMBER_SUFFIX
)[0]
Club.objects.bulk_update(clubs, fields=["board_group", "members_group"])
# steps of the migration :
# - Create a nullable field for the board group and the member group
# - Edit those new fields to make them point to currently existing meta groups
# - When this data migration is done, make the fields non-nullable
class Migration(migrations.Migration):
dependencies = [
("core", "0040_alter_user_options_user_user_permissions_and_more"),
("club", "0011_auto_20180426_2013"),
]
operations = [
migrations.AddField(
model_name="club",
name="board_group",
field=models.OneToOneField(
blank=True,
null=True,
on_delete=django.db.models.deletion.CASCADE,
related_name="club_board",
to="core.group",
),
),
migrations.AddField(
model_name="club",
name="members_group",
field=models.OneToOneField(
blank=True,
null=True,
on_delete=django.db.models.deletion.CASCADE,
related_name="club",
to="core.group",
),
),
migrations.RunPython(
migrate_meta_groups, reverse_code=migrations.RunPython.noop, elidable=True
),
migrations.AlterField(
model_name="club",
name="board_group",
field=models.OneToOneField(
on_delete=django.db.models.deletion.CASCADE,
related_name="club_board",
to="core.group",
),
),
migrations.AlterField(
model_name="club",
name="members_group",
field=models.OneToOneField(
on_delete=django.db.models.deletion.CASCADE,
related_name="club",
to="core.group",
),
),
]

View File

@ -38,7 +38,7 @@ from django.utils.functional import cached_property
from django.utils.timezone import localdate
from django.utils.translation import gettext_lazy as _
from core.models import Group, MetaGroup, Notification, Page, SithFile, User
from core.models import Group, Notification, Page, SithFile, User
# Create your models here.
@ -103,6 +103,12 @@ class Club(models.Model):
page = models.OneToOneField(
Page, related_name="club", blank=True, null=True, on_delete=models.CASCADE
)
members_group = models.OneToOneField(
Group, related_name="club", on_delete=models.CASCADE
)
board_group = models.OneToOneField(
Group, related_name="club_board", on_delete=models.CASCADE
)
class Meta:
ordering = ["name", "unix_name"]
@ -112,23 +118,25 @@ class Club(models.Model):
@transaction.atomic()
def save(self, *args, **kwargs):
old = Club.objects.filter(id=self.id).first()
creation = old is None
if not creation and old.unix_name != self.unix_name:
self._change_unixname(self.unix_name)
creation = self._state.adding
if not creation and Club.objects.get(id=self.id).unix_name != self.unix_name:
self.home.name = self.unix_name
self.home.save()
if creation:
self.board_group = Group.objects.create(
name=f"{self.name} - Bureau", is_manually_manageable=False
)
self.members_group = Group.objects.create(
name=f"{self.name} - Membres", is_manually_manageable=False
)
super().save(*args, **kwargs)
if creation:
board = MetaGroup(name=self.unix_name + settings.SITH_BOARD_SUFFIX)
board.save()
member = MetaGroup(name=self.unix_name + settings.SITH_MEMBER_SUFFIX)
member.save()
subscribers = Group.objects.filter(
name=settings.SITH_MAIN_MEMBERS_GROUP
).first()
self.make_home()
self.home.edit_groups.set([board])
self.home.view_groups.set([member, subscribers])
self.home.save()
self.home.edit_groups.add(self.board_group)
self.home.view_groups.add(self.members_group, subscribers)
self.make_page()
cache.set(f"sith_club_{self.unix_name}", self)
@ -136,7 +144,8 @@ class Club(models.Model):
return reverse("club:club_view", kwargs={"club_id": self.id})
@cached_property
def president(self):
def president(self) -> Membership | None:
"""Fetch the membership of the current president of this club."""
return self.members.filter(
role=settings.SITH_CLUB_ROLES_ID["President"], end_date=None
).first()
@ -154,36 +163,18 @@ class Club(models.Model):
def clean(self):
self.check_loop()
def _change_unixname(self, old_name, new_name):
c = Club.objects.filter(unix_name=new_name).first()
if c is None:
# Update all the groups names
Group.objects.filter(name=old_name).update(name=new_name)
Group.objects.filter(name=old_name + settings.SITH_BOARD_SUFFIX).update(
name=new_name + settings.SITH_BOARD_SUFFIX
)
Group.objects.filter(name=old_name + settings.SITH_MEMBER_SUFFIX).update(
name=new_name + settings.SITH_MEMBER_SUFFIX
)
def make_home(self) -> None:
if self.home:
return
home_root = SithFile.objects.filter(parent=None, name="clubs").first()
root = User.objects.filter(username="root").first()
if home_root and root:
home = SithFile(parent=home_root, name=self.unix_name, owner=root)
home.save()
self.home = home
self.save()
if self.home:
self.home.name = new_name
self.home.save()
else:
raise ValidationError(_("A club with that unix_name already exists"))
def make_home(self):
if not self.home:
home_root = SithFile.objects.filter(parent=None, name="clubs").first()
root = User.objects.filter(username="root").first()
if home_root and root:
home = SithFile(parent=home_root, name=self.unix_name, owner=root)
home.save()
self.home = home
self.save()
def make_page(self):
def make_page(self) -> None:
root = User.objects.filter(username="root").first()
if not self.page:
club_root = Page.objects.filter(name=settings.SITH_CLUB_ROOT_PAGE).first()
@ -213,35 +204,32 @@ class Club(models.Model):
self.page.parent = self.parent.page
self.page.save(force_lock=True)
def delete(self, *args, **kwargs):
def delete(self, *args, **kwargs) -> tuple[int, dict[str, int]]:
# Invalidate the cache of this club and of its memberships
for membership in self.members.ongoing().select_related("user"):
cache.delete(f"membership_{self.id}_{membership.user.id}")
cache.delete(f"sith_club_{self.unix_name}")
super().delete(*args, **kwargs)
return super().delete(*args, **kwargs)
def get_display_name(self):
def get_display_name(self) -> str:
return self.name
def is_owned_by(self, user):
def is_owned_by(self, user: User) -> bool:
"""Method to see if that object can be super edited by the given user."""
if user.is_anonymous:
return False
return user.is_board_member
def get_full_logo_url(self):
return "https://%s%s" % (settings.SITH_URL, self.logo.url)
def get_full_logo_url(self) -> str:
return f"https://{settings.SITH_URL}{self.logo.url}"
def can_be_edited_by(self, user):
def can_be_edited_by(self, user: User) -> bool:
"""Method to see if that object can be edited by the given user."""
return self.has_rights_in_club(user)
def can_be_viewed_by(self, user):
def can_be_viewed_by(self, user: User) -> bool:
"""Method to see if that object can be seen by the given user."""
sub = User.objects.filter(pk=user.pk).first()
if sub is None:
return False
return sub.was_subscribed
return user.was_subscribed
def get_membership_for(self, user: User) -> Membership | None:
"""Return the current membership the given user.
@ -262,12 +250,19 @@ class Club(models.Model):
cache.set(f"membership_{self.id}_{user.id}", membership)
return membership
def has_rights_in_club(self, user):
def has_rights_in_club(self, user: User) -> bool:
m = self.get_membership_for(user)
return m is not None and m.role > settings.SITH_MAXIMUM_FREE_ROLE
class MembershipQuerySet(models.QuerySet):
@staticmethod
def _remove_from_club_groups(users: list[int], clubs: list[int]):
groups = Group.objects.filter(Q(club__in=clubs) | Q(club_board__in=clubs))
return User.groups.through.objects.filter(
Q(group__in=groups) & Q(user__in=users)
).delete()
def ongoing(self) -> Self:
"""Filter all memberships which are not finished yet."""
return self.filter(Q(end_date=None) | Q(end_date__gt=localdate()))
@ -283,7 +278,7 @@ class MembershipQuerySet(models.QuerySet):
"""
return self.filter(role__gt=settings.SITH_MAXIMUM_FREE_ROLE)
def update(self, **kwargs):
def update(self, **kwargs) -> int:
"""Refresh the cache for the elements of the queryset.
Besides that, does the same job as a regular update method.
@ -291,34 +286,48 @@ class MembershipQuerySet(models.QuerySet):
Be aware that this adds a db query to retrieve the updated objects
"""
nb_rows = super().update(**kwargs)
if nb_rows > 0:
# if at least a row was affected, refresh the cache
for membership in self.all():
if membership.end_date is not None:
cache.set(
f"membership_{membership.club_id}_{membership.user_id}",
"not_member",
)
else:
cache.set(
f"membership_{membership.club_id}_{membership.user_id}",
membership,
)
if nb_rows == 0:
# if no row was affected, no need to refresh the cache
return 0
def delete(self):
cache_memberships = {}
to_remove = {"users": [], "clubs": []}
for membership in self.all():
cache_key = f"membership_{membership.club_id}_{membership.user_id}"
if membership.end_date is not None and membership.end_date <= localdate():
cache_memberships[cache_key] = "not_member"
to_remove["users"].append(membership.user_id)
to_remove["clubs"].append(membership.club_id)
else:
cache_memberships[cache_key] = membership
cache.set_many(cache_memberships)
self._remove_from_club_groups(to_remove["users"], to_remove["clubs"])
return nb_rows
def delete(self) -> tuple[int, dict[str, int]]:
"""Work just like the default Django's delete() method,
but add a cache invalidation for the elements of the queryset
before the deletion.
before the deletion,
and a removal of the user from the club groups.
Be aware that this adds a db query to retrieve the deleted element.
As this first query take place before the deletion operation,
it will be performed even if the deletion fails.
Be aware that this adds a db query to retrieve the deleted element
and another to remove users from the groups.
As queries take place before the deletion operation,
they will be performed even if the deletion fails.
"""
ids = list(self.values_list("club_id", "user_id"))
nb_rows, _ = super().delete()
nb_rows, rows_counts = super().delete()
if nb_rows > 0:
for club_id, user_id in ids:
cache.set(f"membership_{club_id}_{user_id}", "not_member")
user_ids = [i[0] for i in ids]
club_ids = [i[1] for i in ids]
self._remove_from_club_groups(user_ids, club_ids)
cache.set_many(
{
f"membership_{club_id}_{user_id}": "not_member"
for club_id, user_id in ids
}
)
return nb_rows, rows_counts
class Membership(models.Model):
@ -369,7 +378,23 @@ class Membership(models.Model):
)
def save(self, *args, **kwargs):
# adding must be set before calling super().save()
# because the call will switch _state.adding from True to False
adding = self._state.adding
super().save(*args, **kwargs)
if adding:
groups = [
User.groups.through(
user_id=self.user_id, group_id=self.club.members_group_id
)
]
if self.role > settings.SITH_MAXIMUM_FREE_ROLE:
groups.append(
User.groups.through(
user_id=self.user_id, group_id=self.club.board_group_id
)
)
User.groups.through.objects.bulk_create(groups)
if self.end_date is None:
cache.set(f"membership_{self.club_id}_{self.user_id}", self)
else:
@ -378,6 +403,10 @@ class Membership(models.Model):
def get_absolute_url(self):
return reverse("club:club_members", kwargs={"club_id": self.club_id})
@property
def is_ongoing(self):
return self.end_date is None or self.end_date
def is_owned_by(self, user):
"""Method to see if that object can be super edited by the given user."""
if user.is_anonymous:

View File

@ -21,6 +21,7 @@ from django.urls import reverse
from django.utils import timezone
from django.utils.timezone import localdate, localtime, now
from django.utils.translation import gettext as _
from model_bakery import baker
from club.forms import MailingForm
from club.models import Club, Mailing, Membership
@ -192,10 +193,8 @@ class TestClubModel(TestClub):
assert membership.end_date is None
assert membership.role == role
assert membership.club.get_membership_for(user) == membership
member_group = self.club.unix_name + settings.SITH_MEMBER_SUFFIX
board_group = self.club.unix_name + settings.SITH_BOARD_SUFFIX
assert user.is_in_group(name=member_group)
assert user.is_in_group(name=board_group)
assert user.is_in_group(pk=self.club.members_group_id)
assert user.is_in_group(pk=self.club.board_group_id)
def assert_membership_ended_today(self, user: User):
"""Assert that the given user have a membership which ended today."""
@ -474,37 +473,23 @@ class TestClubModel(TestClub):
assert self.club.members.count() == nb_memberships
assert membership == new_mem
def test_delete_remove_from_meta_group(self):
"""Test that when a club is deleted, all its members are removed from the
associated metagroup.
"""
memberships = self.club.members.select_related("user")
users = [membership.user for membership in memberships]
meta_group = self.club.unix_name + settings.SITH_MEMBER_SUFFIX
def test_remove_from_club_group(self):
"""Test that when a membership ends, the user is removed from club groups."""
user = baker.make(User)
baker.make(Membership, user=user, club=self.club, end_date=None, role=3)
assert user.groups.contains(self.club.members_group)
assert user.groups.contains(self.club.board_group)
user.memberships.update(end_date=localdate())
assert not user.groups.contains(self.club.members_group)
assert not user.groups.contains(self.club.board_group)
self.club.delete()
for user in users:
assert not user.is_in_group(name=meta_group)
def test_add_to_meta_group(self):
"""Test that when a membership begins, the user is added to the meta group."""
group_members = self.club.unix_name + settings.SITH_MEMBER_SUFFIX
board_members = self.club.unix_name + settings.SITH_BOARD_SUFFIX
assert not self.subscriber.is_in_group(name=group_members)
assert not self.subscriber.is_in_group(name=board_members)
def test_add_to_club_group(self):
"""Test that when a membership begins, the user is added to the club group."""
assert not self.subscriber.groups.contains(self.club.members_group)
assert not self.subscriber.groups.contains(self.club.board_group)
Membership.objects.create(club=self.club, user=self.subscriber, role=3)
assert self.subscriber.is_in_group(name=group_members)
assert self.subscriber.is_in_group(name=board_members)
def test_remove_from_meta_group(self):
"""Test that when a membership ends, the user is removed from meta group."""
group_members = self.club.unix_name + settings.SITH_MEMBER_SUFFIX
board_members = self.club.unix_name + settings.SITH_BOARD_SUFFIX
assert self.comptable.is_in_group(name=group_members)
assert self.comptable.is_in_group(name=board_members)
self.comptable.memberships.update(end_date=localtime(now()))
assert not self.comptable.is_in_group(name=group_members)
assert not self.comptable.is_in_group(name=board_members)
assert self.subscriber.groups.contains(self.club.members_group)
assert self.subscriber.groups.contains(self.club.board_group)
def test_club_owner(self):
"""Test that a club is owned only by board members of the main club."""

View File

@ -71,14 +71,13 @@ class ClubTabsMixin(TabedViewMixin):
return self.object.get_display_name()
def get_list_of_tabs(self):
tab_list = []
tab_list.append(
tab_list = [
{
"url": reverse("club:club_view", kwargs={"club_id": self.object.id}),
"slug": "infos",
"name": _("Infos"),
}
)
]
if self.request.user.can_view(self.object):
tab_list.append(
{