# -*- coding: utf-8 -*- # hub/ticket/application/archive_service.py """Application-сервис архивации задач Ticket.""" from __future__ import annotations from error_logger import log_exception from domain import ArchiveRecordSnapshot, TicketStateService from domain.task import TicketTask from state import ArchiveRecordRepository, TicketStateApi from .document_flow_service import DocumentFlowService class ArchiveService: """Команда архивации поверх доменного сервиса и state API.""" def __init__( self, state: TicketStateApi, state_service: TicketStateService, archive_repository: ArchiveRecordRepository | None = None, document_service: DocumentFlowService | None = None, ): self._state = state self._state_service = state_service self._archive_repository = archive_repository or ArchiveRecordRepository() self._document_service = document_service def archive_task(self, task_id: int): """Перевести задачу в архив и сохранить архивную запись.""" snapshot = self._state.get_task(task_id) if snapshot is None: return None self.ensure_archive_record(snapshot) result = self._state_service.move_task_to_archive( task_id, TicketTask.from_snapshot(snapshot), ) if result.task is None: return None archived_snapshot = result.task.to_snapshot() self._state.upsert_task(archived_snapshot) return archived_snapshot def list_archive_records(self) -> list[ArchiveRecordSnapshot]: """Вернуть все архивные записи из файлового хранилища.""" return self._archive_repository.list_records() def ensure_archive_record(self, snapshot) -> None: """Сохранить архивную запись, если её ещё нет для данного цикла задачи.""" try: cycle_token = self._build_cycle_token(snapshot) if self._archive_repository.has_record(snapshot.task_id, cycle_token): return documents = self._collect_cycle_documents(snapshot) self._archive_repository.save_record(snapshot, documents) except Exception as exc: log_exception(__name__, "ArchiveService.ensure_archive_record", exc) def _collect_cycle_documents(self, snapshot) -> list: """Вернуть документы только текущего цикла задачи.""" if self._document_service is None: return [] cycle_token = self._build_cycle_token(snapshot) all_docs = self._document_service.list_documents() return [ d for d in all_docs if d.task_id == snapshot.task_id and cycle_token in d.document_id ] @staticmethod def _build_cycle_token(snapshot) -> str: from datetime import datetime as _dt if snapshot.created_at is not None: return snapshot.created_at.strftime("%Y%m%d_%H%M%S") return _dt.now().strftime("%Y%m%d_%H%M%S")