custom queryset method to bulk update customer balance

This commit is contained in:
imperosol 2024-11-05 19:40:59 +01:00
parent 97ea1763f1
commit b091fee035
6 changed files with 595 additions and 519 deletions

View File

@ -1,14 +1,12 @@
import random import random
from datetime import date, timedelta from datetime import date, timedelta
from datetime import timezone as tz from datetime import timezone as tz
from decimal import Decimal
from typing import Iterator from typing import Iterator
from dateutil.relativedelta import relativedelta from dateutil.relativedelta import relativedelta
from django.conf import settings from django.conf import settings
from django.core.management.base import BaseCommand from django.core.management.base import BaseCommand
from django.db.models import Count, Exists, F, Min, OuterRef, Subquery, Sum from django.db.models import Count, Exists, Min, OuterRef, Subquery
from django.db.models.functions import Coalesce
from django.utils.timezone import localdate, make_aware, now from django.utils.timezone import localdate, make_aware, now
from faker import Faker from faker import Faker
@ -268,24 +266,6 @@ class Command(BaseCommand):
Product.buying_groups.through.objects.bulk_create(buying_groups) Product.buying_groups.through.objects.bulk_create(buying_groups)
Counter.products.through.objects.bulk_create(selling_places) Counter.products.through.objects.bulk_create(selling_places)
@staticmethod
def _update_balances():
customers = Customer.objects.annotate(
money_in=Sum(F("refillings__amount"), default=0),
money_out=Coalesce(
Subquery(
Selling.objects.filter(customer=OuterRef("pk"))
.values("customer_id") # group by customer
.annotate(res=Sum(F("unit_price") * F("quantity"), default=0))
.values("res")
),
Decimal("0"),
),
).annotate(real_balance=F("money_in") - F("money_out"))
for c in customers:
c.amount = c.real_balance
Customer.objects.bulk_update(customers, fields=["amount"])
def create_sales(self, sellers: list[User]): def create_sales(self, sellers: list[User]):
customers = list( customers = list(
Customer.objects.annotate( Customer.objects.annotate(
@ -355,7 +335,7 @@ class Command(BaseCommand):
sales.extend(this_customer_sales) sales.extend(this_customer_sales)
Refilling.objects.bulk_create(reloads) Refilling.objects.bulk_create(reloads)
Selling.objects.bulk_create(sales) Selling.objects.bulk_create(sales)
self._update_balances() Customer.objects.update_amount()
def create_permanences(self, sellers: list[User]): def create_permanences(self, sellers: list[User]):
counters = list( counters = list(

18
counter/baker_recipes.py Normal file
View File

@ -0,0 +1,18 @@
from model_bakery.recipe import Recipe, foreign_key
from club.models import Club
from core.models import User
from counter.models import Counter, Product, Refilling, Selling
counter_recipe = Recipe(Counter)
product_recipe = Recipe(Product, club=foreign_key(Recipe(Club)))
sale_recipe = Recipe(
Selling,
product=foreign_key(product_recipe),
counter=foreign_key(counter_recipe),
seller=foreign_key(Recipe(User)),
club=foreign_key(Recipe(Club)),
)
refill_recipe = Recipe(
Refilling, counter=foreign_key(counter_recipe), operator=foreign_key(Recipe(User))
)

View File

@ -20,14 +20,15 @@ import random
import string import string
from datetime import date, datetime, timedelta from datetime import date, datetime, timedelta
from datetime import timezone as tz from datetime import timezone as tz
from decimal import Decimal
from typing import Self, Tuple from typing import Self, Tuple
from dict2xml import dict2xml from dict2xml import dict2xml
from django.conf import settings from django.conf import settings
from django.core.validators import MinLengthValidator from django.core.validators import MinLengthValidator
from django.db import models from django.db import models
from django.db.models import Exists, F, OuterRef, Q, QuerySet, Sum, Value from django.db.models import Exists, F, OuterRef, Q, QuerySet, Subquery, Sum, Value
from django.db.models.functions import Concat, Length from django.db.models.functions import Coalesce, Concat, Length
from django.forms import ValidationError from django.forms import ValidationError
from django.urls import reverse from django.urls import reverse
from django.utils import timezone from django.utils import timezone
@ -45,6 +46,39 @@ from sith.settings import SITH_COUNTER_OFFICES, SITH_MAIN_CLUB
from subscription.models import Subscription from subscription.models import Subscription
class CustomerQuerySet(models.QuerySet):
def update_amount(self) -> int:
"""Update the amount of all customers selected by this queryset.
The result is given as the sum of all refills minus the sum of all purchases.
Returns:
The number of updated rows.
Warnings:
The execution time of this query grows really quickly.
When updating 500 customers, it may take around a second.
If you try to update all customers at once, the execution time
goes up to tens of seconds.
Use this either on a small subset of the `Customer` table,
or execute it inside an independent task
(like a Celery task or a management command).
"""
money_in = Subquery(
Refilling.objects.filter(customer=OuterRef("pk"))
.values("customer_id") # group by customer
.annotate(res=Sum(F("amount"), default=0))
.values("res")
)
money_out = Subquery(
Selling.objects.filter(customer=OuterRef("pk"))
.values("customer_id")
.annotate(res=Sum(F("unit_price") * F("quantity"), default=0))
.values("res")
)
return self.update(amount=Coalesce(money_in - money_out, Decimal("0")))
class Customer(models.Model): class Customer(models.Model):
"""Customer data of a User. """Customer data of a User.
@ -57,6 +91,8 @@ class Customer(models.Model):
amount = CurrencyField(_("amount"), default=0) amount = CurrencyField(_("amount"), default=0)
recorded_products = models.IntegerField(_("recorded product"), default=0) recorded_products = models.IntegerField(_("recorded product"), default=0)
objects = CustomerQuerySet.as_manager()
class Meta: class Meta:
verbose_name = _("customer") verbose_name = _("customer")
verbose_name_plural = _("customers") verbose_name_plural = _("customers")
@ -141,18 +177,6 @@ class Customer(models.Model):
account = cls.objects.create(user=user, account_id=account_id) account = cls.objects.create(user=user, account_id=account_id)
return account, True return account, True
def recompute_amount(self):
refillings = self.refillings.aggregate(sum=Sum(F("amount")))["sum"]
self.amount = refillings if refillings is not None else 0
purchases = (
self.buyings.filter(payment_method="SITH_ACCOUNT")
.annotate(amount=F("quantity") * F("unit_price"))
.aggregate(sum=Sum(F("amount")))
)["sum"]
if purchases is not None:
self.amount -= purchases
self.save()
def get_full_url(self): def get_full_url(self):
return f"https://{settings.SITH_URL}{self.get_absolute_url()}" return f"https://{settings.SITH_URL}{self.get_absolute_url()}"

View File

@ -12,15 +12,13 @@
# OR WITHIN THE LOCAL FILE "LICENSE" # OR WITHIN THE LOCAL FILE "LICENSE"
# #
# #
import json
import re import re
import string
from datetime import timedelta from datetime import timedelta
import pytest import pytest
from django.conf import settings from django.conf import settings
from django.core.cache import cache from django.core.cache import cache
from django.test import Client, TestCase from django.test import TestCase
from django.urls import reverse from django.urls import reverse
from django.utils import timezone from django.utils import timezone
from django.utils.timezone import now from django.utils.timezone import now
@ -31,7 +29,6 @@ from club.models import Club, Membership
from core.baker_recipes import subscriber_user from core.baker_recipes import subscriber_user
from core.models import User from core.models import User
from counter.models import ( from counter.models import (
BillingInfo,
Counter, Counter,
Customer, Customer,
Permanency, Permanency,
@ -313,149 +310,6 @@ class TestCounterStats(TestCase):
] ]
@pytest.mark.django_db
class TestBillingInfo:
@pytest.fixture
def payload(self):
return {
"first_name": "Subscribed",
"last_name": "User",
"address_1": "3, rue de Troyes",
"zip_code": "34301",
"city": "Sète",
"country": "FR",
"phone_number": "0612345678",
}
def test_edit_infos(self, client: Client, payload: dict):
user = subscriber_user.make()
baker.make(BillingInfo, customer=user.customer)
client.force_login(user)
response = client.put(
reverse("api:put_billing_info", args=[user.id]),
json.dumps(payload),
content_type="application/json",
)
user.refresh_from_db()
infos = BillingInfo.objects.get(customer__user=user)
assert response.status_code == 200
assert hasattr(user.customer, "billing_infos")
assert infos.customer == user.customer
for key, val in payload.items():
assert getattr(infos, key) == val
@pytest.mark.parametrize(
"user_maker", [subscriber_user.make, lambda: baker.make(User)]
)
@pytest.mark.django_db
def test_create_infos(self, client: Client, user_maker, payload):
user = user_maker()
client.force_login(user)
assert not BillingInfo.objects.filter(customer__user=user).exists()
response = client.put(
reverse("api:put_billing_info", args=[user.id]),
json.dumps(payload),
content_type="application/json",
)
assert response.status_code == 200
user.refresh_from_db()
assert hasattr(user, "customer")
infos = BillingInfo.objects.get(customer__user=user)
assert hasattr(user.customer, "billing_infos")
assert infos.customer == user.customer
for key, val in payload.items():
assert getattr(infos, key) == val
def test_invalid_data(self, client: Client, payload: dict[str, str]):
user = subscriber_user.make()
client.force_login(user)
# address_1, zip_code and country are missing
del payload["city"]
response = client.put(
reverse("api:put_billing_info", args=[user.id]),
json.dumps(payload),
content_type="application/json",
)
assert response.status_code == 422
user.customer.refresh_from_db()
assert not hasattr(user.customer, "billing_infos")
@pytest.mark.parametrize(
("operator_maker", "expected_code"),
[
(subscriber_user.make, 403),
(lambda: baker.make(User), 403),
(lambda: baker.make(User, is_superuser=True), 200),
],
)
def test_edit_other_user(
self, client: Client, operator_maker, expected_code: int, payload: dict
):
user = subscriber_user.make()
client.force_login(operator_maker())
baker.make(BillingInfo, customer=user.customer)
response = client.put(
reverse("api:put_billing_info", args=[user.id]),
json.dumps(payload),
content_type="application/json",
)
assert response.status_code == expected_code
@pytest.mark.parametrize(
"phone_number",
["+33612345678", "0612345678", "06 12 34 56 78", "06-12-34-56-78"],
)
def test_phone_number_format(
self, client: Client, payload: dict, phone_number: str
):
"""Test that various formats of phone numbers are accepted."""
user = subscriber_user.make()
client.force_login(user)
payload["phone_number"] = phone_number
response = client.put(
reverse("api:put_billing_info", args=[user.id]),
json.dumps(payload),
content_type="application/json",
)
assert response.status_code == 200
infos = BillingInfo.objects.get(customer__user=user)
assert infos.phone_number == "0612345678"
assert infos.phone_number.country_code == 33
def test_foreign_phone_number(self, client: Client, payload: dict):
"""Test that a foreign phone number is accepted."""
user = subscriber_user.make()
client.force_login(user)
payload["phone_number"] = "+49612345678"
response = client.put(
reverse("api:put_billing_info", args=[user.id]),
json.dumps(payload),
content_type="application/json",
)
assert response.status_code == 200
infos = BillingInfo.objects.get(customer__user=user)
assert infos.phone_number.as_national == "06123 45678"
assert infos.phone_number.country_code == 49
@pytest.mark.parametrize(
"phone_number", ["061234567a", "06 12 34 56", "061234567879", "azertyuiop"]
)
def test_invalid_phone_number(
self, client: Client, payload: dict, phone_number: str
):
"""Test that invalid phone numbers are rejected."""
user = subscriber_user.make()
client.force_login(user)
payload["phone_number"] = phone_number
response = client.put(
reverse("api:put_billing_info", args=[user.id]),
json.dumps(payload),
content_type="application/json",
)
assert response.status_code == 422
assert not BillingInfo.objects.filter(customer__user=user).exists()
class TestBarmanConnection(TestCase): class TestBarmanConnection(TestCase):
@classmethod @classmethod
def setUpTestData(cls): def setUpTestData(cls):
@ -529,341 +383,6 @@ def test_barman_timeout():
assert bar.barmen_list == [] assert bar.barmen_list == []
class TestStudentCard(TestCase):
"""Tests for adding and deleting Stundent Cards
Test that an user can be found with it's student card.
"""
@classmethod
def setUpTestData(cls):
cls.krophil = User.objects.get(username="krophil")
cls.sli = User.objects.get(username="sli")
cls.skia = User.objects.get(username="skia")
cls.root = User.objects.get(username="root")
cls.counter = Counter.objects.get(id=2)
def setUp(self):
# Auto login on counter
self.client.post(
reverse("counter:login", args=[self.counter.id]),
{"username": "krophil", "password": "plop"},
)
def test_search_user_with_student_card(self):
response = self.client.post(
reverse("counter:details", args=[self.counter.id]),
{"code": "9A89B82018B0A0"},
)
assert response.url == reverse(
"counter:click",
kwargs={"counter_id": self.counter.id, "user_id": self.sli.id},
)
def test_add_student_card_from_counter(self):
# Test card with mixed letters and numbers
response = self.client.post(
reverse(
"counter:click",
kwargs={"counter_id": self.counter.id, "user_id": self.sli.id},
),
{"student_card_uid": "8B90734A802A8F", "action": "add_student_card"},
)
self.assertContains(response, text="8B90734A802A8F")
# Test card with only numbers
response = self.client.post(
reverse(
"counter:click",
kwargs={"counter_id": self.counter.id, "user_id": self.sli.id},
),
{"student_card_uid": "04786547890123", "action": "add_student_card"},
)
self.assertContains(response, text="04786547890123")
# Test card with only letters
response = self.client.post(
reverse(
"counter:click",
kwargs={"counter_id": self.counter.id, "user_id": self.sli.id},
),
{"student_card_uid": "ABCAAAFAAFAAAB", "action": "add_student_card"},
)
self.assertContains(response, text="ABCAAAFAAFAAAB")
def test_add_student_card_from_counter_fail(self):
# UID too short
response = self.client.post(
reverse(
"counter:click",
kwargs={"counter_id": self.counter.id, "user_id": self.sli.id},
),
{"student_card_uid": "8B90734A802A8", "action": "add_student_card"},
)
self.assertContains(
response, text="Ce n'est pas un UID de carte étudiante valide"
)
# UID too long
response = self.client.post(
reverse(
"counter:click",
kwargs={"counter_id": self.counter.id, "user_id": self.sli.id},
),
{"student_card_uid": "8B90734A802A8FA", "action": "add_student_card"},
)
self.assertContains(
response, text="Ce n'est pas un UID de carte étudiante valide"
)
# Test with already existing card
response = self.client.post(
reverse(
"counter:click",
kwargs={"counter_id": self.counter.id, "user_id": self.sli.id},
),
{"student_card_uid": "9A89B82018B0A0", "action": "add_student_card"},
)
self.assertContains(
response, text="Ce n'est pas un UID de carte étudiante valide"
)
# Test with lowercase
response = self.client.post(
reverse(
"counter:click",
kwargs={"counter_id": self.counter.id, "user_id": self.sli.id},
),
{"student_card_uid": "8b90734a802a9f", "action": "add_student_card"},
)
self.assertContains(
response, text="Ce n'est pas un UID de carte étudiante valide"
)
# Test with white spaces
response = self.client.post(
reverse(
"counter:click",
kwargs={"counter_id": self.counter.id, "user_id": self.sli.id},
),
{"student_card_uid": " ", "action": "add_student_card"},
)
self.assertContains(
response, text="Ce n'est pas un UID de carte étudiante valide"
)
def test_delete_student_card_with_owner(self):
self.client.force_login(self.sli)
self.client.post(
reverse(
"counter:delete_student_card",
kwargs={
"customer_id": self.sli.customer.pk,
"card_id": self.sli.customer.student_cards.first().id,
},
)
)
assert not self.sli.customer.student_cards.exists()
def test_delete_student_card_with_board_member(self):
self.client.force_login(self.skia)
self.client.post(
reverse(
"counter:delete_student_card",
kwargs={
"customer_id": self.sli.customer.pk,
"card_id": self.sli.customer.student_cards.first().id,
},
)
)
assert not self.sli.customer.student_cards.exists()
def test_delete_student_card_with_root(self):
self.client.force_login(self.root)
self.client.post(
reverse(
"counter:delete_student_card",
kwargs={
"customer_id": self.sli.customer.pk,
"card_id": self.sli.customer.student_cards.first().id,
},
)
)
assert not self.sli.customer.student_cards.exists()
def test_delete_student_card_fail(self):
self.client.force_login(self.krophil)
response = self.client.post(
reverse(
"counter:delete_student_card",
kwargs={
"customer_id": self.sli.customer.pk,
"card_id": self.sli.customer.student_cards.first().id,
},
)
)
assert response.status_code == 403
assert self.sli.customer.student_cards.exists()
def test_add_student_card_from_user_preferences(self):
# Test with owner of the card
self.client.force_login(self.sli)
self.client.post(
reverse(
"counter:add_student_card", kwargs={"customer_id": self.sli.customer.pk}
),
{"uid": "8B90734A802A8F"},
)
response = self.client.get(
reverse("core:user_prefs", kwargs={"user_id": self.sli.id})
)
self.assertContains(response, text="8B90734A802A8F")
# Test with board member
self.client.force_login(self.skia)
self.client.post(
reverse(
"counter:add_student_card", kwargs={"customer_id": self.sli.customer.pk}
),
{"uid": "8B90734A802A8A"},
)
response = self.client.get(
reverse("core:user_prefs", kwargs={"user_id": self.sli.id})
)
self.assertContains(response, text="8B90734A802A8A")
# Test card with only numbers
self.client.post(
reverse(
"counter:add_student_card", kwargs={"customer_id": self.sli.customer.pk}
),
{"uid": "04786547890123"},
)
response = self.client.get(
reverse("core:user_prefs", kwargs={"user_id": self.sli.id})
)
self.assertContains(response, text="04786547890123")
# Test card with only letters
self.client.post(
reverse(
"counter:add_student_card", kwargs={"customer_id": self.sli.customer.pk}
),
{"uid": "ABCAAAFAAFAAAB"},
)
response = self.client.get(
reverse("core:user_prefs", kwargs={"user_id": self.sli.id})
)
self.assertContains(response, text="ABCAAAFAAFAAAB")
# Test with root
self.client.force_login(self.root)
self.client.post(
reverse(
"counter:add_student_card", kwargs={"customer_id": self.sli.customer.pk}
),
{"uid": "8B90734A802A8B"},
)
response = self.client.get(
reverse("core:user_prefs", kwargs={"user_id": self.sli.id})
)
self.assertContains(response, text="8B90734A802A8B")
def test_add_student_card_from_user_preferences_fail(self):
self.client.force_login(self.sli)
# UID too short
response = self.client.post(
reverse(
"counter:add_student_card", kwargs={"customer_id": self.sli.customer.pk}
),
{"uid": "8B90734A802A8"},
)
self.assertContains(response, text="Cet UID est invalide")
# UID too long
response = self.client.post(
reverse(
"counter:add_student_card", kwargs={"customer_id": self.sli.customer.pk}
),
{"uid": "8B90734A802A8FA"},
)
self.assertContains(response, text="Cet UID est invalide")
# Test with already existing card
response = self.client.post(
reverse(
"counter:add_student_card", kwargs={"customer_id": self.sli.customer.pk}
),
{"uid": "9A89B82018B0A0"},
)
self.assertContains(
response, text="Un objet Student card avec ce champ Uid existe déjà."
)
# Test with lowercase
response = self.client.post(
reverse(
"counter:add_student_card", kwargs={"customer_id": self.sli.customer.pk}
),
{"uid": "8b90734a802a9f"},
)
self.assertContains(response, text="Cet UID est invalide")
# Test with white spaces
response = self.client.post(
reverse(
"counter:add_student_card", kwargs={"customer_id": self.sli.customer.pk}
),
{"uid": " " * 14},
)
self.assertContains(response, text="Cet UID est invalide")
# Test with unauthorized user
self.client.force_login(self.krophil)
response = self.client.post(
reverse(
"counter:add_student_card", kwargs={"customer_id": self.sli.customer.pk}
),
{"uid": "8B90734A802A8F"},
)
assert response.status_code == 403
class TestCustomerAccountId(TestCase):
@classmethod
def setUpTestData(cls):
cls.user_a = User.objects.create(
username="a", password="plop", email="a.a@a.fr"
)
user_b = User.objects.create(username="b", password="plop", email="b.b@b.fr")
user_c = User.objects.create(username="c", password="plop", email="c.c@c.fr")
Customer.objects.create(user=cls.user_a, amount=10, account_id="1111a")
Customer.objects.create(user=user_b, amount=0, account_id="9999z")
Customer.objects.create(user=user_c, amount=0, account_id="12345f")
def test_create_customer(self):
user_d = User.objects.create(username="d", password="plop")
customer, created = Customer.get_or_create(user_d)
account_id = customer.account_id
number = account_id[:-1]
assert created is True
assert number == "12346"
assert len(account_id) == 6
assert account_id[-1] in string.ascii_lowercase
assert customer.amount == 0
def test_get_existing_account(self):
account, created = Customer.get_or_create(self.user_a)
assert created is False
assert account.account_id == "1111a"
assert account.amount == 10
class TestClubCounterClickAccess(TestCase): class TestClubCounterClickAccess(TestCase):
@classmethod @classmethod
def setUpTestData(cls): def setUpTestData(cls):

View File

@ -0,0 +1,535 @@
import json
import string
import pytest
from django.test import Client, TestCase
from django.urls import reverse
from model_bakery import baker
from core.baker_recipes import subscriber_user
from core.models import User
from counter.baker_recipes import refill_recipe, sale_recipe
from counter.models import BillingInfo, Counter, Customer, Refilling, Selling
@pytest.mark.django_db
class TestBillingInfo:
@pytest.fixture
def payload(self):
return {
"first_name": "Subscribed",
"last_name": "User",
"address_1": "3, rue de Troyes",
"zip_code": "34301",
"city": "Sète",
"country": "FR",
"phone_number": "0612345678",
}
def test_edit_infos(self, client: Client, payload: dict):
user = subscriber_user.make()
baker.make(BillingInfo, customer=user.customer)
client.force_login(user)
response = client.put(
reverse("api:put_billing_info", args=[user.id]),
json.dumps(payload),
content_type="application/json",
)
user.refresh_from_db()
infos = BillingInfo.objects.get(customer__user=user)
assert response.status_code == 200
assert hasattr(user.customer, "billing_infos")
assert infos.customer == user.customer
for key, val in payload.items():
assert getattr(infos, key) == val
@pytest.mark.parametrize(
"user_maker", [subscriber_user.make, lambda: baker.make(User)]
)
@pytest.mark.django_db
def test_create_infos(self, client: Client, user_maker, payload):
user = user_maker()
client.force_login(user)
assert not BillingInfo.objects.filter(customer__user=user).exists()
response = client.put(
reverse("api:put_billing_info", args=[user.id]),
json.dumps(payload),
content_type="application/json",
)
assert response.status_code == 200
user.refresh_from_db()
assert hasattr(user, "customer")
infos = BillingInfo.objects.get(customer__user=user)
assert hasattr(user.customer, "billing_infos")
assert infos.customer == user.customer
for key, val in payload.items():
assert getattr(infos, key) == val
def test_invalid_data(self, client: Client, payload: dict[str, str]):
user = subscriber_user.make()
client.force_login(user)
# address_1, zip_code and country are missing
del payload["city"]
response = client.put(
reverse("api:put_billing_info", args=[user.id]),
json.dumps(payload),
content_type="application/json",
)
assert response.status_code == 422
user.customer.refresh_from_db()
assert not hasattr(user.customer, "billing_infos")
@pytest.mark.parametrize(
("operator_maker", "expected_code"),
[
(subscriber_user.make, 403),
(lambda: baker.make(User), 403),
(lambda: baker.make(User, is_superuser=True), 200),
],
)
def test_edit_other_user(
self, client: Client, operator_maker, expected_code: int, payload: dict
):
user = subscriber_user.make()
client.force_login(operator_maker())
baker.make(BillingInfo, customer=user.customer)
response = client.put(
reverse("api:put_billing_info", args=[user.id]),
json.dumps(payload),
content_type="application/json",
)
assert response.status_code == expected_code
@pytest.mark.parametrize(
"phone_number",
["+33612345678", "0612345678", "06 12 34 56 78", "06-12-34-56-78"],
)
def test_phone_number_format(
self, client: Client, payload: dict, phone_number: str
):
"""Test that various formats of phone numbers are accepted."""
user = subscriber_user.make()
client.force_login(user)
payload["phone_number"] = phone_number
response = client.put(
reverse("api:put_billing_info", args=[user.id]),
json.dumps(payload),
content_type="application/json",
)
assert response.status_code == 200
infos = BillingInfo.objects.get(customer__user=user)
assert infos.phone_number == "0612345678"
assert infos.phone_number.country_code == 33
def test_foreign_phone_number(self, client: Client, payload: dict):
"""Test that a foreign phone number is accepted."""
user = subscriber_user.make()
client.force_login(user)
payload["phone_number"] = "+49612345678"
response = client.put(
reverse("api:put_billing_info", args=[user.id]),
json.dumps(payload),
content_type="application/json",
)
assert response.status_code == 200
infos = BillingInfo.objects.get(customer__user=user)
assert infos.phone_number.as_national == "06123 45678"
assert infos.phone_number.country_code == 49
@pytest.mark.parametrize(
"phone_number", ["061234567a", "06 12 34 56", "061234567879", "azertyuiop"]
)
def test_invalid_phone_number(
self, client: Client, payload: dict, phone_number: str
):
"""Test that invalid phone numbers are rejected."""
user = subscriber_user.make()
client.force_login(user)
payload["phone_number"] = phone_number
response = client.put(
reverse("api:put_billing_info", args=[user.id]),
json.dumps(payload),
content_type="application/json",
)
assert response.status_code == 422
assert not BillingInfo.objects.filter(customer__user=user).exists()
class TestStudentCard(TestCase):
"""Tests for adding and deleting Stundent Cards
Test that an user can be found with it's student card.
"""
@classmethod
def setUpTestData(cls):
cls.krophil = User.objects.get(username="krophil")
cls.sli = User.objects.get(username="sli")
cls.skia = User.objects.get(username="skia")
cls.root = User.objects.get(username="root")
cls.counter = Counter.objects.get(id=2)
def setUp(self):
# Auto login on counter
self.client.post(
reverse("counter:login", args=[self.counter.id]),
{"username": "krophil", "password": "plop"},
)
def test_search_user_with_student_card(self):
response = self.client.post(
reverse("counter:details", args=[self.counter.id]),
{"code": "9A89B82018B0A0"},
)
assert response.url == reverse(
"counter:click",
kwargs={"counter_id": self.counter.id, "user_id": self.sli.id},
)
def test_add_student_card_from_counter(self):
# Test card with mixed letters and numbers
response = self.client.post(
reverse(
"counter:click",
kwargs={"counter_id": self.counter.id, "user_id": self.sli.id},
),
{"student_card_uid": "8B90734A802A8F", "action": "add_student_card"},
)
self.assertContains(response, text="8B90734A802A8F")
# Test card with only numbers
response = self.client.post(
reverse(
"counter:click",
kwargs={"counter_id": self.counter.id, "user_id": self.sli.id},
),
{"student_card_uid": "04786547890123", "action": "add_student_card"},
)
self.assertContains(response, text="04786547890123")
# Test card with only letters
response = self.client.post(
reverse(
"counter:click",
kwargs={"counter_id": self.counter.id, "user_id": self.sli.id},
),
{"student_card_uid": "ABCAAAFAAFAAAB", "action": "add_student_card"},
)
self.assertContains(response, text="ABCAAAFAAFAAAB")
def test_add_student_card_from_counter_fail(self):
# UID too short
response = self.client.post(
reverse(
"counter:click",
kwargs={"counter_id": self.counter.id, "user_id": self.sli.id},
),
{"student_card_uid": "8B90734A802A8", "action": "add_student_card"},
)
self.assertContains(
response, text="Ce n'est pas un UID de carte étudiante valide"
)
# UID too long
response = self.client.post(
reverse(
"counter:click",
kwargs={"counter_id": self.counter.id, "user_id": self.sli.id},
),
{"student_card_uid": "8B90734A802A8FA", "action": "add_student_card"},
)
self.assertContains(
response, text="Ce n'est pas un UID de carte étudiante valide"
)
# Test with already existing card
response = self.client.post(
reverse(
"counter:click",
kwargs={"counter_id": self.counter.id, "user_id": self.sli.id},
),
{"student_card_uid": "9A89B82018B0A0", "action": "add_student_card"},
)
self.assertContains(
response, text="Ce n'est pas un UID de carte étudiante valide"
)
# Test with lowercase
response = self.client.post(
reverse(
"counter:click",
kwargs={"counter_id": self.counter.id, "user_id": self.sli.id},
),
{"student_card_uid": "8b90734a802a9f", "action": "add_student_card"},
)
self.assertContains(
response, text="Ce n'est pas un UID de carte étudiante valide"
)
# Test with white spaces
response = self.client.post(
reverse(
"counter:click",
kwargs={"counter_id": self.counter.id, "user_id": self.sli.id},
),
{"student_card_uid": " ", "action": "add_student_card"},
)
self.assertContains(
response, text="Ce n'est pas un UID de carte étudiante valide"
)
def test_delete_student_card_with_owner(self):
self.client.force_login(self.sli)
self.client.post(
reverse(
"counter:delete_student_card",
kwargs={
"customer_id": self.sli.customer.pk,
"card_id": self.sli.customer.student_cards.first().id,
},
)
)
assert not self.sli.customer.student_cards.exists()
def test_delete_student_card_with_board_member(self):
self.client.force_login(self.skia)
self.client.post(
reverse(
"counter:delete_student_card",
kwargs={
"customer_id": self.sli.customer.pk,
"card_id": self.sli.customer.student_cards.first().id,
},
)
)
assert not self.sli.customer.student_cards.exists()
def test_delete_student_card_with_root(self):
self.client.force_login(self.root)
self.client.post(
reverse(
"counter:delete_student_card",
kwargs={
"customer_id": self.sli.customer.pk,
"card_id": self.sli.customer.student_cards.first().id,
},
)
)
assert not self.sli.customer.student_cards.exists()
def test_delete_student_card_fail(self):
self.client.force_login(self.krophil)
response = self.client.post(
reverse(
"counter:delete_student_card",
kwargs={
"customer_id": self.sli.customer.pk,
"card_id": self.sli.customer.student_cards.first().id,
},
)
)
assert response.status_code == 403
assert self.sli.customer.student_cards.exists()
def test_add_student_card_from_user_preferences(self):
# Test with owner of the card
self.client.force_login(self.sli)
self.client.post(
reverse(
"counter:add_student_card", kwargs={"customer_id": self.sli.customer.pk}
),
{"uid": "8B90734A802A8F"},
)
response = self.client.get(
reverse("core:user_prefs", kwargs={"user_id": self.sli.id})
)
self.assertContains(response, text="8B90734A802A8F")
# Test with board member
self.client.force_login(self.skia)
self.client.post(
reverse(
"counter:add_student_card", kwargs={"customer_id": self.sli.customer.pk}
),
{"uid": "8B90734A802A8A"},
)
response = self.client.get(
reverse("core:user_prefs", kwargs={"user_id": self.sli.id})
)
self.assertContains(response, text="8B90734A802A8A")
# Test card with only numbers
self.client.post(
reverse(
"counter:add_student_card", kwargs={"customer_id": self.sli.customer.pk}
),
{"uid": "04786547890123"},
)
response = self.client.get(
reverse("core:user_prefs", kwargs={"user_id": self.sli.id})
)
self.assertContains(response, text="04786547890123")
# Test card with only letters
self.client.post(
reverse(
"counter:add_student_card", kwargs={"customer_id": self.sli.customer.pk}
),
{"uid": "ABCAAAFAAFAAAB"},
)
response = self.client.get(
reverse("core:user_prefs", kwargs={"user_id": self.sli.id})
)
self.assertContains(response, text="ABCAAAFAAFAAAB")
# Test with root
self.client.force_login(self.root)
self.client.post(
reverse(
"counter:add_student_card", kwargs={"customer_id": self.sli.customer.pk}
),
{"uid": "8B90734A802A8B"},
)
response = self.client.get(
reverse("core:user_prefs", kwargs={"user_id": self.sli.id})
)
self.assertContains(response, text="8B90734A802A8B")
def test_add_student_card_from_user_preferences_fail(self):
self.client.force_login(self.sli)
# UID too short
response = self.client.post(
reverse(
"counter:add_student_card", kwargs={"customer_id": self.sli.customer.pk}
),
{"uid": "8B90734A802A8"},
)
self.assertContains(response, text="Cet UID est invalide")
# UID too long
response = self.client.post(
reverse(
"counter:add_student_card", kwargs={"customer_id": self.sli.customer.pk}
),
{"uid": "8B90734A802A8FA"},
)
self.assertContains(response, text="Cet UID est invalide")
# Test with already existing card
response = self.client.post(
reverse(
"counter:add_student_card", kwargs={"customer_id": self.sli.customer.pk}
),
{"uid": "9A89B82018B0A0"},
)
self.assertContains(
response, text="Un objet Student card avec ce champ Uid existe déjà."
)
# Test with lowercase
response = self.client.post(
reverse(
"counter:add_student_card", kwargs={"customer_id": self.sli.customer.pk}
),
{"uid": "8b90734a802a9f"},
)
self.assertContains(response, text="Cet UID est invalide")
# Test with white spaces
response = self.client.post(
reverse(
"counter:add_student_card", kwargs={"customer_id": self.sli.customer.pk}
),
{"uid": " " * 14},
)
self.assertContains(response, text="Cet UID est invalide")
# Test with unauthorized user
self.client.force_login(self.krophil)
response = self.client.post(
reverse(
"counter:add_student_card", kwargs={"customer_id": self.sli.customer.pk}
),
{"uid": "8B90734A802A8F"},
)
assert response.status_code == 403
class TestCustomerAccountId(TestCase):
@classmethod
def setUpTestData(cls):
cls.user_a = User.objects.create(
username="a", password="plop", email="a.a@a.fr"
)
user_b = User.objects.create(username="b", password="plop", email="b.b@b.fr")
user_c = User.objects.create(username="c", password="plop", email="c.c@c.fr")
Customer.objects.create(user=cls.user_a, amount=10, account_id="1111a")
Customer.objects.create(user=user_b, amount=0, account_id="9999z")
Customer.objects.create(user=user_c, amount=0, account_id="12345f")
def test_create_customer(self):
user_d = User.objects.create(username="d", password="plop")
customer, created = Customer.get_or_create(user_d)
account_id = customer.account_id
number = account_id[:-1]
assert created is True
assert number == "12346"
assert len(account_id) == 6
assert account_id[-1] in string.ascii_lowercase
assert customer.amount == 0
def test_get_existing_account(self):
account, created = Customer.get_or_create(self.user_a)
assert created is False
assert account.account_id == "1111a"
assert account.amount == 10
@pytest.mark.django_db
def test_update_balance():
customers = baker.make(Customer, _quantity=5, _bulk_create=True)
refills = [
*refill_recipe.prepare(
customer=iter(customers),
amount=iter([30, 30, 40, 50, 50]),
_quantity=len(customers),
_save_related=True,
),
refill_recipe.prepare(customer=customers[0], amount=30, _save_related=True),
refill_recipe.prepare(customer=customers[4], amount=10, _save_related=True),
]
Refilling.objects.bulk_create(refills)
sales = [
*sale_recipe.prepare(
customer=iter(customers),
_quantity=len(customers),
unit_price=10,
quantity=1,
_save_related=True,
),
*sale_recipe.prepare(
customer=iter(customers[:3]),
_quantity=3,
unit_price=5,
quantity=2,
_save_related=True,
),
sale_recipe.prepare(
customer=customers[4], quantity=1, unit_price=50, _save_related=True
),
]
Selling.objects.bulk_create(sales)
# customer 0 = 40, customer 1 = 10€, customer 2 = 20€,
# customer 3 = 40€, customer 4 = 0€
customers_qs = Customer.objects.filter(pk__in={c.pk for c in customers})
# put everything at zero to be sure the amounts were wrong beforehand
customers_qs.update(amount=0)
customers_qs.update_amount()
for customer, amount in zip(customers, [40, 10, 20, 40, 0]):
customer.refresh_from_db()
assert customer.amount == amount

View File

@ -123,7 +123,7 @@ def merge_users(u1: User, u2: User) -> User:
c_dest, created = Customer.get_or_create(u1) c_dest, created = Customer.get_or_create(u1)
c_src.refillings.update(customer=c_dest) c_src.refillings.update(customer=c_dest)
c_src.buyings.update(customer=c_dest) c_src.buyings.update(customer=c_dest)
c_dest.recompute_amount() Customer.objects.filter(pk=c_dest.pk).update_amount()
if created: if created:
# swap the account numbers, so that the user keep # swap the account numbers, so that the user keep
# the id he is accustomed to # the id he is accustomed to