diff --git a/counter/forms.py b/counter/forms.py index 4c5a3b1d..3b1df846 100644 --- a/counter/forms.py +++ b/counter/forms.py @@ -1,9 +1,10 @@ import json import math +import uuid from django import forms from django.db.models import Q -from django.forms.formsets import DELETION_FIELD_NAME, BaseFormSet +from django.forms import BaseModelFormSet from django.utils.timezone import now from django.utils.translation import gettext_lazy as _ from django_celery_beat.models import ClockedSchedule @@ -12,7 +13,6 @@ from phonenumber_field.widgets import RegionalPhoneNumberWidget from club.widgets.ajax_select import AutoCompleteSelectClub from core.models import User from core.views.forms import ( - FutureDateTimeField, NFCTextInput, SelectDate, SelectDateTime, @@ -169,7 +169,7 @@ class CounterEditForm(forms.ModelForm): } -class ScheduledProductActionForm(forms.Form): +class ScheduledProductActionForm(forms.ModelForm): """Form for automatic product archiving. The `save` method will update or create tasks using celery-beat. @@ -178,14 +178,14 @@ class ScheduledProductActionForm(forms.Form): required_css_class = "required" prefix = "scheduled" - task = forms.fields_for_model( - ScheduledProductAction, - ["task"], - widgets={"task": forms.RadioSelect(choices=get_product_actions)}, - labels={"task": _("Action")}, - help_texts={"task": ""}, - )["task"] - trigger_at = FutureDateTimeField( + class Meta: + model = ScheduledProductAction + fields = ["task"] + widgets = {"task": forms.RadioSelect(choices=get_product_actions)} + labels = {"task": _("Action")} + help_texts = {"task": ""} + + trigger_at = forms.DateTimeField( label=_("Date and time of action"), widget=SelectDateTime ) counters = forms.ModelMultipleChoiceField( @@ -199,71 +199,58 @@ class ScheduledProductActionForm(forms.Form): def __init__(self, *args, product: Product, **kwargs): self.product = product super().__init__(*args, **kwargs) - - def save(self): - if not self.changed_data: - return - task = self.cleaned_data["task"] - instance = ScheduledProductAction.objects.filter( - product=self.product, task=task - ).first() - if not instance: - instance = ScheduledProductAction( - product=self.product, - clocked=ClockedSchedule.objects.create( - clocked_time=self.cleaned_data["trigger_at"] - ), + if not self.instance._state.adding: + self.fields["trigger_at"].initial = self.instance.clocked.clocked_time + self.fields["counters"].initial = json.loads(self.instance.kwargs).get( + "counters" ) - elif "trigger_at" in self.changed_data: - instance.clocked.clocked_time = self.cleaned_data["trigger_at"] - instance.clocked.save() - instance.task = task - task_kwargs = {"product_id": self.product.id} - if task == "counter.tasks.change_counters": - task_kwargs["counters"] = [c.id for c in self.cleaned_data["counters"]] - instance.kwargs = json.dumps(task_kwargs) - instance.name = f"{self.cleaned_data['task']} - {self.product}" - instance.enabled = True - instance.save() - return instance - def delete(self): - instance = ScheduledProductAction.objects.get( - product=self.product, task=self.cleaned_data["task"] - ) - # if the clocked object linked to the task has no other task, - # delete it and let the task be deleted in cascade, - # else delete only the task. - if instance.clocked.periodictask_set.count() > 1: - return instance.clocked.delete() - return instance.delete() - - -class BaseScheduledProductActionFormSet(BaseFormSet): - def __init__(self, *args, product: Product, **kwargs): - initial = [ - { - "task": action.task, - "trigger_at": action.clocked.clocked_time, - "counters": json.loads(action.kwargs).get("counters"), - } - for action in product.scheduled_actions.filter( - enabled=True, clocked__clocked_time__gt=now() - ).order_by("clocked__clocked_time") - ] - kwargs["initial"] = initial - kwargs["form_kwargs"] = {"product": product} - super().__init__(*args, **kwargs) - - def save(self): - for form in self.forms: - if form.cleaned_data.get(DELETION_FIELD_NAME, False): - form.delete() + def clean(self): + if not self.changed_data: + return super().clean() + if "trigger_at" in self.changed_data: + if not self.instance.clocked_id: + self.instance.clocked = ClockedSchedule( + clocked_time=self.cleaned_data["trigger_at"] + ) else: - form.save() + self.instance.clocked.clocked_time = self.cleaned_data["trigger_at"] + self.instance.clocked.save() + task_kwargs = {"product_id": self.product.id} + if ( + self.cleaned_data["task"] == "counter.tasks.change_counters" + and "counters" in self.changed_data + ): + task_kwargs["counters"] = [c.id for c in self.cleaned_data["counters"]] + self.instance.product = self.product + self.instance.kwargs = json.dumps(task_kwargs) + self.instance.name = ( + f"{self.cleaned_data['task']} - {self.product} - {uuid.uuid4()}" + ) + return super().clean() -ScheduledProductActionFormSet = forms.formset_factory( +class BaseScheduledProductActionFormSet(BaseModelFormSet): + def __init__(self, *args, product: Product, **kwargs): + queryset = ( + product.scheduled_actions.filter( + enabled=True, clocked__clocked_time__gt=now() + ) + .order_by("clocked__clocked_time") + .select_related("clocked") + ) + form_kwargs = {"product": product} + super().__init__(*args, queryset=queryset, form_kwargs=form_kwargs, **kwargs) + + def delete_existing(self, obj: ScheduledProductAction, commit: bool = True): # noqa FBT001 + clocked = obj.clocked + super().delete_existing(obj, commit=commit) + if commit: + clocked.delete() + + +ScheduledProductActionFormSet = forms.modelformset_factory( + ScheduledProductAction, ScheduledProductActionForm, formset=BaseScheduledProductActionFormSet, absolute_max=None, diff --git a/counter/models.py b/counter/models.py index 58fc8564..d7bc3b4e 100644 --- a/counter/models.py +++ b/counter/models.py @@ -481,7 +481,7 @@ class CounterQuerySet(models.QuerySet): return self.annotate(has_annotated_barman=Exists(subquery)) def annotate_is_open(self) -> Self: - """Annotate tue queryset with the `is_open` field. + """Annotate the queryset with the `is_open` field. For each counter, if `is_open=True`, then the counter is currently opened. Else the counter is closed. @@ -1382,6 +1382,6 @@ class ScheduledProductAction(PeriodicTask): self._meta.get_field("task").choices = get_product_actions() super().__init__(*args, **kwargs) - def save(self, *args, **kwargs): + def full_clean(self, *args, **kwargs): self.one_off = True # A product action should occur one time only - return super().save(*args, **kwargs) + return super().full_clean(*args, **kwargs) diff --git a/counter/templates/counter/product_form.jinja b/counter/templates/counter/product_form.jinja index 53cb23f9..afc9ba2a 100644 --- a/counter/templates/counter/product_form.jinja +++ b/counter/templates/counter/product_form.jinja @@ -24,32 +24,33 @@

{{ form.action_formset.management_form }} - {% for action_form in form.action_formset.forms %} + {%- for action_form in form.action_formset.forms -%}
+ {{ action_form.non_field_errors() }}
{{ action_form.task.errors }} {{ action_form.task.label_tag() }} {{ action_form.task|add_attr("x-model=action") }}
-
- {{ action_form.trigger_at.as_field_group() }} -
+
{{ action_form.trigger_at.as_field_group() }}
{{ action_form.counters.as_field_group() }}
- {% if action_form.DELETE %} + {%- if action_form.DELETE -%}
{{ action_form.DELETE.as_field_group() }}
- {% endif %} + {%- endif -%} + {%- for field in action_form.hidden_fields() -%} + {{ field }} + {%- endfor -%}
- {% if not loop.last %} + {%- if not loop.last -%}
- {% endif %} - - {% endfor %} + {%- endif -%} + {%- endfor -%}

{% endblock %} \ No newline at end of file diff --git a/counter/tests/test_auto_actions.py b/counter/tests/test_auto_actions.py index 65975315..bc7f6937 100644 --- a/counter/tests/test_auto_actions.py +++ b/counter/tests/test_auto_actions.py @@ -8,7 +8,6 @@ from django.urls import reverse from django.utils.timezone import now from django_celery_beat.models import ClockedSchedule from model_bakery import baker -from pytest_django.asserts import assertNumQueries from core.models import Group, User from counter.baker_recipes import counter_recipe, product_recipe @@ -78,64 +77,20 @@ class TestProductActionForm: {"product_id": product.id, "counters": [counter.id]} ) - def test_task_reused_when_editing(self): - """Check that when product tasks are edited, no new entry is created in db""" - product = product_recipe.make() - old_clocked = now() + timedelta(minutes=10) - old_instance = baker.make( - ScheduledProductAction, - product=product, - clocked=baker.make(ClockedSchedule, clocked_time=old_clocked), - task="counter.tasks.archive_product", - ) - new_clocked = old_clocked + timedelta(minutes=2) - form = ScheduledProductActionForm( - product=product, - data={ - "scheduled-task": "counter.tasks.archive_product", - "scheduled-trigger_at": new_clocked, - }, - ) - assert form.is_valid() - new_instance = form.save() - assert new_instance.id == old_instance.id - assert new_instance.clocked.id == old_instance.clocked.id - assert new_instance.clocked.clocked_time == new_clocked - - def test_no_changed_data(self): - """Test that when no data changes, the save method does no db query""" - product = product_recipe.make() - trigger_at = now() + timedelta(minutes=2) - baker.make( - ScheduledProductAction, - product=product, - clocked=baker.make(ClockedSchedule, clocked_time=trigger_at), - task="counter.tasks.archive_product", - ) - form = ScheduledProductActionForm(product=product, data={}) - with assertNumQueries(0): - form.is_valid() - form.save() - def test_delete(self): product = product_recipe.make() - trigger_at = now() + timedelta(minutes=2) + clocked = baker.make(ClockedSchedule, clocked_time=now() + timedelta(minutes=2)) task = baker.make( ScheduledProductAction, product=product, - clocked=baker.make(ClockedSchedule, clocked_time=trigger_at), + one_off=True, + clocked=clocked, task="counter.tasks.archive_product", ) - form = ScheduledProductActionForm( - product=product, - data={ - "scheduled-task": "counter.tasks.archive_product", - "scheduled-trigger_at": trigger_at, - }, - ) - assert form.is_valid() - form.delete() + formset = ScheduledProductActionFormSet(product=product) + formset.delete_existing(task) assert not ScheduledProductAction.objects.filter(id=task.id).exists() + assert not ClockedSchedule.objects.filter(id=clocked.id).exists() @pytest.mark.django_db