mirror of
https://github.com/ae-utbm/sith.git
synced 2025-07-10 03:49:24 +00:00
Add antispam app
* update_spam_database command to update suspicious domains from an external provider * Add a AntiSpamEmailField that deny emails from suspicious domains * Update documentation
This commit is contained in:
0
antispam/management/commands/__init__.py
Normal file
0
antispam/management/commands/__init__.py
Normal file
69
antispam/management/commands/update_spam_database.py
Normal file
69
antispam/management/commands/update_spam_database.py
Normal file
@ -0,0 +1,69 @@
|
||||
import requests
|
||||
from django.conf import settings
|
||||
from django.core.management import BaseCommand
|
||||
from django.db.models import Max
|
||||
from django.utils import timezone
|
||||
|
||||
from antispam.models import ToxicDomain
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
"""Update blocked ips/mails database"""
|
||||
|
||||
help = "Update blocked ips/mails database"
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument(
|
||||
"--force", action="store_true", help="Force re-creation even if up to date"
|
||||
)
|
||||
|
||||
def _should_update(self, *, force: bool = False) -> bool:
|
||||
if force:
|
||||
return True
|
||||
oldest = ToxicDomain.objects.filter(is_externally_managed=True).aggregate(
|
||||
res=Max("created")
|
||||
)["res"]
|
||||
return not (oldest and timezone.now() < (oldest + timezone.timedelta(days=1)))
|
||||
|
||||
def _download_domains(self, providers: list[str]) -> set[str]:
|
||||
domains = set()
|
||||
for provider in providers:
|
||||
res = requests.get(provider)
|
||||
if not res.ok:
|
||||
self.stderr.write(
|
||||
f"Source {provider} responded with code {res.status_code}"
|
||||
)
|
||||
continue
|
||||
domains |= set(res.content.decode().splitlines())
|
||||
return domains
|
||||
|
||||
def _update_domains(self, domains: set[str]):
|
||||
# Cleanup database
|
||||
ToxicDomain.objects.filter(is_externally_managed=True).delete()
|
||||
|
||||
# Create database
|
||||
ToxicDomain.objects.bulk_create(
|
||||
[
|
||||
ToxicDomain(domain=domain, is_externally_managed=True)
|
||||
for domain in domains
|
||||
],
|
||||
ignore_conflicts=True,
|
||||
)
|
||||
self.stdout.write("Domain database updated")
|
||||
|
||||
def handle(self, *args, **options):
|
||||
if not self._should_update(force=options["force"]):
|
||||
self.stdout.write("Domain database is up to date")
|
||||
return
|
||||
self.stdout.write("Updating domain database")
|
||||
|
||||
domains = self._download_domains(settings.TOXIC_DOMAINS_PROVIDERS)
|
||||
|
||||
if not domains:
|
||||
self.stderr.write(
|
||||
"No domains could be fetched from settings.TOXIC_DOMAINS_PROVIDERS. "
|
||||
"Please, have a look at your settings."
|
||||
)
|
||||
return
|
||||
|
||||
self._update_domains(domains)
|
Reference in New Issue
Block a user