mirror of
https://github.com/ae-utbm/sith.git
synced 2025-10-09 08:14:39 +00:00
use ModelFormSet instead of FormSet for scheduled actions
This commit is contained in:
129
counter/forms.py
129
counter/forms.py
@@ -1,9 +1,10 @@
|
||||
import json
|
||||
import math
|
||||
import uuid
|
||||
|
||||
from django import forms
|
||||
from django.db.models import Q
|
||||
from django.forms.formsets import DELETION_FIELD_NAME, BaseFormSet
|
||||
from django.forms import BaseModelFormSet
|
||||
from django.utils.timezone import now
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from django_celery_beat.models import ClockedSchedule
|
||||
@@ -12,7 +13,6 @@ from phonenumber_field.widgets import RegionalPhoneNumberWidget
|
||||
from club.widgets.ajax_select import AutoCompleteSelectClub
|
||||
from core.models import User
|
||||
from core.views.forms import (
|
||||
FutureDateTimeField,
|
||||
NFCTextInput,
|
||||
SelectDate,
|
||||
SelectDateTime,
|
||||
@@ -169,7 +169,7 @@ class CounterEditForm(forms.ModelForm):
|
||||
}
|
||||
|
||||
|
||||
class ScheduledProductActionForm(forms.Form):
|
||||
class ScheduledProductActionForm(forms.ModelForm):
|
||||
"""Form for automatic product archiving.
|
||||
|
||||
The `save` method will update or create tasks using celery-beat.
|
||||
@@ -178,14 +178,14 @@ class ScheduledProductActionForm(forms.Form):
|
||||
required_css_class = "required"
|
||||
prefix = "scheduled"
|
||||
|
||||
task = forms.fields_for_model(
|
||||
ScheduledProductAction,
|
||||
["task"],
|
||||
widgets={"task": forms.RadioSelect(choices=get_product_actions)},
|
||||
labels={"task": _("Action")},
|
||||
help_texts={"task": ""},
|
||||
)["task"]
|
||||
trigger_at = FutureDateTimeField(
|
||||
class Meta:
|
||||
model = ScheduledProductAction
|
||||
fields = ["task"]
|
||||
widgets = {"task": forms.RadioSelect(choices=get_product_actions)}
|
||||
labels = {"task": _("Action")}
|
||||
help_texts = {"task": ""}
|
||||
|
||||
trigger_at = forms.DateTimeField(
|
||||
label=_("Date and time of action"), widget=SelectDateTime
|
||||
)
|
||||
counters = forms.ModelMultipleChoiceField(
|
||||
@@ -199,71 +199,58 @@ class ScheduledProductActionForm(forms.Form):
|
||||
def __init__(self, *args, product: Product, **kwargs):
|
||||
self.product = product
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def save(self):
|
||||
if not self.changed_data:
|
||||
return
|
||||
task = self.cleaned_data["task"]
|
||||
instance = ScheduledProductAction.objects.filter(
|
||||
product=self.product, task=task
|
||||
).first()
|
||||
if not instance:
|
||||
instance = ScheduledProductAction(
|
||||
product=self.product,
|
||||
clocked=ClockedSchedule.objects.create(
|
||||
clocked_time=self.cleaned_data["trigger_at"]
|
||||
),
|
||||
if not self.instance._state.adding:
|
||||
self.fields["trigger_at"].initial = self.instance.clocked.clocked_time
|
||||
self.fields["counters"].initial = json.loads(self.instance.kwargs).get(
|
||||
"counters"
|
||||
)
|
||||
elif "trigger_at" in self.changed_data:
|
||||
instance.clocked.clocked_time = self.cleaned_data["trigger_at"]
|
||||
instance.clocked.save()
|
||||
instance.task = task
|
||||
task_kwargs = {"product_id": self.product.id}
|
||||
if task == "counter.tasks.change_counters":
|
||||
task_kwargs["counters"] = [c.id for c in self.cleaned_data["counters"]]
|
||||
instance.kwargs = json.dumps(task_kwargs)
|
||||
instance.name = f"{self.cleaned_data['task']} - {self.product}"
|
||||
instance.enabled = True
|
||||
instance.save()
|
||||
return instance
|
||||
|
||||
def delete(self):
|
||||
instance = ScheduledProductAction.objects.get(
|
||||
product=self.product, task=self.cleaned_data["task"]
|
||||
)
|
||||
# if the clocked object linked to the task has no other task,
|
||||
# delete it and let the task be deleted in cascade,
|
||||
# else delete only the task.
|
||||
if instance.clocked.periodictask_set.count() > 1:
|
||||
return instance.clocked.delete()
|
||||
return instance.delete()
|
||||
|
||||
|
||||
class BaseScheduledProductActionFormSet(BaseFormSet):
|
||||
def __init__(self, *args, product: Product, **kwargs):
|
||||
initial = [
|
||||
{
|
||||
"task": action.task,
|
||||
"trigger_at": action.clocked.clocked_time,
|
||||
"counters": json.loads(action.kwargs).get("counters"),
|
||||
}
|
||||
for action in product.scheduled_actions.filter(
|
||||
enabled=True, clocked__clocked_time__gt=now()
|
||||
).order_by("clocked__clocked_time")
|
||||
]
|
||||
kwargs["initial"] = initial
|
||||
kwargs["form_kwargs"] = {"product": product}
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def save(self):
|
||||
for form in self.forms:
|
||||
if form.cleaned_data.get(DELETION_FIELD_NAME, False):
|
||||
form.delete()
|
||||
def clean(self):
|
||||
if not self.changed_data:
|
||||
return super().clean()
|
||||
if "trigger_at" in self.changed_data:
|
||||
if not self.instance.clocked_id:
|
||||
self.instance.clocked = ClockedSchedule(
|
||||
clocked_time=self.cleaned_data["trigger_at"]
|
||||
)
|
||||
else:
|
||||
form.save()
|
||||
self.instance.clocked.clocked_time = self.cleaned_data["trigger_at"]
|
||||
self.instance.clocked.save()
|
||||
task_kwargs = {"product_id": self.product.id}
|
||||
if (
|
||||
self.cleaned_data["task"] == "counter.tasks.change_counters"
|
||||
and "counters" in self.changed_data
|
||||
):
|
||||
task_kwargs["counters"] = [c.id for c in self.cleaned_data["counters"]]
|
||||
self.instance.product = self.product
|
||||
self.instance.kwargs = json.dumps(task_kwargs)
|
||||
self.instance.name = (
|
||||
f"{self.cleaned_data['task']} - {self.product} - {uuid.uuid4()}"
|
||||
)
|
||||
return super().clean()
|
||||
|
||||
|
||||
ScheduledProductActionFormSet = forms.formset_factory(
|
||||
class BaseScheduledProductActionFormSet(BaseModelFormSet):
|
||||
def __init__(self, *args, product: Product, **kwargs):
|
||||
queryset = (
|
||||
product.scheduled_actions.filter(
|
||||
enabled=True, clocked__clocked_time__gt=now()
|
||||
)
|
||||
.order_by("clocked__clocked_time")
|
||||
.select_related("clocked")
|
||||
)
|
||||
form_kwargs = {"product": product}
|
||||
super().__init__(*args, queryset=queryset, form_kwargs=form_kwargs, **kwargs)
|
||||
|
||||
def delete_existing(self, obj: ScheduledProductAction, commit: bool = True): # noqa FBT001
|
||||
clocked = obj.clocked
|
||||
super().delete_existing(obj, commit=commit)
|
||||
if commit:
|
||||
clocked.delete()
|
||||
|
||||
|
||||
ScheduledProductActionFormSet = forms.modelformset_factory(
|
||||
ScheduledProductAction,
|
||||
ScheduledProductActionForm,
|
||||
formset=BaseScheduledProductActionFormSet,
|
||||
absolute_max=None,
|
||||
|
@@ -481,7 +481,7 @@ class CounterQuerySet(models.QuerySet):
|
||||
return self.annotate(has_annotated_barman=Exists(subquery))
|
||||
|
||||
def annotate_is_open(self) -> Self:
|
||||
"""Annotate tue queryset with the `is_open` field.
|
||||
"""Annotate the queryset with the `is_open` field.
|
||||
|
||||
For each counter, if `is_open=True`, then the counter is currently opened.
|
||||
Else the counter is closed.
|
||||
@@ -1382,6 +1382,6 @@ class ScheduledProductAction(PeriodicTask):
|
||||
self._meta.get_field("task").choices = get_product_actions()
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
def full_clean(self, *args, **kwargs):
|
||||
self.one_off = True # A product action should occur one time only
|
||||
return super().save(*args, **kwargs)
|
||||
return super().full_clean(*args, **kwargs)
|
||||
|
@@ -24,32 +24,33 @@
|
||||
</p>
|
||||
|
||||
{{ form.action_formset.management_form }}
|
||||
{% for action_form in form.action_formset.forms %}
|
||||
{%- for action_form in form.action_formset.forms -%}
|
||||
<fieldset x-data="{action: '{{ action_form.task.initial }}'}">
|
||||
{{ action_form.non_field_errors() }}
|
||||
<div class="row gap-2x margin-bottom">
|
||||
<div>
|
||||
{{ action_form.task.errors }}
|
||||
{{ action_form.task.label_tag() }}
|
||||
{{ action_form.task|add_attr("x-model=action") }}
|
||||
</div>
|
||||
<div>
|
||||
{{ action_form.trigger_at.as_field_group() }}
|
||||
</div>
|
||||
<div>{{ action_form.trigger_at.as_field_group() }}</div>
|
||||
</div>
|
||||
<div x-show="action==='counter.tasks.change_counters'" class="margin-bottom">
|
||||
{{ action_form.counters.as_field_group() }}
|
||||
</div>
|
||||
{% if action_form.DELETE %}
|
||||
{%- if action_form.DELETE -%}
|
||||
<div class="row gap">
|
||||
{{ action_form.DELETE.as_field_group() }}
|
||||
</div>
|
||||
{% endif %}
|
||||
{%- endif -%}
|
||||
{%- for field in action_form.hidden_fields() -%}
|
||||
{{ field }}
|
||||
{%- endfor -%}
|
||||
</fieldset>
|
||||
{% if not loop.last %}
|
||||
{%- if not loop.last -%}
|
||||
<hr class="margin-bottom">
|
||||
{% endif %}
|
||||
|
||||
{% endfor %}
|
||||
{%- endif -%}
|
||||
{%- endfor -%}
|
||||
<p><input type="submit" value="{% trans %}Save{% endtrans %}" /></p>
|
||||
</form>
|
||||
{% endblock %}
|
@@ -8,7 +8,6 @@ from django.urls import reverse
|
||||
from django.utils.timezone import now
|
||||
from django_celery_beat.models import ClockedSchedule
|
||||
from model_bakery import baker
|
||||
from pytest_django.asserts import assertNumQueries
|
||||
|
||||
from core.models import Group, User
|
||||
from counter.baker_recipes import counter_recipe, product_recipe
|
||||
@@ -78,64 +77,20 @@ class TestProductActionForm:
|
||||
{"product_id": product.id, "counters": [counter.id]}
|
||||
)
|
||||
|
||||
def test_task_reused_when_editing(self):
|
||||
"""Check that when product tasks are edited, no new entry is created in db"""
|
||||
product = product_recipe.make()
|
||||
old_clocked = now() + timedelta(minutes=10)
|
||||
old_instance = baker.make(
|
||||
ScheduledProductAction,
|
||||
product=product,
|
||||
clocked=baker.make(ClockedSchedule, clocked_time=old_clocked),
|
||||
task="counter.tasks.archive_product",
|
||||
)
|
||||
new_clocked = old_clocked + timedelta(minutes=2)
|
||||
form = ScheduledProductActionForm(
|
||||
product=product,
|
||||
data={
|
||||
"scheduled-task": "counter.tasks.archive_product",
|
||||
"scheduled-trigger_at": new_clocked,
|
||||
},
|
||||
)
|
||||
assert form.is_valid()
|
||||
new_instance = form.save()
|
||||
assert new_instance.id == old_instance.id
|
||||
assert new_instance.clocked.id == old_instance.clocked.id
|
||||
assert new_instance.clocked.clocked_time == new_clocked
|
||||
|
||||
def test_no_changed_data(self):
|
||||
"""Test that when no data changes, the save method does no db query"""
|
||||
product = product_recipe.make()
|
||||
trigger_at = now() + timedelta(minutes=2)
|
||||
baker.make(
|
||||
ScheduledProductAction,
|
||||
product=product,
|
||||
clocked=baker.make(ClockedSchedule, clocked_time=trigger_at),
|
||||
task="counter.tasks.archive_product",
|
||||
)
|
||||
form = ScheduledProductActionForm(product=product, data={})
|
||||
with assertNumQueries(0):
|
||||
form.is_valid()
|
||||
form.save()
|
||||
|
||||
def test_delete(self):
|
||||
product = product_recipe.make()
|
||||
trigger_at = now() + timedelta(minutes=2)
|
||||
clocked = baker.make(ClockedSchedule, clocked_time=now() + timedelta(minutes=2))
|
||||
task = baker.make(
|
||||
ScheduledProductAction,
|
||||
product=product,
|
||||
clocked=baker.make(ClockedSchedule, clocked_time=trigger_at),
|
||||
one_off=True,
|
||||
clocked=clocked,
|
||||
task="counter.tasks.archive_product",
|
||||
)
|
||||
form = ScheduledProductActionForm(
|
||||
product=product,
|
||||
data={
|
||||
"scheduled-task": "counter.tasks.archive_product",
|
||||
"scheduled-trigger_at": trigger_at,
|
||||
},
|
||||
)
|
||||
assert form.is_valid()
|
||||
form.delete()
|
||||
formset = ScheduledProductActionFormSet(product=product)
|
||||
formset.delete_existing(task)
|
||||
assert not ScheduledProductAction.objects.filter(id=task.id).exists()
|
||||
assert not ClockedSchedule.objects.filter(id=clocked.id).exists()
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
|
Reference in New Issue
Block a user