Merge pull request #1236 from ae-utbm/csrf-api

remove deprecated api csrf argument
This commit is contained in:
thomas girod
2025-11-09 21:38:36 +01:00
committed by GitHub
14 changed files with 180 additions and 69 deletions

View File

@@ -6,6 +6,8 @@ from api.models import ApiClient, ApiKey
class ApiKeyAuth(APIKeyHeader):
"""Authentication through client api keys."""
param_name = "X-APIKey"
def authenticate(self, request: HttpRequest, key: str | None) -> ApiClient | None:

View File

@@ -0,0 +1,48 @@
import pytest
from django.test import Client
from django.urls import path
from model_bakery import baker
from ninja import NinjaAPI
from ninja.security import SessionAuth
from api.auth import ApiKeyAuth
from api.hashers import generate_key
from api.models import ApiClient, ApiKey
api = NinjaAPI()
@api.post("", auth=[ApiKeyAuth(), SessionAuth()])
def post_method(*args, **kwargs) -> None:
"""Dummy POST route authenticated by either api key or session cookie."""
pass
urlpatterns = [path("", api.urls)]
@pytest.mark.django_db
@pytest.mark.urls(__name__)
@pytest.mark.parametrize("user_logged_in", [False, True])
def test_csrf_token(user_logged_in):
"""Test that CSRF check happens only when no api key is used."""
client = Client(enforce_csrf_checks=True)
key, hashed = generate_key()
api_client = baker.make(ApiClient)
baker.make(ApiKey, client=api_client, hashed_key=hashed)
if user_logged_in:
client.force_login(api_client.owner)
response = client.post("")
assert response.status_code == 403
assert response.json()["detail"] == "CSRF check Failed"
# if using a valid API key, CSRF check should not occur
response = client.post("", headers={"X-APIKey": key})
assert response.status_code == 200
# if using a wrong API key, ApiKeyAuth should fail,
# leading to a fallback into SessionAuth and a CSRF check
response = client.post("", headers={"X-APIKey": generate_key()[0]})
assert response.status_code == 403
assert response.json()["detail"] == "CSRF check Failed"

View File

@@ -1,3 +1,4 @@
from ninja.security import SessionAuth
from ninja_extra import NinjaExtraAPI
api = NinjaExtraAPI(
@@ -5,6 +6,6 @@ api = NinjaExtraAPI(
description="Portail Interactif de Communication avec les Outils Numériques",
version="0.2.0",
urls_namespace="api",
csrf=True,
auth=[SessionAuth()],
)
api.auto_discover_controllers()

View File

@@ -16,7 +16,7 @@ class ClubController(ControllerBase):
@route.get(
"/search",
response=PaginatedResponseSchema[SimpleClubSchema],
auth=[SessionAuth(), ApiKeyAuth()],
auth=[ApiKeyAuth(), SessionAuth()],
permissions=[CanAccessLookup],
url_name="search_club",
)
@@ -27,7 +27,7 @@ class ClubController(ControllerBase):
@route.get(
"/{int:club_id}",
response=ClubSchema,
auth=[SessionAuth(), ApiKeyAuth()],
auth=[ApiKeyAuth(), SessionAuth()],
permissions=[HasPerm("club.view_club")],
url_name="fetch_club",
)

View File

@@ -5,7 +5,6 @@ from django.utils.cache import add_never_cache_headers
from ninja import Query
from ninja_extra import ControllerBase, api_controller, paginate, route
from ninja_extra.pagination import PageNumberPaginationExtra
from ninja_extra.permissions import IsAuthenticated
from ninja_extra.schemas import PaginatedResponseSchema
from api.permissions import HasPerm
@@ -17,17 +16,13 @@ from core.views.files import send_raw_file
@api_controller("/calendar")
class CalendarController(ControllerBase):
@route.get("/internal.ics", url_name="calendar_internal")
@route.get("/internal.ics", auth=None, url_name="calendar_internal")
def calendar_internal(self):
response = send_raw_file(IcsCalendar.get_internal())
add_never_cache_headers(response)
return response
@route.get(
"/unpublished.ics",
permissions=[IsAuthenticated],
url_name="calendar_unpublished",
)
@route.get("/unpublished.ics", url_name="calendar_unpublished")
def calendar_unpublished(self):
response = HttpResponse(
IcsCalendar.get_unpublished(self.context.request.user),
@@ -74,6 +69,7 @@ class NewsController(ControllerBase):
@route.get(
"/date",
auth=None,
url_name="fetch_news_dates",
response=PaginatedResponseSchema[NewsDateSchema],
)

View File

@@ -99,7 +99,7 @@ class SithFileController(ControllerBase):
@route.get(
"/search",
response=PaginatedResponseSchema[SithFileSchema],
auth=[SessionAuth(), ApiKeyAuth()],
auth=[ApiKeyAuth(), SessionAuth()],
permissions=[CanAccessLookup],
)
@paginate(PageNumberPaginationExtra, page_size=50)
@@ -112,7 +112,7 @@ class GroupController(ControllerBase):
@route.get(
"/search",
response=PaginatedResponseSchema[GroupSchema],
auth=[SessionAuth(), ApiKeyAuth()],
auth=[ApiKeyAuth(), SessionAuth()],
permissions=[CanAccessLookup],
)
@paginate(PageNumberPaginationExtra, page_size=50)

View File

@@ -46,7 +46,7 @@ class TestFetchFamilyApi(TestCase):
response = self.client.get(
reverse("api:family_graph", args=[self.main_user.id])
)
assert response.status_code == 403
assert response.status_code == 401
self.client.force_login(baker.make(User)) # unsubscribed user
response = self.client.get(

View File

@@ -269,7 +269,7 @@ def test_apply_rights_recursively():
SimpleUploadedFile(
"test.jpg", content=RED_PIXEL_PNG, content_type="image/jpg"
),
403,
401,
),
(
lambda: baker.make(User),

View File

@@ -64,7 +64,7 @@ class CounterController(ControllerBase):
@route.get(
"/search",
response=PaginatedResponseSchema[SimplifiedCounterSchema],
auth=[SessionAuth(), ApiKeyAuth()],
auth=[ApiKeyAuth(), SessionAuth()],
permissions=[CanAccessLookup],
)
@paginate(PageNumberPaginationExtra, page_size=50)
@@ -77,7 +77,7 @@ class ProductController(ControllerBase):
@route.get(
"/search",
response=PaginatedResponseSchema[SimpleProductSchema],
auth=[SessionAuth(), ApiKeyAuth()],
auth=[ApiKeyAuth(), SessionAuth()],
permissions=[CanAccessLookup],
)
@paginate(PageNumberPaginationExtra, page_size=50)
@@ -117,7 +117,7 @@ class ProductTypeController(ControllerBase):
def fetch_all(self):
return ProductType.objects.order_by("order")
@route.patch("/{type_id}/move")
@route.patch("/{type_id}/move", url_name="reorder_product_type")
def reorder(self, type_id: int, other_id: Query[ReorderProductTypeSchema]):
"""Change the order of a product type.

View File

@@ -3,11 +3,9 @@ from django.conf import settings
from django.test import Client
from django.urls import reverse
from model_bakery import baker, seq
from ninja_extra.testing import TestClient
from core.baker_recipes import board_user, subscriber_user
from core.models import Group, User
from counter.api import ProductTypeController
from counter.models import ProductType
@@ -19,24 +17,43 @@ def product_types(db) -> list[ProductType]:
return baker.make(ProductType, _quantity=5, order=seq(0))
@pytest.fixture()
def counter_admin_client(db, client: Client) -> Client:
client.force_login(
baker.make(
User, groups=[Group.objects.get(id=settings.SITH_GROUP_COUNTER_ADMIN_ID)]
)
)
return client
@pytest.mark.django_db
def test_fetch_product_types(product_types: list[ProductType]):
def test_fetch_product_types(
counter_admin_client: Client, product_types: list[ProductType]
):
"""Test that the API returns the right products in the right order"""
client = TestClient(ProductTypeController)
response = client.get("")
response = counter_admin_client.get(reverse("api:fetch_product_types"))
assert response.status_code == 200
assert [i["id"] for i in response.json()] == [t.id for t in product_types]
@pytest.mark.django_db
def test_move_below_product_type(product_types: list[ProductType]):
def test_move_below_product_type(
counter_admin_client: Client, product_types: list[ProductType]
):
"""Test that moving a product below another works"""
client = TestClient(ProductTypeController)
response = client.patch(
f"/{product_types[-1].id}/move", query={"below": product_types[0].id}
response = counter_admin_client.patch(
reverse(
"api:reorder_product_type",
kwargs={"type_id": product_types[-1].id},
query={"below": product_types[0].id},
),
)
assert response.status_code == 200
new_order = [i["id"] for i in client.get("").json()]
new_order = [
i["id"]
for i in counter_admin_client.get(reverse("api:fetch_product_types")).json()
]
assert new_order == [
product_types[0].id,
product_types[-1].id,
@@ -45,14 +62,22 @@ def test_move_below_product_type(product_types: list[ProductType]):
@pytest.mark.django_db
def test_move_above_product_type(product_types: list[ProductType]):
def test_move_above_product_type(
counter_admin_client: Client, product_types: list[ProductType]
):
"""Test that moving a product above another works"""
client = TestClient(ProductTypeController)
response = client.patch(
f"/{product_types[1].id}/move", query={"above": product_types[0].id}
response = counter_admin_client.patch(
reverse(
"api:reorder_product_type",
kwargs={"type_id": product_types[1].id},
query={"above": product_types[0].id},
),
)
assert response.status_code == 200
new_order = [i["id"] for i in client.get("").json()]
new_order = [
i["id"]
for i in counter_admin_client.get(reverse("api:fetch_product_types")).json()
]
assert new_order == [
product_types[1].id,
product_types[0].id,

View File

@@ -1,4 +1,3 @@
Pour l'API, nous utilisons `django-ninja` et sa surcouche `django-ninja-extra`.
Ce sont des librairies relativement simples et qui présentent
l'immense avantage d'offrir des mécanismes de validation et de sérialisation
@@ -49,8 +48,9 @@ Notre API offre deux moyens d'authentification :
- par clef d'API
La plus grande partie des routes de l'API utilisent la méthode par cookie de session.
Cette dernière est donc activée par défaut.
Pour placer une route d'API derrière l'une de ces méthodes (ou bien les deux),
Pour changer la méthode d'authentification,
utilisez l'attribut `auth` et les classes `SessionAuth` et
[`ApiKeyAuth`][api.auth.ApiKeyAuth].
@@ -60,13 +60,17 @@ utilisez l'attribut `auth` et les classes `SessionAuth` et
@api_controller("/foo")
class FooController(ControllerBase):
# Cette route sera accessible uniquement avec l'authentification
# par cookie de session
@route.get("", auth=[SessionAuth()])
# par clef d'API
@route.get("", auth=[ApiKeyAuth()])
def fetch_foo(self, club_id: int): ...
# Et celle-ci sera accessible peut importe la méthode d'authentification
@route.get("/bar", auth=[SessionAuth(), ApiKeyAuth()])
# Celle-ci sera accessible avec les deux méthodes d'authentification
@route.get("/bar", auth=[ApiKeyAuth(), SessionAuth()])
def fetch_bar(self, club_id: int): ...
# Et celle-ci sera accessible aussi aux utilisateurs non-connectés
@route.get("/public", auth=None)
def fetch_public(self, club_id: int): ...
```
### Permissions
@@ -79,9 +83,7 @@ par-dessus `django-ninja`, le système de permissions de django
et notre propre système.
Cette dernière est documentée [ici](../perms.md).
### Limites des clefs d'API
#### Incompatibilité avec certaines permissions
### Incompatibilité avec certaines permissions
Le système des clefs d'API est apparu très tard dans l'histoire du site
(en P25, 10 ans après le début du développement).
@@ -112,10 +114,33 @@ Les principaux points de friction sont :
- `IsLoggedInCounter`, qui utilise encore un autre système
d'authentification maison et qui n'est pas fait pour être utilisé en dehors du site.
#### Incompatibilité avec les tokens csrf
### CSRF
Le [CSRF (*cross-site request forgery*)](https://fr.wikipedia.org/wiki/Cross-site_request_forgery)
est un des multiples facteurs d'attaque sur le web.
!!!info "A propos du csrf"
Le [CSRF (*cross-site request forgery*)](https://fr.wikipedia.org/wiki/Cross-site_request_forgery)
est un vecteur d'attaque sur le web consistant
à soumettre des données au serveur à l'insu
de l'utilisateur, en profitant de sa session.
C'est une attaque qui peut se produire lorsque l'utilisateur
est authentifié par cookie de session.
En effet, les cookies sont joints automatiquement à
toutes les requêtes ;
en l'absence de protection contre le CSRF,
un attaquant parvenant à insérer un formulaire
dans la page de l'utilisateur serait en mesure
de faire presque n'importe quoi en son nom,
et ce sans même que l'utilisateur ni les administrateurs
ne s'en rendent compte avant qu'il ne soit largement trop tard !
Sur le CSRF et les moyens de s'en prémunir, voir :
- [https://owasp.org/www-community/attacks/csrf]()
- [https://security.stackexchange.com/questions/166724/should-i-use-csrf-protection-on-rest-api-endpoints]()
- [https://cheatsheetseries.owasp.org/cheatsheets/Cross-Site_Request_Forgery_Prevention_Cheat_Sheet.html]()
Le CSRF, c'est dangereux.
Heureusement, Django vient encore une fois à notre aide,
avec des mécanismes intégrés pour s'en protéger.
Ceux-ci incluent notamment un système de
@@ -123,16 +148,39 @@ Ceux-ci incluent notamment un système de
à fournir dans les requêtes POST/PUT/PATCH.
Ceux-ci sont bien adaptés au cycle requêtes/réponses
typique de l'expérience utilisateur sur un navigateur,
typiques de l'expérience utilisateur sur un navigateur,
où les requêtes POST sont toujours effectuées après une requête
GET au cours de laquelle on a pu récupérer un token csrf.
Cependant, le flux des requêtes sur une API est bien différent ;
de ce fait, il est à attendre que les requêtes POST envoyées à l'API
par un client externe n'aient pas de token CSRF et se retrouvent
donc bloquées.
Cependant, ils sont également gênants et moins utiles
dans le cadre d'une API REST, étant donné
que l'authentification cesse d'être implicite :
la clef d'API doit être explicitement jointe aux headers,
pour chaque requête.
Pour ces raisons, l'accès aux requêtes POST/PUT/PATCH de l'API
par un client externe ne marche pas.
Pour ces raisons, la vérification CSRF ne prend place
que pour la vérification de l'authentification
par cookie de session.
!!!warning "L'ordre est important"
Si vous écrivez le code suivant, l'authentification par clef d'API
ne marchera plus :
```python
@api_controller("/foo")
class FooController(ControllerBase):
@route.post("/bar", auth=[SessionAuth(), ApiKeyAuth()])
def post_bar(self, club_id: int): ...
```
En effet, la vérification du cookie de session intègrera
toujours la vérification CSRF.
Or, un échec de cette dernière est traduit par django en un code HTTP 403
au lieu d'un HTTP 401.
L'authentification se retrouve alors court-circuitée,
faisant que la vérification de la clef d'API ne sera jamais appelée.
`SessionAuth` doit donc être déclaré **après** `ApiKeyAuth`.
## Créer un client et une clef d'API
@@ -171,5 +219,3 @@ qui en a besoin.
Dites-lui bien de garder cette clef en lieu sûr !
Si la clef est perdue, il n'y a pas moyen de la récupérer,
vous devrez en recréer une.

View File

@@ -19,7 +19,7 @@ from pedagogy.utbm_api import UtbmApiClient
class UvController(ControllerBase):
@route.get(
"/{code}",
auth=[SessionAuth(), ApiKeyAuth()],
auth=[ApiKeyAuth(), SessionAuth()],
permissions=[
# this route will almost always be called in the context
# of a UV creation/edition
@@ -45,7 +45,7 @@ class UvController(ControllerBase):
"",
response=PaginatedResponseSchema[SimpleUvSchema],
url_name="fetch_uvs",
auth=[SessionAuth(), ApiKeyAuth()],
auth=[ApiKeyAuth(), SessionAuth()],
permissions=[HasPerm("pedagogy.view_uv")],
)
@paginate(PageNumberPaginationExtra, page_size=100)

View File

@@ -8,7 +8,6 @@ from ninja.security import SessionAuth
from ninja_extra import ControllerBase, api_controller, paginate, route
from ninja_extra.exceptions import NotFound, PermissionDenied
from ninja_extra.pagination import PageNumberPaginationExtra
from ninja_extra.permissions import IsAuthenticated
from ninja_extra.schemas import PaginatedResponseSchema
from pydantic import NonNegativeInt
@@ -41,7 +40,6 @@ class AlbumController(ControllerBase):
@route.get(
"/search",
response=PaginatedResponseSchema[AlbumSchema],
permissions=[IsAuthenticated],
url_name="search-album",
)
@paginate(PageNumberPaginationExtra, page_size=50)
@@ -54,7 +52,7 @@ class AlbumController(ControllerBase):
@route.get(
"/autocomplete-search",
response=PaginatedResponseSchema[AlbumAutocompleteSchema],
auth=[SessionAuth(), ApiKeyAuth()],
auth=[ApiKeyAuth(), SessionAuth()],
permissions=[CanAccessLookup],
)
@paginate(PageNumberPaginationExtra, page_size=50)
@@ -74,12 +72,7 @@ class AlbumController(ControllerBase):
@api_controller("/sas/picture")
class PicturesController(ControllerBase):
@route.get(
"",
response=PaginatedResponseSchema[PictureSchema],
permissions=[IsAuthenticated],
url_name="pictures",
)
@route.get("", response=PaginatedResponseSchema[PictureSchema], url_name="pictures")
@paginate(PageNumberPaginationExtra, page_size=100)
def fetch_pictures(self, filters: Query[PictureFilterSchema]):
"""Find pictures viewable by the user corresponding to the given filters.
@@ -141,7 +134,7 @@ class PicturesController(ControllerBase):
@route.get(
"/{picture_id}/identified",
permissions=[IsAuthenticated, CanView],
permissions=[CanView],
response=list[IdentifiedUserSchema],
)
def fetch_identifications(self, picture_id: int):
@@ -149,7 +142,7 @@ class PicturesController(ControllerBase):
picture = self.get_object_or_exception(Picture, pk=picture_id)
return picture.people.select_related("user")
@route.put("/{picture_id}/identified", permissions=[IsAuthenticated, CanView])
@route.put("/{picture_id}/identified", permissions=[CanView])
def identify_users(self, picture_id: NonNegativeInt, users: set[NonNegativeInt]):
picture = self.get_object_or_exception(
Picture.objects.select_related("parent"), pk=picture_id
@@ -209,7 +202,7 @@ class PicturesController(ControllerBase):
@api_controller("/sas/relation", tags="User identification on SAS pictures")
class UsersIdentifiedController(ControllerBase):
@route.delete("/{relation_id}", permissions=[IsAuthenticated])
@route.delete("/{relation_id}")
def delete_relation(self, relation_id: NonNegativeInt):
"""Untag a user from a SAS picture.

View File

@@ -55,7 +55,7 @@ class TestPictureSearch(TestSas):
def test_anonymous_user_forbidden(self):
res = self.client.get(self.url)
assert res.status_code == 403
assert res.status_code == 401
def test_filter_by_album(self):
self.client.force_login(self.user_b)
@@ -148,7 +148,7 @@ class TestPictureRelation(TestSas):
relation = PeoplePictureRelation.objects.exclude(user=self.user_a).first()
res = self.client.delete(f"/api/sas/relation/{relation.id}")
assert res.status_code == 403
assert res.status_code == 401
for user in baker.make(User), self.user_a:
self.client.force_login(user)