mirror of
https://github.com/ae-utbm/sith.git
synced 2024-11-24 10:04:34 +00:00
70 lines
2.2 KiB
Python
70 lines
2.2 KiB
Python
|
import requests
|
||
|
from django.conf import settings
|
||
|
from django.core.management import BaseCommand
|
||
|
from django.db.models import Max
|
||
|
from django.utils import timezone
|
||
|
|
||
|
from antispam.models import ToxicDomain
|
||
|
|
||
|
|
||
|
class Command(BaseCommand):
|
||
|
"""Update blocked ips/mails database"""
|
||
|
|
||
|
help = "Update blocked ips/mails database"
|
||
|
|
||
|
def add_arguments(self, parser):
|
||
|
parser.add_argument(
|
||
|
"--force", action="store_true", help="Force re-creation even if up to date"
|
||
|
)
|
||
|
|
||
|
def _should_update(self, *, force: bool = False) -> bool:
|
||
|
if force:
|
||
|
return True
|
||
|
oldest = ToxicDomain.objects.filter(is_externally_managed=True).aggregate(
|
||
|
res=Max("created")
|
||
|
)["res"]
|
||
|
return not (oldest and timezone.now() < (oldest + timezone.timedelta(days=1)))
|
||
|
|
||
|
def _download_domains(self, providers: list[str]) -> set[str]:
|
||
|
domains = set()
|
||
|
for provider in providers:
|
||
|
res = requests.get(provider)
|
||
|
if not res.ok:
|
||
|
self.stderr.write(
|
||
|
f"Source {provider} responded with code {res.status_code}"
|
||
|
)
|
||
|
continue
|
||
|
domains |= set(res.content.decode().splitlines())
|
||
|
return domains
|
||
|
|
||
|
def _update_domains(self, domains: set[str]):
|
||
|
# Cleanup database
|
||
|
ToxicDomain.objects.filter(is_externally_managed=True).delete()
|
||
|
|
||
|
# Create database
|
||
|
ToxicDomain.objects.bulk_create(
|
||
|
[
|
||
|
ToxicDomain(domain=domain, is_externally_managed=True)
|
||
|
for domain in domains
|
||
|
],
|
||
|
ignore_conflicts=True,
|
||
|
)
|
||
|
self.stdout.write("Domain database updated")
|
||
|
|
||
|
def handle(self, *args, **options):
|
||
|
if not self._should_update(force=options["force"]):
|
||
|
self.stdout.write("Domain database is up to date")
|
||
|
return
|
||
|
self.stdout.write("Updating domain database")
|
||
|
|
||
|
domains = self._download_domains(settings.TOXIC_DOMAINS_PROVIDERS)
|
||
|
|
||
|
if not domains:
|
||
|
self.stderr.write(
|
||
|
"No domains could be fetched from settings.TOXIC_DOMAINS_PROVIDERS. "
|
||
|
"Please, have a look at your settings."
|
||
|
)
|
||
|
return
|
||
|
|
||
|
self._update_domains(domains)
|