import json import math import uuid from collections import defaultdict from datetime import date, datetime, timezone from dateutil.relativedelta import relativedelta from django import forms from django.core.validators import MaxValueValidator from django.db.models import Exists, OuterRef, Q from django.forms import BaseModelFormSet from django.utils.timezone import now from django.utils.translation import gettext_lazy as _ from django_celery_beat.models import ClockedSchedule from phonenumber_field.widgets import RegionalPhoneNumberWidget from club.models import Club from club.widgets.ajax_select import AutoCompleteSelectClub from core.models import User from core.views.forms import ( FutureDateTimeField, NFCTextInput, SelectDate, SelectDateTime, ) from core.views.widgets.ajax_select import ( AutoCompleteSelect, AutoCompleteSelectMultiple, AutoCompleteSelectMultipleGroup, AutoCompleteSelectMultipleUser, AutoCompleteSelectUser, ) from counter.models import ( BillingInfo, Counter, Customer, Eticket, InvoiceCall, Price, Product, ProductFormula, Refilling, ReturnableProduct, ScheduledProductAction, Selling, StudentCard, get_product_actions, ) from counter.widgets.ajax_select import ( AutoCompleteSelectMultipleCounter, AutoCompleteSelectMultipleProduct, AutoCompleteSelectProduct, ) class BillingInfoForm(forms.ModelForm): class Meta: model = BillingInfo fields = [ "first_name", "last_name", "address_1", "address_2", "zip_code", "city", "country", "phone_number", ] widgets = { "phone_number": RegionalPhoneNumberWidget, } class StudentCardForm(forms.ModelForm): """Form for adding student cards""" error_css_class = "error" class Meta: model = StudentCard fields = ["uid"] widgets = {"uid": NFCTextInput} def clean(self): cleaned_data = super().clean() uid = cleaned_data.get("uid", None) if not uid or not StudentCard.is_valid(uid): raise forms.ValidationError(_("This UID is invalid"), code="invalid") return cleaned_data class GetUserForm(forms.Form): """The Form class aims at providing a valid user_id field in its cleaned data, in order to pass it to some view, reverse function, or any other use. The Form implements a nice JS widget allowing the user to type a customer account id, or search the database with some nickname, first name, or last name (TODO) """ code = forms.CharField( label="Code", max_length=StudentCard.UID_SIZE, required=False, widget=NFCTextInput, ) id = forms.CharField( label=_("Select user"), help_text=None, widget=AutoCompleteSelectUser, required=False, ) def as_p(self): self.fields["code"].widget.attrs["autofocus"] = True return super().as_p() def clean(self): cleaned_data = super().clean() customer = None if cleaned_data["code"] != "": if len(cleaned_data["code"]) == StudentCard.UID_SIZE: card = ( StudentCard.objects.filter(uid=cleaned_data["code"]) .select_related("customer") .first() ) if card is not None: customer = card.customer if customer is None: customer = Customer.objects.filter( account_id__iexact=cleaned_data["code"] ).first() elif cleaned_data["id"]: customer = Customer.objects.filter(user=cleaned_data["id"]).first() if customer is None or not customer.can_buy: raise forms.ValidationError(_("User not found")) cleaned_data["user_id"] = customer.user.id cleaned_data["user"] = customer.user return cleaned_data class RefillForm(forms.ModelForm): allowed_refilling_methods = [ Refilling.PaymentMethod.CASH, Refilling.PaymentMethod.CARD, ] error_css_class = "error" required_css_class = "required" amount = forms.FloatField( min_value=0, widget=forms.NumberInput(attrs={"class": "focus"}) ) class Meta: model = Refilling fields = ["amount", "payment_method"] widgets = {"payment_method": forms.RadioSelect} def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.fields["payment_method"].choices = ( method for method in self.fields["payment_method"].choices if method[0] in self.allowed_refilling_methods ) if self.fields["payment_method"].initial not in self.allowed_refilling_methods: self.fields["payment_method"].initial = self.allowed_refilling_methods[0] class CounterEditForm(forms.ModelForm): class Meta: model = Counter fields = ["sellers", "products"] widgets = {"sellers": AutoCompleteSelectMultipleUser} def __init__(self, *args, user: User, instance: Counter, **kwargs): super().__init__(*args, instance=instance, **kwargs) if user.has_perm("counter.change_counter"): self.fields["products"].widget = AutoCompleteSelectMultipleProduct() else: self.fields["products"].widget = AutoCompleteSelectMultiple() self.fields["products"].queryset = Product.objects.filter( Q(club_id=instance.club_id) | Q(counters=instance), archived=False ).distinct() self.fields["products"].help_text = _( "If you want to add a product that is not owned by " "your club to this counter, you should ask an admin." ) class ScheduledProductActionForm(forms.ModelForm): """Form for automatic product archiving. The `save` method will update or create tasks using celery-beat. """ required_css_class = "required" prefix = "scheduled" class Meta: model = ScheduledProductAction fields = ["task"] widgets = {"task": forms.RadioSelect(choices=get_product_actions)} labels = {"task": _("Action")} help_texts = {"task": ""} trigger_at = FutureDateTimeField( label=_("Date and time of action"), widget=SelectDateTime ) counters = forms.ModelMultipleChoiceField( label=_("New counters"), help_text=_("The selected counters will replace the current ones"), required=False, widget=AutoCompleteSelectMultipleCounter, queryset=Counter.objects.all(), ) def __init__(self, *args, product: Product, **kwargs): self.product = product super().__init__(*args, **kwargs) if not self.instance._state.adding: self.fields["trigger_at"].initial = self.instance.clocked.clocked_time self.fields["counters"].initial = json.loads(self.instance.kwargs).get( "counters" ) def clean(self): if not self.changed_data or "trigger_at" in self.errors: return super().clean() if "trigger_at" in self.changed_data: if not self.instance.clocked_id: self.instance.clocked = ClockedSchedule( clocked_time=self.cleaned_data["trigger_at"] ) else: self.instance.clocked.clocked_time = self.cleaned_data["trigger_at"] self.instance.clocked.save() task_kwargs = {"product_id": self.product.id} if ( self.cleaned_data["task"] == "counter.tasks.change_counters" and "counters" in self.changed_data ): task_kwargs["counters"] = [c.id for c in self.cleaned_data["counters"]] self.instance.product = self.product self.instance.kwargs = json.dumps(task_kwargs) self.instance.name = ( f"{self.cleaned_data['task']} - {self.product} - {uuid.uuid4()}" ) return super().clean() def set_product(self, product: Product): """Set the product to which this form's instance is linked. When this form is linked to a ProductForm in the case of a product's creation, the product doesn't exist yet, so saving this form as is will result in having `{"product_id": null}` in the action kwargs. For the creation to be useful, it may be needed to inject the newly created product into this form, before saving the latter. """ self.product = product kwargs = json.loads(self.instance.kwargs) | {"product_id": self.product.id} self.instance.kwargs = json.dumps(kwargs) class BaseScheduledProductActionFormSet(BaseModelFormSet): def __init__(self, *args, product: Product, **kwargs): if product.id: queryset = ( product.scheduled_actions.filter( enabled=True, clocked__clocked_time__gt=now() ) .order_by("clocked__clocked_time") .select_related("clocked") ) else: queryset = ScheduledProductAction.objects.none() form_kwargs = {"product": product} super().__init__(*args, queryset=queryset, form_kwargs=form_kwargs, **kwargs) def delete_existing(self, obj: ScheduledProductAction, commit: bool = True): # noqa FBT001 clocked = obj.clocked super().delete_existing(obj, commit=commit) if commit: clocked.delete() ScheduledProductActionFormSet = forms.modelformset_factory( ScheduledProductAction, ScheduledProductActionForm, formset=BaseScheduledProductActionFormSet, absolute_max=None, can_delete=True, can_delete_extra=False, extra=0, ) ProductPriceFormSet = forms.inlineformset_factory( parent_model=Product, model=Price, fields=["amount", "label", "groups", "is_always_shown"], widgets={ "groups": AutoCompleteSelectMultipleGroup, "is_always_shown": forms.CheckboxInput(attrs={"class": "switch"}), }, absolute_max=None, can_delete_extra=False, min_num=1, extra=0, ) class ProductForm(forms.ModelForm): error_css_class = "error" required_css_class = "required" class Meta: model = Product fields = [ "name", "description", "product_type", "code", "purchase_price", "icon", "club", "limit_age", "tray", "archived", ] help_texts = { "description": _( "Describe the product. If it's an event's click, " "give some insights about it, like the date (including the year)." ) } widgets = { "product_type": AutoCompleteSelect, "club": AutoCompleteSelectClub, "tray": forms.CheckboxInput(attrs={"class": "switch"}), } counters = forms.ModelMultipleChoiceField( label=_("Counters"), required=False, widget=AutoCompleteSelectMultipleCounter, queryset=Counter.objects.all(), ) def __init__(self, *args, prefix: str | None = None, instance=None, **kwargs): super().__init__(*args, prefix=prefix, instance=instance, **kwargs) self.fields["name"].widget.attrs["autofocus"] = "autofocus" if self.instance.id: self.fields["counters"].initial = self.instance.counters.all() if hasattr(self.instance, "formula"): self.formula_init(self.instance.formula) self.price_formset = ProductPriceFormSet( *args, instance=self.instance, prefix="price", **kwargs ) self.action_formset = ScheduledProductActionFormSet( *args, product=self.instance, prefix="action", **kwargs ) def formula_init(self, formula: ProductFormula): """Part of the form initialisation specific to formula products.""" self.fields["selling_price"].help_text = _( "This product is a formula. " "Its price cannot be greater than the price " "of the products constituting it, which is %(price)s €" ) % {"price": formula.max_selling_price} self.fields["special_selling_price"].help_text = _( "This product is a formula. " "Its special price cannot be greater than the price " "of the products constituting it, which is %(price)s €" ) % {"price": formula.max_special_selling_price} for key, price in ( ("selling_price", formula.max_selling_price), ("special_selling_price", formula.max_special_selling_price), ): self.fields[key].widget.attrs["max"] = price self.fields[key].validators.append(MaxValueValidator(price)) def is_valid(self): return ( super().is_valid() and self.price_formset.is_valid() and self.action_formset.is_valid() ) def save(self, *args, **kwargs) -> Product: product = super().save(*args, **kwargs) product.counters.set(self.cleaned_data["counters"]) # if it's a creation, the product given in the formset # wasn't a persisted instance. # So if we tried to persist the related objects in the current state, # they would be linked to no product, thus be completely useless # To make it work, we have to replace # the initial product with a persisted one for form in self.action_formset: form.set_product(product) self.action_formset.save() self.price_formset.save() return product class ProductFormulaForm(forms.ModelForm): class Meta: model = ProductFormula fields = ["products", "result"] widgets = { "products": AutoCompleteSelectMultipleProduct, "result": AutoCompleteSelectProduct, } def clean(self): cleaned_data = super().clean() if cleaned_data["result"] in cleaned_data["products"]: self.add_error( None, _( "The same product cannot be at the same time " "the result and a part of the formula." ), ) prices = [p.selling_price for p in cleaned_data["products"]] special_prices = [p.special_selling_price for p in cleaned_data["products"]] selling_price = cleaned_data["result"].selling_price special_selling_price = cleaned_data["result"].special_selling_price if selling_price > sum(prices) or special_selling_price > sum(special_prices): self.add_error( "result", _( "The result cannot be more expensive " "than the total of the other products." ), ) return cleaned_data class ReturnableProductForm(forms.ModelForm): class Meta: model = ReturnableProduct fields = ["product", "returned_product", "max_return"] widgets = { "product": AutoCompleteSelectProduct, "returned_product": AutoCompleteSelectProduct, } def save(self, commit: bool = True) -> ReturnableProduct: # noqa FBT instance: ReturnableProduct = super().save(commit=commit) if commit: # This is expensive, but we don't have a task queue to offload it. # Hopefully, creations and updates of returnable products # occur very rarely instance.update_balances() return instance class CashSummaryFormBase(forms.Form): begin_date = forms.DateTimeField( label=_("Begin date"), widget=SelectDateTime, required=False ) end_date = forms.DateTimeField( label=_("End date"), widget=SelectDateTime, required=False ) class EticketForm(forms.ModelForm): class Meta: model = Eticket fields = ["product", "banner", "event_title", "event_date"] widgets = { "product": AutoCompleteSelectProduct, "event_date": SelectDate, } class CloseCustomerAccountForm(forms.Form): user = forms.ModelChoiceField( label=_("Refound this account"), required=True, widget=AutoCompleteSelectUser, queryset=User.objects.all(), ) class BasketItemForm(forms.Form): quantity = forms.IntegerField(min_value=1, required=True) price_id = forms.IntegerField(min_value=0, required=True) def __init__( self, customer: Customer, counter: Counter, allowed_prices: dict[int, Price], *args, **kwargs, ): self.customer = customer # Used by formset self.counter = counter # Used by formset self.allowed_prices = allowed_prices super().__init__(*args, **kwargs) def clean_price_id(self): data = self.cleaned_data["price_id"] # We store self.price so we can use it later on the formset validation # And also in the global clean self.price = self.allowed_prices.get(data, None) if self.price is None: raise forms.ValidationError( _("The selected product isn't available for this user") ) return data def clean(self): cleaned_data = super().clean() if len(self.errors) > 0: return cleaned_data # Compute prices cleaned_data["bonus_quantity"] = 0 if self.price.product.tray: cleaned_data["bonus_quantity"] = math.floor( cleaned_data["quantity"] / Product.QUANTITY_FOR_TRAY_PRICE ) cleaned_data["total_price"] = self.price.amount * ( cleaned_data["quantity"] - cleaned_data["bonus_quantity"] ) return cleaned_data class BaseBasketForm(forms.BaseFormSet): def clean(self): self.forms = [form for form in self.forms if form.cleaned_data != {}] if len(self.forms) == 0: return self._check_forms_have_errors() self._check_product_are_unique() self._check_recorded_products(self[0].customer) self._check_enough_money(self[0].counter, self[0].customer) def _check_forms_have_errors(self): if any(len(form.errors) > 0 for form in self): raise forms.ValidationError(_("Submitted basket is invalid")) def _check_product_are_unique(self): price_ids = {form.cleaned_data["price_id"] for form in self.forms} if len(price_ids) != len(self.forms): raise forms.ValidationError(_("Duplicated product entries.")) def _check_enough_money(self, counter: Counter, customer: Customer): self.total_price = sum([data["total_price"] for data in self.cleaned_data]) if self.total_price > customer.amount: raise forms.ValidationError(_("Not enough money")) def _check_recorded_products(self, customer: Customer): """Check for, among other things, ecocups and pitchers""" items = defaultdict(int) for form in self.forms: items[form.price.product_id] += form.cleaned_data["quantity"] ids = list(items.keys()) returnables = list( ReturnableProduct.objects.filter( Q(product_id__in=ids) | Q(returned_product_id__in=ids) ).annotate_balance_for(customer) ) limit_reached = [] for returnable in returnables: returnable.balance += items.get(returnable.product_id, 0) for returnable in returnables: dcons = items.get(returnable.returned_product_id, 0) returnable.balance -= dcons if dcons and returnable.balance < -returnable.max_return: limit_reached.append(returnable.returned_product) if limit_reached: raise forms.ValidationError( _( "This user have reached his recording limit " "for the following products : %s" ) % ", ".join([str(p) for p in limit_reached]) ) BasketForm = forms.formset_factory( BasketItemForm, formset=BaseBasketForm, absolute_max=None, min_num=1 ) class InvoiceCallForm(forms.Form): def __init__(self, *args, month: date, **kwargs): super().__init__(*args, **kwargs) self.month = month month_start = datetime(month.year, month.month, month.day, tzinfo=timezone.utc) self.clubs = list( Club.objects.filter( Exists( Selling.objects.filter( club=OuterRef("pk"), date__gte=month_start, date__lte=month_start + relativedelta(months=1), ) ) ).annotate( validated_invoice=Exists( InvoiceCall.objects.filter( club=OuterRef("pk"), month=month, is_validated=True ) ) ) ) self.fields = { str(club.id): forms.BooleanField( required=False, initial=club.validated_invoice ) for club in self.clubs } def save(self): invoice_calls = [ InvoiceCall( month=self.month, club_id=club.id, is_validated=self.cleaned_data.get(str(club.id), False), ) for club in self.clubs ] InvoiceCall.objects.bulk_create( invoice_calls, update_conflicts=True, update_fields=["is_validated"], unique_fields=["month", "club"], )