Files
Dispatch/Dispatch_V0.1.1/state/document_repository.py
2026-04-29 08:18:54 +04:00

188 lines
7.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
# hub/ticket/state/document_repository.py
"""Файловый репозиторий документов Ticket."""
from __future__ import annotations
import json
from datetime import datetime
from pathlib import Path
from typing import Any, Iterable, Mapping
from error_logger import log_exception
from domain import TicketDocumentSnapshot, TicketTaskSnapshot
from .paths import ACTS_DIR, REPORTS_DIR, ensure_storage_directories
REPORT_DOCUMENT_TYPES = {"diagnostic", "repair"}
class TicketDocumentRepository:
"""Каноническое файловое хранилище отчётов и актов Ticket."""
def __init__(
self,
reports_dir: Path = REPORTS_DIR,
acts_dir: Path = ACTS_DIR,
):
self._reports_dir = reports_dir
self._acts_dir = acts_dir
ensure_storage_directories()
self._reports_dir.mkdir(parents=True, exist_ok=True)
self._acts_dir.mkdir(parents=True, exist_ok=True)
def save_document(
self,
task: TicketTaskSnapshot,
document_type: str,
title: str,
summary: str,
content: str,
payload: Mapping[str, str],
) -> TicketDocumentSnapshot | None:
"""Сохранить документ в каноническое JSON-хранилище."""
created_at = datetime.now()
document_id = self._build_document_id(task, document_type)
storage_path = self._directory_for_type(document_type) / f"{document_id}.json"
snapshot = TicketDocumentSnapshot(
document_id=document_id,
task_id=task.task_id,
document_type=document_type,
title=title,
created_at=created_at,
location=task.location,
specialist_name=task.assigned_specialist,
summary=summary,
content=content,
storage_path=str(storage_path),
payload=dict(payload),
)
payload_data = self._serialize_snapshot(snapshot)
try:
with open(storage_path, "w", encoding="utf-8") as handle:
json.dump(
payload_data,
handle,
indent=2,
ensure_ascii=False,
)
except Exception as exc:
log_exception(__name__, "TicketDocumentRepository.save_document", exc)
return None
return snapshot
def list_documents(
self,
document_type: str | None = None,
) -> list[TicketDocumentSnapshot]:
"""Вернуть список документов с optional-фильтром по типу."""
snapshots: list[TicketDocumentSnapshot] = []
for directory in self._directories_for_filter(document_type):
for path in directory.glob("*.json"):
snapshot = self._load_snapshot(path)
if snapshot is None:
continue
if document_type == "report" and snapshot.document_type not in REPORT_DOCUMENT_TYPES:
continue
if document_type not in {None, "report"} and snapshot.document_type != document_type:
continue
snapshots.append(snapshot)
snapshots.sort(key=lambda item: item.created_at, reverse=True)
return snapshots
def _load_snapshot(self, storage_path: Path) -> TicketDocumentSnapshot | None:
try:
with open(storage_path, "r", encoding="utf-8") as handle:
payload = json.load(handle)
except Exception as exc:
log_exception(__name__, "TicketDocumentRepository._load_snapshot", exc)
return None
if not isinstance(payload, dict):
return None
return self._deserialize_snapshot(payload, storage_path)
def _deserialize_snapshot(
self,
payload: Mapping[str, Any],
storage_path: Path,
) -> TicketDocumentSnapshot | None:
raw_created_at = payload.get("created_at")
if not isinstance(raw_created_at, str):
return None
try:
created_at = datetime.fromisoformat(raw_created_at)
except ValueError:
return None
raw_document_id = payload.get("document_id")
raw_task_id = payload.get("task_id")
raw_document_type = payload.get("document_type")
raw_title = payload.get("title")
if not all(
isinstance(value, str)
for value in (raw_document_id, raw_document_type, raw_title)
):
return None
try:
task_id = int(raw_task_id)
except (TypeError, ValueError):
return None
raw_payload = payload.get("payload")
snapshot_payload: dict[str, str] = {}
if isinstance(raw_payload, dict):
snapshot_payload = {
str(key): str(value)
for key, value in raw_payload.items()
if value is not None
}
return TicketDocumentSnapshot(
document_id=raw_document_id,
task_id=task_id,
document_type=raw_document_type,
title=raw_title,
created_at=created_at,
location=str(payload.get("location", "")),
specialist_name=str(payload.get("specialist_name", "")),
summary=str(payload.get("summary", "")),
content=str(payload.get("content", "")),
storage_path=str(storage_path),
payload=snapshot_payload,
)
def _serialize_snapshot(self, snapshot: TicketDocumentSnapshot) -> dict[str, Any]:
return {
"document_id": snapshot.document_id,
"task_id": snapshot.task_id,
"document_type": snapshot.document_type,
"title": snapshot.title,
"created_at": snapshot.created_at.isoformat(),
"location": snapshot.location,
"specialist_name": snapshot.specialist_name,
"summary": snapshot.summary,
"content": snapshot.content,
"storage_path": snapshot.storage_path,
"payload": dict(snapshot.payload),
}
def _directories_for_filter(self, document_type: str | None) -> Iterable[Path]:
if document_type == "acceptance":
return (self._acts_dir,)
if document_type in REPORT_DOCUMENT_TYPES or document_type in {None, "report"}:
return (self._reports_dir, self._acts_dir) if document_type is None else (self._reports_dir,)
return (self._reports_dir, self._acts_dir)
def _directory_for_type(self, document_type: str) -> Path:
if document_type == "acceptance":
return self._acts_dir
return self._reports_dir
@staticmethod
def _build_document_id(task: TicketTaskSnapshot, document_type: str) -> str:
cycle_token = (
task.created_at.strftime("%Y%m%d_%H%M%S")
if task.created_at is not None
else datetime.now().strftime("%Y%m%d_%H%M%S")
)
return f"task_{task.task_id}_{cycle_token}_{document_type}"