Aller au contenu

Models

PAYMENT_METHOD = [('CHECK', _('Check')), ('CASH', _('Cash')), ('CARD', _('Credit card'))] module-attribute

CurrencyField(*args, **kwargs)

Bases: DecimalField

Custom database field used for currency.

Source code in accounting/models.py
def __init__(self, *args, **kwargs):
    kwargs["max_digits"] = 12
    kwargs["decimal_places"] = 2
    super().__init__(*args, **kwargs)

Club

Bases: Model

The Club class, made as a tree to allow nice tidy organization.

check_loop()

Raise a validation error when a loop is found within the parent list.

Source code in club/models.py
def check_loop(self):
    """Raise a validation error when a loop is found within the parent list."""
    objs = []
    cur = self
    while cur.parent is not None:
        if cur in objs:
            raise ValidationError(_("You can not make loops in clubs"))
        objs.append(cur)
        cur = cur.parent

is_owned_by(user)

Method to see if that object can be super edited by the given user.

Source code in club/models.py
def is_owned_by(self, user):
    """Method to see if that object can be super edited by the given user."""
    if user.is_anonymous:
        return False
    return user.is_board_member

can_be_edited_by(user)

Method to see if that object can be edited by the given user.

Source code in club/models.py
def can_be_edited_by(self, user):
    """Method to see if that object can be edited by the given user."""
    return self.has_rights_in_club(user)

can_be_viewed_by(user)

Method to see if that object can be seen by the given user.

Source code in club/models.py
def can_be_viewed_by(self, user):
    """Method to see if that object can be seen by the given user."""
    sub = User.objects.filter(pk=user.pk).first()
    if sub is None:
        return False
    return sub.was_subscribed

get_membership_for(user)

Return the current membership the given user.

Note

The result is cached.

Source code in club/models.py
def get_membership_for(self, user: User) -> Membership | None:
    """Return the current membership the given user.

    Note:
        The result is cached.
    """
    if user.is_anonymous:
        return None
    membership = cache.get(f"membership_{self.id}_{user.id}")
    if membership == "not_member":
        return None
    if membership is None:
        membership = self.members.filter(user=user, end_date=None).first()
        if membership is None:
            cache.set(f"membership_{self.id}_{user.id}", "not_member")
        else:
            cache.set(f"membership_{self.id}_{user.id}", membership)
    return membership

ResizedImageField(width=None, height=None, force_format=None, **kwargs)

Bases: ImageField

A field that automatically resizes images to a given size.

This field is useful for profile pictures or product icons, for example.

The final size of the image is determined by the width and height parameters :

  • If both are given, the image will be resized to fit in a rectangle of width x height
  • If only one is given, the other will be calculated to keep the same ratio

If the force_format parameter is given, the image will be converted to this format.

Examples:

To resize an image with a height of 100px, without changing the ratio, and a format of WEBP :

class Product(models.Model):
    icon = ResizedImageField(height=100, force_format="WEBP")

To explicitly resize an image to 100x100px (but possibly change the ratio) :

class Product(models.Model):
    icon = ResizedImageField(width=100, height=100)

Raises:

Type Description
FieldError

If neither width nor height is given

Parameters:

Name Type Description Default
width int | None

If given, the width of the resized image

None
height int | None

If given, the height of the resized image

None
force_format str | None

If given, the image will be converted to this format

None
Source code in core/fields.py
def __init__(
    self,
    width: int | None = None,
    height: int | None = None,
    force_format: str | None = None,
    **kwargs,
):
    if width is None and height is None:
        raise FieldError(
            f"{self.__class__.__name__} requires "
            "width, height or both, but got neither"
        )
    self.width = width
    self.height = height
    self.force_format = force_format
    super().__init__(**kwargs)

Group

Bases: Group

Implement both RealGroups and Meta groups.

Groups are sorted by their is_meta property

Notification

Bases: Model

User

Bases: AbstractUser

Defines the base user class, useable in every app.

This is almost the same as the auth module AbstractUser since it inherits from it, but some fields are required, and the username is generated automatically with the name of the user (see generate_username()).

Added field: nick_name, date_of_birth Required fields: email, first_name, last_name, date_of_birth

cached_groups: list[Group] property

Get the list of groups this user is in.

The result is cached for the default duration (should be 5 minutes)

Returns: A list of all the groups this user is in.

is_in_group(*, pk=None, name=None)

Check if this user is in the given group. Either a group id or a group name must be provided. If both are passed, only the id will be considered.

The group will be fetched using the given parameter. If no group is found, return False. If a group is found, check if this user is in the latter.

Returns:

Type Description
bool

True if the user is the group, else False

Source code in core/models.py
def is_in_group(self, *, pk: int | None = None, name: str | None = None) -> bool:
    """Check if this user is in the given group.
    Either a group id or a group name must be provided.
    If both are passed, only the id will be considered.

    The group will be fetched using the given parameter.
    If no group is found, return False.
    If a group is found, check if this user is in the latter.

    Returns:
         True if the user is the group, else False
    """
    if pk is not None:
        group: Optional[Group] = get_group(pk=pk)
    elif name is not None:
        group: Optional[Group] = get_group(name=name)
    else:
        raise ValueError("You must either provide the id or the name of the group")
    if group is None:
        return False
    if group.id == settings.SITH_GROUP_PUBLIC_ID:
        return True
    if group.id == settings.SITH_GROUP_SUBSCRIBERS_ID:
        return self.is_subscribed
    if group.id == settings.SITH_GROUP_OLD_SUBSCRIBERS_ID:
        return self.was_subscribed
    if group.id == settings.SITH_GROUP_ROOT_ID:
        return self.is_root
    if group.is_meta:
        # check if this group is associated with a club
        group.__class__ = MetaGroup
        club = group.associated_club
        if club is None:
            return False
        membership = club.get_membership_for(self)
        if membership is None:
            return False
        if group.name.endswith(settings.SITH_MEMBER_SUFFIX):
            return True
        return membership.role > settings.SITH_MAXIMUM_FREE_ROLE
    return group in self.cached_groups

age()

Return the age this user has the day the method is called. If the user has not filled his age, return 0.

Source code in core/models.py
@cached_property
def age(self) -> int:
    """Return the age this user has the day the method is called.
    If the user has not filled his age, return 0.
    """
    if self.date_of_birth is None:
        return 0
    today = timezone.now()
    age = today.year - self.date_of_birth.year
    # remove a year if this year's birthday is yet to come
    age -= (today.month, today.day) < (
        self.date_of_birth.month,
        self.date_of_birth.day,
    )
    return age

get_short_name()

Returns the short name for the user.

Source code in core/models.py
def get_short_name(self):
    """Returns the short name for the user."""
    if self.nick_name:
        return self.nick_name
    return self.first_name + " " + self.last_name

get_display_name()

Returns the display name of the user.

A nickname if possible, otherwise, the full name.

Source code in core/models.py
def get_display_name(self) -> str:
    """Returns the display name of the user.

    A nickname if possible, otherwise, the full name.
    """
    if self.nick_name:
        return "%s (%s)" % (self.get_full_name(), self.nick_name)
    return self.get_full_name()

get_family(godfathers_depth=4, godchildren_depth=4)

Get the family of the user, with the given depth.

Parameters:

Name Type Description Default
godfathers_depth NonNegativeInt

The number of generations of godfathers to fetch

4
godchildren_depth NonNegativeInt

The number of generations of godchildren to fetch

4

Returns:

Type Description
set[through]

A list of family relationships in this user's family

Source code in core/models.py
def get_family(
    self,
    godfathers_depth: NonNegativeInt = 4,
    godchildren_depth: NonNegativeInt = 4,
) -> set[User.godfathers.through]:
    """Get the family of the user, with the given depth.

    Args:
        godfathers_depth: The number of generations of godfathers to fetch
        godchildren_depth: The number of generations of godchildren to fetch

    Returns:
        A list of family relationships in this user's family
    """
    res = []
    for depth, key, reverse_key in [
        (godfathers_depth, "from_user_id", "to_user_id"),
        (godchildren_depth, "to_user_id", "from_user_id"),
    ]:
        if depth == 0:
            continue
        links = list(User.godfathers.through.objects.filter(**{key: self.id}))
        res.extend(links)
        for _ in range(1, depth):  # noqa: F402 we don't care about gettext here
            ids = [getattr(c, reverse_key) for c in links]
            links = list(
                User.godfathers.through.objects.filter(
                    **{f"{key}__in": ids}
                ).exclude(id__in=[r.id for r in res])
            )
            if not links:
                break
            res.extend(links)
    return set(res)

email_user(subject, message, from_email=None, **kwargs)

Sends an email to this User.

Source code in core/models.py
def email_user(self, subject, message, from_email=None, **kwargs):
    """Sends an email to this User."""
    if from_email is None:
        from_email = settings.DEFAULT_FROM_EMAIL
    send_mail(subject, message, from_email, [self.email], **kwargs)

generate_username()

Generates a unique username based on the first and last names.

For example: Guy Carlier gives gcarlier, and gcarlier1 if the first one exists.

Returns:

Type Description
str

The generated username.

Source code in core/models.py
def generate_username(self) -> str:
    """Generates a unique username based on the first and last names.

    For example: Guy Carlier gives gcarlier, and gcarlier1 if the first one exists.

    Returns:
        The generated username.
    """

    def remove_accents(data):
        return "".join(
            x
            for x in unicodedata.normalize("NFKD", data)
            if unicodedata.category(x)[0] == "L"
        ).lower()

    user_name = (
        remove_accents(self.first_name[0] + self.last_name)
        .encode("ascii", "ignore")
        .decode("utf-8")
    )
    # load all usernames which could conflict with the new one.
    # we need to actually load them, instead of performing a count,
    # because we cannot be sure that two usernames refer to the
    # actual same word (eg. tmore and tmoreau)
    possible_conflicts: list[str] = list(
        User.objects.filter(username__startswith=user_name).values_list(
            "username", flat=True
        )
    )
    nb_conflicts = sum(
        1 for name in possible_conflicts if name.rstrip(string.digits) == user_name
    )
    if nb_conflicts > 0:
        user_name += str(nb_conflicts)  # exemple => exemple1
    self.username = user_name
    return user_name

is_owner(obj)

Determine if the object is owned by the user.

Source code in core/models.py
def is_owner(self, obj):
    """Determine if the object is owned by the user."""
    if hasattr(obj, "is_owned_by") and obj.is_owned_by(self):
        return True
    if hasattr(obj, "owner_group") and self.is_in_group(pk=obj.owner_group.id):
        return True
    return self.is_root

can_edit(obj)

Determine if the object can be edited by the user.

Source code in core/models.py
def can_edit(self, obj):
    """Determine if the object can be edited by the user."""
    if hasattr(obj, "can_be_edited_by") and obj.can_be_edited_by(self):
        return True
    if hasattr(obj, "edit_groups"):
        for pk in obj.edit_groups.values_list("pk", flat=True):
            if self.is_in_group(pk=pk):
                return True
    if isinstance(obj, User) and obj == self:
        return True
    return self.is_owner(obj)

can_view(obj)

Determine if the object can be viewed by the user.

Source code in core/models.py
def can_view(self, obj):
    """Determine if the object can be viewed by the user."""
    if hasattr(obj, "can_be_viewed_by") and obj.can_be_viewed_by(self):
        return True
    if hasattr(obj, "view_groups"):
        for pk in obj.view_groups.values_list("pk", flat=True):
            if self.is_in_group(pk=pk):
                return True
    return self.can_edit(obj)

clubs_with_rights()

The list of clubs where the user has rights

Source code in core/models.py
@cached_property
def clubs_with_rights(self) -> list[Club]:
    """The list of clubs where the user has rights"""
    memberships = self.memberships.ongoing().board().select_related("club")
    return [m.club for m in memberships]

CustomerQuerySet

Bases: QuerySet

update_amount()

Update the amount of all customers selected by this queryset.

The result is given as the sum of all refills minus the sum of all purchases.

Returns:

Type Description
int

The number of updated rows.

Source code in counter/models.py
def update_amount(self) -> int:
    """Update the amount of all customers selected by this queryset.

    The result is given as the sum of all refills minus the sum of all purchases.

    Returns:
        The number of updated rows.

    Warnings:
        The execution time of this query grows really quickly.
        When updating 500 customers, it may take around a second.
        If you try to update all customers at once, the execution time
        goes up to tens of seconds.
        Use this either on a small subset of the `Customer` table,
        or execute it inside an independent task
        (like a Celery task or a management command).
    """
    money_in = Subquery(
        Refilling.objects.filter(customer=OuterRef("pk"))
        .values("customer_id")  # group by customer
        .annotate(res=Sum(F("amount"), default=0))
        .values("res")
    )
    money_out = Subquery(
        Selling.objects.filter(customer=OuterRef("pk"))
        .values("customer_id")
        .annotate(res=Sum(F("unit_price") * F("quantity"), default=0))
        .values("res")
    )
    return self.update(amount=Coalesce(money_in - money_out, Decimal("0")))

Customer

Bases: Model

Customer data of a User.

It adds some basic customers' information, such as the account ID, and is used by other accounting classes as reference to the customer, rather than using User.

can_buy: bool property

Check if whether this customer has the right to purchase any item.

This must be not confused with the Product.can_be_sold_to(user) method as the present method returns an information about a customer whereas the other tells something about the relation between a User (not a Customer, don't mix them) and a Product.

save(*args, allow_negative=False, is_selling=False, **kwargs)

is_selling : tell if the current action is a selling allow_negative : ignored if not a selling. Allow a selling to put the account in negative Those two parameters avoid blocking the save method of a customer if his account is negative.

Source code in counter/models.py
def save(self, *args, allow_negative=False, is_selling=False, **kwargs):
    """is_selling : tell if the current action is a selling
    allow_negative : ignored if not a selling. Allow a selling to put the account in negative
    Those two parameters avoid blocking the save method of a customer if his account is negative.
    """
    if self.amount < 0 and (is_selling and not allow_negative):
        raise ValidationError(_("Not enough money"))
    super().save(*args, **kwargs)

get_or_create(user) classmethod

Work in pretty much the same way as the usual get_or_create method, but with the default field replaced by some under the hood.

If the user has an account, return it as is. Else create a new account with no money on it and a new unique account id

Example : ::

user = User.objects.get(pk=1)
account, created = Customer.get_or_create(user)
if created:
    print(f"created a new account with id {account.id}")
else:
    print(f"user has already an account, with {account.id} € on it"
Source code in counter/models.py
@classmethod
def get_or_create(cls, user: User) -> tuple[Customer, bool]:
    """Work in pretty much the same way as the usual get_or_create method,
    but with the default field replaced by some under the hood.

    If the user has an account, return it as is.
    Else create a new account with no money on it and a new unique account id

    Example : ::

        user = User.objects.get(pk=1)
        account, created = Customer.get_or_create(user)
        if created:
            print(f"created a new account with id {account.id}")
        else:
            print(f"user has already an account, with {account.id} € on it"
    """
    if hasattr(user, "customer"):
        return user.customer, False

    # account_id are always a number with a letter appended
    account_id = (
        Customer.objects.order_by(Length("account_id"), "account_id")
        .values("account_id")
        .last()
    )
    if account_id is None:
        # legacy from the old site
        account = cls.objects.create(user=user, account_id="1504a")
        return account, True

    account_id = account_id["account_id"]
    account_num = int(account_id[:-1])
    while Customer.objects.filter(account_id=account_id).exists():
        # when entering the first iteration, we are using an already existing account id
        # so the loop should always execute at least one time
        account_num += 1
        account_id = f"{account_num}{random.choice(string.ascii_lowercase)}"

    account = cls.objects.create(user=user, account_id=account_id)
    return account, True

BillingInfo

Bases: Model

Represent the billing information of a user, which are required by the 3D-Secure v2 system used by the etransaction module.

to_3dsv2_xml()

Convert the data from this model into a xml usable by the online paying service of the Crédit Agricole bank. see : https://www.ca-moncommerce.com/espace-client-mon-commerce/up2pay-e-transactions/ma-documentation/manuel-dintegration-focus-3ds-v2/principes-generaux/#integration-3dsv2-developpeur-webmaster.

Source code in counter/models.py
def to_3dsv2_xml(self) -> str:
    """Convert the data from this model into a xml usable
    by the online paying service of the Crédit Agricole bank.
    see : `https://www.ca-moncommerce.com/espace-client-mon-commerce/up2pay-e-transactions/ma-documentation/manuel-dintegration-focus-3ds-v2/principes-generaux/#integration-3dsv2-developpeur-webmaster`.
    """
    data = {
        "Address": {
            "FirstName": self.first_name,
            "LastName": self.last_name,
            "Address1": self.address_1,
            "ZipCode": self.zip_code,
            "City": self.city,
            "CountryCode": self.country.numeric,  # ISO-3166-1 numeric code
            "MobilePhone": self.phone_number.as_national.replace(" ", ""),
            "CountryCodeMobilePhone": f"+{self.phone_number.country_code}",
        }
    }
    if self.address_2:
        data["Address"]["Address2"] = self.address_2
    xml = dict2xml(data, wrap="Billing", newlines=False)
    return '<?xml version="1.0" encoding="UTF-8" ?>' + xml

AccountDumpQuerySet

Bases: QuerySet

ongoing()

Filter dump operations that are not completed yet.

Source code in counter/models.py
def ongoing(self) -> Self:
    """Filter dump operations that are not completed yet."""
    return self.filter(dump_operation=None)

AccountDump

Bases: Model

The process of dumping an account.

ProductType

Bases: OrderedModel

A product type.

Useful only for categorizing.

is_owned_by(user)

Method to see if that object can be edited by the given user.

Source code in counter/models.py
def is_owned_by(self, user):
    """Method to see if that object can be edited by the given user."""
    if user.is_anonymous:
        return False
    return user.is_in_group(pk=settings.SITH_GROUP_ACCOUNTING_ADMIN_ID)

Product

Bases: Model

A product, with all its related information.

is_owned_by(user)

Method to see if that object can be edited by the given user.

Source code in counter/models.py
def is_owned_by(self, user):
    """Method to see if that object can be edited by the given user."""
    if user.is_anonymous:
        return False
    return user.is_in_group(
        pk=settings.SITH_GROUP_ACCOUNTING_ADMIN_ID
    ) or user.is_in_group(pk=settings.SITH_GROUP_COUNTER_ADMIN_ID)

can_be_sold_to(user)

Check if whether the user given in parameter has the right to buy this product or not.

This must be not confused with the Customer.can_buy() method as the present method returns an information about the relation between a User and a Product, whereas the other tells something about a Customer (and not a user, they are not the same model).

Returns:

Type Description
bool

True if the user can buy this product else False

Warning

This performs a db query, thus you can quickly have a N+1 queries problem if you call it in a loop. Hopefully, you can avoid that if you prefetch the buying_groups :

user = User.objects.get(username="foobar")
products = [
    p
    for p in Product.objects.prefetch_related("buying_groups")
    if p.can_be_sold_to(user)
]
Source code in counter/models.py
def can_be_sold_to(self, user: User) -> bool:
    """Check if whether the user given in parameter has the right to buy
    this product or not.

    This must be not confused with the Customer.can_buy()
    method as the present method returns an information
    about the relation between a User and a Product,
    whereas the other tells something about a Customer
    (and not a user, they are not the same model).

    Returns:
        True if the user can buy this product else False

    Warning:
        This performs a db query, thus you can quickly have
        a N+1 queries problem if you call it in a loop.
        Hopefully, you can avoid that if you prefetch the buying_groups :

        ```python
        user = User.objects.get(username="foobar")
        products = [
            p
            for p in Product.objects.prefetch_related("buying_groups")
            if p.can_be_sold_to(user)
        ]
        ```
    """
    buying_groups = list(self.buying_groups.all())
    if not buying_groups:
        return True
    return any(user.is_in_group(pk=group.id) for group in buying_groups)

CounterQuerySet

Bases: QuerySet

annotate_has_barman(user)

Annotate the queryset with the user_is_barman field.

For each counter, this field has value True if the user is a barman of this counter, else False.

Parameters:

Name Type Description Default
user User

the user we want to check if he is a barman

required

Examples:

sli = User.objects.get(username="sli")
counters = (
    Counter.objects
    .annotate_has_barman(sli)  # add the user_has_barman boolean field
    .filter(has_annotated_barman=True)  # keep only counters where this user is barman
)
print("Sli est barman dans les comptoirs suivants :")
for counter in counters:
    print(f"- {counter.name}")
Source code in counter/models.py
def annotate_has_barman(self, user: User) -> Self:
    """Annotate the queryset with the `user_is_barman` field.

    For each counter, this field has value True if the user
    is a barman of this counter, else False.

    Args:
        user: the user we want to check if he is a barman

    Examples:
        ```python
        sli = User.objects.get(username="sli")
        counters = (
            Counter.objects
            .annotate_has_barman(sli)  # add the user_has_barman boolean field
            .filter(has_annotated_barman=True)  # keep only counters where this user is barman
        )
        print("Sli est barman dans les comptoirs suivants :")
        for counter in counters:
            print(f"- {counter.name}")
        ```
    """
    subquery = user.counters.filter(pk=OuterRef("pk"))
    return self.annotate(has_annotated_barman=Exists(subquery))

annotate_is_open()

Annotate tue queryset with the is_open field.

For each counter, if is_open=True, then the counter is currently opened. Else the counter is closed.

Source code in counter/models.py
def annotate_is_open(self) -> Self:
    """Annotate tue queryset with the `is_open` field.

    For each counter, if `is_open=True`, then the counter is currently opened.
    Else the counter is closed.
    """
    return self.annotate(
        is_open=Exists(
            Permanency.objects.filter(counter_id=OuterRef("pk"), end=None)
        )
    )

handle_timeout()

Disconnect the barmen who are inactive in the given counters.

Returns:

Type Description
int

The number of affected rows (ie, the number of timeouted permanences)

Source code in counter/models.py
def handle_timeout(self) -> int:
    """Disconnect the barmen who are inactive in the given counters.

    Returns:
        The number of affected rows (ie, the number of timeouted permanences)
    """
    timeout = timezone.now() - timedelta(minutes=settings.SITH_BARMAN_TIMEOUT)
    return Permanency.objects.filter(
        counter__in=self, end=None, activity__lt=timeout
    ).update(end=F("activity"))

Counter

Bases: Model

gen_token()

Generate a new token for this counter.

Source code in counter/models.py
def gen_token(self) -> None:
    """Generate a new token for this counter."""
    self.token = "".join(
        random.choice(string.ascii_letters + string.digits) for _ in range(30)
    )
    self.save()

barmen_list()

Returns the barman list as list of User.

Source code in counter/models.py
@cached_property
def barmen_list(self) -> list[User]:
    """Returns the barman list as list of User."""
    return [
        p.user for p in self.permanencies.filter(end=None).select_related("user")
    ]

get_random_barman()

Return a random user being currently a barman.

Source code in counter/models.py
def get_random_barman(self) -> User:
    """Return a random user being currently a barman."""
    return random.choice(self.barmen_list)

update_activity()

Update the barman activity to prevent timeout.

Source code in counter/models.py
def update_activity(self) -> None:
    """Update the barman activity to prevent timeout."""
    self.permanencies.filter(end=None).update(activity=timezone.now())

can_refill()

Show if the counter authorize the refilling with physic money.

Source code in counter/models.py
def can_refill(self) -> bool:
    """Show if the counter authorize the refilling with physic money."""
    if self.type != "BAR":
        return False
    # at least one of the barmen is in the AE board
    ae = Club.objects.get(unix_name=SITH_MAIN_CLUB["unix_name"])
    return any(ae.get_membership_for(barman) for barman in self.barmen_list)

get_top_barmen()

Return a QuerySet querying the office hours stats of all the barmen of all time of this counter, ordered by descending number of hours.

Each element of the QuerySet corresponds to a barman and has the following data
  • the full name (first name + last name) of the barman
  • the nickname of the barman
  • the promo of the barman
  • the total number of office hours the barman did attend
Source code in counter/models.py
def get_top_barmen(self) -> QuerySet:
    """Return a QuerySet querying the office hours stats of all the barmen of all time
    of this counter, ordered by descending number of hours.

    Each element of the QuerySet corresponds to a barman and has the following data :
        - the full name (first name + last name) of the barman
        - the nickname of the barman
        - the promo of the barman
        - the total number of office hours the barman did attend
    """
    return (
        self.permanencies.exclude(end=None)
        .annotate(
            name=Concat(F("user__first_name"), Value(" "), F("user__last_name"))
        )
        .annotate(nickname=F("user__nick_name"))
        .annotate(promo=F("user__promo"))
        .values("user", "name", "nickname", "promo")
        .annotate(perm_sum=Sum(F("end") - F("start")))
        .exclude(perm_sum=None)
        .order_by("-perm_sum")
    )

get_top_customers(since=None)

Return a QuerySet querying the money spent by customers of this counter since the specified date, ordered by descending amount of money spent.

Each element of the QuerySet corresponds to a customer and has the following data :

  • the full name (first name + last name) of the customer
  • the nickname of the customer
  • the amount of money spent by the customer

Parameters:

Name Type Description Default
since datetime | date | None

timestamp from which to perform the calculation

None
Source code in counter/models.py
def get_top_customers(self, since: datetime | date | None = None) -> QuerySet:
    """Return a QuerySet querying the money spent by customers of this counter
    since the specified date, ordered by descending amount of money spent.

    Each element of the QuerySet corresponds to a customer and has the following data :

    - the full name (first name + last name) of the customer
    - the nickname of the customer
    - the amount of money spent by the customer

    Args:
        since: timestamp from which to perform the calculation
    """
    if since is None:
        since = get_start_of_semester()
    if isinstance(since, date):
        since = datetime(since.year, since.month, since.day, tzinfo=tz.utc)
    return (
        self.sellings.filter(date__gte=since)
        .annotate(
            name=Concat(
                F("customer__user__first_name"),
                Value(" "),
                F("customer__user__last_name"),
            )
        )
        .annotate(nickname=F("customer__user__nick_name"))
        .annotate(promo=F("customer__user__promo"))
        .annotate(user=F("customer__user"))
        .values("user", "promo", "name", "nickname")
        .annotate(
            selling_sum=Sum(
                F("unit_price") * F("quantity"), output_field=CurrencyField()
            )
        )
        .filter(selling_sum__gt=0)
        .order_by("-selling_sum")
    )

get_total_sales(since=None)

Compute and return the total turnover of this counter since the given date.

By default, the date is the start of the current semester.

Parameters:

Name Type Description Default
since datetime | date | None

timestamp from which to perform the calculation

None

Returns:

Type Description
CurrencyField

Total revenue earned at this counter.

Source code in counter/models.py
def get_total_sales(self, since: datetime | date | None = None) -> CurrencyField:
    """Compute and return the total turnover of this counter since the given date.

    By default, the date is the start of the current semester.

    Args:
        since: timestamp from which to perform the calculation

    Returns:
        Total revenue earned at this counter.
    """
    if since is None:
        since = get_start_of_semester()
    if isinstance(since, date):
        since = datetime(since.year, since.month, since.day, tzinfo=tz.utc)
    return self.sellings.filter(date__gte=since).aggregate(
        total=Sum(
            F("quantity") * F("unit_price"),
            default=0,
            output_field=CurrencyField(),
        )
    )["total"]

customer_is_barman(customer)

Check if this counter is a bar and if the customer is currently logged in. This is useful to compute special prices.

Source code in counter/models.py
def customer_is_barman(self, customer: Customer | User) -> bool:
    """Check if this counter is a `bar` and if the customer is currently logged in.
    This is useful to compute special prices."""

    # Customer and User are two different tables,
    # but they share the same primary key
    return self.type == "BAR" and any(b.pk == customer.pk for b in self.barmen_list)

get_products_for(customer)

Get all allowed products for the provided customer on this counter Prices will be annotated

Source code in counter/models.py
def get_products_for(self, customer: Customer) -> list[Product]:
    """
    Get all allowed products for the provided customer on this counter
    Prices will be annotated
    """

    products = self.products.select_related("product_type").prefetch_related(
        "buying_groups"
    )

    # Only include age appropriate products
    age = customer.user.age
    if customer.user.is_banned_alcohol:
        age = min(age, 17)
    products = products.filter(limit_age__lte=age)

    # Compute special price for customer if he is a barmen on that bar
    if self.customer_is_barman(customer):
        products = products.annotate(price=F("special_selling_price"))
    else:
        products = products.annotate(price=F("selling_price"))

    return [
        product
        for product in products.all()
        if product.can_be_sold_to(customer.user)
    ]

RefillingQuerySet

Bases: QuerySet

annotate_total()

Annotate the Queryset with the total amount.

The total is just the sum of the amounts for each row. If no grouping is involved (like in most queries), this is just the same as doing nothing and fetching the amount attribute.

However, it may be useful when there is a group by clause in the query, or when other models are queried and having a common interface is helpful (e.g. Selling.objects.annotate_total() and Refilling.objects.annotate_total() will both have the total field).

Source code in counter/models.py
def annotate_total(self) -> Self:
    """Annotate the Queryset with the total amount.

    The total is just the sum of the amounts for each row.
    If no grouping is involved (like in most queries),
    this is just the same as doing nothing and fetching the
    `amount` attribute.

    However, it may be useful when there is a `group by` clause
    in the query, or when other models are queried and having
    a common interface is helpful (e.g. `Selling.objects.annotate_total()`
    and `Refilling.objects.annotate_total()` will both have the `total` field).
    """
    return self.annotate(total=Sum("amount"))

Refilling

Bases: Model

Handle the refilling.

SellingQuerySet

Bases: QuerySet

annotate_total()

Annotate the Queryset with the total amount of the sales.

The total is considered as the sum of (unit_price * quantity).

Source code in counter/models.py
def annotate_total(self) -> Self:
    """Annotate the Queryset with the total amount of the sales.

    The total is considered as the sum of (unit_price * quantity).
    """
    return self.annotate(total=Sum(F("unit_price") * F("quantity")))

Selling

Bases: Model

Handle the sellings.

save(*args, allow_negative=False, **kwargs)

allow_negative : Allow this selling to use more money than available for this user.

Source code in counter/models.py
def save(self, *args, allow_negative=False, **kwargs):
    """allow_negative : Allow this selling to use more money than available for this user."""
    if not self.date:
        self.date = timezone.now()
    self.full_clean()
    if not self.is_validated:
        self.customer.amount -= self.quantity * self.unit_price
        self.customer.save(allow_negative=allow_negative, is_selling=True)
        self.is_validated = True
    user = self.customer.user
    if user.was_subscribed:
        if (
            self.product
            and self.product.id == settings.SITH_PRODUCT_SUBSCRIPTION_ONE_SEMESTER
        ):
            sub = Subscription(
                member=user,
                subscription_type="un-semestre",
                payment_method="EBOUTIC",
                location="EBOUTIC",
            )
            sub.subscription_start = Subscription.compute_start()
            sub.subscription_start = Subscription.compute_start(
                duration=settings.SITH_SUBSCRIPTIONS[sub.subscription_type][
                    "duration"
                ]
            )
            sub.subscription_end = Subscription.compute_end(
                duration=settings.SITH_SUBSCRIPTIONS[sub.subscription_type][
                    "duration"
                ],
                start=sub.subscription_start,
            )
            sub.save()
        elif (
            self.product
            and self.product.id == settings.SITH_PRODUCT_SUBSCRIPTION_TWO_SEMESTERS
        ):
            sub = Subscription(
                member=user,
                subscription_type="deux-semestres",
                payment_method="EBOUTIC",
                location="EBOUTIC",
            )
            sub.subscription_start = Subscription.compute_start()
            sub.subscription_start = Subscription.compute_start(
                duration=settings.SITH_SUBSCRIPTIONS[sub.subscription_type][
                    "duration"
                ]
            )
            sub.subscription_end = Subscription.compute_end(
                duration=settings.SITH_SUBSCRIPTIONS[sub.subscription_type][
                    "duration"
                ],
                start=sub.subscription_start,
            )
            sub.save()
    if user.preferences.notify_on_click:
        Notification(
            user=user,
            url=reverse(
                "core:user_account_detail",
                kwargs={
                    "user_id": user.id,
                    "year": self.date.year,
                    "month": self.date.month,
                },
            ),
            param="%d x %s" % (self.quantity, self.label),
            type="SELLING",
        ).save()
    super().save(*args, **kwargs)
    if hasattr(self.product, "eticket"):
        self.send_mail_customer()

Permanency

Bases: Model

A permanency of a barman, on a counter.

This aims at storing a traceability of who was barman where and when. Mainly for dick size contest establishing the top 10 barmen of the semester.

CashRegisterSummary

Bases: Model

is_owned_by(user)

Method to see if that object can be edited by the given user.

Source code in counter/models.py
def is_owned_by(self, user):
    """Method to see if that object can be edited by the given user."""
    if user.is_anonymous:
        return False
    return user.is_in_group(pk=settings.SITH_GROUP_COUNTER_ADMIN_ID)

CashRegisterSummaryItem

Bases: Model

Eticket

Bases: Model

Eticket can be linked to a product an allows PDF generation.

is_owned_by(user)

Method to see if that object can be edited by the given user.

Source code in counter/models.py
def is_owned_by(self, user):
    """Method to see if that object can be edited by the given user."""
    if user.is_anonymous:
        return False
    return user.is_in_group(pk=settings.SITH_GROUP_COUNTER_ADMIN_ID)

StudentCard

Bases: Model

Alternative way to connect a customer into a counter.

We are using Mifare DESFire EV1 specs since it's used for izly cards https://www.nxp.com/docs/en/application-note/AN10927.pdf UID is 7 byte long that means 14 hexa characters.

get_start_of_semester(today=None)

Return the date of the start of the semester of the given date. If no date is given, return the start date of the current semester.

The current semester is computed as follows:

  • If the date is between 15/08 and 31/12 => Autumn semester.
  • If the date is between 01/01 and 15/02 => Autumn semester of the previous year.
  • If the date is between 15/02 and 15/08 => Spring semester

Parameters:

Name Type Description Default
today date | None

the date to use to compute the semester. If None, use today's date.

None

Returns:

Type Description
date

the date of the start of the semester

Source code in core/utils.py
def get_start_of_semester(today: date | None = None) -> date:
    """Return the date of the start of the semester of the given date.
    If no date is given, return the start date of the current semester.

    The current semester is computed as follows:

    - If the date is between 15/08 and 31/12  => Autumn semester.
    - If the date is between 01/01 and 15/02  => Autumn semester of the previous year.
    - If the date is between 15/02 and 15/08  => Spring semester

    Args:
        today: the date to use to compute the semester. If None, use today's date.

    Returns:
        the date of the start of the semester
    """
    if today is None:
        today = localdate()

    autumn = date(today.year, *settings.SITH_SEMESTER_START_AUTUMN)
    spring = date(today.year, *settings.SITH_SEMESTER_START_SPRING)

    if today >= autumn:  # between 15/08 (included) and 31/12 -> autumn semester
        return autumn
    if today >= spring:  # between 15/02 (included) and 15/08 -> spring semester
        return spring
    # between 01/01 and 15/02 -> autumn semester of the previous year
    return autumn.replace(year=autumn.year - 1)