feat: request.barmen

This commit is contained in:
imperosol
2026-05-30 12:20:17 +02:00
parent 074ebcb011
commit 222b0d16a7
11 changed files with 158 additions and 88 deletions
+64
View File
@@ -0,0 +1,64 @@
from typing import TYPE_CHECKING, Callable
from django.db.models import Exists, OuterRef
from django.http import HttpRequest, HttpResponse
from django.utils.functional import SimpleLazyObject, empty
from core.models import User
from counter.models import Permanency
if TYPE_CHECKING:
from django.contrib.sessions.backends.base import SessionBase
SESSION_BARMEN_KEY = "barmen_ids"
def get_cached_barmen(request: HttpRequest) -> set[User]:
if not hasattr(request, "_cached_barmen"):
session: SessionBase = request.session
barmen_ids = session.get(SESSION_BARMEN_KEY, [])
if barmen_ids:
request._cached_barmen = set(
User.objects.filter(
Exists(Permanency.objects.filter(user=OuterRef("pk"), end=None)),
id__in=barmen_ids,
)
)
else:
request._cached_barmen = set()
return request._cached_barmen
class BarmenMiddleware:
"""Inject barmen logged in the current session.
In a similar fashion as `request.user`, `request.barmen` contains
users that are barmen in the current session, and ONLY them ;
if a user is logged as a barman on another session,
it will not be in `request.barmen`.
Notes:
In case of ended permanence, users will be automatically
removed from `request.barmen`.
However, in case of newly started permanence, this middleware
cannot add new barmen in the session data, so that operation
must be explicitly done in the barman login view.
"""
def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]):
self.get_response = get_response
def __call__(self, request: HttpRequest):
request.barmen = SimpleLazyObject(lambda: get_cached_barmen(request))
response = self.get_response(request)
if request.barmen._wrapped is not empty and {
b.id for b in request.barmen
} != set(request.session.get(SESSION_BARMEN_KEY, [])):
# update the session data only if `session.barmen`
# has been accessed and modified.
request.session[SESSION_BARMEN_KEY] = [b.id for b in request.barmen]
return response
+7 -14
View File
@@ -20,41 +20,34 @@
# Place - Suite 330, Boston, MA 02111-1307, USA.
#
#
import random
from django.db.models.signals import pre_delete
from django.dispatch import receiver
from core.middleware import get_signal_request
from core.models import OperationLog
from counter.models import Counter, Refilling, Selling
from counter.models import Refilling, Selling
def write_log(instance, operation_type):
def write_log(instance: Selling | Refilling, operation_type):
def get_user():
request = get_signal_request()
if not request:
return None
# Get a random barmen if deletion is from a counter
session = getattr(request, "session", {})
session_token = session.get("counter_token", None)
if session_token:
counter = Counter.objects.filter(token=session_token).first()
if counter and len(counter.barmen_list) > 0:
return counter.get_random_barman()
if request.barmen:
return random.choice(list(request.barmen))
# Get the current logged user if not from a counter
if request.user and not request.user.is_anonymous:
if request.user.is_authenticated:
return request.user
# Return None by default
return None
OperationLog(
label=str(instance),
operator=get_user(),
operation_type=operation_type,
label=str(instance), operator=get_user(), operation_type=operation_type
).save()
+45 -18
View File
@@ -21,6 +21,7 @@ from bs4 import BeautifulSoup
from dateutil.relativedelta import relativedelta
from django.conf import settings
from django.contrib.auth.models import Permission, make_password
from django.contrib.messages import DEFAULT_LEVELS, get_messages
from django.http import HttpResponse
from django.shortcuts import resolve_url
from django.test import Client, TestCase
@@ -38,6 +39,7 @@ from core.models import BanGroup, Group, User
from counter.baker_recipes import price_recipe, product_recipe, sale_recipe
from counter.models import (
Counter,
CounterSellers,
Customer,
Permanency,
ProductType,
@@ -67,10 +69,14 @@ class TestFullClickBase(TestCase):
cls.subscriber = subscriber_user.make()
cls.counter = baker.make(Counter, type="BAR")
cls.counter.sellers.add(cls.barmen, cls.board_admin)
cls.other_counter = baker.make(Counter, type="BAR")
cls.other_counter.sellers.add(cls.barmen)
CounterSellers.objects.bulk_create(
[
CounterSellers(counter=cls.counter, user=cls.barmen),
CounterSellers(counter=cls.counter, user=cls.board_admin),
CounterSellers(counter=cls.other_counter, user=cls.barmen),
]
)
cls.yet_another_counter = baker.make(Counter, type="BAR")
@@ -115,7 +121,10 @@ class TestRefilling(TestFullClickBase):
) -> HttpResponse:
used_client = client if client is not None else self.client
return used_client.post(
reverse("counter:refilling_create", kwargs={"customer_id": user.pk}),
reverse(
"counter:refilling_create",
kwargs={"customer_id": user.pk, "counter_id": self.counter.pk},
),
{"amount": str(amount), "payment_method": Refilling.PaymentMethod.CASH},
HTTP_REFERER=reverse(
"counter:click", kwargs={"counter_id": counter.id, "user_id": user.pk}
@@ -139,7 +148,10 @@ class TestRefilling(TestFullClickBase):
return self.client.post(
reverse(
"counter:refilling_create",
kwargs={"customer_id": self.customer.pk},
kwargs={
"customer_id": self.customer.pk,
"counter_id": self.counter.pk,
},
),
{"amount": "10", "payment_method": "CASH"},
)
@@ -443,9 +455,19 @@ class TestCounterClick(TestFullClickBase):
def test_click_not_connected(self):
force_refill_user(self.customer, 10)
# trying to click on a bar without being logged should result
# in a redirect to the counter page with an error message
res = self.submit_basket(self.customer, [BasketItem(self.snack.id, 2)])
assertRedirects(res, self.counter.get_absolute_url())
messages = list(get_messages(res.wsgi_request))
assert len(messages) == 1
assert messages[0].level == DEFAULT_LEVELS["ERROR"]
assert (
messages[0].message == "Vous ne pouvez pas cliquer des gens sur ce comptoir"
)
# trying to click on an office counter without permission should 403
res = self.submit_basket(
self.customer, [BasketItem(self.snack.id, 2)], counter=self.club_counter
)
@@ -738,6 +760,7 @@ class TestBarmanConnection(TestCase):
assert last_perm.counter == self.counter
assert last_perm.user == self.barman
assert last_perm.end is None
assert self.barman in response.wsgi_request.barmen
response = self.client.get(
self.detail_url, {"username": self.barman.username, "password": "plop"}
)
@@ -754,6 +777,7 @@ class TestBarmanConnection(TestCase):
)
assert "HX-Redirect" not in response.headers
assert not Permanency.objects.filter(user=not_barman).exists()
assert self.barman not in response.wsgi_request.barmen
response = self.client.get(self.detail_url)
assert response.context_data.get("barmen") == []
@@ -762,11 +786,11 @@ class TestBarmanConnection(TestCase):
@pytest.mark.django_db
def test_barman_timeout():
def test_barman_timeout(client: Client):
"""Test that barmen timeout is well managed."""
bar = baker.make(Counter, type="BAR")
user = baker.make(User)
bar.sellers.add(user)
CounterSellers.objects.create(counter=bar, user=user)
baker.make(Permanency, counter=bar, user=user, start=now())
qs = Counter.objects.annotate_is_open().filter(pk=bar.pk)
@@ -782,6 +806,8 @@ def test_barman_timeout():
bar = qs[0]
assert not bar.is_open
assert bar.barmen_list == []
res = client.get("")
assert res.wsgi_request.barmen == set()
class TestClubCounterClickAccess(TestCase):
@@ -831,14 +857,14 @@ class TestClubCounterClickAccess(TestCase):
def test_barman(self):
"""Sellers should be able to click on office counters"""
self.counter.sellers.add(self.user)
CounterSellers.objects.create(counter=self.counter, user=self.user)
self.client.force_login(self.user)
res = self.client.get(self.click_url)
assert res.status_code == 200
def test_both_barman_and_board_member(self):
"""If the user is barman and board member, he should be authorized as well."""
self.counter.sellers.add(self.user)
CounterSellers.objects.create(counter=self.counter, user=self.user)
baker.make(
Membership, club=self.counter.club, user=self.user, role=self.board_role
)
@@ -862,16 +888,17 @@ class TestCounterLogout:
reverse("counter:logout", kwargs={"counter_id": permanence.counter_id}),
data={"user_id": permanence.user_id},
)
assertRedirects(
res,
reverse(
"counter:details", kwargs={"counter_id": permanence.counter_id}
),
)
permanence.refresh_from_db()
assert permanence.end == now()
assertRedirects(
res,
reverse("counter:details", kwargs={"counter_id": permanence.counter_id}),
)
permanence.refresh_from_db()
assert permanence.end == permanence.activity
assert permanence.user not in res.wsgi_request.barmen
def test_logout_doesnt_change_old_permanences(self, client: Client):
# regression test for #1141
# https://github.com/ae-utbm/sith/pull/1141
perm_counter = baker.make(Counter, type="BAR")
permanence = baker.make(
Permanency,
@@ -892,6 +919,6 @@ class TestCounterLogout:
data={"user_id": permanence.user_id},
)
permanence.refresh_from_db()
assert permanence.end == now()
assert permanence.end == permanence.activity
old_permanence.refresh_from_db()
assert old_permanence.end == old_end
+1 -1
View File
@@ -67,7 +67,7 @@ urlpatterns = [
path("<int:counter_id>/", CounterMain.as_view(), name="details"),
path("<int:counter_id>/click/<int:user_id>/", CounterClick.as_view(), name="click"),
path(
"refill/<int:customer_id>/",
"<int:counter_id>/refill/<int:customer_id>/",
RefillingCreateView.as_view(),
name="refilling_create",
),
+3 -16
View File
@@ -3,8 +3,6 @@ from urllib.parse import urlparse
from django.http import HttpRequest
from django.urls import resolve
from counter.models import Counter
def is_logged_in_counter(request: HttpRequest) -> bool:
"""Check if the request is sent from a device logged to a counter.
@@ -20,24 +18,13 @@ def is_logged_in_counter(request: HttpRequest) -> bool:
or the request path belongs to the counter app
(eg. the barman went back to the main by missclick and go back
to the counter)
- The current session has a counter token associated with it.
- A counter with this token exists.
- The counter is open
- There are barmen logged in the current session
"""
referer_ok = (
"HTTP_REFERER" in request.META
and resolve(urlparse(request.META["HTTP_REFERER"]).path).app_name == "counter"
)
has_token = (
(referer_ok or request.resolver_match.app_name == "counter")
and "counter_token" in request.session
and request.session["counter_token"]
)
if not has_token:
if not referer_ok and request.resolver_match.app_name != "counter":
return False
return (
Counter.objects.annotate_is_open()
.filter(token=request.session["counter_token"], is_open=True)
.exists()
)
return bool(request.barmen)
+15 -10
View File
@@ -12,6 +12,7 @@
# OR WITHIN THE LOCAL FILE "LICENSE"
#
#
import random
from collections import defaultdict
from django.contrib import messages
@@ -42,7 +43,7 @@ def get_operator(request: HttpRequest, counter: Counter, customer: Customer) ->
return request.user
if counter.customer_is_barman(customer):
return customer.user
return counter.get_random_barman()
return random.choice(list(request.barmen))
class CounterClick(
@@ -74,7 +75,7 @@ class CounterClick(
return kwargs
def dispatch(self, request, *args, **kwargs):
self.customer = get_object_or_404(Customer, user__id=self.kwargs["user_id"])
self.customer = get_object_or_404(Customer, user_id=self.kwargs["user_id"])
obj: Counter = self.get_object()
if not self.customer.can_buy or self.customer.user.is_banned_counter:
@@ -92,8 +93,8 @@ class CounterClick(
# or a seller of this counter.
raise PermissionDenied
if obj.type == "BAR" and (
not obj.is_open or request.session.get("counter_token", "") != obj.token
if obj.type == "BAR" and not (
request.barmen and request.barmen.issubset(set(obj.barmen_list))
):
messages.error(request, _("You cannot click users on this counter"))
return redirect(obj) # Redirect to counter
@@ -194,7 +195,7 @@ class CounterClick(
)
if self.object.can_refill():
res["refilling_fragment"] = RefillingCreateView.as_fragment()(
self.request, customer=self.customer
self.request, customer=self.customer, counter=self.object
)
return res
@@ -232,11 +233,13 @@ class RefillingCreateView(FragmentMixin, FormView):
if not is_logged_in_counter(request):
raise PermissionDenied
self.counter: Counter = get_object_or_404(
Counter, token=request.session["counter_token"]
)
self.counter: Counter = get_object_or_404(Counter, id=self.kwargs["counter_id"])
if not self.counter.can_refill():
if not (
request.barmen
and request.barmen.issubset(self.counter.barmen_list)
and self.counter.can_refill()
):
raise PermissionDenied
self.operator = get_operator(request, self.counter, self.customer)
@@ -245,6 +248,7 @@ class RefillingCreateView(FragmentMixin, FormView):
def render_fragment(self, request, **kwargs) -> SafeString:
self.customer = kwargs.pop("customer")
self.counter = kwargs.pop("counter")
return super().render_fragment(request, **kwargs)
def form_valid(self, form):
@@ -259,7 +263,8 @@ class RefillingCreateView(FragmentMixin, FormView):
def get_context_data(self, **kwargs):
kwargs = super().get_context_data(**kwargs)
kwargs["action"] = reverse(
"counter:refilling_create", kwargs={"customer_id": self.customer.pk}
"counter:refilling_create",
kwargs={"customer_id": self.customer.pk, "counter_id": self.counter.pk},
)
return kwargs
+8 -8
View File
@@ -15,12 +15,12 @@
from datetime import timedelta
from django.conf import settings
from django.db.models import F
from django.http import HttpRequest, HttpResponseRedirect
from django.shortcuts import redirect
from django.urls import reverse
from django.utils import timezone
from django.utils.safestring import SafeString
from django.utils.timezone import now
from django.views.decorators.http import require_POST
from django.views.generic import DetailView
from django.views.generic.detail import SingleObjectMixin
@@ -49,10 +49,9 @@ class CounterLoginFragment(FragmentMixin, SingleObjectMixin, FormView):
return super().get_form_kwargs() | {"counter": self.object}
def form_valid(self, form: CounterLoginForm):
self.object.permanencies.create(user=form.get_user(), start=timezone.now())
if not self.object.barmen_list:
self.object.gen_token()
self.request.session["counter_token"] = self.object.token
user = form.get_user()
self.object.permanencies.create(user=user, start=timezone.now())
self.request.barmen.add(user)
self.success_url = reverse(
"counter:details", kwargs={"counter_id": self.object.id}
)
@@ -92,8 +91,8 @@ class CounterMain(
def dispatch(self, request, *args, **kwargs):
self.object: Counter = self.get_object()
if self.object.type != "BAR" and self.request.method.upper() == "POST":
# barmen have to log in (thus do a POST request)
# only if it is a bar.
# barmen have to log in (thus do a POST request) only if it is a bar,
# so a POST on a non-bar counter makes no sense
return self.http_method_not_allowed(request, *args, **kwargs)
if self.object.type == "BAR":
self.object.update_activity()
@@ -115,7 +114,8 @@ class CounterMain(
kwargs["can_click"] = (
self.object.type == "BAR"
and self.object.is_open
and self.request.session.get("counter_token", "") == self.object.token
and self.request.barmen
and self.request.barmen.issubset(set(self.object.barmen_list))
) or (
self.object.type == "OFFICE"
and (