Files
Dispatch/Dispatch_V0.1.1/services/service_manager.py
2026-04-29 08:18:54 +04:00

178 lines
6.5 KiB
Python

# -*- coding: utf-8 -*-
# hub/ticket/services/service_manager.py
"""Единый hardware gateway Ticket поверх serial/mock transport-сервисов."""
from __future__ import annotations
from PySide6.QtCore import QObject, QTimer, Signal
from domain import TicketHardwareStatus
from .base_service import BaseService
from .hardware_gateway import TicketHardwareObserver
from .mock_service import MockService
from .serial_service import SERIAL_BAUDRATE, SERIAL_PORT, SerialService, probe_serial_port
PROBE_INTERVAL_SEC = 3
class ServiceManager(QObject):
"""Канонический hardware gateway Ticket."""
service_changed = Signal(str, bool)
action_triggered = Signal(object)
error_occurred = Signal(str)
com_status_changed = Signal(bool, str)
buttons_initialized = Signal(bool, int)
port_disconnected = Signal()
def __init__(
self,
serial_port: str = SERIAL_PORT,
serial_baudrate: int = SERIAL_BAUDRATE,
probe_interval_sec: int = PROBE_INTERVAL_SEC,
parent: QObject | None = None,
):
super().__init__(parent)
self._serial_port = serial_port
self._serial_baudrate = serial_baudrate
self._observer: TicketHardwareObserver | None = None
self._current_service: BaseService | None = None
self._current_service_name = "none"
self._button_states: dict[int, int] = {}
self._status = TicketHardwareStatus()
self._probe_timer = QTimer(self)
self._probe_timer.setInterval(probe_interval_sec * 1000)
self._probe_timer.timeout.connect(self._on_probe_timer)
def start(self) -> None:
if self._current_service is not None:
return
if probe_serial_port(self._serial_port):
self._start_serial_service()
else:
self._start_mock_service()
def stop(self) -> None:
self._probe_timer.stop()
self._shutdown_current_service()
def get_status(self) -> TicketHardwareStatus:
return self._status
def set_observer(self, observer: TicketHardwareObserver | None) -> None:
self._observer = observer
if observer is not None:
observer.on_gateway_status(self._status)
def set_button_state(self, button_id: int, state_code: int) -> None:
self._button_states[int(button_id)] = int(state_code)
if self._current_service is not None:
self._current_service.set_button_state(button_id, state_code)
def remove_button_state(self, button_id: int) -> None:
self._button_states.pop(int(button_id), None)
if self._current_service is not None:
self._current_service.remove_button_state(button_id)
def reset_button_states(self) -> None:
self._button_states.clear()
if self._current_service is not None:
self._current_service.reset_button_states()
def _start_serial_service(self) -> None:
service = SerialService(
port=self._serial_port,
baudrate=self._serial_baudrate,
parent=self,
)
if not self._activate_service(service, "serial", True):
self._start_mock_service()
return
self._probe_timer.stop()
def _start_mock_service(self) -> None:
service = MockService(parent=self)
self._activate_service(service, "mock", False)
self._probe_timer.start()
def _activate_service(
self,
service: BaseService,
service_name: str,
is_connected: bool,
) -> bool:
self._shutdown_current_service()
self._connect_service_signals(service)
for button_id, state_code in self._button_states.items():
service.set_button_state(button_id, state_code)
service.start()
if not service.is_running() and service_name == "serial":
service.deleteLater()
return False
self._current_service = service
self._current_service_name = service_name
self._refresh_status()
self.service_changed.emit(service_name, is_connected)
return True
def _connect_service_signals(self, service: BaseService) -> None:
service.action_triggered.connect(self._on_action_triggered)
service.error_occurred.connect(self._on_error_occurred)
service.com_status_changed.connect(self._on_com_status_changed)
service.buttons_initialized.connect(self._on_buttons_initialized)
service.port_disconnected.connect(self._on_port_disconnected)
def _shutdown_current_service(self) -> None:
if self._current_service is None:
return
current_service = self._current_service
self._current_service = None
self._current_service_name = "none"
current_service.stop()
current_service.deleteLater()
self._status = TicketHardwareStatus()
self._notify_status_observer()
def _refresh_status(self) -> None:
if self._current_service is None:
self._status = TicketHardwareStatus()
else:
self._status = self._current_service.get_status()
self._notify_status_observer()
def _notify_status_observer(self) -> None:
if self._observer is not None:
self._observer.on_gateway_status(self._status)
def _on_action_triggered(self, raw_action: object) -> None:
self.action_triggered.emit(raw_action)
if self._observer is not None and isinstance(raw_action, dict):
self._observer.on_task_action(raw_action)
def _on_error_occurred(self, message: str) -> None:
self.error_occurred.emit(message)
if self._observer is not None:
self._observer.on_gateway_error(message)
def _on_com_status_changed(self, is_connected: bool, message: str) -> None:
self._refresh_status()
self.com_status_changed.emit(is_connected, message)
def _on_buttons_initialized(self, is_initialized: bool, button_count: int) -> None:
self._refresh_status()
self.buttons_initialized.emit(is_initialized, button_count)
def _on_port_disconnected(self) -> None:
self.port_disconnected.emit()
if self._current_service_name == "serial":
self._start_mock_service()
def _on_probe_timer(self) -> None:
port_available = probe_serial_port(self._serial_port)
if port_available and self._current_service_name == "mock":
self._start_serial_service()
elif not port_available and self._current_service_name == "serial":
self._start_mock_service()