diff --git a/config/settings/base.py b/config/settings/base.py
index 68b9724..6fc2f4c 100644
--- a/config/settings/base.py
+++ b/config/settings/base.py
@@ -247,7 +247,7 @@
# This can be omitted to allow all files, but note that this may present a security risk
# if untrusted users are allowed to upload files -
# see https://docs.wagtail.org/en/stable/advanced_topics/deploying.html#user-uploaded-files
-WAGTAILDOCS_EXTENSIONS = ['csv', 'docx', 'key', 'odt', 'pdf', 'pptx', 'rtf', 'txt', 'xlsx', 'zip']
+WAGTAILDOCS_EXTENSIONS = ['csv', 'docx', 'json', 'key', 'odt', 'pdf', 'pptx', 'rtf', 'txt', 'xlsx', 'zip']
# https://docs.djangoproject.com/en/dev/ref/settings/#auth-user-model
AUTH_USER_MODEL = 'users.CustomUser'
diff --git a/config/settings/test.py b/config/settings/test.py
new file mode 100644
index 0000000..5eb86ad
--- /dev/null
+++ b/config/settings/test.py
@@ -0,0 +1,30 @@
+import tempfile
+
+from .base import *
+
+DEBUG = False
+TEMPLATE_DEBUG = False
+SECRET_KEY = "test-secret-key-not-for-production"
+ALLOWED_HOSTS = ["localhost", "127.0.0.1", "testserver"]
+WAGTAILADMIN_BASE_URL = "http://testserver"
+
+PASSWORD_HASHERS = [
+ "django.contrib.auth.hashers.MD5PasswordHasher",
+]
+
+EMAIL_BACKEND = "django.core.mail.backends.locmem.EmailBackend"
+
+CACHES = {
+ "default": {
+ "BACKEND": "django.core.cache.backends.locmem.LocMemCache",
+ }
+}
+
+CELERY_TASK_ALWAYS_EAGER = True
+CELERY_TASK_EAGER_PROPAGATES = True
+
+COMPRESS_ENABLED = False
+
+MEDIA_ROOT = tempfile.mkdtemp(prefix="markapi_test_media_")
+
+LLAMA_ENABLED = False
diff --git a/xml_manager/apps.py b/xml_manager/apps.py
index 1292960..ff4d0c4 100644
--- a/xml_manager/apps.py
+++ b/xml_manager/apps.py
@@ -2,5 +2,8 @@
class XmlManagerConfig(AppConfig):
- default_auto_field = 'django.db.models.BigAutoField'
- name = 'xml_manager'
+ default_auto_field = "django.db.models.BigAutoField"
+ name = "xml_manager"
+
+ def ready(self):
+ import xml_manager.signals # noqa: F401
diff --git a/xml_manager/forms.py b/xml_manager/forms.py
new file mode 100644
index 0000000..3ab1b91
--- /dev/null
+++ b/xml_manager/forms.py
@@ -0,0 +1,59 @@
+import os
+
+from django import forms
+from django.core.exceptions import ValidationError
+from django.utils.translation import gettext_lazy as _
+from wagtail.admin.forms import WagtailAdminModelForm
+
+from xml_manager.models import SPSPackageValidation
+
+
+class SPSPackageValidationForm(WagtailAdminModelForm):
+ zip_upload = forms.FileField(
+ label=_("SPS package (.zip)"),
+ required=False,
+ help_text=_(
+ "On edit, leave empty to revalidate the current package without "
+ "replacing the file."
+ ),
+ )
+
+ class Meta:
+ model = SPSPackageValidation
+ fields = []
+
+ def clean_zip_upload(self):
+ zip_upload = self.cleaned_data.get("zip_upload")
+ if not zip_upload:
+ return zip_upload
+ if not zip_upload.name.lower().endswith(".zip"):
+ raise ValidationError(_("Only .zip files are allowed."))
+ if zip_upload.size == 0:
+ raise ValidationError(_("The file is empty."))
+ return zip_upload
+
+ def clean(self):
+ cleaned = super().clean()
+ if not self.instance.pk and not cleaned.get("zip_upload"):
+ raise ValidationError(_("A .zip file is required."))
+ return cleaned
+
+ @staticmethod
+ def save_wagtail_document(zip_upload):
+ from wagtail.documents.models import Document
+
+ document = Document(title=os.path.basename(zip_upload.name))
+ document.file.save(zip_upload.name, zip_upload, save=True)
+ return document
+
+ @staticmethod
+ def save_wagtail_document_from_path(file_path, title=None):
+ from django.core.files import File
+ from wagtail.documents.models import Document
+
+ basename = os.path.basename(file_path)
+ document_title = title or basename
+ with open(file_path, "rb") as fp:
+ document = Document(title=document_title)
+ document.file.save(basename, File(fp), save=True)
+ return document
diff --git a/xml_manager/migrations/0004_spspackagevalidation.py b/xml_manager/migrations/0004_spspackagevalidation.py
new file mode 100644
index 0000000..6181158
--- /dev/null
+++ b/xml_manager/migrations/0004_spspackagevalidation.py
@@ -0,0 +1,105 @@
+import django.db.models.deletion
+from django.conf import settings
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ("wagtaildocs", "0014_alter_document_file_size"),
+ migrations.swappable_dependency(settings.AUTH_USER_MODEL),
+ ("xml_manager", "0003_xmldocumentpdf_docx_file"),
+ ]
+
+ operations = [
+ migrations.CreateModel(
+ name="SPSPackageValidation",
+ fields=[
+ (
+ "id",
+ models.BigAutoField(
+ auto_created=True,
+ primary_key=True,
+ serialize=False,
+ verbose_name="ID",
+ ),
+ ),
+ (
+ "zip_size_bytes",
+ models.PositiveBigIntegerField(
+ default=0, verbose_name="ZIP size (bytes)"
+ ),
+ ),
+ (
+ "validated_at",
+ models.DateTimeField(
+ blank=True, null=True, verbose_name="Validated at"
+ ),
+ ),
+ (
+ "status",
+ models.CharField(
+ choices=[
+ ("pending", "Pending"),
+ ("running", "Running"),
+ ("done", "Done"),
+ ("error", "Error"),
+ ],
+ default="pending",
+ max_length=16,
+ verbose_name="Status",
+ ),
+ ),
+ (
+ "error_message",
+ models.TextField(blank=True, verbose_name="Error message"),
+ ),
+ (
+ "exceptions_document",
+ models.ForeignKey(
+ blank=True,
+ null=True,
+ on_delete=django.db.models.deletion.SET_NULL,
+ related_name="+",
+ to="wagtaildocs.document",
+ verbose_name="Exceptions file",
+ ),
+ ),
+ (
+ "package_document",
+ models.OneToOneField(
+ on_delete=django.db.models.deletion.CASCADE,
+ related_name="sps_validation",
+ to="wagtaildocs.document",
+ verbose_name="SPS package document",
+ ),
+ ),
+ (
+ "validated_by",
+ models.ForeignKey(
+ blank=True,
+ null=True,
+ on_delete=django.db.models.deletion.SET_NULL,
+ related_name="sps_package_validations",
+ to=settings.AUTH_USER_MODEL,
+ verbose_name="Validated by",
+ ),
+ ),
+ (
+ "validation_document",
+ models.ForeignKey(
+ blank=True,
+ null=True,
+ on_delete=django.db.models.deletion.SET_NULL,
+ related_name="+",
+ to="wagtaildocs.document",
+ verbose_name="Validation file",
+ ),
+ ),
+ ],
+ options={
+ "verbose_name": "SPS package validation",
+ "verbose_name_plural": "SPS package validations",
+ },
+ ),
+ ]
diff --git a/xml_manager/models.py b/xml_manager/models.py
index 4a1ad7b..6d52748 100644
--- a/xml_manager/models.py
+++ b/xml_manager/models.py
@@ -1,30 +1,38 @@
+from django.conf import settings
from django.db import models
from django.utils.translation import gettext_lazy as _
from wagtail.admin.panels import FieldPanel
+class SPSPackageValidationStatus(models.TextChoices):
+ PENDING = "pending", _("Pending")
+ RUNNING = "running", _("Running")
+ DONE = "done", _("Done")
+ ERROR = "error", _("Error")
+
+
class XMLDocument(models.Model):
xml_file = models.FileField(
- upload_to='xml_manager/xml/',
+ upload_to="xml_manager/xml/",
verbose_name=_("XML File"),
- help_text=_("Upload an XML file for processing.")
+ help_text=_("Upload an XML file for processing."),
)
validation_file = models.FileField(
- upload_to='xml_manager/validation/',
+ upload_to="xml_manager/validation/",
blank=True,
null=True,
- verbose_name=_("Validation File")
+ verbose_name=_("Validation File"),
)
exceptions_file = models.FileField(
- upload_to='xml_manager/validation/',
+ upload_to="xml_manager/validation/",
blank=True,
null=True,
- verbose_name=_("Exceptions File")
+ verbose_name=_("Exceptions File"),
)
uploaded_at = models.DateTimeField(
auto_now_add=True,
verbose_name=_("Uploaded At"),
- help_text=_("The date and time when the file was uploaded.")
+ help_text=_("The date and time when the file was uploaded."),
)
panels = [
@@ -33,36 +41,40 @@ class XMLDocument(models.Model):
def __str__(self):
return f"{self.xml_file.name}"
-
+
class Meta:
verbose_name = _("XML Document")
verbose_name_plural = _("XML Documents")
class XMLDocumentPDF(models.Model):
- xml_document = models.ForeignKey(XMLDocument, on_delete=models.CASCADE, related_name="pdfs", verbose_name=_("XML Document"))
+ xml_document = models.ForeignKey(
+ XMLDocument,
+ on_delete=models.CASCADE,
+ related_name="pdfs",
+ verbose_name=_("XML Document"),
+ )
pdf_file = models.FileField(
- upload_to='xml_manager/pdf/',
- verbose_name=_("PDF File")
+ upload_to="xml_manager/pdf/", verbose_name=_("PDF File")
)
docx_file = models.FileField(
- upload_to='xml_manager/docx/',
+ upload_to="xml_manager/docx/",
verbose_name=_("DOCX File"),
null=True,
blank=True,
- help_text=_('Intermediate DOCX file generated during PDF creation')
+ help_text=_("Intermediate DOCX file generated during PDF creation"),
)
language = models.CharField(
max_length=32,
default="pt",
verbose_name=_("Language"),
- help_text=_("Language code or name")
+ help_text=_("Language code or name"),
)
uploaded_at = models.DateTimeField(auto_now_add=True, verbose_name=_("Uploaded At"))
def __str__(self):
return f"PDF for {self.xml_document.xml_file.name} ({self.language})"
-
+
class Meta:
verbose_name = _("XML Document PDF")
verbose_name_plural = _("XML Document PDFs")
@@ -70,31 +82,33 @@ class Meta:
@classmethod
def create(cls, xml_document, pdf_file, language="pt"):
pdf_instance = cls(
- xml_document=xml_document,
- pdf_file=pdf_file,
- language=language
+ xml_document=xml_document, pdf_file=pdf_file, language=language
)
pdf_instance.save()
return pdf_instance
class XMLDocumentHTML(models.Model):
- xml_document = models.ForeignKey(XMLDocument, on_delete=models.CASCADE, related_name="htmls", verbose_name=_("XML Document"))
+ xml_document = models.ForeignKey(
+ XMLDocument,
+ on_delete=models.CASCADE,
+ related_name="htmls",
+ verbose_name=_("XML Document"),
+ )
html_file = models.FileField(
- upload_to='xml_manager/html/',
- verbose_name=_("HTML File")
+ upload_to="xml_manager/html/", verbose_name=_("HTML File")
)
language = models.CharField(
max_length=32,
default="pt",
verbose_name=_("Language"),
- help_text=_("Language code or name")
+ help_text=_("Language code or name"),
)
uploaded_at = models.DateTimeField(auto_now_add=True, verbose_name=_("Uploaded At"))
def __str__(self):
return f"HTML for {self.xml_document.xml_file.name} ({self.language})"
-
+
class Meta:
verbose_name = _("XML Document HTML")
verbose_name_plural = _("XML Document HTMLs")
@@ -102,9 +116,77 @@ class Meta:
@classmethod
def create(cls, xml_document, html_file, language="pt"):
html_instance = cls(
- xml_document=xml_document,
- html_file=html_file,
- language=language
+ xml_document=xml_document, html_file=html_file, language=language
)
html_instance.save()
return html_instance
+
+
+class SPSPackageValidation(models.Model):
+ package_document = models.OneToOneField(
+ "wagtaildocs.Document",
+ on_delete=models.CASCADE,
+ related_name="sps_validation",
+ verbose_name=_("SPS package document"),
+ )
+ validation_document = models.ForeignKey(
+ "wagtaildocs.Document",
+ on_delete=models.SET_NULL,
+ null=True,
+ blank=True,
+ related_name="+",
+ verbose_name=_("Validation file"),
+ )
+ exceptions_document = models.ForeignKey(
+ "wagtaildocs.Document",
+ on_delete=models.SET_NULL,
+ null=True,
+ blank=True,
+ related_name="+",
+ verbose_name=_("Exceptions file"),
+ )
+ zip_size_bytes = models.PositiveBigIntegerField(
+ verbose_name=_("ZIP size (bytes)"),
+ default=0,
+ )
+ validated_at = models.DateTimeField(
+ blank=True,
+ null=True,
+ verbose_name=_("Validated at"),
+ )
+ validated_by = models.ForeignKey(
+ settings.AUTH_USER_MODEL,
+ on_delete=models.SET_NULL,
+ null=True,
+ blank=True,
+ related_name="sps_package_validations",
+ verbose_name=_("Validated by"),
+ )
+ status = models.CharField(
+ max_length=16,
+ choices=SPSPackageValidationStatus.choices,
+ default=SPSPackageValidationStatus.PENDING,
+ verbose_name=_("Status"),
+ )
+ error_message = models.TextField(
+ blank=True,
+ verbose_name=_("Error message"),
+ )
+
+ panels = [
+ FieldPanel("package_document"),
+ FieldPanel("status"),
+ FieldPanel("zip_size_bytes"),
+ FieldPanel("validated_by"),
+ FieldPanel("validated_at"),
+ FieldPanel("validation_document"),
+ FieldPanel("exceptions_document"),
+ FieldPanel("error_message"),
+ ]
+
+ def __str__(self):
+ return self.package_document.title
+
+ class Meta:
+ verbose_name = _("SPS package validation")
+ verbose_name_plural = _("SPS package validations")
diff --git a/xml_manager/signals.py b/xml_manager/signals.py
new file mode 100644
index 0000000..e95baa6
--- /dev/null
+++ b/xml_manager/signals.py
@@ -0,0 +1,19 @@
+from django.db.models.signals import post_delete
+from django.dispatch import receiver
+from wagtail.documents.models import Document
+
+from xml_manager.models import SPSPackageValidation
+
+
+@receiver(post_delete, sender=SPSPackageValidation)
+def delete_linked_wagtail_documents(sender, instance, **kwargs):
+ doc_ids = []
+ for doc_id in (
+ instance.package_document_id,
+ instance.validation_document_id,
+ instance.exceptions_document_id,
+ ):
+ if doc_id and doc_id not in doc_ids:
+ doc_ids.append(doc_id)
+ if doc_ids:
+ Document.objects.filter(pk__in=doc_ids).delete()
diff --git a/xml_manager/tests.py b/xml_manager/tests.py
deleted file mode 100644
index 7ce503c..0000000
--- a/xml_manager/tests.py
+++ /dev/null
@@ -1,3 +0,0 @@
-from django.test import TestCase
-
-# Create your tests here.
diff --git a/xml_manager/tests/__init__.py b/xml_manager/tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/xml_manager/tests/test_sps_package_validation.py b/xml_manager/tests/test_sps_package_validation.py
new file mode 100644
index 0000000..ab560d2
--- /dev/null
+++ b/xml_manager/tests/test_sps_package_validation.py
@@ -0,0 +1,187 @@
+import io
+import os
+import zipfile
+from unittest.mock import patch
+
+from django.contrib.auth import get_user_model
+from django.contrib.messages.storage.fallback import FallbackStorage
+from django.core.files.uploadedfile import SimpleUploadedFile
+from django.http import Http404
+from django.test import RequestFactory, TestCase
+from django.urls import reverse
+from django.utils import timezone
+from wagtail.documents.models import Document
+
+from xml_manager.exceptions import SPS_Package_Validation_Error
+from xml_manager.forms import SPSPackageValidationForm
+from xml_manager.models import SPSPackageValidation, SPSPackageValidationStatus
+from xml_manager.views import revalidate_sps_package_pk
+
+User = get_user_model()
+
+
+def make_minimal_sps_zip(xml_name="article.xml", xml_body=b""):
+ buffer = io.BytesIO()
+ with zipfile.ZipFile(buffer, "w", compression=zipfile.ZIP_DEFLATED) as zf:
+ zf.writestr(xml_name, xml_body)
+ buffer.seek(0)
+ return buffer.read()
+
+
+def make_package_document(title="pkg.zip"):
+ upload = SimpleUploadedFile(
+ title,
+ make_minimal_sps_zip(),
+ content_type="application/zip",
+ )
+ document = Document(title=title)
+ document.file.save(title, upload, save=True)
+ return document, upload.size
+
+
+class SPSPackageValidationFormTests(TestCase):
+ def test_create_requires_zip(self):
+ form = SPSPackageValidationForm(data={})
+ self.assertFalse(form.is_valid())
+
+ def test_rejects_non_zip_extension(self):
+ upload = SimpleUploadedFile("pkg.txt", b"data", content_type="text/plain")
+ form = SPSPackageValidationForm(
+ files={"zip_upload": upload},
+ data={},
+ )
+ self.assertFalse(form.is_valid())
+
+ def test_accepts_zip_upload(self):
+ upload = SimpleUploadedFile(
+ "pkg.zip",
+ make_minimal_sps_zip(),
+ content_type="application/zip",
+ )
+ form = SPSPackageValidationForm(
+ files={"zip_upload": upload},
+ data={},
+ )
+ self.assertTrue(form.is_valid())
+
+
+class SPSPackageValidationDeleteTests(TestCase):
+ def test_delete_removes_all_wagtail_documents(self):
+ package, zip_size = make_package_document()
+ csv_doc = Document(title="report.validation.csv")
+ csv_doc.file.save(
+ "report.validation.csv",
+ SimpleUploadedFile("report.validation.csv", b"a,b\n"),
+ save=True,
+ )
+ exc_doc = Document(title="report.exceptions.json")
+ exc_doc.file.save(
+ "report.exceptions.json",
+ SimpleUploadedFile("report.exceptions.json", b"{}"),
+ save=True,
+ )
+ document_ids = {package.pk, csv_doc.pk, exc_doc.pk}
+
+ validation = SPSPackageValidation.objects.create(
+ package_document=package,
+ validation_document=csv_doc,
+ exceptions_document=exc_doc,
+ zip_size_bytes=zip_size,
+ )
+ validation.delete()
+
+ self.assertFalse(SPSPackageValidation.objects.filter(pk=validation.pk).exists())
+ self.assertFalse(Document.objects.filter(pk__in=document_ids).exists())
+
+ def test_delete_package_only_removes_package_document(self):
+ package, zip_size = make_package_document()
+ validation = SPSPackageValidation.objects.create(
+ package_document=package,
+ zip_size_bytes=zip_size,
+ )
+ package_id = package.pk
+
+ validation.delete()
+
+ self.assertFalse(Document.objects.filter(pk=package_id).exists())
+
+ def test_delete_does_not_raise_recursion_error(self):
+ package, zip_size = make_package_document()
+ csv_doc = Document(title="report.validation.csv")
+ csv_doc.file.save(
+ "report.validation.csv",
+ SimpleUploadedFile("report.validation.csv", b"a,b\n"),
+ save=True,
+ )
+ exc_doc = Document(title="report.exceptions.json")
+ exc_doc.file.save(
+ "report.exceptions.json",
+ SimpleUploadedFile("report.exceptions.json", b"{}"),
+ save=True,
+ )
+ validation = SPSPackageValidation.objects.create(
+ package_document=package,
+ validation_document=csv_doc,
+ exceptions_document=exc_doc,
+ zip_size_bytes=zip_size,
+ status=SPSPackageValidationStatus.DONE,
+ )
+ validation.delete()
+
+ def test_queryset_delete_removes_linked_documents(self):
+ package, zip_size = make_package_document()
+ csv_doc = Document(title="report.validation.csv")
+ csv_doc.file.save(
+ "report.validation.csv",
+ SimpleUploadedFile("report.validation.csv", b"a,b\n"),
+ save=True,
+ )
+ validation = SPSPackageValidation.objects.create(
+ package_document=package,
+ validation_document=csv_doc,
+ zip_size_bytes=zip_size,
+ )
+ document_ids = {package.pk, csv_doc.pk}
+ validation_pk = validation.pk
+
+ SPSPackageValidation.objects.filter(pk=validation_pk).delete()
+
+ self.assertFalse(SPSPackageValidation.objects.filter(pk=validation_pk).exists())
+ self.assertFalse(Document.objects.filter(pk__in=document_ids).exists())
+
+
+def _request_for_user(user):
+ request = RequestFactory().get("/admin/xml-manager/revalidate-sps/1/")
+ request.user = user
+ request.session = {}
+ request._messages = FallbackStorage(request)
+ return request
+
+
+class SPSPackageValidationRevalidateViewTests(TestCase):
+ def setUp(self):
+ self.user = User.objects.create_user(
+ username="staff", password="secret", is_staff=True
+ )
+ self.package, self.zip_size = make_package_document()
+ self.validation = SPSPackageValidation.objects.create(
+ package_document=self.package,
+ zip_size_bytes=self.zip_size,
+ status=SPSPackageValidationStatus.DONE,
+ validated_by=self.user,
+ validated_at=timezone.now(),
+ error_message="old error",
+ )
+
+ def test_revalidate_requires_staff(self):
+ request = _request_for_user(
+ User.objects.create_user(username="regular", password="secret")
+ )
+ response = revalidate_sps_package_pk(request, pk=self.validation.pk)
+ self.assertEqual(response.status_code, 302)
+
+ def test_revalidate_returns_404_for_missing_validation(self):
+ request = _request_for_user(self.user)
+ with self.assertRaises(Http404):
+ revalidate_sps_package_pk(request, pk=99999)
+
diff --git a/xml_manager/urls.py b/xml_manager/urls.py
index 226389b..8afb209 100644
--- a/xml_manager/urls.py
+++ b/xml_manager/urls.py
@@ -4,4 +4,9 @@
urlpatterns = [
path("process//", views.process_xml_pk, name="process_xml_pk"),
-]
\ No newline at end of file
+ path(
+ "revalidate-sps//",
+ views.revalidate_sps_package_pk,
+ name="revalidate_sps_package_pk",
+ ),
+]
diff --git a/xml_manager/views.py b/xml_manager/views.py
index 8f67642..9555af6 100644
--- a/xml_manager/views.py
+++ b/xml_manager/views.py
@@ -1,9 +1,11 @@
from django.contrib import messages
from django.contrib.admin.views.decorators import staff_member_required
-from django.shortcuts import redirect, get_object_or_404
+from django.shortcuts import get_object_or_404, redirect
+from django.urls import reverse
+from django.utils.translation import gettext_lazy as _
+
+from .models import SPSPackageValidation, SPSPackageValidationStatus, XMLDocument
-from .tasks import task_process_xml_document
-from .models import XMLDocument
@staff_member_required
@@ -12,3 +14,21 @@ def process_xml_pk(request, pk):
task_process_xml_document.delay(obj.id)
messages.success(request, f"XML {obj.id} enviado para processamento")
return redirect("/admin/snippets/xml_manager/xmldocument/")
+
+
+@staff_member_required
+def revalidate_sps_package_pk(request, pk):
+ validation = get_object_or_404(SPSPackageValidation, pk=pk)
+ validation.status = SPSPackageValidationStatus.PENDING
+ validation.validated_by = request.user
+ validation.validated_at = None
+ validation.error_message = ""
+ validation.save()
+ # TODO: here add the code to validate the package
+
+ messages.success(
+ request,
+ _("Validation started for “%(title)s”.") % {"title": validation},
+ )
+ list_url = reverse(validation.snippet_viewset.get_url_name("list"))
+ return redirect(list_url)
diff --git a/xml_manager/wagtail_hooks.py b/xml_manager/wagtail_hooks.py
index 37c20b9..75f72c6 100644
--- a/xml_manager/wagtail_hooks.py
+++ b/xml_manager/wagtail_hooks.py
@@ -1,24 +1,36 @@
import os
+from django.contrib import messages
+from django.http import HttpResponseRedirect
from django.urls import include, path, reverse
from django.utils.html import format_html
from django.utils.translation import gettext_lazy as _
from wagtail import hooks
from wagtail.admin.ui.tables import Column
+from wagtail.admin.widgets.button import Button
from wagtail.snippets.models import register_snippet
-from wagtail.snippets.views.snippets import SnippetViewSet, SnippetViewSetGroup
+from wagtail.snippets.views.snippets import (
+ CreateView,
+ EditView,
+ SnippetViewSet,
+ SnippetViewSetGroup,
+)
from config.menu import get_menu_order
from . import urls
-from .models import XMLDocument, XMLDocumentHTML, XMLDocumentPDF
+from .forms import SPSPackageValidationForm
+from .models import (
+ SPSPackageValidation,
+ SPSPackageValidationStatus,
+ XMLDocument,
+ XMLDocumentHTML,
+ XMLDocumentPDF,
+)
-class FileNameColumn(Column):
- """
- Coluna que mostra apenas o nome do arquivo de um FileField.
- """
+class FileNameColumn(Column):
def get_value(self, instance):
val = super().get_value(instance)
if not val:
@@ -27,11 +39,6 @@ def get_value(self, instance):
class LinkColumn(Column):
- """
- Column que recebe um FileField (FieldFile) e renderiza um .
- Cria '-' se não houver arquivo.
- """
-
def get_value(self, instance):
val = super().get_value(instance)
if not val:
@@ -44,6 +51,18 @@ def get_value(self, instance):
return format_html('{}', url, name)
+class WagtailDocumentLinkColumn(Column):
+ def get_value(self, instance):
+ doc = super().get_value(instance)
+ if not doc:
+ return "-"
+ try:
+ url = doc.url
+ except Exception:
+ url = doc.file.url
+ return format_html('{}', url, doc.title)
+
+
class ActionColumn(Column):
def get_value(self, instance):
url = reverse("process_xml_pk", args=[instance.pk])
@@ -52,6 +71,65 @@ def get_value(self, instance):
)
+class SPSPackageValidationCreateView(CreateView):
+ def get_form_class(self):
+ return SPSPackageValidationForm
+
+ def form_valid(self, form):
+ zip_upload = form.cleaned_data["zip_upload"]
+ document = SPSPackageValidationForm.save_wagtail_document(zip_upload)
+ validation = SPSPackageValidation(
+ package_document=document,
+ status=SPSPackageValidationStatus.PENDING,
+ zip_size_bytes=zip_upload.size,
+ validated_by=self.request.user,
+ )
+ validation.save()
+ self.object = validation
+ # TODO: here add the code to validate the package
+
+ messages.success(
+ self.request,
+ _("SPS package uploaded. Validation started for “%(title)s”.")
+ % {"title": document.title},
+ )
+ return HttpResponseRedirect(self.get_success_url())
+
+
+class SPSPackageValidationEditView(EditView):
+ def get_form_class(self):
+ return SPSPackageValidationForm
+
+ def form_valid(self, form):
+ validation = form.instance
+ zip_upload = form.cleaned_data.get("zip_upload")
+ if zip_upload:
+ validation.package_document.file.save(
+ zip_upload.name, zip_upload, save=True
+ )
+ validation.package_document.save()
+ validation.zip_size_bytes = zip_upload.size
+ if validation.validation_document:
+ validation.validation_document.delete()
+ validation.validation_document = None
+ if validation.exceptions_document:
+ validation.exceptions_document.delete()
+ validation.exceptions_document = None
+ validation.status = SPSPackageValidationStatus.PENDING
+ validation.validated_by = self.request.user
+ validation.validated_at = None
+ validation.error_message = ""
+ validation.save()
+ self.object = validation
+ # TODO: here add the code to validate the package
+
+ messages.success(
+ self.request,
+ _("Validation started for “%(title)s”.") % {"title": validation},
+ )
+ return HttpResponseRedirect(self.get_success_url())
+
+
class XMLDocumentSnippetViewSet(SnippetViewSet):
model = XMLDocument
verbose_name = _("XML Document")
@@ -113,6 +191,33 @@ class XMLDocumentHTMLSnippetViewSet(SnippetViewSet):
search_fields = ("html_file",)
+class SPSPackageValidationSnippetViewSet(SnippetViewSet):
+ model = SPSPackageValidation
+ add_view_class = SPSPackageValidationCreateView
+ edit_view_class = SPSPackageValidationEditView
+ copy_view_enabled = False
+ verbose_name = _("SPS package validation")
+ verbose_name_plural = _("Validar pacote SPS")
+ icon = "doc-full-inverse"
+ menu_name = "sps_package_validation"
+ menu_label = _("Validar pacote SPS")
+ add_to_admin_menu = False
+
+ list_display = (
+ "__str__",
+ WagtailDocumentLinkColumn("package_document", label=_("SPS package (ZIP)")),
+ "zip_size_bytes",
+ "validated_by",
+ "validated_at",
+ "status",
+ WagtailDocumentLinkColumn("validation_document", label=_("Validation file")),
+ WagtailDocumentLinkColumn("exceptions_document", label=_("Exceptions file")),
+ )
+
+ list_filter = ("status",)
+ search_fields = ("package_document__title",)
+
+
class XMLDocumentSnippetViewSetGroup(SnippetViewSetGroup):
menu_name = "xml_manager"
menu_label = _("Gestão de XML")
@@ -122,6 +227,7 @@ class XMLDocumentSnippetViewSetGroup(SnippetViewSetGroup):
XMLDocumentSnippetViewSet,
XMLDocumentPDFSnippetViewSet,
XMLDocumentHTMLSnippetViewSet,
+ SPSPackageValidationSnippetViewSet,
)
@@ -133,3 +239,15 @@ def register_admin_urls():
return [
path("xml-manager/", include(urls)),
]
+
+
+@hooks.register("register_snippet_listing_buttons")
+def sps_package_validation_listing_buttons(snippet, user, next_url=None):
+ if not isinstance(snippet, SPSPackageValidation):
+ return
+ yield Button(
+ _("Revalidar"),
+ reverse("revalidate_sps_package_pk", args=[snippet.pk]),
+ icon_name="rotate",
+ priority=25,
+ )