mirror of
https://github.com/ae-utbm/sith.git
synced 2025-07-10 20:09:25 +00:00
rename apikey to api
This commit is contained in:
0
api/__init__.py
Normal file
0
api/__init__.py
Normal file
55
api/admin.py
Normal file
55
api/admin.py
Normal file
@ -0,0 +1,55 @@
|
||||
from django.contrib import admin, messages
|
||||
from django.db.models import QuerySet
|
||||
from django.http import HttpRequest
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
from api.hashers import generate_key
|
||||
from api.models import ApiClient, ApiKey
|
||||
|
||||
|
||||
@admin.register(ApiClient)
|
||||
class ApiClientAdmin(admin.ModelAdmin):
|
||||
list_display = ("name", "owner", "created_at", "updated_at")
|
||||
search_fields = (
|
||||
"name",
|
||||
"owner__first_name",
|
||||
"owner__last_name",
|
||||
"owner__nick_name",
|
||||
)
|
||||
autocomplete_fields = ("owner", "groups", "client_permissions")
|
||||
|
||||
|
||||
@admin.register(ApiKey)
|
||||
class ApiKeyAdmin(admin.ModelAdmin):
|
||||
list_display = ("name", "client", "created_at", "revoked")
|
||||
list_filter = ("revoked",)
|
||||
date_hierarchy = "created_at"
|
||||
|
||||
readonly_fields = ("prefix", "hashed_key")
|
||||
actions = ("revoke_keys",)
|
||||
|
||||
def save_model(self, request: HttpRequest, obj: ApiKey, form, change):
|
||||
if not change:
|
||||
key, hashed = generate_key()
|
||||
obj.prefix = key[: ApiKey.PREFIX_LENGTH]
|
||||
obj.hashed_key = hashed
|
||||
self.message_user(
|
||||
request,
|
||||
_(
|
||||
"The API key for %(name)s is: %(key)s. "
|
||||
"Please store it somewhere safe: "
|
||||
"you will not be able to see it again."
|
||||
)
|
||||
% {"name": obj.name, "key": key},
|
||||
level=messages.WARNING,
|
||||
)
|
||||
return super().save_model(request, obj, form, change)
|
||||
|
||||
def get_readonly_fields(self, request, obj: ApiKey | None = None):
|
||||
if obj is None or obj.revoked:
|
||||
return ["revoked", *self.readonly_fields]
|
||||
return self.readonly_fields
|
||||
|
||||
@admin.action(description=_("Revoke selected API keys"))
|
||||
def revoke_keys(self, _request: HttpRequest, queryset: QuerySet[ApiKey]):
|
||||
queryset.update(revoked=True)
|
6
api/apps.py
Normal file
6
api/apps.py
Normal file
@ -0,0 +1,6 @@
|
||||
from django.apps import AppConfig
|
||||
|
||||
|
||||
class ApiConfig(AppConfig):
|
||||
default_auto_field = "django.db.models.BigAutoField"
|
||||
name = "api"
|
20
api/auth.py
Normal file
20
api/auth.py
Normal file
@ -0,0 +1,20 @@
|
||||
from django.http import HttpRequest
|
||||
from ninja.security import APIKeyHeader
|
||||
|
||||
from api.hashers import get_hasher
|
||||
from api.models import ApiClient, ApiKey
|
||||
|
||||
|
||||
class ApiKeyAuth(APIKeyHeader):
|
||||
param_name = "X-APIKey"
|
||||
|
||||
def authenticate(self, request: HttpRequest, key: str | None) -> ApiClient | None:
|
||||
if not key or len(key) != ApiKey.KEY_LENGTH:
|
||||
return None
|
||||
hasher = get_hasher()
|
||||
hashed_key = hasher.encode(key)
|
||||
try:
|
||||
key_obj = ApiKey.objects.get(revoked=False, hashed_key=hashed_key)
|
||||
except ApiKey.DoesNotExist:
|
||||
return None
|
||||
return key_obj.client
|
43
api/hashers.py
Normal file
43
api/hashers.py
Normal file
@ -0,0 +1,43 @@
|
||||
import functools
|
||||
import hashlib
|
||||
import secrets
|
||||
|
||||
from django.contrib.auth.hashers import BasePasswordHasher
|
||||
from django.utils.crypto import constant_time_compare
|
||||
|
||||
|
||||
class Sha512ApiKeyHasher(BasePasswordHasher):
|
||||
"""
|
||||
An API key hasher using the sha256 algorithm.
|
||||
|
||||
This hasher shouldn't be used in Django's `PASSWORD_HASHERS` setting.
|
||||
It is insecure for use in hashing passwords, but is safe for hashing
|
||||
high entropy, randomly generated API keys.
|
||||
"""
|
||||
|
||||
algorithm = "sha512"
|
||||
|
||||
def salt(self) -> str:
|
||||
# No need for a salt on a high entropy key.
|
||||
return ""
|
||||
|
||||
def encode(self, password: str, salt: str = "") -> str:
|
||||
hashed = hashlib.sha512(password.encode()).hexdigest()
|
||||
return f"{self.algorithm}$${hashed}"
|
||||
|
||||
def verify(self, password: str, encoded: str) -> bool:
|
||||
encoded_2 = self.encode(password, "")
|
||||
return constant_time_compare(encoded, encoded_2)
|
||||
|
||||
|
||||
@functools.cache
|
||||
def get_hasher():
|
||||
return Sha512ApiKeyHasher()
|
||||
|
||||
|
||||
def generate_key() -> tuple[str, str]:
|
||||
"""Generate a [key, hash] couple."""
|
||||
# this will result in key with a length of 72
|
||||
key = str(secrets.token_urlsafe(54))
|
||||
hasher = get_hasher()
|
||||
return key, hasher.encode(key)
|
113
api/migrations/0001_initial.py
Normal file
113
api/migrations/0001_initial.py
Normal file
@ -0,0 +1,113 @@
|
||||
# Generated by Django 5.2 on 2025-06-01 08:53
|
||||
|
||||
import django.db.models.deletion
|
||||
from django.conf import settings
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
initial = True
|
||||
|
||||
dependencies = [
|
||||
("auth", "0012_alter_user_first_name_max_length"),
|
||||
("core", "0046_permissionrights"),
|
||||
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name="ApiClient",
|
||||
fields=[
|
||||
(
|
||||
"id",
|
||||
models.BigAutoField(
|
||||
auto_created=True,
|
||||
primary_key=True,
|
||||
serialize=False,
|
||||
verbose_name="ID",
|
||||
),
|
||||
),
|
||||
("name", models.CharField(max_length=64, verbose_name="name")),
|
||||
("created_at", models.DateTimeField(auto_now_add=True)),
|
||||
("updated_at", models.DateTimeField(auto_now=True)),
|
||||
(
|
||||
"client_permissions",
|
||||
models.ManyToManyField(
|
||||
blank=True,
|
||||
help_text="Specific permissions for this api client.",
|
||||
related_name="clients",
|
||||
to="auth.permission",
|
||||
verbose_name="client permissions",
|
||||
),
|
||||
),
|
||||
(
|
||||
"groups",
|
||||
models.ManyToManyField(
|
||||
blank=True,
|
||||
related_name="api_clients",
|
||||
to="core.group",
|
||||
verbose_name="groups",
|
||||
),
|
||||
),
|
||||
(
|
||||
"owner",
|
||||
models.ForeignKey(
|
||||
on_delete=django.db.models.deletion.CASCADE,
|
||||
related_name="api_clients",
|
||||
to=settings.AUTH_USER_MODEL,
|
||||
verbose_name="owner",
|
||||
),
|
||||
),
|
||||
],
|
||||
options={
|
||||
"verbose_name": "api client",
|
||||
"verbose_name_plural": "api clients",
|
||||
},
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name="ApiKey",
|
||||
fields=[
|
||||
(
|
||||
"id",
|
||||
models.BigAutoField(
|
||||
auto_created=True,
|
||||
primary_key=True,
|
||||
serialize=False,
|
||||
verbose_name="ID",
|
||||
),
|
||||
),
|
||||
("name", models.CharField(blank=True, default="", verbose_name="name")),
|
||||
(
|
||||
"prefix",
|
||||
models.CharField(
|
||||
editable=False, max_length=5, verbose_name="prefix"
|
||||
),
|
||||
),
|
||||
(
|
||||
"hashed_key",
|
||||
models.CharField(
|
||||
db_index=True,
|
||||
editable=False,
|
||||
max_length=136,
|
||||
verbose_name="hashed key",
|
||||
),
|
||||
),
|
||||
("revoked", models.BooleanField(default=False, verbose_name="revoked")),
|
||||
("created_at", models.DateTimeField(auto_now_add=True)),
|
||||
(
|
||||
"client",
|
||||
models.ForeignKey(
|
||||
on_delete=django.db.models.deletion.CASCADE,
|
||||
related_name="api_keys",
|
||||
to="api.apiclient",
|
||||
verbose_name="api client",
|
||||
),
|
||||
),
|
||||
],
|
||||
options={
|
||||
"verbose_name": "api key",
|
||||
"verbose_name_plural": "api keys",
|
||||
"permissions": [("revoke_apikey", "Revoke API keys")],
|
||||
},
|
||||
),
|
||||
]
|
0
api/migrations/__init__.py
Normal file
0
api/migrations/__init__.py
Normal file
94
api/models.py
Normal file
94
api/models.py
Normal file
@ -0,0 +1,94 @@
|
||||
from typing import Iterable
|
||||
|
||||
from django.contrib.auth.models import Permission
|
||||
from django.db import models
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from django.utils.translation import pgettext_lazy
|
||||
|
||||
from core.models import Group, User
|
||||
|
||||
|
||||
class ApiClient(models.Model):
|
||||
name = models.CharField(_("name"), max_length=64)
|
||||
owner = models.ForeignKey(
|
||||
User,
|
||||
verbose_name=_("owner"),
|
||||
related_name="api_clients",
|
||||
on_delete=models.CASCADE,
|
||||
)
|
||||
groups = models.ManyToManyField(
|
||||
Group, verbose_name=_("groups"), related_name="api_clients", blank=True
|
||||
)
|
||||
client_permissions = models.ManyToManyField(
|
||||
Permission,
|
||||
verbose_name=_("client permissions"),
|
||||
blank=True,
|
||||
help_text=_("Specific permissions for this api client."),
|
||||
related_name="clients",
|
||||
)
|
||||
created_at = models.DateTimeField(auto_now_add=True)
|
||||
updated_at = models.DateTimeField(auto_now=True)
|
||||
|
||||
_perm_cache: set[str] | None = None
|
||||
|
||||
class Meta:
|
||||
verbose_name = _("api client")
|
||||
verbose_name_plural = _("api clients")
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
|
||||
def has_perm(self, perm: str):
|
||||
"""Return True if the client has the specified permission."""
|
||||
|
||||
if self._perm_cache is None:
|
||||
group_permissions = (
|
||||
Permission.objects.filter(group__group__in=self.groups.all())
|
||||
.values_list("content_type__app_label", "codename")
|
||||
.order_by()
|
||||
)
|
||||
client_permissions = self.client_permissions.values_list(
|
||||
"content_type__app_label", "codename"
|
||||
).order_by()
|
||||
self._perm_cache = {
|
||||
f"{content_type}.{name}"
|
||||
for content_type, name in (*group_permissions, *client_permissions)
|
||||
}
|
||||
return perm in self._perm_cache
|
||||
|
||||
def has_perms(self, perm_list):
|
||||
"""
|
||||
Return True if the client has each of the specified permissions. If
|
||||
object is passed, check if the client has all required perms for it.
|
||||
"""
|
||||
if not isinstance(perm_list, Iterable) or isinstance(perm_list, str):
|
||||
raise ValueError("perm_list must be an iterable of permissions.")
|
||||
return all(self.has_perm(perm) for perm in perm_list)
|
||||
|
||||
|
||||
class ApiKey(models.Model):
|
||||
PREFIX_LENGTH = 5
|
||||
KEY_LENGTH = 72
|
||||
HASHED_KEY_LENGTH = 136
|
||||
|
||||
name = models.CharField(_("name"), blank=True, default="")
|
||||
prefix = models.CharField(_("prefix"), max_length=PREFIX_LENGTH, editable=False)
|
||||
hashed_key = models.CharField(
|
||||
_("hashed key"), max_length=HASHED_KEY_LENGTH, db_index=True, editable=False
|
||||
)
|
||||
client = models.ForeignKey(
|
||||
ApiClient,
|
||||
verbose_name=_("api client"),
|
||||
related_name="api_keys",
|
||||
on_delete=models.CASCADE,
|
||||
)
|
||||
revoked = models.BooleanField(pgettext_lazy("api key", "revoked"), default=False)
|
||||
created_at = models.DateTimeField(auto_now_add=True)
|
||||
|
||||
class Meta:
|
||||
verbose_name = _("api key")
|
||||
verbose_name_plural = _("api keys")
|
||||
permissions = [("revoke_apikey", "Revoke API keys")]
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name} ({self.prefix}***)"
|
197
api/permissions.py
Normal file
197
api/permissions.py
Normal file
@ -0,0 +1,197 @@
|
||||
"""Permission classes to be used within ninja-extra controllers.
|
||||
|
||||
Some permissions are global (like `IsInGroup` or `IsRoot`),
|
||||
and some others are per-object (like `CanView` or `CanEdit`).
|
||||
|
||||
Example:
|
||||
```python
|
||||
# restrict all the routes of this controller
|
||||
# to subscribed users
|
||||
@api_controller("/foo", permissions=[IsSubscriber])
|
||||
class FooController(ControllerBase):
|
||||
@route.get("/bar")
|
||||
def bar_get(self):
|
||||
# This route inherits the permissions of the controller
|
||||
# ...
|
||||
|
||||
@route.bar("/bar/{bar_id}", permissions=[CanView])
|
||||
def bar_get_one(self, bar_id: int):
|
||||
# per-object permission resolution happens
|
||||
# when calling either the `get_object_or_exception`
|
||||
# or `get_object_or_none` method.
|
||||
bar = self.get_object_or_exception(Counter, pk=bar_id)
|
||||
|
||||
# you can also call the `check_object_permission` manually
|
||||
other_bar = Counter.objects.first()
|
||||
self.check_object_permissions(other_bar)
|
||||
|
||||
# ...
|
||||
|
||||
# This route is restricted to counter admins and root users
|
||||
@route.delete(
|
||||
"/bar/{bar_id}",
|
||||
permissions=[IsRoot | IsInGroup(settings.SITH_GROUP_COUNTER_ADMIN_ID)
|
||||
]
|
||||
def bar_delete(self, bar_id: int):
|
||||
# ...
|
||||
```
|
||||
"""
|
||||
|
||||
import operator
|
||||
from functools import reduce
|
||||
from typing import Any, Callable
|
||||
|
||||
from django.contrib.auth.models import Permission
|
||||
from django.http import HttpRequest
|
||||
from ninja_extra import ControllerBase
|
||||
from ninja_extra.permissions import BasePermission
|
||||
|
||||
from counter.models import Counter
|
||||
|
||||
|
||||
class IsInGroup(BasePermission):
|
||||
"""Check that the user is in the group whose primary key is given."""
|
||||
|
||||
def __init__(self, group_pk: int):
|
||||
self._group_pk = group_pk
|
||||
|
||||
def has_permission(self, request: HttpRequest, controller: ControllerBase) -> bool:
|
||||
return request.user.is_in_group(pk=self._group_pk)
|
||||
|
||||
|
||||
class HasPerm(BasePermission):
|
||||
"""Check that the user has the required perm.
|
||||
|
||||
If multiple perms are given, a comparer function can also be passed,
|
||||
in order to change the way perms are checked.
|
||||
|
||||
Example:
|
||||
```python
|
||||
@api_controller("/foo")
|
||||
class FooController(ControllerBase):
|
||||
# this route will require both permissions
|
||||
@route.put("/foo", permissions=[HasPerm(["foo.change_foo", "foo.add_foo"])]
|
||||
def foo(self): ...
|
||||
|
||||
# This route will require at least one of the perm,
|
||||
# but it's not mandatory to have all of them
|
||||
@route.put(
|
||||
"/bar",
|
||||
permissions=[HasPerm(["foo.change_bar", "foo.add_bar"], op=operator.or_)],
|
||||
)
|
||||
def bar(self): ...
|
||||
```
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
perms: str | Permission | list[str | Permission],
|
||||
op: Callable[[bool, bool], bool] = operator.and_,
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
perms: a permission or a list of permissions the user must have
|
||||
op: An operator to combine multiple permissions (in most cases,
|
||||
it will be either `operator.and_` or `operator.or_`)
|
||||
"""
|
||||
super().__init__()
|
||||
if not isinstance(perms, (list, tuple, set)):
|
||||
perms = [perms]
|
||||
self._operator = op
|
||||
self._perms = perms
|
||||
|
||||
def has_permission(self, request: HttpRequest, controller: ControllerBase) -> bool:
|
||||
# if the request has the `auth` property,
|
||||
# it means that the user has been explicitly authenticated
|
||||
# using a django-ninja authentication backend
|
||||
# (whether it is SessionAuth or ApiKeyAuth).
|
||||
# If not, this authentication has not been done, but the user may
|
||||
# still be implicitly authenticated through AuthenticationMiddleware
|
||||
user = request.auth if hasattr(request, "auth") else request.user
|
||||
# `user` may either be a `core.User` or an `api.ApiClient` ;
|
||||
# they are not the same model, but they both implement the `has_perm` method
|
||||
return reduce(self._operator, (user.has_perm(p) for p in self._perms))
|
||||
|
||||
|
||||
class IsRoot(BasePermission):
|
||||
"""Check that the user is root."""
|
||||
|
||||
def has_permission(self, request: HttpRequest, controller: ControllerBase) -> bool:
|
||||
return request.user.is_root
|
||||
|
||||
|
||||
class IsSubscriber(BasePermission):
|
||||
"""Check that the user is currently subscribed."""
|
||||
|
||||
def has_permission(self, request: HttpRequest, controller: ControllerBase) -> bool:
|
||||
return request.user.is_subscribed
|
||||
|
||||
|
||||
class IsOldSubscriber(BasePermission):
|
||||
"""Check that the user has at least one subscription in its history."""
|
||||
|
||||
def has_permission(self, request: HttpRequest, controller: ControllerBase) -> bool:
|
||||
return request.user.was_subscribed
|
||||
|
||||
|
||||
class CanView(BasePermission):
|
||||
"""Check that this user has the permission to view the object of this route.
|
||||
|
||||
Wrap the `user.can_view(obj)` method.
|
||||
To see an example, look at the example in the module docstring.
|
||||
"""
|
||||
|
||||
def has_permission(self, request: HttpRequest, controller: ControllerBase) -> bool:
|
||||
return True
|
||||
|
||||
def has_object_permission(
|
||||
self, request: HttpRequest, controller: ControllerBase, obj: Any
|
||||
) -> bool:
|
||||
return request.user.can_view(obj)
|
||||
|
||||
|
||||
class CanEdit(BasePermission):
|
||||
"""Check that this user has the permission to edit the object of this route.
|
||||
|
||||
Wrap the `user.can_edit(obj)` method.
|
||||
To see an example, look at the example in the module docstring.
|
||||
"""
|
||||
|
||||
def has_permission(self, request: HttpRequest, controller: ControllerBase) -> bool:
|
||||
return True
|
||||
|
||||
def has_object_permission(
|
||||
self, request: HttpRequest, controller: ControllerBase, obj: Any
|
||||
) -> bool:
|
||||
return request.user.can_edit(obj)
|
||||
|
||||
|
||||
class IsOwner(BasePermission):
|
||||
"""Check that this user owns the object of this route.
|
||||
|
||||
Wrap the `user.is_owner(obj)` method.
|
||||
To see an example, look at the example in the module docstring.
|
||||
"""
|
||||
|
||||
def has_permission(self, request: HttpRequest, controller: ControllerBase) -> bool:
|
||||
return True
|
||||
|
||||
def has_object_permission(
|
||||
self, request: HttpRequest, controller: ControllerBase, obj: Any
|
||||
) -> bool:
|
||||
return request.user.is_owner(obj)
|
||||
|
||||
|
||||
class IsLoggedInCounter(BasePermission):
|
||||
"""Check that a user is logged in a counter."""
|
||||
|
||||
def has_permission(self, request: HttpRequest, controller: ControllerBase) -> bool:
|
||||
if "/counter/" not in request.META.get("HTTP_REFERER", ""):
|
||||
return False
|
||||
token = request.session.get("counter_token")
|
||||
if not token:
|
||||
return False
|
||||
return Counter.objects.filter(token=token).exists()
|
||||
|
||||
|
||||
CanAccessLookup = IsLoggedInCounter | HasPerm("core.access_lookup")
|
0
api/tests/__init__.py
Normal file
0
api/tests/__init__.py
Normal file
29
api/tests/test_api_key.py
Normal file
29
api/tests/test_api_key.py
Normal file
@ -0,0 +1,29 @@
|
||||
import pytest
|
||||
from django.test import RequestFactory
|
||||
from model_bakery import baker
|
||||
|
||||
from api.auth import ApiKeyAuth
|
||||
from api.hashers import generate_key
|
||||
from api.models import ApiClient, ApiKey
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_api_key_auth():
|
||||
key, hashed = generate_key()
|
||||
client = baker.make(ApiClient)
|
||||
baker.make(ApiKey, client=client, hashed_key=hashed)
|
||||
auth = ApiKeyAuth()
|
||||
|
||||
assert auth.authenticate(RequestFactory().get(""), key) == client
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize(
|
||||
("key", "hashed"), [(generate_key()[0], generate_key()[1]), (generate_key()[0], "")]
|
||||
)
|
||||
def test_api_key_auth_invalid(key, hashed):
|
||||
client = baker.make(ApiClient)
|
||||
baker.make(ApiKey, client=client, hashed_key=hashed)
|
||||
auth = ApiKeyAuth()
|
||||
|
||||
assert auth.authenticate(RequestFactory().get(""), key) is None
|
Reference in New Issue
Block a user