Files
Dispatch/Dispatch_V0.1.1/auth_service.py
2026-04-29 08:18:54 +04:00

181 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
# hub/my_account/auth_service.py
"""Сервис аутентификации Dispatch: проверка логина/пароля и запись сессии.
Назначение модуля:
Полностью повторяет контракт сервиса USMS `hub/my_account/auth_service`,
но обращается к локальному каталогу `DB_dispatch` независимого
приложения Dispatch. Источники данных:
- `DB_dispatch/0_users.py` — список учётных записей диспетчеров
и руководителей сервисной службы;
- `DB_dispatch/1_actual_state.py` — текущая активная сессия.
Архитектурные ограничения:
- Каталог `DB_dispatch` располагается на одном уровне с каталогом
`dispatch`, поэтому путь вычисляется относительно текущего файла.
- Файлы данных читаются через `importlib.util` без подключения
к фреймворку USMS. Это сохраняет независимость дистрибутива.
- Запись сессии перезаписывает файл целиком, без частичных правок.
"""
from __future__ import annotations
import importlib.util
import os
import uuid
from datetime import datetime, timezone
def _resolve_db_dir() -> str:
"""Вернуть абсолютный путь к каталогу `DB_dispatch`.
Структура размещения:
<project_root>/dispatch/hub/my_account/auth_service.py
<project_root>/DB_dispatch/0_users.py
Поэтому переход — четыре уровня вверх от текущего файла, затем
спуск в каталог `DB_dispatch`.
"""
here = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(os.path.dirname(os.path.dirname(here)))
return os.path.join(project_root, "DB_dispatch")
_DB_DIR = _resolve_db_dir()
def _load_users() -> list[dict]:
"""Загрузить список пользователей из `DB_dispatch/0_users.py`."""
path = os.path.join(_DB_DIR, "0_users.py")
spec = importlib.util.spec_from_file_location("_dispatch_db_users", path)
if spec is None or spec.loader is None:
return []
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return list(getattr(mod, "users", []))
def load_facility_locations() -> dict[str, str]:
"""Загрузить таблицу `facility_id → строка локации` из `DB_dispatch/2_customer_facility_list.py`."""
path = os.path.join(_DB_DIR, "2_customer_facility_list.py")
if not os.path.exists(path):
return {}
spec = importlib.util.spec_from_file_location("_dispatch_db_facilities", path)
if spec is None or spec.loader is None:
return {}
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
raw = getattr(mod, "DEFAULT_BUTTON_LOCATIONS", {})
return {str(k): str(v) for k, v in dict(raw).items()}
def load_software_list() -> dict[str, str]:
"""Загрузить таблицу `software_id → наименование` из `DB_dispatch/3_software_list.py`."""
path = os.path.join(_DB_DIR, "3_software_list.py")
if not os.path.exists(path):
return {}
spec = importlib.util.spec_from_file_location("_dispatch_db_software", path)
if spec is None or spec.loader is None:
return {}
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
raw = getattr(mod, "software_list_ru", {})
return {str(k): str(v) for k, v in dict(raw).items()}
def load_malfunction_list() -> dict[str, list[str]]:
"""Загрузить таблицу `software_id → список заголовков` из `DB_dispatch/4_malfunction_list.py`."""
path = os.path.join(_DB_DIR, "4_malfunction_list.py")
if not os.path.exists(path):
return {}
spec = importlib.util.spec_from_file_location("_dispatch_db_malfunction", path)
if spec is None or spec.loader is None:
return {}
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
raw = getattr(mod, "malfunction_list_ru", {})
return {str(k): [str(item) for item in list(values)] for k, values in dict(raw).items()}
def get_user_facility(user: dict | None) -> str:
"""Вернуть строку локации, привязанную к пользователю.
Источник связи — поле `facility_id` в `DB_dispatch/0_users.py` и
реестр `DEFAULT_BUTTON_LOCATIONS` из `DB_dispatch/2_customer_facility_list.py`.
Возвращает пустую строку, если пользователь не передан или связь
не настроена.
"""
if not user:
return ""
facility_id = str(user.get("facility_id", "")).strip()
if not facility_id:
return ""
locations = load_facility_locations()
return locations.get(facility_id, "")
def authenticate(login: str, password: str) -> tuple[dict | None, str]:
"""Проверить пару логин/пароль.
Возвращает ``(user_dict, "")`` при успехе или ``(None, error_code)``:
* ``"no_user"`` — логин не найден;
* ``"bad_password"`` — логин верный, пароль не совпадает.
"""
users = _load_users()
matched = [u for u in users if u.get("login") == login]
if not matched:
return None, "no_user"
if matched[0].get("password") == password:
return matched[0], ""
return None, "bad_password"
def write_session(user: dict) -> dict:
"""Записать сессию в `DB_dispatch/1_actual_state.py` и вернуть её."""
state = {
"state_id": str(uuid.uuid4()),
"user_id": user["user_id"],
"is_online": True,
"current_module": "dispatch",
"last_activity_ts": datetime.now(timezone.utc).isoformat(),
}
path = os.path.join(_DB_DIR, "1_actual_state.py")
with open(path, "w", encoding="utf-8") as fh:
fh.write("# -*- coding: utf-8 -*-\n")
fh.write("# DB_dispatch/1_actual_state.py\n\n")
fh.write(f"actual_state = [{repr(state)}]\n")
return state
def clear_session() -> None:
"""Сбросить активную сессию (выход из системы)."""
path = os.path.join(_DB_DIR, "1_actual_state.py")
with open(path, "w", encoding="utf-8") as fh:
fh.write("# -*- coding: utf-8 -*-\n")
fh.write("# DB_dispatch/1_actual_state.py\n\n")
fh.write("actual_state = []\n")
def load_session() -> dict | None:
"""Прочитать активную сессию и вернуть user-dict или None."""
path = os.path.join(_DB_DIR, "1_actual_state.py")
if not os.path.exists(path):
return None
spec = importlib.util.spec_from_file_location("_dispatch_db_state", path)
if spec is None or spec.loader is None:
return None
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
states = getattr(mod, "actual_state", [])
if not states:
return None
user_id = states[0].get("user_id")
if not user_id:
return None
users = _load_users()
matched = [u for u in users if u.get("user_id") == user_id]
return matched[0] if matched else None