Aller au contenu

Models

User

Bases: AbstractUser

Defines the base user class, useable in every app.

This is almost the same as the auth module AbstractUser since it inherits from it, but some fields are required, and the username is generated automatically with the name of the user (see generate_username()).

Added field: nick_name, date_of_birth Required fields: email, first_name, last_name, date_of_birth

is_in_group(*, pk=None, name=None)

Check if this user is in the given group. Either a group id or a group name must be provided. If both are passed, only the id will be considered.

The group will be fetched using the given parameter. If no group is found, return False. If a group is found, check if this user is in the latter.

Returns:

Type Description
bool

True if the user is the group, else False

Source code in core/models.py
def is_in_group(self, *, pk: int | None = None, name: str | None = None) -> bool:
    """Check if this user is in the given group.
    Either a group id or a group name must be provided.
    If both are passed, only the id will be considered.

    The group will be fetched using the given parameter.
    If no group is found, return False.
    If a group is found, check if this user is in the latter.

    Returns:
         True if the user is the group, else False
    """
    if not pk and not name:
        raise ValueError("You must either provide the id or the name of the group")
    group_id: int | None = (
        pk or Group.objects.filter(name=name).values_list("id", flat=True).first()
    )
    if group_id is None:
        return False
    return group_id in self.all_groups

all_groups()

Get the list of groups this user is in.

Source code in core/models.py
@cached_property
def all_groups(self) -> dict[int, Group]:
    """Get the list of groups this user is in."""
    additional_groups = []
    if self.is_subscribed:
        additional_groups.append(settings.SITH_GROUP_SUBSCRIBERS_ID)
    if self.is_superuser:
        additional_groups.append(settings.SITH_GROUP_ROOT_ID)
    qs = self.groups.all()
    if additional_groups:
        # This is somewhat counter-intuitive, but this query runs way faster with
        # a UNION rather than a OR (in average, 0.25ms vs 14ms).
        # For the why, cf. https://dba.stackexchange.com/questions/293836/why-is-an-or-statement-slower-than-union
        qs = qs.union(Group.objects.filter(id__in=additional_groups))
    return {g.id: g for g in qs}

age()

Return the age this user has the day the method is called. If the user has not filled his age, return 0.

Source code in core/models.py
@cached_property
def age(self) -> int:
    """Return the age this user has the day the method is called.
    If the user has not filled his age, return 0.
    """
    if self.date_of_birth is None:
        return 0
    today = timezone.now()
    age = today.year - self.date_of_birth.year
    # remove a year if this year's birthday is yet to come
    age -= (today.month, today.day) < (
        self.date_of_birth.month,
        self.date_of_birth.day,
    )
    return age

get_short_name()

Returns the short name for the user.

Source code in core/models.py
def get_short_name(self):
    """Returns the short name for the user."""
    if self.nick_name:
        return self.nick_name
    return self.first_name + " " + self.last_name

get_display_name()

Returns the display name of the user.

A nickname if possible, otherwise, the full name.

Source code in core/models.py
def get_display_name(self) -> str:
    """Returns the display name of the user.

    A nickname if possible, otherwise, the full name.
    """
    if self.nick_name:
        return "%s (%s)" % (self.get_full_name(), self.nick_name)
    return self.get_full_name()

get_family(godfathers_depth=4, godchildren_depth=4)

Get the family of the user, with the given depth.

Parameters:

Name Type Description Default
godfathers_depth NonNegativeInt

The number of generations of godfathers to fetch

4
godchildren_depth NonNegativeInt

The number of generations of godchildren to fetch

4

Returns:

Type Description
set[through]

A list of family relationships in this user's family

Source code in core/models.py
def get_family(
    self,
    godfathers_depth: NonNegativeInt = 4,
    godchildren_depth: NonNegativeInt = 4,
) -> set[User.godfathers.through]:
    """Get the family of the user, with the given depth.

    Args:
        godfathers_depth: The number of generations of godfathers to fetch
        godchildren_depth: The number of generations of godchildren to fetch

    Returns:
        A list of family relationships in this user's family
    """
    res = []
    for depth, key, reverse_key in [
        (godfathers_depth, "from_user_id", "to_user_id"),
        (godchildren_depth, "to_user_id", "from_user_id"),
    ]:
        if depth == 0:
            continue
        links = list(User.godfathers.through.objects.filter(**{key: self.id}))
        res.extend(links)
        for _ in range(1, depth):  # noqa: F402 we don't care about gettext here
            ids = [getattr(c, reverse_key) for c in links]
            links = list(
                User.godfathers.through.objects.filter(
                    **{f"{key}__in": ids}
                ).exclude(id__in=[r.id for r in res])
            )
            if not links:
                break
            res.extend(links)
    return set(res)

email_user(subject, message, from_email=None, **kwargs)

Sends an email to this User.

Source code in core/models.py
def email_user(self, subject, message, from_email=None, **kwargs):
    """Sends an email to this User."""
    if from_email is None:
        from_email = settings.DEFAULT_FROM_EMAIL
    send_mail(subject, message, from_email, [self.email], **kwargs)

generate_username()

Generates a unique username based on the first and last names.

For example: Guy Carlier gives gcarlier, and gcarlier1 if the first one exists.

Returns:

Type Description
str

The generated username.

Source code in core/models.py
def generate_username(self) -> str:
    """Generates a unique username based on the first and last names.

    For example: Guy Carlier gives gcarlier, and gcarlier1 if the first one exists.

    Returns:
        The generated username.
    """

    def remove_accents(data):
        return "".join(
            x
            for x in unicodedata.normalize("NFKD", data)
            if unicodedata.category(x)[0] == "L"
        ).lower()

    user_name = (
        remove_accents(self.first_name[0] + self.last_name)
        .encode("ascii", "ignore")
        .decode("utf-8")
    )
    # load all usernames which could conflict with the new one.
    # we need to actually load them, instead of performing a count,
    # because we cannot be sure that two usernames refer to the
    # actual same word (eg. tmore and tmoreau)
    possible_conflicts: list[str] = list(
        User.objects.filter(username__startswith=user_name).values_list(
            "username", flat=True
        )
    )
    nb_conflicts = sum(
        1 for name in possible_conflicts if name.rstrip(string.digits) == user_name
    )
    if nb_conflicts > 0:
        user_name += str(nb_conflicts)  # exemple => exemple1
    self.username = user_name
    return user_name

is_owner(obj)

Determine if the object is owned by the user.

Source code in core/models.py
def is_owner(self, obj: models.Model):
    """Determine if the object is owned by the user."""
    if hasattr(obj, "is_owned_by") and obj.is_owned_by(self):
        return True
    if hasattr(obj, "owner_group") and self.is_in_group(pk=obj.owner_group_id):
        return True
    return self.is_root

can_edit(obj)

Determine if the object can be edited by the user.

Source code in core/models.py
def can_edit(self, obj: models.Model):
    """Determine if the object can be edited by the user."""
    if hasattr(obj, "can_be_edited_by") and obj.can_be_edited_by(self):
        return True
    if hasattr(obj, "edit_groups"):
        if (
            hasattr(obj, "_prefetched_objects_cache")
            and "edit_groups" in obj._prefetched_objects_cache
        ):
            pks = [g.id for g in obj.edit_groups.all()]
        else:
            pks = list(obj.edit_groups.values_list("id", flat=True))
        if any(self.is_in_group(pk=pk) for pk in pks):
            return True
    return self.is_owner(obj)

can_view(obj)

Determine if the object can be viewed by the user.

Source code in core/models.py
def can_view(self, obj: models.Model):
    """Determine if the object can be viewed by the user."""
    if hasattr(obj, "can_be_viewed_by") and obj.can_be_viewed_by(self):
        return True
    if hasattr(obj, "view_groups"):
        # if "view_groups" has already been prefetched, use
        # the prefetch cache, else fetch only the ids, to make
        # the query lighter.
        if (
            hasattr(obj, "_prefetched_objects_cache")
            and "view_groups" in obj._prefetched_objects_cache
        ):
            pks = [g.id for g in obj.view_groups.all()]
        else:
            pks = list(obj.view_groups.values_list("id", flat=True))
        if any(self.is_in_group(pk=pk) for pk in pks):
            return True
    return self.can_edit(obj)

can_be_viewed_by(user)

Check if the given user can be viewed by this user.

Given users A and B. A can be viewed by B if :

  • A and B are the same user
  • or B has the permission to view hidden users
  • or B can view users in general and A didn't hide its profile
  • or B is in A's whitelist.
Source code in core/models.py
def can_be_viewed_by(self, user: User) -> bool:
    """Check if the given user can be viewed by this user.

    Given users A and B. A can be viewed by B if :

    - A and B are the same user
    - or B has the permission to view hidden users
    - or B can view users in general and A didn't hide its profile
    - or B is in A's whitelist.
    """

    def is_in_whitelist(u: User):
        if (
            hasattr(self, "_prefetched_objects_cache")
            and "whitelisted_users" in self._prefetched_objects_cache
        ):
            return u in self.whitelisted_users.all()
        return self.whitelisted_users.contains(u)

    return (
        user.id == self.id
        or user.has_perm("core.view_hidden_user")
        or (
            user.has_perm("core.view_user")
            and (self.is_viewable or is_in_whitelist(user))
        )
    )

clubs_with_rights()

The list of clubs where the user has rights

Source code in core/models.py
@cached_property
def clubs_with_rights(self) -> list[Club]:
    """The list of clubs where the user has rights"""
    memberships = self.memberships.ongoing().board().select_related("club")
    return [m.club for m in memberships]

CurrencyField(*args, **kwargs)

Bases: DecimalField

Custom database field used for currency.

Source code in counter/fields.py
def __init__(self, *args, **kwargs):
    kwargs["max_digits"] = 12
    kwargs["decimal_places"] = 2
    super().__init__(*args, **kwargs)

BillingInfo

Bases: Model

Represent the billing information of a user, which are required by the 3D-Secure v2 system used by the etransaction module.

to_3dsv2_xml()

Convert the data from this model into a xml usable by the online paying service of the Crédit Agricole bank. see : https://www.ca-moncommerce.com/espace-client-mon-commerce/up2pay-e-transactions/ma-documentation/manuel-dintegration-focus-3ds-v2/principes-generaux/#integration-3dsv2-developpeur-webmaster.

Source code in counter/models.py
def to_3dsv2_xml(self) -> str:
    """Convert the data from this model into a xml usable
    by the online paying service of the Crédit Agricole bank.
    see : `https://www.ca-moncommerce.com/espace-client-mon-commerce/up2pay-e-transactions/ma-documentation/manuel-dintegration-focus-3ds-v2/principes-generaux/#integration-3dsv2-developpeur-webmaster`.
    """
    data = {
        "Address": {
            "FirstName": self.first_name,
            "LastName": self.last_name,
            "Address1": self.address_1,
            "ZipCode": self.zip_code,
            "City": self.city,
            "CountryCode": self.country.numeric,  # ISO-3166-1 numeric code
            "MobilePhone": self.phone_number.as_national.replace(" ", ""),
            "CountryCodeMobilePhone": f"+{self.phone_number.country_code}",
        }
    }
    if self.address_2:
        data["Address"]["Address2"] = self.address_2
    xml = dict2xml(data, wrap="Billing", newlines=False)
    return '<?xml version="1.0" encoding="UTF-8" ?>' + xml

Customer

Bases: Model

Customer data of a User.

It adds some basic customers' information, such as the account ID, and is used by other accounting classes as reference to the customer, rather than using User.

can_buy property

Check if whether this customer has the right to purchase any item.

save(*args, allow_negative=False, **kwargs)

is_selling : tell if the current action is a selling allow_negative : ignored if not a selling. Allow a selling to put the account in negative Those two parameters avoid blocking the save method of a customer if his account is negative.

Source code in counter/models.py
def save(self, *args, allow_negative=False, **kwargs):
    """is_selling : tell if the current action is a selling
    allow_negative : ignored if not a selling. Allow a selling to put the account in negative
    Those two parameters avoid blocking the save method of a customer if his account is negative.
    """
    if self.amount < 0 and not allow_negative:
        raise ValidationError(_("Not enough money"))
    super().save(*args, **kwargs)

update_returnable_balance()

Update all returnable balances of this user to their real amount.

Source code in counter/models.py
def update_returnable_balance(self):
    """Update all returnable balances of this user to their real amount."""

    def purchases_qs(outer_ref: Literal["product_id", "returned_product_id"]):
        return (
            Selling.objects.filter(customer=self, product=OuterRef(outer_ref))
            .values("product")
            .annotate(quantity=Sum("quantity", default=0))
            .values("quantity")
        )

    balances = (
        ReturnableProduct.objects.annotate_balance_for(self)
        .annotate(
            nb_cons=Coalesce(Subquery(purchases_qs("product_id")), 0),
            nb_dcons=Coalesce(Subquery(purchases_qs("returned_product_id")), 0),
        )
        .annotate(new_balance=F("nb_cons") - F("nb_dcons"))
        .values("id", "new_balance")
    )
    updated_balances = [
        ReturnableProductBalance(
            customer=self, returnable_id=b["id"], balance=b["new_balance"]
        )
        for b in balances
    ]
    ReturnableProductBalance.objects.bulk_create(
        updated_balances,
        update_conflicts=True,
        update_fields=["balance"],
        unique_fields=["customer", "returnable"],
    )

get_or_create(user) classmethod

Work in pretty much the same way as the usual get_or_create method, but with the default field replaced by some under the hood.

If the user has an account, return it as is. Else create a new account with no money on it and a new unique account id

Example : ::

user = User.objects.get(pk=1)
account, created = Customer.get_or_create(user)
if created:
    print(f"created a new account with id {account.id}")
else:
    print(f"user has already an account, with {account.id} € on it"
Source code in counter/models.py
@classmethod
def get_or_create(cls, user: User) -> tuple[Customer, bool]:
    """Work in pretty much the same way as the usual get_or_create method,
    but with the default field replaced by some under the hood.

    If the user has an account, return it as is.
    Else create a new account with no money on it and a new unique account id

    Example : ::

        user = User.objects.get(pk=1)
        account, created = Customer.get_or_create(user)
        if created:
            print(f"created a new account with id {account.id}")
        else:
            print(f"user has already an account, with {account.id} € on it"
    """
    if hasattr(user, "customer"):
        return user.customer, False

    # account_id are always a number with a letter appended
    account_id = (
        Customer.objects.order_by(Length("account_id"), "account_id")
        .values("account_id")
        .last()
    )
    if account_id is None:
        # legacy from the old site
        account = cls.objects.create(user=user, account_id="1504a")
        return account, True

    account_id = account_id["account_id"]
    account_num = int(account_id[:-1])
    while Customer.objects.filter(account_id=account_id).exists():
        # when entering the first iteration, we are using an already existing account id
        # so the loop should always execute at least one time
        account_num += 1
        account_id = f"{account_num}{random.choice(string.ascii_lowercase)}"

    account = cls.objects.create(user=user, account_id=account_id)
    return account, True

Price

Bases: Model

Product

Bases: Model

A product, with all its related information.

is_owned_by(user)

Method to see if that object can be edited by the given user.

Source code in counter/models.py
def is_owned_by(self, user):
    """Method to see if that object can be edited by the given user."""
    if user.is_anonymous:
        return False
    return user.is_in_group(
        pk=settings.SITH_GROUP_ACCOUNTING_ADMIN_ID
    ) or user.is_in_group(pk=settings.SITH_GROUP_COUNTER_ADMIN_ID)

Refilling

Bases: Model

Handle the refilling.

Selling

Bases: Model

Handle the sellings.

save(*args, allow_negative=False, **kwargs)

allow_negative : Allow this selling to use more money than available for this user.

Source code in counter/models.py
def save(self, *args, allow_negative=False, **kwargs):
    """allow_negative : Allow this selling to use more money than available for this user."""
    if not self.date:
        self.date = timezone.now()
    self.full_clean()
    if (
        self._state.adding
        and self.payment_method == self.PaymentMethod.SITH_ACCOUNT
    ):
        self.customer.amount -= self.quantity * self.unit_price
        self.customer.save(allow_negative=allow_negative)
    user = self.customer.user
    if user.was_subscribed:
        if (
            self.product
            and self.product.id == settings.SITH_PRODUCT_SUBSCRIPTION_ONE_SEMESTER
        ):
            sub = Subscription(
                member=user,
                subscription_type="un-semestre",
                payment_method="EBOUTIC",
                location="EBOUTIC",
            )
            sub.subscription_start = Subscription.compute_start()
            sub.subscription_start = Subscription.compute_start(
                duration=settings.SITH_SUBSCRIPTIONS[sub.subscription_type][
                    "duration"
                ]
            )
            sub.subscription_end = Subscription.compute_end(
                duration=settings.SITH_SUBSCRIPTIONS[sub.subscription_type][
                    "duration"
                ],
                start=sub.subscription_start,
            )
            sub.save()
        elif (
            self.product
            and self.product.id == settings.SITH_PRODUCT_SUBSCRIPTION_TWO_SEMESTERS
        ):
            sub = Subscription(
                member=user,
                subscription_type="deux-semestres",
                payment_method="EBOUTIC",
                location="EBOUTIC",
            )
            sub.subscription_start = Subscription.compute_start()
            sub.subscription_start = Subscription.compute_start(
                duration=settings.SITH_SUBSCRIPTIONS[sub.subscription_type][
                    "duration"
                ]
            )
            sub.subscription_end = Subscription.compute_end(
                duration=settings.SITH_SUBSCRIPTIONS[sub.subscription_type][
                    "duration"
                ],
                start=sub.subscription_start,
            )
            sub.save()
    if user.preferences.notify_on_click:
        Notification(
            user=user,
            url=reverse(
                "core:user_account_detail",
                kwargs={
                    "user_id": user.id,
                    "year": self.date.year,
                    "month": self.date.month,
                },
            ),
            param="%d x %s" % (self.quantity, self.label),
            type="SELLING",
        ).save()
    super().save(*args, **kwargs)
    if hasattr(self.product, "eticket"):
        self.send_mail_customer()

BillingInfoState

Bases: Enum

Basket

Bases: Model

Basket is built when the user connects to an eboutic page.

generate_sales(counter, seller, payment_method)

Generate a list of sold items corresponding to the items of this basket WITHOUT saving them NOR deleting the basket.

Example
counter = Counter.objects.get(name="Eboutic")
user = User.objects.get(username="bibou")
sales = basket.generate_sales(counter, user, Selling.PaymentMethod.SITH_ACCOUNT)
# here the basket is in the same state as before the method call

with transaction.atomic():
    for sale in sales:
        sale.save()
    basket.delete()
    # all the basket items are deleted by the on_delete=CASCADE relation
    # thus only the sales remain
Source code in eboutic/models.py
def generate_sales(
    self, counter, seller: User, payment_method: Selling.PaymentMethod
):
    """Generate a list of sold items corresponding to the items
    of this basket WITHOUT saving them NOR deleting the basket.

    Example:
        ```python
        counter = Counter.objects.get(name="Eboutic")
        user = User.objects.get(username="bibou")
        sales = basket.generate_sales(counter, user, Selling.PaymentMethod.SITH_ACCOUNT)
        # here the basket is in the same state as before the method call

        with transaction.atomic():
            for sale in sales:
                sale.save()
            basket.delete()
            # all the basket items are deleted by the on_delete=CASCADE relation
            # thus only the sales remain
        ```
    """
    customer = Customer.get_or_create(self.user)[0]
    return [
        Selling(
            label=item.label,
            counter=counter,
            club_id=item.product.club_id,
            product=item.product,
            seller=seller,
            customer=customer,
            unit_price=item.unit_price,
            quantity=item.quantity,
            payment_method=payment_method,
        )
        for item in self.items.select_related("product")
    ]

InvoiceQueryset

Bases: QuerySet

annotate_total()

Annotate the queryset with the total amount of each invoice.

The total amount is the sum of (unit_price * quantity) for all items related to the invoice.

Source code in eboutic/models.py
def annotate_total(self) -> Self:
    """Annotate the queryset with the total amount of each invoice.

    The total amount is the sum of (unit_price * quantity)
    for all items related to the invoice.
    """
    # aggregates within subqueries require a little bit of black magic,
    # but hopefully, django gives a comprehensive documentation for that :
    # https://docs.djangoproject.com/en/stable/ref/models/expressions/#using-aggregates-within-a-subquery-expression
    return self.annotate(
        total=Subquery(
            InvoiceItem.objects.filter(invoice_id=OuterRef("pk"))
            .values("invoice_id")
            .annotate(total=Sum(F("unit_price") * F("quantity")))
            .values("total")
        )
    )

Invoice

Bases: Model

Invoices are generated once the payment has been validated.

AbstractBaseItem

Bases: Model

BasketItem

Bases: AbstractBaseItem

from_price(price, quantity, basket) classmethod

Create a BasketItem with the same characteristics as the product price passed in parameters, with the specified quantity.

Source code in eboutic/models.py
@classmethod
def from_price(cls, price: Price, quantity: int, basket: Basket):
    """Create a BasketItem with the same characteristics as the
    product price passed in parameters, with the specified quantity.
    """
    return cls(
        basket=basket,
        label=price.full_label,
        product_id=price.product_id,
        quantity=quantity,
        unit_price=price.amount,
    )

InvoiceItem

get_eboutic()

Source code in counter/models.py
def get_eboutic() -> Counter:
    return Counter.objects.filter(type="EBOUTIC").order_by("id").first()