# -*- coding: utf-8 -*- # hub/ticket/state/document_repository.py """Файловый репозиторий документов Ticket.""" from __future__ import annotations import json from datetime import datetime from pathlib import Path from typing import Any, Iterable, Mapping from error_logger import log_exception from domain import TicketDocumentSnapshot, TicketTaskSnapshot from .paths import ACTS_DIR, REPORTS_DIR, ensure_storage_directories REPORT_DOCUMENT_TYPES = {"diagnostic", "repair"} class TicketDocumentRepository: """Каноническое файловое хранилище отчётов и актов Ticket.""" def __init__( self, reports_dir: Path = REPORTS_DIR, acts_dir: Path = ACTS_DIR, ): self._reports_dir = reports_dir self._acts_dir = acts_dir ensure_storage_directories() self._reports_dir.mkdir(parents=True, exist_ok=True) self._acts_dir.mkdir(parents=True, exist_ok=True) def save_document( self, task: TicketTaskSnapshot, document_type: str, title: str, summary: str, content: str, payload: Mapping[str, str], ) -> TicketDocumentSnapshot | None: """Сохранить документ в каноническое JSON-хранилище.""" created_at = datetime.now() document_id = self._build_document_id(task, document_type) storage_path = self._directory_for_type(document_type) / f"{document_id}.json" snapshot = TicketDocumentSnapshot( document_id=document_id, task_id=task.task_id, document_type=document_type, title=title, created_at=created_at, location=task.location, specialist_name=task.assigned_specialist, summary=summary, content=content, storage_path=str(storage_path), payload=dict(payload), ) payload_data = self._serialize_snapshot(snapshot) try: with open(storage_path, "w", encoding="utf-8") as handle: json.dump( payload_data, handle, indent=2, ensure_ascii=False, ) except Exception as exc: log_exception(__name__, "TicketDocumentRepository.save_document", exc) return None return snapshot def list_documents( self, document_type: str | None = None, ) -> list[TicketDocumentSnapshot]: """Вернуть список документов с optional-фильтром по типу.""" snapshots: list[TicketDocumentSnapshot] = [] for directory in self._directories_for_filter(document_type): for path in directory.glob("*.json"): snapshot = self._load_snapshot(path) if snapshot is None: continue if document_type == "report" and snapshot.document_type not in REPORT_DOCUMENT_TYPES: continue if document_type not in {None, "report"} and snapshot.document_type != document_type: continue snapshots.append(snapshot) snapshots.sort(key=lambda item: item.created_at, reverse=True) return snapshots def _load_snapshot(self, storage_path: Path) -> TicketDocumentSnapshot | None: try: with open(storage_path, "r", encoding="utf-8") as handle: payload = json.load(handle) except Exception as exc: log_exception(__name__, "TicketDocumentRepository._load_snapshot", exc) return None if not isinstance(payload, dict): return None return self._deserialize_snapshot(payload, storage_path) def _deserialize_snapshot( self, payload: Mapping[str, Any], storage_path: Path, ) -> TicketDocumentSnapshot | None: raw_created_at = payload.get("created_at") if not isinstance(raw_created_at, str): return None try: created_at = datetime.fromisoformat(raw_created_at) except ValueError: return None raw_document_id = payload.get("document_id") raw_task_id = payload.get("task_id") raw_document_type = payload.get("document_type") raw_title = payload.get("title") if not all( isinstance(value, str) for value in (raw_document_id, raw_document_type, raw_title) ): return None try: task_id = int(raw_task_id) except (TypeError, ValueError): return None raw_payload = payload.get("payload") snapshot_payload: dict[str, str] = {} if isinstance(raw_payload, dict): snapshot_payload = { str(key): str(value) for key, value in raw_payload.items() if value is not None } return TicketDocumentSnapshot( document_id=raw_document_id, task_id=task_id, document_type=raw_document_type, title=raw_title, created_at=created_at, location=str(payload.get("location", "")), specialist_name=str(payload.get("specialist_name", "")), summary=str(payload.get("summary", "")), content=str(payload.get("content", "")), storage_path=str(storage_path), payload=snapshot_payload, ) def _serialize_snapshot(self, snapshot: TicketDocumentSnapshot) -> dict[str, Any]: return { "document_id": snapshot.document_id, "task_id": snapshot.task_id, "document_type": snapshot.document_type, "title": snapshot.title, "created_at": snapshot.created_at.isoformat(), "location": snapshot.location, "specialist_name": snapshot.specialist_name, "summary": snapshot.summary, "content": snapshot.content, "storage_path": snapshot.storage_path, "payload": dict(snapshot.payload), } def _directories_for_filter(self, document_type: str | None) -> Iterable[Path]: if document_type == "acceptance": return (self._acts_dir,) if document_type in REPORT_DOCUMENT_TYPES or document_type in {None, "report"}: return (self._reports_dir, self._acts_dir) if document_type is None else (self._reports_dir,) return (self._reports_dir, self._acts_dir) def _directory_for_type(self, document_type: str) -> Path: if document_type == "acceptance": return self._acts_dir return self._reports_dir @staticmethod def _build_document_id(task: TicketTaskSnapshot, document_type: str) -> str: cycle_token = ( task.created_at.strftime("%Y%m%d_%H%M%S") if task.created_at is not None else datetime.now().strftime("%Y%m%d_%H%M%S") ) return f"task_{task.task_id}_{cycle_token}_{document_type}"