Files
Sith/counter/forms.py
2025-11-27 14:33:21 +01:00

608 lines
20 KiB
Python

import json
import math
import uuid
from datetime import date, datetime, timezone
from dateutil.relativedelta import relativedelta
from django import forms
from django.core.validators import MaxValueValidator
from django.db.models import Exists, OuterRef, Q
from django.forms import BaseModelFormSet
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from django_celery_beat.models import ClockedSchedule
from phonenumber_field.widgets import RegionalPhoneNumberWidget
from club.models import Club
from club.widgets.ajax_select import AutoCompleteSelectClub
from core.models import User
from core.views.forms import (
FutureDateTimeField,
NFCTextInput,
SelectDate,
SelectDateTime,
)
from core.views.widgets.ajax_select import (
AutoCompleteSelect,
AutoCompleteSelectMultipleGroup,
AutoCompleteSelectMultipleUser,
AutoCompleteSelectUser,
)
from counter.models import (
BillingInfo,
Counter,
Customer,
Eticket,
InvoiceCall,
Product,
ProductFormula,
Refilling,
ReturnableProduct,
ScheduledProductAction,
Selling,
StudentCard,
get_product_actions,
)
from counter.widgets.ajax_select import (
AutoCompleteSelectMultipleCounter,
AutoCompleteSelectMultipleProduct,
AutoCompleteSelectProduct,
)
class BillingInfoForm(forms.ModelForm):
class Meta:
model = BillingInfo
fields = [
"first_name",
"last_name",
"address_1",
"address_2",
"zip_code",
"city",
"country",
"phone_number",
]
widgets = {
"phone_number": RegionalPhoneNumberWidget,
}
class StudentCardForm(forms.ModelForm):
"""Form for adding student cards"""
error_css_class = "error"
class Meta:
model = StudentCard
fields = ["uid"]
widgets = {"uid": NFCTextInput}
def clean(self):
cleaned_data = super().clean()
uid = cleaned_data.get("uid", None)
if not uid or not StudentCard.is_valid(uid):
raise forms.ValidationError(_("This UID is invalid"), code="invalid")
return cleaned_data
class GetUserForm(forms.Form):
"""The Form class aims at providing a valid user_id field in its cleaned data, in order to pass it to some view,
reverse function, or any other use.
The Form implements a nice JS widget allowing the user to type a customer account id, or search the database with
some nickname, first name, or last name (TODO)
"""
code = forms.CharField(
label="Code",
max_length=StudentCard.UID_SIZE,
required=False,
widget=NFCTextInput,
)
id = forms.CharField(
label=_("Select user"),
help_text=None,
widget=AutoCompleteSelectUser,
required=False,
)
def as_p(self):
self.fields["code"].widget.attrs["autofocus"] = True
return super().as_p()
def clean(self):
cleaned_data = super().clean()
customer = None
if cleaned_data["code"] != "":
if len(cleaned_data["code"]) == StudentCard.UID_SIZE:
card = (
StudentCard.objects.filter(uid=cleaned_data["code"])
.select_related("customer")
.first()
)
if card is not None:
customer = card.customer
if customer is None:
customer = Customer.objects.filter(
account_id__iexact=cleaned_data["code"]
).first()
elif cleaned_data["id"]:
customer = Customer.objects.filter(user=cleaned_data["id"]).first()
if customer is None or not customer.can_buy:
raise forms.ValidationError(_("User not found"))
cleaned_data["user_id"] = customer.user.id
cleaned_data["user"] = customer.user
return cleaned_data
class RefillForm(forms.ModelForm):
allowed_refilling_methods = [
Refilling.PaymentMethod.CASH,
Refilling.PaymentMethod.CARD,
]
error_css_class = "error"
required_css_class = "required"
amount = forms.FloatField(
min_value=0, widget=forms.NumberInput(attrs={"class": "focus"})
)
class Meta:
model = Refilling
fields = ["amount", "payment_method"]
widgets = {"payment_method": forms.RadioSelect}
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fields["payment_method"].choices = (
method
for method in self.fields["payment_method"].choices
if method[0] in self.allowed_refilling_methods
)
if self.fields["payment_method"].initial not in self.allowed_refilling_methods:
self.fields["payment_method"].initial = self.allowed_refilling_methods[0]
class CounterEditForm(forms.ModelForm):
class Meta:
model = Counter
fields = ["sellers", "products"]
widgets = {
"sellers": AutoCompleteSelectMultipleUser,
"products": AutoCompleteSelectMultipleProduct,
}
class ScheduledProductActionForm(forms.ModelForm):
"""Form for automatic product archiving.
The `save` method will update or create tasks using celery-beat.
"""
required_css_class = "required"
prefix = "scheduled"
class Meta:
model = ScheduledProductAction
fields = ["task"]
widgets = {"task": forms.RadioSelect(choices=get_product_actions)}
labels = {"task": _("Action")}
help_texts = {"task": ""}
trigger_at = FutureDateTimeField(
label=_("Date and time of action"), widget=SelectDateTime
)
counters = forms.ModelMultipleChoiceField(
label=_("New counters"),
help_text=_("The selected counters will replace the current ones"),
required=False,
widget=AutoCompleteSelectMultipleCounter,
queryset=Counter.objects.all(),
)
def __init__(self, *args, product: Product, **kwargs):
self.product = product
super().__init__(*args, **kwargs)
if not self.instance._state.adding:
self.fields["trigger_at"].initial = self.instance.clocked.clocked_time
self.fields["counters"].initial = json.loads(self.instance.kwargs).get(
"counters"
)
def clean(self):
if not self.changed_data or "trigger_at" in self.errors:
return super().clean()
if "trigger_at" in self.changed_data:
if not self.instance.clocked_id:
self.instance.clocked = ClockedSchedule(
clocked_time=self.cleaned_data["trigger_at"]
)
else:
self.instance.clocked.clocked_time = self.cleaned_data["trigger_at"]
self.instance.clocked.save()
task_kwargs = {"product_id": self.product.id}
if (
self.cleaned_data["task"] == "counter.tasks.change_counters"
and "counters" in self.changed_data
):
task_kwargs["counters"] = [c.id for c in self.cleaned_data["counters"]]
self.instance.product = self.product
self.instance.kwargs = json.dumps(task_kwargs)
self.instance.name = (
f"{self.cleaned_data['task']} - {self.product} - {uuid.uuid4()}"
)
return super().clean()
def set_product(self, product: Product):
"""Set the product to which this form's instance is linked.
When this form is linked to a ProductForm in the case of a product's creation,
the product doesn't exist yet, so saving this form as is will result
in having `{"product_id": null}` in the action kwargs.
For the creation to be useful, it may be needed to inject the newly created
product into this form, before saving the latter.
"""
self.product = product
kwargs = json.loads(self.instance.kwargs) | {"product_id": self.product.id}
self.instance.kwargs = json.dumps(kwargs)
class BaseScheduledProductActionFormSet(BaseModelFormSet):
def __init__(self, *args, product: Product, **kwargs):
if product.id:
queryset = (
product.scheduled_actions.filter(
enabled=True, clocked__clocked_time__gt=now()
)
.order_by("clocked__clocked_time")
.select_related("clocked")
)
else:
queryset = ScheduledProductAction.objects.none()
form_kwargs = {"product": product}
super().__init__(*args, queryset=queryset, form_kwargs=form_kwargs, **kwargs)
def delete_existing(self, obj: ScheduledProductAction, commit: bool = True): # noqa FBT001
clocked = obj.clocked
super().delete_existing(obj, commit=commit)
if commit:
clocked.delete()
ScheduledProductActionFormSet = forms.modelformset_factory(
ScheduledProductAction,
ScheduledProductActionForm,
formset=BaseScheduledProductActionFormSet,
absolute_max=None,
can_delete=True,
can_delete_extra=False,
extra=2,
)
class ProductForm(forms.ModelForm):
error_css_class = "error"
required_css_class = "required"
class Meta:
model = Product
fields = [
"name",
"description",
"product_type",
"code",
"buying_groups",
"purchase_price",
"selling_price",
"special_selling_price",
"icon",
"club",
"limit_age",
"tray",
"archived",
]
help_texts = {
"description": _(
"Describe the product. If it's an event's click, "
"give some insights about it, like the date (including the year)."
)
}
widgets = {
"product_type": AutoCompleteSelect,
"buying_groups": AutoCompleteSelectMultipleGroup,
"club": AutoCompleteSelectClub,
}
counters = forms.ModelMultipleChoiceField(
label=_("Counters"),
required=False,
widget=AutoCompleteSelectMultipleCounter,
queryset=Counter.objects.all(),
)
def __init__(self, *args, instance=None, **kwargs):
super().__init__(*args, instance=instance, **kwargs)
if self.instance.id:
self.fields["counters"].initial = self.instance.counters.all()
if hasattr(self.instance, "formula"):
self.formula_init(self.instance.formula)
self.action_formset = ScheduledProductActionFormSet(
*args, product=self.instance, **kwargs
)
def formula_init(self, formula: ProductFormula):
"""Part of the form initialisation specific to formula products."""
self.fields["selling_price"].help_text = _(
"This product is a formula. "
"Its price cannot be greater than the price "
"of the products constituting it, which is %(price)s"
) % {"price": formula.max_selling_price}
self.fields["special_selling_price"].help_text = _(
"This product is a formula. "
"Its special price cannot be greater than the price "
"of the products constituting it, which is %(price)s"
) % {"price": formula.max_special_selling_price}
for key, price in (
("selling_price", formula.max_selling_price),
("special_selling_price", formula.max_special_selling_price),
):
self.fields[key].widget.attrs["max"] = price
self.fields[key].validators.append(MaxValueValidator(price))
def is_valid(self):
return super().is_valid() and self.action_formset.is_valid()
def save(self, *args, **kwargs) -> Product:
product = super().save(*args, **kwargs)
product.counters.set(self.cleaned_data["counters"])
for form in self.action_formset:
# if it's a creation, the product given in the formset
# wasn't a persisted instance.
# So if we tried to persist the scheduled actions in the current state,
# they would be linked to no product, thus be completely useless
# To make it work, we have to replace
# the initial product with a persisted one
form.set_product(product)
self.action_formset.save()
return product
class ProductFormulaForm(forms.ModelForm):
class Meta:
model = ProductFormula
fields = ["products", "result"]
widgets = {
"products": AutoCompleteSelectMultipleProduct,
"result": AutoCompleteSelectProduct,
}
def clean(self):
cleaned_data = super().clean()
if cleaned_data["result"] in cleaned_data["products"]:
self.add_error(
None,
_(
"The same product cannot be at the same time "
"the result and a part of the formula."
),
)
prices = [p.selling_price for p in cleaned_data["products"]]
special_prices = [p.special_selling_price for p in cleaned_data["products"]]
selling_price = cleaned_data["result"].selling_price
special_selling_price = cleaned_data["result"].special_selling_price
if selling_price > sum(prices) or special_selling_price > sum(special_prices):
self.add_error(
"result",
_(
"The result cannot be more expensive "
"than the total of the other products."
),
)
return cleaned_data
class ReturnableProductForm(forms.ModelForm):
class Meta:
model = ReturnableProduct
fields = ["product", "returned_product", "max_return"]
widgets = {
"product": AutoCompleteSelectProduct,
"returned_product": AutoCompleteSelectProduct,
}
def save(self, commit: bool = True) -> ReturnableProduct: # noqa FBT
instance: ReturnableProduct = super().save(commit=commit)
if commit:
# This is expensive, but we don't have a task queue to offload it.
# Hopefully, creations and updates of returnable products
# occur very rarely
instance.update_balances()
return instance
class CashSummaryFormBase(forms.Form):
begin_date = forms.DateTimeField(
label=_("Begin date"), widget=SelectDateTime, required=False
)
end_date = forms.DateTimeField(
label=_("End date"), widget=SelectDateTime, required=False
)
class EticketForm(forms.ModelForm):
class Meta:
model = Eticket
fields = ["product", "banner", "event_title", "event_date"]
widgets = {
"product": AutoCompleteSelectProduct,
"event_date": SelectDate,
}
class CloseCustomerAccountForm(forms.Form):
user = forms.ModelChoiceField(
label=_("Refound this account"),
required=True,
widget=AutoCompleteSelectUser,
queryset=User.objects.all(),
)
class BasketProductForm(forms.Form):
quantity = forms.IntegerField(min_value=1, required=True)
id = forms.IntegerField(min_value=0, required=True)
def __init__(
self,
customer: Customer,
counter: Counter,
allowed_products: dict[int, Product],
*args,
**kwargs,
):
self.customer = customer # Used by formset
self.counter = counter # Used by formset
self.allowed_products = allowed_products
super().__init__(*args, **kwargs)
def clean_id(self):
data = self.cleaned_data["id"]
# We store self.product so we can use it later on the formset validation
# And also in the global clean
self.product = self.allowed_products.get(data, None)
if self.product is None:
raise forms.ValidationError(
_("The selected product isn't available for this user")
)
return data
def clean(self):
cleaned_data = super().clean()
if len(self.errors) > 0:
return
# Compute prices
cleaned_data["bonus_quantity"] = 0
if self.product.tray:
cleaned_data["bonus_quantity"] = math.floor(
cleaned_data["quantity"] / Product.QUANTITY_FOR_TRAY_PRICE
)
cleaned_data["total_price"] = self.product.price * (
cleaned_data["quantity"] - cleaned_data["bonus_quantity"]
)
return cleaned_data
class BaseBasketForm(forms.BaseFormSet):
def clean(self):
self.forms = [form for form in self.forms if form.cleaned_data != {}]
if len(self.forms) == 0:
return
self._check_forms_have_errors()
self._check_product_are_unique()
self._check_recorded_products(self[0].customer)
self._check_enough_money(self[0].counter, self[0].customer)
def _check_forms_have_errors(self):
if any(len(form.errors) > 0 for form in self):
raise forms.ValidationError(_("Submitted basket is invalid"))
def _check_product_are_unique(self):
product_ids = {form.cleaned_data["id"] for form in self.forms}
if len(product_ids) != len(self.forms):
raise forms.ValidationError(_("Duplicated product entries."))
def _check_enough_money(self, counter: Counter, customer: Customer):
self.total_price = sum([data["total_price"] for data in self.cleaned_data])
if self.total_price > customer.amount:
raise forms.ValidationError(_("Not enough money"))
def _check_recorded_products(self, customer: Customer):
"""Check for, among other things, ecocups and pitchers"""
items = {
form.cleaned_data["id"]: form.cleaned_data["quantity"]
for form in self.forms
}
ids = list(items.keys())
returnables = list(
ReturnableProduct.objects.filter(
Q(product_id__in=ids) | Q(returned_product_id__in=ids)
).annotate_balance_for(customer)
)
limit_reached = []
for returnable in returnables:
returnable.balance += items.get(returnable.product_id, 0)
for returnable in returnables:
dcons = items.get(returnable.returned_product_id, 0)
returnable.balance -= dcons
if dcons and returnable.balance < -returnable.max_return:
limit_reached.append(returnable.returned_product)
if limit_reached:
raise forms.ValidationError(
_(
"This user have reached his recording limit "
"for the following products : %s"
)
% ", ".join([str(p) for p in limit_reached])
)
BasketForm = forms.formset_factory(
BasketProductForm, formset=BaseBasketForm, absolute_max=None, min_num=1
)
class InvoiceCallForm(forms.Form):
def __init__(self, *args, month: date, **kwargs):
super().__init__(*args, **kwargs)
self.month = month
month_start = datetime(month.year, month.month, month.day, tzinfo=timezone.utc)
self.clubs = list(
Club.objects.filter(
Exists(
Selling.objects.filter(
club=OuterRef("pk"),
date__gte=month_start,
date__lte=month_start + relativedelta(months=1),
)
)
).annotate(
validated_invoice=Exists(
InvoiceCall.objects.filter(
club=OuterRef("pk"), month=month, is_validated=True
)
)
)
)
self.fields = {
str(club.id): forms.BooleanField(
required=False, initial=club.validated_invoice
)
for club in self.clubs
}
def save(self):
invoice_calls = [
InvoiceCall(
month=self.month,
club_id=club.id,
is_validated=self.cleaned_data.get(str(club.id), False),
)
for club in self.clubs
]
InvoiceCall.objects.bulk_create(
invoice_calls,
update_conflicts=True,
update_fields=["is_validated"],
unique_fields=["month", "club"],
)