import json import math import uuid from datetime import date, datetime, timezone from dateutil.relativedelta import relativedelta from django import forms from django.core.exceptions import ValidationError from django.core.validators import MaxValueValidator from django.db.models import Exists, OuterRef, Q from django.forms import BaseModelFormSet from django.utils.timezone import now from django.utils.translation import gettext_lazy as _ from django_celery_beat.models import ClockedSchedule from phonenumber_field.widgets import RegionalPhoneNumberWidget from club.models import Club from club.widgets.ajax_select import AutoCompleteSelectClub from core.models import User, UserQuerySet from core.views.forms import ( FutureDateTimeField, NFCTextInput, SelectDate, SelectDateTime, ) from core.views.widgets.ajax_select import ( AutoCompleteSelect, AutoCompleteSelectMultiple, AutoCompleteSelectMultipleGroup, AutoCompleteSelectMultipleUser, AutoCompleteSelectUser, ) from counter.models import ( BillingInfo, Counter, CounterSellers, Customer, Eticket, InvoiceCall, Product, ProductFormula, Refilling, ReturnableProduct, ScheduledProductAction, Selling, StudentCard, get_product_actions, ) from counter.widgets.ajax_select import ( AutoCompleteSelectMultipleCounter, AutoCompleteSelectMultipleProduct, AutoCompleteSelectProduct, ) class BillingInfoForm(forms.ModelForm): class Meta: model = BillingInfo fields = [ "first_name", "last_name", "address_1", "address_2", "zip_code", "city", "country", "phone_number", ] widgets = { "phone_number": RegionalPhoneNumberWidget, } class StudentCardForm(forms.ModelForm): """Form for adding student cards""" error_css_class = "error" class Meta: model = StudentCard fields = ["uid"] widgets = {"uid": NFCTextInput} def clean(self): cleaned_data = super().clean() uid = cleaned_data.get("uid", None) if not uid or not StudentCard.is_valid(uid): raise forms.ValidationError(_("This UID is invalid"), code="invalid") return cleaned_data class GetUserForm(forms.Form): """The Form class aims at providing a valid user_id field in its cleaned data, in order to pass it to some view, reverse function, or any other use. The Form implements a nice JS widget allowing the user to type a customer account id, or search the database with some nickname, first name, or last name (TODO) """ code = forms.CharField( label="Code", max_length=StudentCard.UID_SIZE, required=False, widget=NFCTextInput, ) id = forms.CharField( label=_("Select user"), help_text=None, widget=AutoCompleteSelectUser, required=False, ) def as_p(self): self.fields["code"].widget.attrs["autofocus"] = True return super().as_p() def clean(self): cleaned_data = super().clean() customer = None if cleaned_data["code"] != "": if len(cleaned_data["code"]) == StudentCard.UID_SIZE: card = ( StudentCard.objects.filter(uid=cleaned_data["code"]) .select_related("customer") .first() ) if card is not None: customer = card.customer if customer is None: customer = Customer.objects.filter( account_id__iexact=cleaned_data["code"] ).first() elif cleaned_data["id"]: customer = Customer.objects.filter(user=cleaned_data["id"]).first() if customer is None or not customer.can_buy: raise forms.ValidationError(_("User not found")) cleaned_data["user_id"] = customer.user.id cleaned_data["user"] = customer.user return cleaned_data class RefillForm(forms.ModelForm): allowed_refilling_methods = [ Refilling.PaymentMethod.CASH, Refilling.PaymentMethod.CARD, ] error_css_class = "error" required_css_class = "required" amount = forms.FloatField( min_value=0, widget=forms.NumberInput(attrs={"class": "focus"}) ) class Meta: model = Refilling fields = ["amount", "payment_method"] widgets = {"payment_method": forms.RadioSelect} def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.fields["payment_method"].choices = ( method for method in self.fields["payment_method"].choices if method[0] in self.allowed_refilling_methods ) if self.fields["payment_method"].initial not in self.allowed_refilling_methods: self.fields["payment_method"].initial = self.allowed_refilling_methods[0] class CounterEditForm(forms.ModelForm): class Meta: model = Counter fields = ["products"] sellers_regular = forms.ModelMultipleChoiceField( label=_("Regular barmen"), help_text=_( "Barmen having regular permanences " "or frequently giving a hand throughout the semester." ), queryset=User.objects.all(), widget=AutoCompleteSelectMultipleUser, required=False, ) sellers_temporary = forms.ModelMultipleChoiceField( label=_("Temporary barmen"), help_text=_( "Barmen who will be there only for a limited period (e.g. for one evening)" ), queryset=User.objects.all(), widget=AutoCompleteSelectMultipleUser, required=False, ) field_order = ["sellers_regular", "sellers_temporary", "products"] def __init__(self, *args, user: User, instance: Counter, **kwargs): super().__init__(*args, instance=instance, **kwargs) # if the user is an admin, he will have access to all products, # else only to active products owned by the counter's club # or already on the counter if user.has_perm("counter.change_counter"): self.fields["products"].widget = AutoCompleteSelectMultipleProduct() else: # updating the queryset of the field also updates the choices of # the widget, so it's important to set the queryset after the widget self.fields["products"].widget = AutoCompleteSelectMultiple() self.fields["products"].queryset = Product.objects.filter( Q(club_id=instance.club_id) | Q(counters=instance), archived=False ).distinct() self.fields["products"].help_text = _( "If you want to add a product that is not owned by " "your club to this counter, you should ask an admin." ) self.fields["sellers_regular"].initial = self.instance.sellers.filter( countersellers__is_regular=True ).all() self.fields["sellers_temporary"].initial = self.instance.sellers.filter( countersellers__is_regular=False ).all() def clean(self): regular: UserQuerySet = self.cleaned_data["sellers_regular"] temporary: UserQuerySet = self.cleaned_data["sellers_temporary"] duplicates = list(regular.intersection(temporary)) if duplicates: raise ValidationError( _( "A user cannot be a regular and a temporary barman " "at the same time, " "but the following users have been defined as both : %(users)s" ) % {"users": ", ".join([u.get_display_name() for u in duplicates])} ) return self.cleaned_data def save_sellers(self): sellers = [] for users, is_regular in ( (self.cleaned_data["sellers_regular"], True), (self.cleaned_data["sellers_temporary"], False), ): sellers.extend( [ CounterSellers(counter=self.instance, user=u, is_regular=is_regular) for u in users ] ) # start by deleting removed CounterSellers objects user_ids = [seller.user.id for seller in sellers] CounterSellers.objects.filter( ~Q(user_id__in=user_ids), counter=self.instance ).delete() # then create or update the new barmen CounterSellers.objects.bulk_create( sellers, update_conflicts=True, update_fields=["is_regular"], unique_fields=["user", "counter"], ) def save(self, commit=True): # noqa: FBT002 self.instance = super().save(commit=commit) if commit and any( key in self.changed_data for key in ("sellers_regular", "sellers_temporary") ): self.save_sellers() return self.instance class ScheduledProductActionForm(forms.ModelForm): """Form for automatic product archiving. The `save` method will update or create tasks using celery-beat. """ required_css_class = "required" prefix = "scheduled" class Meta: model = ScheduledProductAction fields = ["task"] widgets = {"task": forms.RadioSelect(choices=get_product_actions)} labels = {"task": _("Action")} help_texts = {"task": ""} trigger_at = FutureDateTimeField( label=_("Date and time of action"), widget=SelectDateTime ) counters = forms.ModelMultipleChoiceField( label=_("New counters"), help_text=_("The selected counters will replace the current ones"), required=False, widget=AutoCompleteSelectMultipleCounter, queryset=Counter.objects.all(), ) def __init__(self, *args, product: Product, **kwargs): self.product = product super().__init__(*args, **kwargs) if not self.instance._state.adding: self.fields["trigger_at"].initial = self.instance.clocked.clocked_time self.fields["counters"].initial = json.loads(self.instance.kwargs).get( "counters" ) def clean(self): if not self.changed_data or "trigger_at" in self.errors: return super().clean() if "trigger_at" in self.changed_data: if not self.instance.clocked_id: self.instance.clocked = ClockedSchedule( clocked_time=self.cleaned_data["trigger_at"] ) else: self.instance.clocked.clocked_time = self.cleaned_data["trigger_at"] self.instance.clocked.save() task_kwargs = {"product_id": self.product.id} if ( self.cleaned_data["task"] == "counter.tasks.change_counters" and "counters" in self.changed_data ): task_kwargs["counters"] = [c.id for c in self.cleaned_data["counters"]] self.instance.product = self.product self.instance.kwargs = json.dumps(task_kwargs) self.instance.name = ( f"{self.cleaned_data['task']} - {self.product} - {uuid.uuid4()}" ) return super().clean() def set_product(self, product: Product): """Set the product to which this form's instance is linked. When this form is linked to a ProductForm in the case of a product's creation, the product doesn't exist yet, so saving this form as is will result in having `{"product_id": null}` in the action kwargs. For the creation to be useful, it may be needed to inject the newly created product into this form, before saving the latter. """ self.product = product kwargs = json.loads(self.instance.kwargs) | {"product_id": self.product.id} self.instance.kwargs = json.dumps(kwargs) class BaseScheduledProductActionFormSet(BaseModelFormSet): def __init__(self, *args, product: Product, **kwargs): if product.id: queryset = ( product.scheduled_actions.filter( enabled=True, clocked__clocked_time__gt=now() ) .order_by("clocked__clocked_time") .select_related("clocked") ) else: queryset = ScheduledProductAction.objects.none() form_kwargs = {"product": product} super().__init__(*args, queryset=queryset, form_kwargs=form_kwargs, **kwargs) def delete_existing(self, obj: ScheduledProductAction, commit: bool = True): # noqa FBT001 clocked = obj.clocked super().delete_existing(obj, commit=commit) if commit: clocked.delete() ScheduledProductActionFormSet = forms.modelformset_factory( ScheduledProductAction, ScheduledProductActionForm, formset=BaseScheduledProductActionFormSet, absolute_max=None, can_delete=True, can_delete_extra=False, extra=2, ) class ProductForm(forms.ModelForm): error_css_class = "error" required_css_class = "required" class Meta: model = Product fields = [ "name", "description", "product_type", "code", "buying_groups", "purchase_price", "selling_price", "special_selling_price", "icon", "club", "limit_age", "tray", "archived", ] help_texts = { "description": _( "Describe the product. If it's an event's click, " "give some insights about it, like the date (including the year)." ) } widgets = { "product_type": AutoCompleteSelect, "buying_groups": AutoCompleteSelectMultipleGroup, "club": AutoCompleteSelectClub, } counters = forms.ModelMultipleChoiceField( label=_("Counters"), required=False, widget=AutoCompleteSelectMultipleCounter, queryset=Counter.objects.all(), ) def __init__(self, *args, instance=None, **kwargs): super().__init__(*args, instance=instance, **kwargs) if self.instance.id: self.fields["counters"].initial = self.instance.counters.all() if hasattr(self.instance, "formula"): self.formula_init(self.instance.formula) self.action_formset = ScheduledProductActionFormSet( *args, product=self.instance, **kwargs ) def formula_init(self, formula: ProductFormula): """Part of the form initialisation specific to formula products.""" self.fields["selling_price"].help_text = _( "This product is a formula. " "Its price cannot be greater than the price " "of the products constituting it, which is %(price)s €" ) % {"price": formula.max_selling_price} self.fields["special_selling_price"].help_text = _( "This product is a formula. " "Its special price cannot be greater than the price " "of the products constituting it, which is %(price)s €" ) % {"price": formula.max_special_selling_price} for key, price in ( ("selling_price", formula.max_selling_price), ("special_selling_price", formula.max_special_selling_price), ): self.fields[key].widget.attrs["max"] = price self.fields[key].validators.append(MaxValueValidator(price)) def is_valid(self): return super().is_valid() and self.action_formset.is_valid() def save(self, *args, **kwargs) -> Product: product = super().save(*args, **kwargs) product.counters.set(self.cleaned_data["counters"]) for form in self.action_formset: # if it's a creation, the product given in the formset # wasn't a persisted instance. # So if we tried to persist the scheduled actions in the current state, # they would be linked to no product, thus be completely useless # To make it work, we have to replace # the initial product with a persisted one form.set_product(product) self.action_formset.save() return product class ProductFormulaForm(forms.ModelForm): class Meta: model = ProductFormula fields = ["products", "result"] widgets = { "products": AutoCompleteSelectMultipleProduct, "result": AutoCompleteSelectProduct, } def clean(self): cleaned_data = super().clean() if cleaned_data["result"] in cleaned_data["products"]: self.add_error( None, _( "The same product cannot be at the same time " "the result and a part of the formula." ), ) prices = [p.selling_price for p in cleaned_data["products"]] special_prices = [p.special_selling_price for p in cleaned_data["products"]] selling_price = cleaned_data["result"].selling_price special_selling_price = cleaned_data["result"].special_selling_price if selling_price > sum(prices) or special_selling_price > sum(special_prices): self.add_error( "result", _( "The result cannot be more expensive " "than the total of the other products." ), ) return cleaned_data class ReturnableProductForm(forms.ModelForm): class Meta: model = ReturnableProduct fields = ["product", "returned_product", "max_return"] widgets = { "product": AutoCompleteSelectProduct, "returned_product": AutoCompleteSelectProduct, } def save(self, commit: bool = True) -> ReturnableProduct: # noqa FBT instance: ReturnableProduct = super().save(commit=commit) if commit: # This is expensive, but we don't have a task queue to offload it. # Hopefully, creations and updates of returnable products # occur very rarely instance.update_balances() return instance class CashSummaryFormBase(forms.Form): begin_date = forms.DateTimeField( label=_("Begin date"), widget=SelectDateTime, required=False ) end_date = forms.DateTimeField( label=_("End date"), widget=SelectDateTime, required=False ) class EticketForm(forms.ModelForm): class Meta: model = Eticket fields = ["product", "banner", "event_title", "event_date"] widgets = { "product": AutoCompleteSelectProduct, "event_date": SelectDate, } class CloseCustomerAccountForm(forms.Form): user = forms.ModelChoiceField( label=_("Refound this account"), required=True, widget=AutoCompleteSelectUser, queryset=User.objects.all(), ) class BasketProductForm(forms.Form): quantity = forms.IntegerField(min_value=1, required=True) id = forms.IntegerField(min_value=0, required=True) def __init__( self, customer: Customer, counter: Counter, allowed_products: dict[int, Product], *args, **kwargs, ): self.customer = customer # Used by formset self.counter = counter # Used by formset self.allowed_products = allowed_products super().__init__(*args, **kwargs) def clean_id(self): data = self.cleaned_data["id"] # We store self.product so we can use it later on the formset validation # And also in the global clean self.product = self.allowed_products.get(data, None) if self.product is None: raise forms.ValidationError( _("The selected product isn't available for this user") ) return data def clean(self): cleaned_data = super().clean() if len(self.errors) > 0: return # Compute prices cleaned_data["bonus_quantity"] = 0 if self.product.tray: cleaned_data["bonus_quantity"] = math.floor( cleaned_data["quantity"] / Product.QUANTITY_FOR_TRAY_PRICE ) cleaned_data["total_price"] = self.product.price * ( cleaned_data["quantity"] - cleaned_data["bonus_quantity"] ) return cleaned_data class BaseBasketForm(forms.BaseFormSet): def clean(self): self.forms = [form for form in self.forms if form.cleaned_data != {}] if len(self.forms) == 0: return self._check_forms_have_errors() self._check_product_are_unique() self._check_recorded_products(self[0].customer) self._check_enough_money(self[0].counter, self[0].customer) def _check_forms_have_errors(self): if any(len(form.errors) > 0 for form in self): raise forms.ValidationError(_("Submitted basket is invalid")) def _check_product_are_unique(self): product_ids = {form.cleaned_data["id"] for form in self.forms} if len(product_ids) != len(self.forms): raise forms.ValidationError(_("Duplicated product entries.")) def _check_enough_money(self, counter: Counter, customer: Customer): self.total_price = sum([data["total_price"] for data in self.cleaned_data]) if self.total_price > customer.amount: raise forms.ValidationError(_("Not enough money")) def _check_recorded_products(self, customer: Customer): """Check for, among other things, ecocups and pitchers""" items = { form.cleaned_data["id"]: form.cleaned_data["quantity"] for form in self.forms } ids = list(items.keys()) returnables = list( ReturnableProduct.objects.filter( Q(product_id__in=ids) | Q(returned_product_id__in=ids) ).annotate_balance_for(customer) ) limit_reached = [] for returnable in returnables: returnable.balance += items.get(returnable.product_id, 0) for returnable in returnables: dcons = items.get(returnable.returned_product_id, 0) returnable.balance -= dcons if dcons and returnable.balance < -returnable.max_return: limit_reached.append(returnable.returned_product) if limit_reached: raise forms.ValidationError( _( "This user have reached his recording limit " "for the following products : %s" ) % ", ".join([str(p) for p in limit_reached]) ) BasketForm = forms.formset_factory( BasketProductForm, formset=BaseBasketForm, absolute_max=None, min_num=1 ) class InvoiceCallForm(forms.Form): def __init__(self, *args, month: date, **kwargs): super().__init__(*args, **kwargs) self.month = month month_start = datetime(month.year, month.month, month.day, tzinfo=timezone.utc) self.clubs = list( Club.objects.filter( Exists( Selling.objects.filter( club=OuterRef("pk"), date__gte=month_start, date__lte=month_start + relativedelta(months=1), ) ) ).annotate( validated_invoice=Exists( InvoiceCall.objects.filter( club=OuterRef("pk"), month=month, is_validated=True ) ) ) ) self.fields = { str(club.id): forms.BooleanField( required=False, initial=club.validated_invoice ) for club in self.clubs } def save(self): invoice_calls = [ InvoiceCall( month=self.month, club_id=club.id, is_validated=self.cleaned_data.get(str(club.id), False), ) for club in self.clubs ] InvoiceCall.objects.bulk_create( invoice_calls, update_conflicts=True, update_fields=["is_validated"], unique_fields=["month", "club"], )