Files
Dispatch/Dispatch_V0.1.1/state/runtime_state.py
2026-04-29 08:18:54 +04:00

252 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
# hub/ticket/state/runtime_state.py
"""Runtime-state Ticket с Qt-сигналами и единым путём сохранения."""
from __future__ import annotations
from datetime import datetime
from typing import Any, Mapping
from PySide6.QtCore import QObject, Signal
from error_logger import log_exception
from domain import TicketConnectionStatus, TicketTaskSnapshot
from domain.task import TicketTask, _normalize_color, _normalize_datetime, _normalize_int
from .paths import TASKS_FILE
from .repository import TicketStateRepository
class TicketRuntimeState(QObject):
"""Канонический runtime-state Ticket поверх файлового репозитория."""
task_updated = Signal(object)
task_removed = Signal(int)
connection_changed = Signal(str, str)
active_view_changed = Signal(str)
state_loaded = Signal()
com_connection_changed = Signal(bool, str)
button_initialization_changed = Signal(bool, int)
def __init__(self, repository: TicketStateRepository | None = None):
super().__init__()
self._repository = repository or TicketStateRepository(TASKS_FILE)
self._tasks: dict[int, TicketTask] = {}
self._connection_status = TicketConnectionStatus.DISCONNECTED
self._active_view = "board"
self._com_connected = False
self._buttons_initialized = False
self._button_count = 0
self._sequence_counter = 0
def load(self) -> None:
"""Загрузить задачи из репозитория в память."""
self._tasks = {task.task_id: task for task in self._repository.load_tasks()}
self._sequence_counter = max(
(t.sequence_number for t in self._tasks.values()), default=0,
)
self.state_loaded.emit()
for task in self.list_tasks():
self.task_updated.emit(task)
def list_tasks(self) -> list[TicketTaskSnapshot]:
"""Вернуть снимки всех активных и архивных задач."""
return [task.to_snapshot() for task in self._tasks.values()]
def list_active_tasks(self) -> list[TicketTaskSnapshot]:
"""Вернуть снимки только неархивных задач."""
return [task.to_snapshot() for task in self._tasks.values() if task.state_code != 5]
def list_archived_tasks(self) -> list[TicketTaskSnapshot]:
"""Вернуть снимки архивных задач."""
return [task.to_snapshot() for task in self._tasks.values() if task.state_code == 5]
def get_task(self, task_id: int) -> TicketTaskSnapshot | None:
"""Вернуть снимок задачи по идентификатору."""
task = self._tasks.get(task_id)
return task.to_snapshot() if task else None
def next_sequence_number(self) -> int:
"""Вернуть следующий сквозной номер задачи."""
self._sequence_counter += 1
return self._sequence_counter
def upsert_task(self, task: TicketTaskSnapshot) -> None:
"""Создать или обновить задачу по готовому snapshot."""
self._tasks[task.task_id] = TicketTask.from_snapshot(task)
self._save_and_emit(task.task_id)
def update_task(self, task_data: Mapping[str, Any]) -> TicketTaskSnapshot | None:
"""Обновить задачу по legacy-совместимому словарю полей."""
raw_task_id = task_data.get("button_id")
if raw_task_id is None:
return None
try:
task_id = int(raw_task_id)
except (TypeError, ValueError):
return None
task = self._tasks.get(task_id)
if task is None:
created_task = TicketTask.from_record(task_data)
if created_task is None:
return None
task = created_task
task.created_at = task.created_at or datetime.now()
self._tasks[task_id] = task
else:
self._apply_update(task, task_data)
self._save_and_emit(task_id)
return self.get_task(task_id)
def remove_task(self, task_id: int) -> None:
"""Удалить задачу из runtime-state и хранилища."""
if task_id not in self._tasks:
return
del self._tasks[task_id]
self._persist()
self.task_removed.emit(task_id)
def can_advance_to_confirmation(self, task_id: int) -> bool:
"""Проверить, что обязательные отчёты подписаны."""
task = self._tasks.get(task_id)
if task is None:
return False
return task.diagnostic_report_signed and task.repair_report_signed
def sign_report(self, task_id: int, report_type: str) -> None:
"""Отметить подписание диагностического или ремонтного отчёта."""
task = self._tasks.get(task_id)
if task is None:
return
if report_type == "diagnostic":
task.diagnostic_report_signed = True
elif report_type == "repair":
task.repair_report_signed = True
else:
return
self._save_and_emit(task_id)
def set_error(self, message: str) -> None:
"""Перевести статус подключения в ошибку и уведомить подписчиков."""
self._connection_status = TicketConnectionStatus.ERROR
self.connection_changed.emit(self._connection_status.value, message)
@property
def connection_status(self) -> TicketConnectionStatus:
return self._connection_status
@connection_status.setter
def connection_status(self, value: TicketConnectionStatus) -> None:
if self._connection_status == value:
return
self._connection_status = value
self.connection_changed.emit(value.value, "")
@property
def active_view(self) -> str:
return self._active_view
@active_view.setter
def active_view(self, value: str) -> None:
if self._active_view == value:
return
self._active_view = value
self.active_view_changed.emit(value)
@property
def com_connected(self) -> bool:
return self._com_connected
@com_connected.setter
def com_connected(self, value: bool) -> None:
if self._com_connected == value:
return
self._com_connected = value
self.com_connection_changed.emit(value, "")
@property
def buttons_initialized(self) -> bool:
return self._buttons_initialized
@buttons_initialized.setter
def buttons_initialized(self, value: bool) -> None:
if self._buttons_initialized == value:
return
self._buttons_initialized = value
self.button_initialization_changed.emit(value, self._button_count)
def set_button_initialization(self, is_initialized: bool, button_count: int) -> None:
"""Единая точка обновления статуса инициализации кнопок."""
self._buttons_initialized = is_initialized
self._button_count = button_count
self.button_initialization_changed.emit(is_initialized, button_count)
def set_com_connection(self, is_connected: bool, message: str) -> None:
"""Единая точка обновления статуса COM-подключения."""
self._com_connected = is_connected
self.com_connection_changed.emit(is_connected, message)
def _apply_update(self, task: TicketTask, task_data: Mapping[str, Any]) -> None:
"""Применить обновление к уже существующей задаче."""
new_state = task_data.get("state")
if new_state is not None:
try:
normalized_state = int(new_state)
except (TypeError, ValueError):
normalized_state = task.state_code
else:
normalized_state = task.state_code
if normalized_state == 4 and task.state_code != 4:
task.refused_from_state = task.state_code
elif normalized_state != 4 and "location" in task_data:
task.location = str(task_data.get("location", task.location))
task.state_code = normalized_state
if "action" in task_data:
task.action_text = str(task_data.get("action", task.action_text))
if "state_name" in task_data:
task.state_name = str(task_data.get("state_name", task.state_name))
if "color" in task_data:
task.color_hex = _normalize_color(task_data.get("color"))
if "created_time" in task_data:
task.created_at = _normalize_datetime(task_data.get("created_time")) or task.created_at
if "completed_time" in task_data:
task.completed_at = _normalize_datetime(task_data.get("completed_time"))
if "refused_from_state" in task_data:
task.refused_from_state = _normalize_int(
task_data.get("refused_from_state"),
task.refused_from_state,
)
if "refusal_reason" in task_data and task_data.get("refusal_reason") is not None:
task.refusal_reason = str(task_data.get("refusal_reason", "")).strip()
if "assigned_specialist" in task_data and task_data.get("assigned_specialist") is not None:
task.assigned_specialist = str(task_data.get("assigned_specialist", ""))
if "specialist_photo" in task_data and task_data.get("specialist_photo") is not None:
task.specialist_photo = str(task_data.get("specialist_photo", ""))
if "diagnostic_report_signed" in task_data and task_data.get("diagnostic_report_signed") is not None:
task.diagnostic_report_signed = bool(task_data.get("diagnostic_report_signed"))
if "repair_report_signed" in task_data and task_data.get("repair_report_signed") is not None:
task.repair_report_signed = bool(task_data.get("repair_report_signed"))
if "acceptance_report_signed" in task_data and task_data.get("acceptance_report_signed") is not None:
task.acceptance_report_signed = bool(task_data.get("acceptance_report_signed"))
if normalized_state == 0 and task.completed_at is None:
task.completed_at = datetime.now()
def _save_and_emit(self, task_id: int) -> None:
"""Сохранить состояние и уведомить подписчиков о задаче."""
self._persist()
snapshot = self.get_task(task_id)
if snapshot is not None:
self.task_updated.emit(snapshot)
def _persist(self) -> None:
"""Сохранить весь state в репозиторий."""
try:
self._repository.save_tasks(list(self._tasks.values()))
except Exception as exc:
log_exception(__name__, "TicketRuntimeState._persist", exc)