From 5b91fe214578246f922d1b4bd467c30bf81051fd Mon Sep 17 00:00:00 2001
From: imperosol
Date: Tue, 23 Sep 2025 15:51:02 +0200
Subject: [PATCH] use ModelFormSet instead of FormSet for scheduled actions
---
counter/forms.py | 129 +++++++++----------
counter/models.py | 6 +-
counter/templates/counter/product_form.jinja | 21 +--
counter/tests/test_auto_actions.py | 57 +-------
4 files changed, 78 insertions(+), 135 deletions(-)
diff --git a/counter/forms.py b/counter/forms.py
index 4c5a3b1d..3b1df846 100644
--- a/counter/forms.py
+++ b/counter/forms.py
@@ -1,9 +1,10 @@
import json
import math
+import uuid
from django import forms
from django.db.models import Q
-from django.forms.formsets import DELETION_FIELD_NAME, BaseFormSet
+from django.forms import BaseModelFormSet
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from django_celery_beat.models import ClockedSchedule
@@ -12,7 +13,6 @@ from phonenumber_field.widgets import RegionalPhoneNumberWidget
from club.widgets.ajax_select import AutoCompleteSelectClub
from core.models import User
from core.views.forms import (
- FutureDateTimeField,
NFCTextInput,
SelectDate,
SelectDateTime,
@@ -169,7 +169,7 @@ class CounterEditForm(forms.ModelForm):
}
-class ScheduledProductActionForm(forms.Form):
+class ScheduledProductActionForm(forms.ModelForm):
"""Form for automatic product archiving.
The `save` method will update or create tasks using celery-beat.
@@ -178,14 +178,14 @@ class ScheduledProductActionForm(forms.Form):
required_css_class = "required"
prefix = "scheduled"
- task = forms.fields_for_model(
- ScheduledProductAction,
- ["task"],
- widgets={"task": forms.RadioSelect(choices=get_product_actions)},
- labels={"task": _("Action")},
- help_texts={"task": ""},
- )["task"]
- trigger_at = FutureDateTimeField(
+ class Meta:
+ model = ScheduledProductAction
+ fields = ["task"]
+ widgets = {"task": forms.RadioSelect(choices=get_product_actions)}
+ labels = {"task": _("Action")}
+ help_texts = {"task": ""}
+
+ trigger_at = forms.DateTimeField(
label=_("Date and time of action"), widget=SelectDateTime
)
counters = forms.ModelMultipleChoiceField(
@@ -199,71 +199,58 @@ class ScheduledProductActionForm(forms.Form):
def __init__(self, *args, product: Product, **kwargs):
self.product = product
super().__init__(*args, **kwargs)
-
- def save(self):
- if not self.changed_data:
- return
- task = self.cleaned_data["task"]
- instance = ScheduledProductAction.objects.filter(
- product=self.product, task=task
- ).first()
- if not instance:
- instance = ScheduledProductAction(
- product=self.product,
- clocked=ClockedSchedule.objects.create(
- clocked_time=self.cleaned_data["trigger_at"]
- ),
+ if not self.instance._state.adding:
+ self.fields["trigger_at"].initial = self.instance.clocked.clocked_time
+ self.fields["counters"].initial = json.loads(self.instance.kwargs).get(
+ "counters"
)
- elif "trigger_at" in self.changed_data:
- instance.clocked.clocked_time = self.cleaned_data["trigger_at"]
- instance.clocked.save()
- instance.task = task
- task_kwargs = {"product_id": self.product.id}
- if task == "counter.tasks.change_counters":
- task_kwargs["counters"] = [c.id for c in self.cleaned_data["counters"]]
- instance.kwargs = json.dumps(task_kwargs)
- instance.name = f"{self.cleaned_data['task']} - {self.product}"
- instance.enabled = True
- instance.save()
- return instance
- def delete(self):
- instance = ScheduledProductAction.objects.get(
- product=self.product, task=self.cleaned_data["task"]
- )
- # if the clocked object linked to the task has no other task,
- # delete it and let the task be deleted in cascade,
- # else delete only the task.
- if instance.clocked.periodictask_set.count() > 1:
- return instance.clocked.delete()
- return instance.delete()
-
-
-class BaseScheduledProductActionFormSet(BaseFormSet):
- def __init__(self, *args, product: Product, **kwargs):
- initial = [
- {
- "task": action.task,
- "trigger_at": action.clocked.clocked_time,
- "counters": json.loads(action.kwargs).get("counters"),
- }
- for action in product.scheduled_actions.filter(
- enabled=True, clocked__clocked_time__gt=now()
- ).order_by("clocked__clocked_time")
- ]
- kwargs["initial"] = initial
- kwargs["form_kwargs"] = {"product": product}
- super().__init__(*args, **kwargs)
-
- def save(self):
- for form in self.forms:
- if form.cleaned_data.get(DELETION_FIELD_NAME, False):
- form.delete()
+ def clean(self):
+ if not self.changed_data:
+ return super().clean()
+ if "trigger_at" in self.changed_data:
+ if not self.instance.clocked_id:
+ self.instance.clocked = ClockedSchedule(
+ clocked_time=self.cleaned_data["trigger_at"]
+ )
else:
- form.save()
+ self.instance.clocked.clocked_time = self.cleaned_data["trigger_at"]
+ self.instance.clocked.save()
+ task_kwargs = {"product_id": self.product.id}
+ if (
+ self.cleaned_data["task"] == "counter.tasks.change_counters"
+ and "counters" in self.changed_data
+ ):
+ task_kwargs["counters"] = [c.id for c in self.cleaned_data["counters"]]
+ self.instance.product = self.product
+ self.instance.kwargs = json.dumps(task_kwargs)
+ self.instance.name = (
+ f"{self.cleaned_data['task']} - {self.product} - {uuid.uuid4()}"
+ )
+ return super().clean()
-ScheduledProductActionFormSet = forms.formset_factory(
+class BaseScheduledProductActionFormSet(BaseModelFormSet):
+ def __init__(self, *args, product: Product, **kwargs):
+ queryset = (
+ product.scheduled_actions.filter(
+ enabled=True, clocked__clocked_time__gt=now()
+ )
+ .order_by("clocked__clocked_time")
+ .select_related("clocked")
+ )
+ form_kwargs = {"product": product}
+ super().__init__(*args, queryset=queryset, form_kwargs=form_kwargs, **kwargs)
+
+ def delete_existing(self, obj: ScheduledProductAction, commit: bool = True): # noqa FBT001
+ clocked = obj.clocked
+ super().delete_existing(obj, commit=commit)
+ if commit:
+ clocked.delete()
+
+
+ScheduledProductActionFormSet = forms.modelformset_factory(
+ ScheduledProductAction,
ScheduledProductActionForm,
formset=BaseScheduledProductActionFormSet,
absolute_max=None,
diff --git a/counter/models.py b/counter/models.py
index 58fc8564..d7bc3b4e 100644
--- a/counter/models.py
+++ b/counter/models.py
@@ -481,7 +481,7 @@ class CounterQuerySet(models.QuerySet):
return self.annotate(has_annotated_barman=Exists(subquery))
def annotate_is_open(self) -> Self:
- """Annotate tue queryset with the `is_open` field.
+ """Annotate the queryset with the `is_open` field.
For each counter, if `is_open=True`, then the counter is currently opened.
Else the counter is closed.
@@ -1382,6 +1382,6 @@ class ScheduledProductAction(PeriodicTask):
self._meta.get_field("task").choices = get_product_actions()
super().__init__(*args, **kwargs)
- def save(self, *args, **kwargs):
+ def full_clean(self, *args, **kwargs):
self.one_off = True # A product action should occur one time only
- return super().save(*args, **kwargs)
+ return super().full_clean(*args, **kwargs)
diff --git a/counter/templates/counter/product_form.jinja b/counter/templates/counter/product_form.jinja
index 53cb23f9..afc9ba2a 100644
--- a/counter/templates/counter/product_form.jinja
+++ b/counter/templates/counter/product_form.jinja
@@ -24,32 +24,33 @@
{{ form.action_formset.management_form }}
- {% for action_form in form.action_formset.forms %}
+ {%- for action_form in form.action_formset.forms -%}
- {% if not loop.last %}
+ {%- if not loop.last -%}
- {% endif %}
-
- {% endfor %}
+ {%- endif -%}
+ {%- endfor -%}
{% endblock %}
\ No newline at end of file
diff --git a/counter/tests/test_auto_actions.py b/counter/tests/test_auto_actions.py
index 65975315..bc7f6937 100644
--- a/counter/tests/test_auto_actions.py
+++ b/counter/tests/test_auto_actions.py
@@ -8,7 +8,6 @@ from django.urls import reverse
from django.utils.timezone import now
from django_celery_beat.models import ClockedSchedule
from model_bakery import baker
-from pytest_django.asserts import assertNumQueries
from core.models import Group, User
from counter.baker_recipes import counter_recipe, product_recipe
@@ -78,64 +77,20 @@ class TestProductActionForm:
{"product_id": product.id, "counters": [counter.id]}
)
- def test_task_reused_when_editing(self):
- """Check that when product tasks are edited, no new entry is created in db"""
- product = product_recipe.make()
- old_clocked = now() + timedelta(minutes=10)
- old_instance = baker.make(
- ScheduledProductAction,
- product=product,
- clocked=baker.make(ClockedSchedule, clocked_time=old_clocked),
- task="counter.tasks.archive_product",
- )
- new_clocked = old_clocked + timedelta(minutes=2)
- form = ScheduledProductActionForm(
- product=product,
- data={
- "scheduled-task": "counter.tasks.archive_product",
- "scheduled-trigger_at": new_clocked,
- },
- )
- assert form.is_valid()
- new_instance = form.save()
- assert new_instance.id == old_instance.id
- assert new_instance.clocked.id == old_instance.clocked.id
- assert new_instance.clocked.clocked_time == new_clocked
-
- def test_no_changed_data(self):
- """Test that when no data changes, the save method does no db query"""
- product = product_recipe.make()
- trigger_at = now() + timedelta(minutes=2)
- baker.make(
- ScheduledProductAction,
- product=product,
- clocked=baker.make(ClockedSchedule, clocked_time=trigger_at),
- task="counter.tasks.archive_product",
- )
- form = ScheduledProductActionForm(product=product, data={})
- with assertNumQueries(0):
- form.is_valid()
- form.save()
-
def test_delete(self):
product = product_recipe.make()
- trigger_at = now() + timedelta(minutes=2)
+ clocked = baker.make(ClockedSchedule, clocked_time=now() + timedelta(minutes=2))
task = baker.make(
ScheduledProductAction,
product=product,
- clocked=baker.make(ClockedSchedule, clocked_time=trigger_at),
+ one_off=True,
+ clocked=clocked,
task="counter.tasks.archive_product",
)
- form = ScheduledProductActionForm(
- product=product,
- data={
- "scheduled-task": "counter.tasks.archive_product",
- "scheduled-trigger_at": trigger_at,
- },
- )
- assert form.is_valid()
- form.delete()
+ formset = ScheduledProductActionFormSet(product=product)
+ formset.delete_existing(task)
assert not ScheduledProductAction.objects.filter(id=task.id).exists()
+ assert not ClockedSchedule.objects.filter(id=clocked.id).exists()
@pytest.mark.django_db