317 lines
12 KiB
Python
317 lines
12 KiB
Python
# -*- coding: utf-8 -*-
|
||
# hub/ticket/services/serial_service.py
|
||
|
||
"""Serial transport-сервис Ticket с подтверждением состояний кнопок."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import threading
|
||
import time
|
||
from typing import Any
|
||
|
||
from PySide6.QtCore import QObject
|
||
|
||
from error_logger import log_exception
|
||
|
||
from domain import TicketConnectionStatus
|
||
from domain.ticket_constants import HARDWARE_SIGNAL_INITIALIZE
|
||
from .base_service import BaseService
|
||
|
||
try:
|
||
import serial
|
||
from serial.tools import list_ports
|
||
except ImportError:
|
||
serial = None
|
||
list_ports = None
|
||
SERIAL_PORT = "COM21"
|
||
SERIAL_BAUDRATE = 9600
|
||
SERIAL_TIMEOUT = 0.1
|
||
def probe_serial_port(port: str) -> bool:
|
||
"""Проверить доступность COM-порта без запуска transport-потока."""
|
||
if serial is None or list_ports is None:
|
||
return False
|
||
try:
|
||
available_ports = {item.device for item in list_ports.comports()}
|
||
except Exception as exc:
|
||
log_exception(__name__, "probe_serial_port", exc)
|
||
return False
|
||
if port not in available_ports:
|
||
return False
|
||
serial_port = None
|
||
try:
|
||
serial_port = serial.Serial(port=port, baudrate=SERIAL_BAUDRATE, timeout=SERIAL_TIMEOUT)
|
||
return True
|
||
except Exception as exc:
|
||
log_exception(__name__, "probe_serial_port.serial_open", exc)
|
||
return False
|
||
finally:
|
||
if serial_port is not None and serial_port.is_open:
|
||
serial_port.close()
|
||
class SerialService(BaseService):
|
||
"""Transport-сервис чтения и записи пакетов COM-порта."""
|
||
|
||
def __init__(
|
||
self,
|
||
port: str = SERIAL_PORT,
|
||
baudrate: int = SERIAL_BAUDRATE,
|
||
timeout: float = SERIAL_TIMEOUT,
|
||
resend_interval_sec: float = 1.0,
|
||
reconnect_delay_sec: float = 1.0,
|
||
max_connect_attempts: int = 3,
|
||
parent: QObject | None = None,
|
||
):
|
||
super().__init__(parent)
|
||
self._port_name = port
|
||
self._baudrate = baudrate
|
||
self._timeout = timeout
|
||
self._resend_interval_sec = resend_interval_sec
|
||
self._reconnect_delay_sec = reconnect_delay_sec
|
||
self._max_connect_attempts = max_connect_attempts
|
||
self._lock = threading.Lock()
|
||
self._stop_event = threading.Event()
|
||
self._thread: threading.Thread | None = None
|
||
self._serial_port: Any | None = None
|
||
self._buffer = bytearray()
|
||
self._initialized_buttons: set[int] = set()
|
||
self._pending_states: dict[int, int] = {}
|
||
self._retry_deadlines: dict[int, float] = {}
|
||
|
||
def start(self) -> None:
|
||
if self.is_running():
|
||
return
|
||
if serial is None:
|
||
message = "pyserial недоступен, serial transport не может быть запущен."
|
||
self.error_occurred.emit(message)
|
||
self._set_connection_status(TicketConnectionStatus.ERROR, message)
|
||
self._set_button_initialization(False, 0)
|
||
return
|
||
self._stop_event.clear()
|
||
self._thread = threading.Thread(target=self._run_loop, name="TicketSerialService", daemon=True)
|
||
self._thread.start()
|
||
|
||
def stop(self) -> None:
|
||
self._stop_event.set()
|
||
self._close_port("Порт закрыт")
|
||
if self._thread is not None:
|
||
self._thread.join(timeout=1.0)
|
||
self._thread = None
|
||
|
||
def is_running(self) -> bool:
|
||
return self._thread is not None and self._thread.is_alive()
|
||
|
||
def set_button_state(self, button_id: int, state_code: int) -> None:
|
||
super().set_button_state(button_id, state_code)
|
||
with self._lock:
|
||
button_id = int(button_id)
|
||
state_code = int(state_code)
|
||
self._pending_states[button_id] = state_code
|
||
serial_port = self._serial_port
|
||
port_open = serial_port is not None and serial_port.is_open
|
||
is_initialized = button_id in self._initialized_buttons
|
||
if port_open:
|
||
self._send_state(button_id, state_code, schedule_retry=is_initialized)
|
||
|
||
def remove_button_state(self, button_id: int) -> None:
|
||
super().remove_button_state(button_id)
|
||
with self._lock:
|
||
button_id = int(button_id)
|
||
self._pending_states.pop(button_id, None)
|
||
self._retry_deadlines.pop(button_id, None)
|
||
|
||
def reset_button_states(self) -> None:
|
||
super().reset_button_states()
|
||
with self._lock:
|
||
self._pending_states.clear()
|
||
self._retry_deadlines.clear()
|
||
|
||
def _run_loop(self) -> None:
|
||
attempts = 0
|
||
while not self._stop_event.is_set():
|
||
if not self._is_port_open():
|
||
if self._connect_port():
|
||
attempts = 0
|
||
self._flush_known_states()
|
||
else:
|
||
attempts += 1
|
||
if attempts >= self._max_connect_attempts:
|
||
try:
|
||
self.error_occurred.emit(
|
||
f"Не удалось подключиться к {self._port_name}."
|
||
)
|
||
self.port_disconnected.emit()
|
||
except RuntimeError:
|
||
pass
|
||
return
|
||
self._stop_event.wait(self._reconnect_delay_sec)
|
||
continue
|
||
try:
|
||
self._read_available_packets()
|
||
self._retry_pending_states_if_needed()
|
||
except Exception as exc:
|
||
log_exception(__name__, "SerialService._run_loop", exc)
|
||
try:
|
||
self.port_disconnected.emit()
|
||
except RuntimeError:
|
||
return
|
||
self._close_port("Порт закрыт")
|
||
self._stop_event.wait(self._reconnect_delay_sec)
|
||
continue
|
||
self._stop_event.wait(0.05)
|
||
|
||
def _connect_port(self) -> bool:
|
||
try:
|
||
serial_port = serial.Serial(
|
||
port=self._port_name,
|
||
baudrate=self._baudrate,
|
||
timeout=self._timeout,
|
||
write_timeout=0.5,
|
||
inter_byte_timeout=0.005,
|
||
)
|
||
serial_port.reset_input_buffer()
|
||
serial_port.reset_output_buffer()
|
||
except Exception as exc:
|
||
log_exception(__name__, "SerialService._connect_port", exc)
|
||
self._set_connection_status(
|
||
TicketConnectionStatus.ERROR,
|
||
f"Ошибка подключения к {self._port_name}: {exc}",
|
||
)
|
||
self._set_button_initialization(False, 0)
|
||
return False
|
||
with self._lock:
|
||
self._serial_port = serial_port
|
||
self._buffer.clear()
|
||
self._set_connection_status(
|
||
TicketConnectionStatus.CONNECTED,
|
||
f"{self._port_name} ({self._baudrate} бод)",
|
||
)
|
||
self._set_button_initialization(False, 0)
|
||
return True
|
||
|
||
def _read_available_packets(self) -> None:
|
||
with self._lock:
|
||
serial_port = self._serial_port
|
||
if serial_port is None or not serial_port.is_open:
|
||
return
|
||
data_available = getattr(serial_port, "in_waiting", 0)
|
||
if data_available <= 0:
|
||
return
|
||
payload = serial_port.read(data_available)
|
||
if not payload:
|
||
return
|
||
with self._lock:
|
||
self._buffer.extend(payload)
|
||
self._process_buffer()
|
||
|
||
def _process_buffer(self) -> None:
|
||
while True:
|
||
with self._lock:
|
||
if len(self._buffer) < 4:
|
||
return
|
||
packet_index = self._find_packet_start()
|
||
if packet_index is None:
|
||
if len(self._buffer) > 100:
|
||
self._buffer.clear()
|
||
return
|
||
button_id = self._buffer[packet_index]
|
||
hardware_state = self._buffer[packet_index + 1]
|
||
del self._buffer[: packet_index + 4]
|
||
self._handle_packet(button_id, hardware_state)
|
||
|
||
def _find_packet_start(self) -> int | None:
|
||
buffer_length = len(self._buffer)
|
||
for index in range(buffer_length - 3):
|
||
if self._buffer[index + 2] == 0x0D and self._buffer[index + 3] == 0x0A:
|
||
return index
|
||
return None
|
||
|
||
def _handle_packet(self, button_id: int, hardware_state: int) -> None:
|
||
if not 1 <= button_id <= 8:
|
||
return
|
||
if hardware_state == HARDWARE_SIGNAL_INITIALIZE:
|
||
self._handle_initialization_request(button_id)
|
||
return
|
||
if hardware_state == 0xAA:
|
||
self._handle_confirmation(button_id)
|
||
return
|
||
if hardware_state not in (0, 1, 2, 3):
|
||
self.error_occurred.emit(
|
||
f"Неизвестное аппаратное состояние: {hardware_state:02X}"
|
||
)
|
||
return
|
||
self.action_triggered.emit(
|
||
{
|
||
"event": "advance",
|
||
"button_id": button_id,
|
||
"hardware_state": hardware_state,
|
||
"current_state_code": self._get_button_state(button_id),
|
||
}
|
||
)
|
||
|
||
def _handle_initialization_request(self, button_id: int) -> None:
|
||
with self._lock:
|
||
self._initialized_buttons.add(button_id)
|
||
button_count = len(self._initialized_buttons)
|
||
self._set_button_initialization(True, button_count)
|
||
self._send_state(button_id, self._get_button_state(button_id))
|
||
|
||
def _handle_confirmation(self, button_id: int) -> None:
|
||
with self._lock:
|
||
self._pending_states.pop(button_id, None)
|
||
self._retry_deadlines.pop(button_id, None)
|
||
|
||
def _retry_pending_states_if_needed(self) -> None:
|
||
now = time.monotonic()
|
||
with self._lock:
|
||
pending_items = list(self._retry_deadlines.items())
|
||
for button_id, retry_deadline in pending_items:
|
||
if retry_deadline <= now:
|
||
self._send_state(button_id, self._get_button_state(button_id))
|
||
|
||
def _flush_known_states(self) -> None:
|
||
"""Отправить все известные состояния кнопок однократно (без retry)."""
|
||
for button_id, state_code in list(self._button_states.items()):
|
||
self._send_state(button_id, state_code, schedule_retry=False)
|
||
|
||
def _send_state(self, button_id: int, state_code: int, *, schedule_retry: bool = True) -> bool:
|
||
packet = bytes([int(button_id), int(state_code) & 0xFF, 0x0D, 0x0A])
|
||
try:
|
||
with self._lock:
|
||
serial_port = self._serial_port
|
||
if serial_port is None or not serial_port.is_open:
|
||
return False
|
||
serial_port.write(packet)
|
||
serial_port.flush()
|
||
if schedule_retry:
|
||
self._pending_states[int(button_id)] = int(state_code)
|
||
self._retry_deadlines[int(button_id)] = time.monotonic() + self._resend_interval_sec
|
||
except Exception as exc:
|
||
log_exception(__name__, "SerialService._send_state", exc)
|
||
try:
|
||
self.port_disconnected.emit()
|
||
except RuntimeError:
|
||
return False
|
||
self._close_port("Порт закрыт")
|
||
return False
|
||
return True
|
||
|
||
def _close_port(self, message: str) -> None:
|
||
with self._lock:
|
||
serial_port = self._serial_port
|
||
self._serial_port = None
|
||
self._buffer.clear()
|
||
self._initialized_buttons.clear()
|
||
self._pending_states.clear()
|
||
self._retry_deadlines.clear()
|
||
if serial_port is not None and serial_port.is_open:
|
||
try:
|
||
serial_port.close()
|
||
except Exception as exc:
|
||
log_exception(__name__, "SerialService._close_port", exc)
|
||
self._set_button_initialization(False, 0)
|
||
self._set_connection_status(TicketConnectionStatus.DISCONNECTED, message)
|
||
|
||
def _is_port_open(self) -> bool:
|
||
with self._lock:
|
||
serial_port = self._serial_port
|
||
return serial_port is not None and serial_port.is_open
|