Files
Dispatch/Dispatch_V0.1.1/application/archive_service.py
2026-04-29 08:18:54 +04:00

81 lines
3.2 KiB
Python

# -*- coding: utf-8 -*-
# hub/ticket/application/archive_service.py
"""Application-сервис архивации задач Ticket."""
from __future__ import annotations
from error_logger import log_exception
from domain import ArchiveRecordSnapshot, TicketStateService
from domain.task import TicketTask
from state import ArchiveRecordRepository, TicketStateApi
from .document_flow_service import DocumentFlowService
class ArchiveService:
"""Команда архивации поверх доменного сервиса и state API."""
def __init__(
self,
state: TicketStateApi,
state_service: TicketStateService,
archive_repository: ArchiveRecordRepository | None = None,
document_service: DocumentFlowService | None = None,
):
self._state = state
self._state_service = state_service
self._archive_repository = archive_repository or ArchiveRecordRepository()
self._document_service = document_service
def archive_task(self, task_id: int):
"""Перевести задачу в архив и сохранить архивную запись."""
snapshot = self._state.get_task(task_id)
if snapshot is None:
return None
self.ensure_archive_record(snapshot)
result = self._state_service.move_task_to_archive(
task_id,
TicketTask.from_snapshot(snapshot),
)
if result.task is None:
return None
archived_snapshot = result.task.to_snapshot()
self._state.upsert_task(archived_snapshot)
return archived_snapshot
def list_archive_records(self) -> list[ArchiveRecordSnapshot]:
"""Вернуть все архивные записи из файлового хранилища."""
return self._archive_repository.list_records()
def ensure_archive_record(self, snapshot) -> None:
"""Сохранить архивную запись, если её ещё нет для данного цикла задачи."""
try:
cycle_token = self._build_cycle_token(snapshot)
if self._archive_repository.has_record(snapshot.task_id, cycle_token):
return
documents = self._collect_cycle_documents(snapshot)
self._archive_repository.save_record(snapshot, documents)
except Exception as exc:
log_exception(__name__, "ArchiveService.ensure_archive_record", exc)
def _collect_cycle_documents(self, snapshot) -> list:
"""Вернуть документы только текущего цикла задачи."""
if self._document_service is None:
return []
cycle_token = self._build_cycle_token(snapshot)
all_docs = self._document_service.list_documents()
return [
d for d in all_docs
if d.task_id == snapshot.task_id and cycle_token in d.document_id
]
@staticmethod
def _build_cycle_token(snapshot) -> str:
from datetime import datetime as _dt
if snapshot.created_at is not None:
return snapshot.created_at.strftime("%Y%m%d_%H%M%S")
return _dt.now().strftime("%Y%m%d_%H%M%S")