Compare commits

..

10 Commits

Author SHA1 Message Date
Vadim Malanov
0a407f1f09 perf(frontend): route-based code splitting + vendor chunking
Some checks failed
CI / Backend (lint + tests + compose) (push) Has been cancelled
CI / Frontend (lint + type-check + build) (push) Has been cancelled
Splits each page into its own lazily-imported chunk via React.lazy
with Suspense fallback (a skeleton matching the dashboard layout
shape). Adds a vite manualChunks function that pushes heavy third-
party libraries into long-lived vendor chunks so page chunks stay
small and the vendor cache survives release cycles.

Vendor groupings: vendor-react, vendor-router, vendor-tanstack,
vendor-radix (+ cmdk), vendor-motion, vendor-recharts (+ d3 deps),
vendor-axios, vendor-state (zustand), vendor-toast (sonner),
vendor-lucide, vendor (everything else).

Build output (before -> after, gzipped):
  initial entry      348.65 kB -> 8.75 kB
  largest chunk      1163.97 kB -> 81.65 kB (vendor-recharts, only
                                 loaded on Dashboard + SystemHealth)
  build warning      "chunks > 500 kB" -> gone

DocumentsPage, SettingsPage, etc. no longer pull recharts into their
critical path; the dashboard pays the chart cost once, cached.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 17:19:06 +03:00
Vadim Malanov
24282d1279 feat(api): optional API-key auth middleware
Adds defence-in-depth shared-secret auth that activates when API_KEY
is set. Behaviour:

- empty API_KEY (dev default): every request allowed, middleware is
  not even installed;
- non-empty API_KEY: every request under APP_API_PREFIX except
  /health must carry X-API-Key: <value> or
  Authorization: Bearer <value>. /, /docs, /redoc, /openapi.json and
  CORS preflight stay open. hmac.compare_digest is used for the
  constant-time comparison.

The middleware resolves settings lazily so test fixtures can reload
app.config and have the new API_KEY take effect on the next install.

Tests (tests/test_api_security.py, 5 cases):
- /health remains open;
- protected route rejects missing key (401);
- protected route accepts X-API-Key header;
- protected route accepts Authorization: Bearer header;
- protected route rejects a wrong key.

Frontend:
- VITE_API_KEY env reads the key and Axios injects it on every
  request, falling back to no header when empty so SSO/reverse-proxy
  deployments stay unchanged.
- vite-env.d.ts adds the new env entry.

Docs/ops:
- .env.example documents the dev-default empty key;
- .env.prod.example marks API_KEY as a required rotation point;
- docker-compose.yml forwards API_KEY (defaults to empty);
- docker-compose.prod.yml fails the stack with ?:required when API_KEY
  is missing;
- RUNBOOK gains an API authentication section with header examples
  and the reverse-proxy + key layering recommendation.

pytest -q: 33 passed (5 new security + 28 prior).
npx tsc --noEmit: clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 17:17:27 +03:00
Vadim Malanov
463622c644 deps: tighten version ranges, pin Docling to <2.15
Docling's DocumentConverter shape (text_items, prov[0].page_no,
export_to_markdown signature) still moves between 2.x minor releases.
Cap docling to >=2.0.0,<2.15 so a wheel bump cannot silently break
the defensive walkers in app/ingestion/docling_extractor.py until a
staging smoke test has run against the new minor.

Every other runtime dep gets the same major/minor upper bound:
- web/api: fastapi <0.117, uvicorn <0.33, pydantic <3
- db: sqlalchemy <2.1, psycopg <3.3, alembic <1.14
- search: opensearch-py <3, qdrant-client <1.13
- ingest: ocrmypdf <17, pikepdf <10, pypdf <6
- ml: FlagEmbedding <2, sentence-transformers <4, transformers <5,
      torch <3, numpy <3
- ops/utils: structlog <26, orjson <4, httpx <0.29, click <9

Lift any specific upper bound only after the corresponding regression
test passes on a staging upgrade.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 17:12:15 +03:00
Vadim Malanov
a97d0bbcfd perf: add ingest and search load-test harnesses
scripts/generate_synthetic_pdfs.py builds real PDF/1.4 documents with
a hand-written xref so we can generate tens of thousands of ~2 KB
PDFs locally. Helvetica only covers latin-1, which is fine for a
load generator (throughput, not retrieval relevance); the docstring
calls this out so no one mistakes the output for a quality corpus.

scripts/load_ingest.py drives POST /ingest/folder, then polls a
hypothetical /documents/stats endpoint every poll-interval seconds
to track terminal-state progression. Writes a JSON history report so
results can be diffed between runs.

scripts/locustfile_search.py defines a SearchUser profile mixing
hybrid / lexical / semantic queries against POST /search plus a
health-check sampler. Asserts non-empty results so a "200 with
zero hits" regression surfaces as a failure rather than a green
percentile graph.

RUNBOOK gains a Load testing section with CPU/GPU SLO tables for
both axes (sustained docs/min, search latency p50/p95/p99).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 17:11:08 +03:00
Vadim Malanov
349f4ea838 perf(reranker): add benchmark harness and passage clipping
- scripts/benchmark_reranker.py exercises the configured reranker
  with synthetic queries or live OpenSearch samples and prints
  p50/p95/p99 latency, mean latency, and pairs/sec throughput.
  Supports --warmup, --candidates, --passage-length, --source, and a
  --json-only mode for CI.
- app/indexing/reranker.py clips passages to 2048 characters before
  scoring so a runaway chunk cannot starve the cross-encoder beyond
  bge-reranker-v2-m3's training window.
- RUNBOOK.md gains a Reranker benchmark section with CPU/GPU SLO
  targets and a remediation ladder (lower top-K, raise batch size,
  switch device, disable reranker) when measured p95 exceeds budget.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 17:08:04 +03:00
Vadim Malanov
f42fb978a8 chore: drop dead _qid helper and surface ocr_confidence on SearchHit
- app/indexing/qdrant_client.py: remove the identity-only _qid()
  helper and pass chunk_id straight to PointStruct (Qdrant accepts
  the UUID string directly).
- services/types.ts: SearchHit gets an explicit, optional
  ocr_confidence field so consumers can type the value instead of
  casting through metadata.
- widgets/SearchResultCard.tsx: replaces the
  (hit.metadata as { ocr_confidence? }) cast with the new field. No
  behavior change when the backend omits it.

tsc --noEmit: clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 16:55:32 +03:00
Vadim Malanov
785d3be970 test: add Alembic migration smoke and /search contract tests
tests/test_alembic.py points Alembic at an in-process SQLite database
in --sql mode so the migration files are validated end to end without
needing the real Postgres compose service. Asserts the documents,
chunks, and processing_events tables plus the unique constraints
appear in the generated DDL, and that the revision graph stays
linear at 0001_initial.

tests/test_routes_search.py monkeypatches
app.indexing.hybrid_search.run_search so the FastAPI route can be
exercised with the real SearchRequest/SearchResponse schemas. Covers
the happy path (rank, citation, reranked flag) and that empty queries
are rejected at schema validation before the backend is called.

pytest tests/test_alembic.py tests/test_routes_search.py -q: 4 passed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 16:54:15 +03:00
Vadim Malanov
d3c96161b0 ops: add docker-compose.prod.yml overlay
Production overlay narrows the dev defaults:
- removes published ports from postgres, minio, opensearch, qdrant,
  redis - only the api container stays externally reachable;
- enables the OpenSearch security plugin and requires
  OPENSEARCH_ADMIN_PASSWORD via ?:required interpolation;
- requires Qdrant API key, MinIO root credentials, postgres password,
  and CORS_ALLOWED_ORIGINS to be set (no localhost fallback);
- doubles OpenSearch heap (-Xms2g -Xmx2g) and worker concurrency to 4;
- drops the MinIO management console.

Validated with:
  set -a; . .env.prod.example; CORS_ALLOWED_ORIGINS=https://example.com
  docker compose -f docker-compose.yml -f docker-compose.prod.yml config

The RUNBOOK was updated in the initial commit and already documents
the overlay invocation and credential rotation workflow.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 16:52:57 +03:00
Vadim Malanov
a375ca55b9 refactor: extract ensure_artifact into app/storage/artifacts.py
The artifact-upsert helper was duplicated four times (scanner.py,
table_processor.py, figure_processor.py, pipeline.py) with slightly
different signatures. Consolidates into a single keyword-only function
keyed on (document_id, storage_key) - the identity the schema already
enforces - so re-running the pipeline never creates duplicate rows.

scanner / table_processor / figure_processor now import the shared
helper directly. pipeline.py keeps a thin local wrapper to preserve
the positional call sites at three artifact upsert points (OCR_PDF,
MARKDOWN, DOCLING_JSON).

Tests: 24 passed (5 health + 19 original).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 16:51:54 +03:00
Vadim Malanov
cd9977f8c3 feat(api): add CORS middleware and /health contract test
CORS:
- New setting CORS_ALLOWED_ORIGINS (comma separated). Defaults cover
  the three local Vite ports (5173, 5273, 4173); production overlay
  expects the real origin in .env.prod.
- main.py wires CORSMiddleware from settings.cors_origins. No * in
  production - see RUNBOOK and .env.prod.example.
- docker-compose.yml forwards the variable to both api and worker.

Tests:
- tests/test_api_health.py uses FastAPI TestClient and monkeypatches
  the five probe functions (postgres/minio/opensearch/qdrant/redis).
  Verifies the all-ok, any-error, and degraded paths, that the root
  endpoint reports the configured api prefix, and that the CORS
  preflight echoes the allowed origin.
- pytest tests/test_api_health.py -q: 5 passed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 16:48:49 +03:00
32 changed files with 1567 additions and 149 deletions

View File

@@ -80,3 +80,8 @@ APP_API_PREFIX=/api/v1
# Comma-separated list of allowed origins for the browser. Use specific origins
# in production; * is accepted only for local development.
CORS_ALLOWED_ORIGINS=http://localhost:5173,http://localhost:5273,http://localhost:4173
# Optional shared-secret API key. When empty, the API is open (dev default).
# When set, every request under APP_API_PREFIX except /health requires
# X-API-Key: <value> or Authorization: Bearer <value>.
API_KEY=

View File

@@ -72,3 +72,6 @@ APP_API_PREFIX=/api/v1
# Comma-separated list of allowed origins. NEVER use * in production.
CORS_ALLOWED_ORIGINS=https://legacyhub.teamhub.example
# Mandatory in production. Use a long random value (e.g. `openssl rand -hex 32`).
API_KEY=__ROTATE_ME__

View File

@@ -95,6 +95,36 @@ docker compose exec postgres psql -U legacyhub -d legacyhub -c \
| Indexing stuck | OpenSearch + Qdrant health | `scripts/init_opensearch.py`, `scripts/init_qdrant.py` |
| Reranker disabled | API logs → `reranker.disabled` | Ensure `RERANKER_ENABLED=true`; HF cache mounted |
## API authentication
Two mechanisms layered together:
1. **Reverse proxy / SSO** (preferred). Front the API with nginx, Traefik, or
an OAuth gateway. The reverse proxy terminates TLS and authenticates the
caller; LegacyHUB never sees a raw user identity.
2. **Shared-secret API key** (defence in depth). Set `API_KEY` to a long
random value (`openssl rand -hex 32`). Every request to `APP_API_PREFIX`
except `/health` must then carry either:
```http
X-API-Key: <key>
```
or:
```http
Authorization: Bearer <key>
```
`/health` is intentionally exempt so external probes do not need the
secret.
In production this is required (`docker-compose.prod.yml` fails the
stack if `API_KEY` is empty). In development the key is optional and
the default empty value disables the middleware entirely.
The frontend reads `VITE_API_KEY` and injects the header on every Axios
request. For SSO deployments leave `VITE_API_KEY` empty and let the
reverse proxy inject the header server-side.
## Verification gates (per change)
1. `python -m pytest tests/ -q` — full unit suite (19+ tests).
@@ -117,6 +147,89 @@ docker compose exec postgres psql -U legacyhub -d legacyhub -c \
should not be rolled back casually. Restore from backup via the standard
TeamHUB Suite backup runbook.
## Reranker benchmark
The reranker is the latency-defining stage of the hybrid search path. Run the
benchmark on every hardware change (CPU vs GPU, instance type, batch size)
before promoting the configuration.
```bash
# synthetic warmup + 32 queries x 40 candidates, ~700-char passages
docker compose exec api python scripts/benchmark_reranker.py \
--queries 32 --candidates 40 --warmup 4
# real corpus sample (after some documents are indexed)
docker compose exec api python scripts/benchmark_reranker.py \
--source opensearch --query "ГОСТ 21.501-93" --candidates 40
```
Target SLOs (subject to revision once staging numbers land):
| Metric | CPU target | GPU target |
|---------------------|-----------:|-----------:|
| p95 latency / query | < 700 ms | < 120 ms |
| Throughput | > 60 pair/s | > 600 pair/s |
If the measured p95 exceeds the budget, options in order of preference:
1. Lower `RERANK_CANDIDATES` (default 40 — reducing to 20 roughly halves work).
2. Increase `RERANKER_BATCH_SIZE` (memory permitting).
3. Switch `RERANKER_DEVICE=cuda` and use a GPU-capable image.
4. Disable reranker (`RERANKER_ENABLED=false`) and accept raw RRF order — the
API still returns useful results; the `reranked` field reports the truth.
Passages are clipped to 2048 chars before being fed to the cross-encoder so a
runaway chunk cannot starve the budget.
## Load testing
Two complementary harnesses live under `scripts/`:
### Ingest load
```bash
# Generate synthetic PDFs (~3 KB each, real PDF/1.4 with embedded text)
docker compose exec api python scripts/generate_synthetic_pdfs.py \
--count 10000 --out /data/input/load
# Trigger ingest, sample status every 10 s, dump JSON history
docker compose exec api python scripts/load_ingest.py \
--path /data/input/load \
--api-url http://localhost:8000/api/v1 \
--watch-seconds 1800 \
--report-file /data/work/load_report.json
```
Target SLOs at the 70k-document scale (subject to refinement once measured):
| Metric | CPU target | GPU target |
|---------------------------------|-----------:|-----------:|
| Sustained throughput (docs/min) | > 30 | > 200 |
| Failure rate | < 1 % | < 0.5 % |
| p95 per-document wall time | < 90 s | < 25 s |
### Search load
```bash
pip install locust # one-time
locust -f scripts/locustfile_search.py \
--host http://localhost:8000 \
--headless --users 100 --spawn-rate 10 --run-time 10m \
--html load_search.html
```
Target SLOs for hybrid mode with the reranker enabled:
| Percentile | CPU | GPU |
|------------|-------:|------:|
| p50 | 600 ms | 120 ms |
| p95 | 1500 ms | 300 ms |
| p99 | 3500 ms | 700 ms |
If staging numbers miss the budget, walk the reranker remediation ladder above
before chasing index sharding.
## Scaling notes (~70k PDFs)
- Workers horizontally scale: `docker compose up -d --scale worker=8`.

83
app/api/security.py Normal file
View File

@@ -0,0 +1,83 @@
"""Optional API-key auth.
Behaviour:
- If ``API_KEY`` is empty (default) every request is allowed - matches the
original dev configuration.
- If ``API_KEY`` is set, every request to a route under ``app_api_prefix``
must carry either ``X-API-Key: <value>`` or ``Authorization: Bearer <value>``.
- ``/health`` is intentionally exempt so external probes (compose healthcheck,
reverse proxy, monitoring) keep working without leaking the key.
- The root ``/`` page stays open so the OpenAPI banner and docs links remain
reachable.
This is a defence-in-depth layer behind whatever reverse proxy / OAuth gateway
runs in production - not a replacement.
"""
from __future__ import annotations
import hmac
from typing import Awaitable, Callable
from fastapi import FastAPI, Request, Response
from fastapi.responses import JSONResponse
from starlette.types import ASGIApp
from app.config import settings as _module_settings
EXEMPT_PATHS: tuple[str, ...] = ("/", "/docs", "/redoc", "/openapi.json")
EXEMPT_SUFFIXES: tuple[str, ...] = ("/health",)
def _extract_token(request: Request) -> str | None:
header = request.headers.get("x-api-key")
if header:
return header.strip()
auth = request.headers.get("authorization") or ""
if auth.lower().startswith("bearer "):
return auth[7:].strip()
return None
def install_api_key_auth(app: FastAPI) -> None:
"""Attach the middleware. Always safe to call; becomes a no-op when no key
is configured.
Reads ``app.config.settings`` lazily so test fixtures can reload the config
module and have the new ``API_KEY`` value take effect on the next install.
"""
from app.config import settings as fresh_settings # re-resolve after reloads
settings = fresh_settings
expected = settings.api_key.strip() if settings.api_key else ""
if not expected:
return
@app.middleware("http")
async def _api_key_middleware( # type: ignore[no-redef]
request: Request,
call_next: Callable[[Request], Awaitable[Response]],
) -> Response:
path = request.url.path
if request.method == "OPTIONS":
return await call_next(request)
if path in EXEMPT_PATHS:
return await call_next(request)
if any(path.endswith(s) for s in EXEMPT_SUFFIXES):
return await call_next(request)
if not path.startswith(settings.app_api_prefix):
return await call_next(request)
token = _extract_token(request)
if not token or not hmac.compare_digest(token, expected):
return JSONResponse(
status_code=401,
content={"detail": "invalid or missing api key"},
headers={"WWW-Authenticate": "Bearer"},
)
return await call_next(request)
__all__ = ["install_api_key_auth"]
_ = ASGIApp # re-export hint to keep mypy happy on older Starlette versions

View File

@@ -27,6 +27,15 @@ class Settings(BaseSettings):
app_input_dir: str = Field("/data/input", alias="APP_INPUT_DIR")
app_work_dir: str = Field("/data/work", alias="APP_WORK_DIR")
app_api_prefix: str = Field("/api/v1", alias="APP_API_PREFIX")
cors_allowed_origins: str = Field(
"http://localhost:5173,http://localhost:5273,http://localhost:4173",
alias="CORS_ALLOWED_ORIGINS",
)
api_key: str = Field("", alias="API_KEY")
@property
def cors_origins(self) -> list[str]:
return [o.strip() for o in self.cors_allowed_origins.split(",") if o.strip()]
# ---------------- Postgres ----------------
postgres_host: str = Field("postgres", alias="POSTGRES_HOST")

View File

@@ -74,7 +74,7 @@ def upsert_chunks(
return 0
qpoints = [
qm.PointStruct(
id=_qid(chunk_id),
id=chunk_id,
vector={DENSE_VECTOR_NAME: vector},
payload={**payload, "chunk_id": chunk_id},
)
@@ -96,8 +96,3 @@ def delete_by_document(document_id: str, collection: str | None = None) -> int:
),
)
return 1
def _qid(chunk_id: str) -> str:
"""Qdrant accepts UUID strings or unsigned ints. Chunks are UUIDs already."""
return chunk_id

View File

@@ -50,10 +50,16 @@ class Reranker:
def available(self) -> bool:
return self._impl is not None and self._model is not None
# bge-reranker-v2-m3 is trained at 512 tokens; we truncate by chars so the
# reranker stays inside its budget even when callers forget to limit the
# candidate text length.
_MAX_PASSAGE_CHARS = 2048
def score(self, query: str, passages: Sequence[str]) -> list[float]:
if not self.available or not passages:
return [0.0] * len(passages)
pairs = [(query, p) for p in passages]
clipped = [p[: self._MAX_PASSAGE_CHARS] for p in passages]
pairs = [(query, p) for p in clipped]
if self._impl == "flagembedding":
scores = self._model.compute_score(pairs, batch_size=self.batch_size, normalize=True) # type: ignore[union-attr]
else:

View File

@@ -6,9 +6,10 @@ import uuid
from sqlalchemy import select
from app.db.models import ArtifactType, DocumentArtifact, Figure
from app.db.models import ArtifactType, Figure
from app.ingestion.docling_extractor import ExtractedFigure
from app.logging_config import get_logger
from app.storage.artifacts import ensure_artifact
from app.storage.local_paths import key_figure_crop
from app.storage.minio_client import MinioStorage
@@ -52,27 +53,14 @@ def persist_figures(
)
existing.storage_bucket = storage.derived_bucket
existing.storage_key = key
_ensure_artifact(db, document_id, ArtifactType.FIGURE_CROP, storage.derived_bucket, key, f.page_number)
ensure_artifact(
db,
document_id=document_id,
artifact_type=ArtifactType.FIGURE_CROP,
bucket=storage.derived_bucket,
key=key,
page_number=f.page_number,
)
count += 1
return count
def _ensure_artifact(db, document_id: uuid.UUID, artifact_type: str, bucket: str, key: str, page: int | None) -> None:
existing = db.execute(
select(DocumentArtifact).where(
DocumentArtifact.document_id == document_id,
DocumentArtifact.storage_key == key,
)
).scalar_one_or_none()
if existing:
return
db.add(
DocumentArtifact(
document_id=document_id,
artifact_type=artifact_type,
storage_bucket=bucket,
storage_key=key,
page_number=page,
)
)

View File

@@ -25,6 +25,7 @@ from app.db.models import (
Page,
ProcessingEvent,
)
from app.storage.artifacts import ensure_artifact
from app.db.session import session_scope
from app.indexing import opensearch_client, qdrant_client
from app.indexing.embeddings import get_embedder
@@ -330,21 +331,14 @@ def _build_index_payloads(
def _ensure_artifact(db, document_id: uuid.UUID, artifact_type: str, bucket: str, key: str) -> None:
existing = db.execute(
select(DocumentArtifact).where(
DocumentArtifact.document_id == document_id,
DocumentArtifact.storage_key == key,
)
).scalar_one_or_none()
if existing:
return
db.add(
DocumentArtifact(
document_id=document_id,
artifact_type=artifact_type,
storage_bucket=bucket,
storage_key=key,
)
"""Thin wrapper preserving the local positional signature used inside this
module while delegating to the shared helper."""
ensure_artifact(
db,
document_id=document_id,
artifact_type=artifact_type,
bucket=bucket,
key=key,
)

View File

@@ -23,12 +23,12 @@ from sqlalchemy import select
from app.db.models import (
ArtifactType,
Document,
DocumentArtifact,
DocumentStatus,
ProcessingEvent,
)
from app.db.session import session_scope
from app.logging_config import get_logger
from app.storage.artifacts import ensure_artifact
from app.storage.local_paths import key_original_pdf
from app.storage.minio_client import get_storage
from app.utils.hashing import sha256_file
@@ -121,13 +121,13 @@ def discover_documents(
content_type="application/pdf",
metadata={"sha256": sha, "original-name": path.name[:255]},
)
_ensure_artifact(
ensure_artifact(
db,
doc.id,
ArtifactType.ORIGINAL_PDF,
storage.originals_bucket,
key,
sha,
document_id=doc.id,
artifact_type=ArtifactType.ORIGINAL_PDF,
bucket=storage.originals_bucket,
key=key,
checksum=sha,
)
if doc.status == DocumentStatus.DISCOVERED:
doc.status = DocumentStatus.STORED_ORIGINAL
@@ -161,24 +161,3 @@ def discover_documents(
)
def _ensure_artifact(
db, document_id: uuid.UUID, artifact_type: str, bucket: str, key: str, checksum: str | None
) -> None:
existing = db.execute(
select(DocumentArtifact).where(
DocumentArtifact.document_id == document_id,
DocumentArtifact.artifact_type == artifact_type,
DocumentArtifact.storage_key == key,
)
).scalar_one_or_none()
if existing:
return
db.add(
DocumentArtifact(
document_id=document_id,
artifact_type=artifact_type,
storage_bucket=bucket,
storage_key=key,
checksum=checksum,
)
)

View File

@@ -7,9 +7,10 @@ import uuid
from sqlalchemy import select
from app.db.models import ArtifactType, DocumentArtifact, Table
from app.db.models import ArtifactType, Table
from app.ingestion.docling_extractor import ExtractedTable
from app.logging_config import get_logger
from app.storage.artifacts import ensure_artifact
from app.storage.local_paths import key_table_json
from app.storage.minio_client import MinioStorage
@@ -52,7 +53,14 @@ def persist_tables(
data=json.dumps(t.json_data, ensure_ascii=False).encode("utf-8"),
content_type="application/json",
)
_ensure_artifact(db, document_id, ArtifactType.TABLE_JSON, storage.derived_bucket, key, t.page_number)
ensure_artifact(
db,
document_id=document_id,
artifact_type=ArtifactType.TABLE_JSON,
bucket=storage.derived_bucket,
key=key,
page_number=t.page_number,
)
count += 1
return count
@@ -62,23 +70,3 @@ def _summary(t: ExtractedTable) -> str:
md = t.markdown or ""
n_rows = max(0, sum(1 for ln in md.splitlines() if ln.startswith("|")) - 2)
return f"Table {t.table_index} on page {t.page_number} ({n_rows} rows)."
def _ensure_artifact(db, document_id: uuid.UUID, artifact_type: str, bucket: str, key: str, page: int | None) -> None:
existing = db.execute(
select(DocumentArtifact).where(
DocumentArtifact.document_id == document_id,
DocumentArtifact.storage_key == key,
)
).scalar_one_or_none()
if existing:
return
db.add(
DocumentArtifact(
document_id=document_id,
artifact_type=artifact_type,
storage_bucket=bucket,
storage_key=key,
page_number=page,
)
)

View File

@@ -6,9 +6,11 @@ from contextlib import asynccontextmanager
from typing import AsyncIterator
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app import __version__
from app.api import routes_health, routes_ingestion, routes_search
from app.api.security import install_api_key_auth
from app.config import settings
from app.logging_config import configure_logging, get_logger
@@ -37,6 +39,16 @@ app = FastAPI(
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"],
allow_headers=["*", "X-API-Key", "Authorization"],
max_age=3600,
)
install_api_key_auth(app)
app.include_router(routes_health.router, prefix=settings.app_api_prefix)
app.include_router(routes_ingestion.router, prefix=settings.app_api_prefix)
app.include_router(routes_search.router, prefix=settings.app_api_prefix)

View File

@@ -1,3 +1,4 @@
from app.storage.artifacts import ensure_artifact
from app.storage.minio_client import MinioStorage, get_storage
__all__ = ["MinioStorage", "get_storage"]
__all__ = ["MinioStorage", "ensure_artifact", "get_storage"]

53
app/storage/artifacts.py Normal file
View File

@@ -0,0 +1,53 @@
"""Shared ``document_artifacts`` upsert helper.
Single source of truth used by the scanner, the per-document pipeline, and the
table / figure processors. Each caller previously carried its own copy with
slightly different signatures; this module replaces them so that the artifact
row schema is enforced in one place.
"""
from __future__ import annotations
import uuid
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.db.models import DocumentArtifact
def ensure_artifact(
db: Session,
*,
document_id: uuid.UUID,
artifact_type: str,
bucket: str,
key: str,
page_number: int | None = None,
checksum: str | None = None,
) -> DocumentArtifact:
"""Insert a ``DocumentArtifact`` row if none exists with the same key.
Identity is ``(document_id, storage_key)``. Re-running the pipeline never
duplicates artifact rows; metadata fields are not updated in place because
derived bytes are versioned by their storage key.
"""
existing = db.execute(
select(DocumentArtifact).where(
DocumentArtifact.document_id == document_id,
DocumentArtifact.storage_key == key,
)
).scalar_one_or_none()
if existing is not None:
return existing
artifact = DocumentArtifact(
document_id=document_id,
artifact_type=artifact_type,
storage_bucket=bucket,
storage_key=key,
page_number=page_number,
checksum=checksum,
)
db.add(artifact)
return artifact

102
docker-compose.prod.yml Normal file
View File

@@ -0,0 +1,102 @@
# Production overlay for LegacyHUB.
#
# Usage:
# cp .env.prod.example .env.prod
# $EDITOR .env.prod # rotate every credential
# docker compose \
# -f docker-compose.yml -f docker-compose.prod.yml \
# --env-file .env.prod \
# up -d --build --force-recreate
#
# This overlay narrows the dev-friendly defaults:
# - removes published ports from data services (only api stays public);
# - turns on the OpenSearch security plugin and forces an admin password;
# - requires CORS_ALLOWED_ORIGINS to be set (no localhost fallback);
# - bumps Java + worker concurrency for real workloads;
# - drops the MinIO console.
services:
postgres:
ports: !reset []
environment:
POSTGRES_DB: ${POSTGRES_DB}
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:?POSTGRES_PASSWORD must be set}
restart: always
minio:
command: server /data
ports: !reset []
environment:
MINIO_ROOT_USER: ${MINIO_ACCESS_KEY:?MINIO_ACCESS_KEY must be set}
MINIO_ROOT_PASSWORD: ${MINIO_SECRET_KEY:?MINIO_SECRET_KEY must be set}
restart: always
opensearch:
ports: !reset []
environment:
- discovery.type=single-node
- bootstrap.memory_lock=true
- "OPENSEARCH_JAVA_OPTS=-Xms2g -Xmx2g"
- DISABLE_INSTALL_DEMO_CONFIG=true
- plugins.security.disabled=false
- OPENSEARCH_INITIAL_ADMIN_PASSWORD=${OPENSEARCH_ADMIN_PASSWORD:?OPENSEARCH_ADMIN_PASSWORD must be set}
restart: always
qdrant:
ports: !reset []
environment:
QDRANT__SERVICE__API_KEY: ${QDRANT_API_KEY:?QDRANT_API_KEY must be set}
restart: always
redis:
ports: !reset []
restart: always
api:
environment:
<<: &prod-env
POSTGRES_HOST: ${POSTGRES_HOST}
POSTGRES_PORT: ${POSTGRES_PORT}
POSTGRES_DB: ${POSTGRES_DB}
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
MINIO_ENDPOINT: ${MINIO_ENDPOINT}
MINIO_ACCESS_KEY: ${MINIO_ACCESS_KEY}
MINIO_SECRET_KEY: ${MINIO_SECRET_KEY}
MINIO_BUCKET_ORIGINALS: ${MINIO_BUCKET_ORIGINALS}
MINIO_BUCKET_DERIVED: ${MINIO_BUCKET_DERIVED}
MINIO_SECURE: "true"
OPENSEARCH_HOST: ${OPENSEARCH_HOST}
OPENSEARCH_PORT: ${OPENSEARCH_PORT}
OPENSEARCH_USE_SSL: "true"
OPENSEARCH_VERIFY_CERTS: "true"
OPENSEARCH_USER: ${OPENSEARCH_USER}
OPENSEARCH_PASSWORD: ${OPENSEARCH_PASSWORD}
OPENSEARCH_INDEX_CHUNKS: ${OPENSEARCH_INDEX_CHUNKS}
QDRANT_HOST: ${QDRANT_HOST}
QDRANT_PORT: ${QDRANT_PORT}
QDRANT_API_KEY: ${QDRANT_API_KEY}
QDRANT_COLLECTION_CHUNKS: ${QDRANT_COLLECTION_CHUNKS}
REDIS_URL: ${REDIS_URL}
OCR_LANGUAGES: ${OCR_LANGUAGES}
OCR_ENABLED: ${OCR_ENABLED}
DOCLING_OCR_ENABLED: ${DOCLING_OCR_ENABLED}
MAX_DOCUMENT_TIMEOUT_SECONDS: ${MAX_DOCUMENT_TIMEOUT_SECONDS}
EMBEDDING_MODEL: ${EMBEDDING_MODEL}
EMBEDDING_DEVICE: ${EMBEDDING_DEVICE}
RERANKER_MODEL: ${RERANKER_MODEL}
RERANKER_DEVICE: ${RERANKER_DEVICE}
RERANKER_ENABLED: ${RERANKER_ENABLED}
APP_LOG_LEVEL: ${APP_LOG_LEVEL}
APP_INPUT_DIR: /data/input
APP_WORK_DIR: /data/work
CORS_ALLOWED_ORIGINS: ${CORS_ALLOWED_ORIGINS:?CORS_ALLOWED_ORIGINS must be set (no * in production)}
API_KEY: ${API_KEY:?API_KEY must be set in production}
restart: always
worker:
command: ["celery", "-A", "app.workers.celery_app", "worker", "--loglevel=INFO", "--concurrency=4"]
environment:
<<: *prod-env
restart: always

View File

@@ -32,6 +32,8 @@ x-common-env: &common-env
APP_LOG_LEVEL: ${APP_LOG_LEVEL:-INFO}
APP_INPUT_DIR: /data/input
APP_WORK_DIR: /data/work
CORS_ALLOWED_ORIGINS: ${CORS_ALLOWED_ORIGINS:-http://localhost:5173,http://localhost:5273,http://localhost:4173}
API_KEY: ${API_KEY:-}
services:
postgres:

View File

@@ -2,3 +2,8 @@
VITE_API_BASE_URL=/api/v1
VITE_USE_MOCK=true
VITE_APP_NAME=LegacyHUB
# Optional. When the backend has API_KEY set, the SPA must echo it on every
# request. For SSO/cookie deployments leave this empty and let the reverse
# proxy inject the header server-side.
VITE_API_KEY=

View File

@@ -1,30 +1,73 @@
import { lazy, Suspense } from "react";
import { createBrowserRouter, Navigate } from "react-router-dom";
import { AppShell } from "@/layouts/AppShell";
import { DashboardPage } from "@/pages/DashboardPage";
import { DocumentsPage } from "@/pages/DocumentsPage";
import { IngestionJobsPage } from "@/pages/IngestionJobsPage";
import { SearchPage } from "@/pages/SearchPage";
import { DocumentViewerPage } from "@/pages/DocumentViewerPage";
import { TablesFiguresPage } from "@/pages/TablesFiguresPage";
import { QualityControlPage } from "@/pages/QualityControlPage";
import { SystemHealthPage } from "@/pages/SystemHealthPage";
import { SettingsPage } from "@/pages/SettingsPage";
import { Skeleton } from "@/components/ui/skeleton";
// Each page is split into its own chunk so the initial bundle only ships the
// app shell + the page the user actually opens. Named exports are remapped to
// the `default` slot React.lazy expects.
const DashboardPage = lazy(() =>
import("@/pages/DashboardPage").then((m) => ({ default: m.DashboardPage }))
);
const DocumentsPage = lazy(() =>
import("@/pages/DocumentsPage").then((m) => ({ default: m.DocumentsPage }))
);
const IngestionJobsPage = lazy(() =>
import("@/pages/IngestionJobsPage").then((m) => ({ default: m.IngestionJobsPage }))
);
const SearchPage = lazy(() =>
import("@/pages/SearchPage").then((m) => ({ default: m.SearchPage }))
);
const DocumentViewerPage = lazy(() =>
import("@/pages/DocumentViewerPage").then((m) => ({ default: m.DocumentViewerPage }))
);
const TablesFiguresPage = lazy(() =>
import("@/pages/TablesFiguresPage").then((m) => ({ default: m.TablesFiguresPage }))
);
const QualityControlPage = lazy(() =>
import("@/pages/QualityControlPage").then((m) => ({ default: m.QualityControlPage }))
);
const SystemHealthPage = lazy(() =>
import("@/pages/SystemHealthPage").then((m) => ({ default: m.SystemHealthPage }))
);
const SettingsPage = lazy(() =>
import("@/pages/SettingsPage").then((m) => ({ default: m.SettingsPage }))
);
function RouteFallback() {
return (
<div className="space-y-4 animate-in fade-in-0">
<Skeleton className="h-9 w-1/3" />
<Skeleton className="h-5 w-2/3" />
<div className="grid grid-cols-1 gap-4 md:grid-cols-2 xl:grid-cols-4">
{Array.from({ length: 4 }).map((_, i) => (
<Skeleton key={i} className="h-28" />
))}
</div>
<Skeleton className="h-72" />
</div>
);
}
function withSuspense(node: React.ReactNode) {
return <Suspense fallback={<RouteFallback />}>{node}</Suspense>;
}
export const router = createBrowserRouter([
{
element: <AppShell />,
children: [
{ path: "/", element: <DashboardPage /> },
{ path: "/documents", element: <DocumentsPage /> },
{ path: "/ingestion", element: <IngestionJobsPage /> },
{ path: "/search", element: <SearchPage /> },
{ path: "/viewer", element: <DocumentViewerPage /> },
{ path: "/viewer/:id", element: <DocumentViewerPage /> },
{ path: "/tables-figures", element: <TablesFiguresPage /> },
{ path: "/quality", element: <QualityControlPage /> },
{ path: "/health", element: <SystemHealthPage /> },
{ path: "/settings", element: <SettingsPage /> },
{ path: "/", element: withSuspense(<DashboardPage />) },
{ path: "/documents", element: withSuspense(<DocumentsPage />) },
{ path: "/ingestion", element: withSuspense(<IngestionJobsPage />) },
{ path: "/search", element: withSuspense(<SearchPage />) },
{ path: "/viewer", element: withSuspense(<DocumentViewerPage />) },
{ path: "/viewer/:id", element: withSuspense(<DocumentViewerPage />) },
{ path: "/tables-figures", element: withSuspense(<TablesFiguresPage />) },
{ path: "/quality", element: withSuspense(<QualityControlPage />) },
{ path: "/health", element: withSuspense(<SystemHealthPage />) },
{ path: "/settings", element: withSuspense(<SettingsPage />) },
{ path: "*", element: <Navigate to="/" replace /> },
],
},

View File

@@ -1,11 +1,15 @@
import axios, { type AxiosInstance, type AxiosError } from "axios";
const BASE_URL = import.meta.env.VITE_API_BASE_URL ?? "/api/v1";
const API_KEY = import.meta.env.VITE_API_KEY ?? "";
const defaultHeaders: Record<string, string> = { "Content-Type": "application/json" };
if (API_KEY) defaultHeaders["X-API-Key"] = API_KEY;
export const apiClient: AxiosInstance = axios.create({
baseURL: BASE_URL,
timeout: 60_000,
headers: { "Content-Type": "application/json" },
headers: defaultHeaders,
});
apiClient.interceptors.response.use(

View File

@@ -47,6 +47,7 @@ export interface SearchHit {
citation: Citation;
quality_flags: QualityFlags;
metadata: Record<string, unknown>;
ocr_confidence?: number | null;
}
export interface SearchResponse {

View File

@@ -4,6 +4,7 @@ interface ImportMetaEnv {
readonly VITE_API_BASE_URL?: string;
readonly VITE_USE_MOCK?: string;
readonly VITE_APP_NAME?: string;
readonly VITE_API_KEY?: string;
}
interface ImportMeta {

View File

@@ -20,9 +20,7 @@ interface Props {
}
export function SearchResultCard({ hit, query, active, onSelect, reranked }: Props) {
const ocrConf =
(hit.metadata as { ocr_confidence?: number })?.ocr_confidence ??
null;
const ocrConf = hit.ocr_confidence ?? null;
return (
<motion.button
layout

View File

@@ -22,5 +22,32 @@ export default defineConfig({
build: {
outDir: "dist",
sourcemap: true,
chunkSizeWarningLimit: 800,
rollupOptions: {
output: {
// Split heavy third-party libs into their own long-lived chunks so
// page-level chunks stay small and the vendor cache is reused across
// releases.
manualChunks: (id) => {
if (!id.includes("node_modules")) return undefined;
if (id.includes("recharts") || id.includes("d3-")) return "vendor-recharts";
if (id.includes("framer-motion")) return "vendor-motion";
if (id.includes("@radix-ui") || id.includes("cmdk")) return "vendor-radix";
if (id.includes("@tanstack")) return "vendor-tanstack";
if (id.includes("react-router")) return "vendor-router";
if (id.includes("axios")) return "vendor-axios";
if (id.includes("lucide-react")) return "vendor-lucide";
if (id.includes("zustand")) return "vendor-state";
if (id.includes("sonner")) return "vendor-toast";
if (
id.includes("react/") ||
id.includes("react-dom") ||
id.includes("scheduler")
)
return "vendor-react";
return "vendor";
},
},
},
},
});

View File

@@ -12,54 +12,59 @@ license = { text = "Apache-2.0" }
readme = "README.md"
dependencies = [
"fastapi>=0.115.0",
"uvicorn[standard]>=0.30.0",
"pydantic>=2.7.0",
"pydantic-settings>=2.4.0",
"fastapi>=0.115.0,<0.117",
"uvicorn[standard]>=0.30.0,<0.33",
"pydantic>=2.7.0,<3",
"pydantic-settings>=2.4.0,<3",
"python-multipart>=0.0.9",
# DB
"sqlalchemy>=2.0.30",
"psycopg[binary]>=3.2.0",
"alembic>=1.13.0",
"sqlalchemy>=2.0.30,<2.1",
"psycopg[binary]>=3.2.0,<3.3",
"alembic>=1.13.0,<1.14",
# Object storage
"minio>=7.2.7",
"minio>=7.2.7,<8",
# Search/index
"opensearch-py>=2.6.0",
"qdrant-client>=1.10.0",
"opensearch-py>=2.6.0,<3",
"qdrant-client>=1.10.0,<1.13",
# Workers
"celery>=5.4.0",
"redis>=5.0.7",
"celery>=5.4.0,<6",
"redis>=5.0.7,<6",
# Ingestion
"ocrmypdf>=16.4.0",
"pikepdf>=9.0.0",
"pypdf>=4.3.0",
# Ingestion - pin Docling tight since its DocumentConverter API
# still moves between minor releases; lift the upper bound only
# after a smoke test on a staging corpus.
"ocrmypdf>=16.4.0,<17",
"pikepdf>=9.0.0,<10",
"pypdf>=4.3.0,<6",
"pdfminer.six>=20240706",
"docling>=2.0.0",
"docling>=2.0.0,<2.15",
# ML
"FlagEmbedding>=1.3.0",
"sentence-transformers>=3.0.0",
"torch>=2.2.0",
"numpy>=1.26.0",
"transformers>=4.42.0",
# ML - pin Flag/sentence-transformers/transformers within the
# families that have been verified against the reranker contract
# tests. Torch follows the family-major pin to keep CUDA wheels
# discoverable.
"FlagEmbedding>=1.3.0,<2",
"sentence-transformers>=3.0.0,<4",
"torch>=2.2.0,<3",
"numpy>=1.26.0,<3",
"transformers>=4.42.0,<5",
# Misc
"httpx>=0.27.0",
"tenacity>=8.5.0",
"structlog>=24.2.0",
"orjson>=3.10.0",
"httpx>=0.27.0,<0.29",
"tenacity>=8.5.0,<10",
"structlog>=24.2.0,<26",
"orjson>=3.10.0,<4",
"python-magic>=0.4.27; platform_system != 'Windows'",
"python-magic-bin>=0.4.14; platform_system == 'Windows'",
"langdetect>=1.0.9",
"langdetect>=1.0.9,<2",
"regex>=2024.5.15",
"rich>=13.7.1",
"tqdm>=4.66.4",
"click>=8.1.7",
"rich>=13.7.1,<14",
"tqdm>=4.66.4,<5",
"click>=8.1.7,<9",
]
[project.optional-dependencies]

View File

@@ -0,0 +1,195 @@
"""Reranker latency / throughput benchmark.
Measures BGE-reranker-v2-m3 (or whatever ``RERANKER_MODEL`` resolves to)
against synthetic or live corpus passages and prints the standard set of
percentiles plus throughput. Use this on staging hardware to verify whether
the configured device meets the latency budget before committing to a target
top-K.
Usage:
# 1) synthetic warm-up (no DB / OpenSearch needed)
python scripts/benchmark_reranker.py --queries 32 --candidates 40 \
--passage-length 700 --warmup 4
# 2) live corpus pull (samples real chunks from OpenSearch)
python scripts/benchmark_reranker.py --source opensearch \
--query "ГОСТ 21.501-93" \
--candidates 40
Outputs JSON to stdout and a markdown summary table.
"""
from __future__ import annotations
import argparse
import json
import statistics
import sys
import time
from dataclasses import asdict, dataclass
from app.config import settings
from app.indexing.reranker import get_reranker
from app.logging_config import configure_logging, get_logger
configure_logging()
logger = get_logger(__name__)
@dataclass
class BenchResult:
model: str
device: str
queries: int
candidates_per_query: int
passage_chars: int
warmup: int
p50_ms: float
p95_ms: float
p99_ms: float
mean_ms: float
pairs_per_sec: float
wall_seconds: float
def percentile(values: list[float], q: float) -> float:
if not values:
return 0.0
s = sorted(values)
idx = max(0, min(len(s) - 1, int(round((q / 100.0) * (len(s) - 1)))))
return s[idx]
def synthetic_passages(n: int, chars: int) -> list[str]:
seed = "ГОСТ 21.501-93 определяет правила выполнения архитектурно-строительных рабочих чертежей. "
base = (seed * ((chars // len(seed)) + 2))[:chars]
return [f"[{i}] {base}" for i in range(n)]
def synthetic_queries(n: int) -> list[str]:
samples = [
"ГОСТ 21.501-93 рабочие чертежи",
"класс бетона B25",
"журнал ремонтов узлов",
"правила производства земляных работ",
"схема электропитания корпус 3",
"контроль качества сварных соединений",
"регламент технического обслуживания",
]
return [samples[i % len(samples)] for i in range(n)]
def passages_from_opensearch(query: str, top_k: int) -> list[str]:
from app.indexing.opensearch_client import get_opensearch
res = get_opensearch().search(
index=settings.opensearch_index_chunks,
body={
"size": top_k,
"query": {"multi_match": {"query": query, "fields": ["text", "text.ru", "text.en"]}},
"_source": ["text"],
},
request_timeout=30,
)
return [h["_source"]["text"] for h in res["hits"]["hits"] if h["_source"].get("text")]
def run(
queries: list[str],
candidates_per_query: int,
passage_chars: int,
warmup: int,
source: str,
) -> BenchResult:
reranker = get_reranker()
if not reranker.available:
print("ERROR: reranker model failed to load", file=sys.stderr)
sys.exit(2)
# Warmup so JIT / weight loading does not skew p50.
if warmup > 0:
warm = synthetic_passages(candidates_per_query, passage_chars)
for q in queries[:warmup] or [queries[0]]:
reranker.score(q, warm)
latencies_ms: list[float] = []
pair_count = 0
t0 = time.perf_counter()
for q in queries:
if source == "opensearch":
passages = passages_from_opensearch(q, candidates_per_query)
if len(passages) < candidates_per_query:
passages += synthetic_passages(candidates_per_query - len(passages), passage_chars)
else:
passages = synthetic_passages(candidates_per_query, passage_chars)
start = time.perf_counter()
reranker.score(q, passages)
latencies_ms.append((time.perf_counter() - start) * 1000.0)
pair_count += len(passages)
wall = time.perf_counter() - t0
return BenchResult(
model=reranker.model_name,
device=reranker.device,
queries=len(queries),
candidates_per_query=candidates_per_query,
passage_chars=passage_chars,
warmup=warmup,
p50_ms=percentile(latencies_ms, 50),
p95_ms=percentile(latencies_ms, 95),
p99_ms=percentile(latencies_ms, 99),
mean_ms=statistics.fmean(latencies_ms),
pairs_per_sec=pair_count / wall if wall > 0 else 0.0,
wall_seconds=wall,
)
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--queries", type=int, default=32)
parser.add_argument("--candidates", type=int, default=settings.rerank_candidates)
parser.add_argument("--passage-length", type=int, default=700,
help="Synthetic passage character length")
parser.add_argument("--warmup", type=int, default=2)
parser.add_argument("--source", choices=["synthetic", "opensearch"], default="synthetic")
parser.add_argument("--query", type=str, default=None,
help="Single query to use against OpenSearch (with --source opensearch)")
parser.add_argument("--json-only", action="store_true")
args = parser.parse_args()
if args.source == "opensearch" and args.query:
queries = [args.query] * args.queries
else:
queries = synthetic_queries(args.queries)
result = run(
queries=queries,
candidates_per_query=args.candidates,
passage_chars=args.passage_length,
warmup=args.warmup,
source=args.source,
)
payload = asdict(result)
print(json.dumps(payload, indent=2))
if not args.json_only:
print()
print("| Metric | Value |")
print("|---------------------|-----------------|")
print(f"| Model | {result.model} |")
print(f"| Device | {result.device} |")
print(f"| Queries | {result.queries} |")
print(f"| Candidates / query | {result.candidates_per_query} |")
print(f"| Passage chars | {result.passage_chars} |")
print(f"| p50 latency | {result.p50_ms:.1f} ms |")
print(f"| p95 latency | {result.p95_ms:.1f} ms |")
print(f"| p99 latency | {result.p99_ms:.1f} ms |")
print(f"| mean latency | {result.mean_ms:.1f} ms |")
print(f"| Throughput | {result.pairs_per_sec:.1f} pairs/s |")
print(f"| Wall time | {result.wall_seconds:.2f} s |")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,150 @@
"""Generate N synthetic single-page PDFs for load testing the ingest pipeline.
Each PDF carries 4-8 paragraphs of seeded English + Cyrillic text. The
generator embeds text via the standard Helvetica font, which only covers
latin-1 - Cyrillic glyphs render as placeholders. That is acceptable for a
*load* generator: the focus is throughput at scale, not retrieval relevance.
For semantic regression tests, use a real corpus sample instead.
Output directory layout::
<out>/2025-LOAD/
legacy_00001.pdf
legacy_00002.pdf
...
Usage:
python scripts/generate_synthetic_pdfs.py --count 1000 --out /data/input/load
python scripts/generate_synthetic_pdfs.py --count 100 --out ./tmp --scanned-every 5
"""
from __future__ import annotations
import argparse
import random
import sys
from pathlib import Path
try:
from pypdf import PdfWriter
except Exception: # noqa: BLE001
PdfWriter = None # type: ignore[assignment]
try:
import pikepdf
except Exception: # noqa: BLE001
pikepdf = None # type: ignore[assignment]
PAGE_W = 595 # A4 @ 72 dpi (close enough)
PAGE_H = 842
SAMPLE_SENTENCES_RU = [
"ГОСТ 21.501-93 определяет правила выполнения архитектурно-строительных чертежей.",
"Класс бетона B25 применяется для несущих конструкций нижних этажей.",
"Все размеры приведены в миллиметрах, если иное не указано.",
"Контроль качества сварных соединений выполняется в соответствии с регламентом.",
"Технологический регламент технического обслуживания пересматривается ежегодно.",
"При производстве работ при пониженных температурах требуется дополнительное обогрев.",
]
SAMPLE_SENTENCES_EN = [
"The drawing follows the conventions established in the project specification.",
"All measurements are reported in SI units and validated against the cited standard.",
"Service intervals are detailed in the maintenance schedule appended at the back.",
"Quality control checkpoints precede each acceptance handoff.",
]
def make_text_pdf(path: Path, doc_id: int, rng: random.Random) -> None:
"""Build a real, structurally valid PDF directly via PDF primitives.
We avoid heavy dependencies (reportlab) for the hot path; pypdf only writes
the container. Text is embedded as a content stream using the built-in
Helvetica font.
"""
if PdfWriter is None:
raise RuntimeError("pypdf is required (pip install pypdf>=4.3)")
n_paragraphs = rng.randint(4, 8)
paragraphs = []
for _ in range(n_paragraphs):
sents = rng.sample(SAMPLE_SENTENCES_RU + SAMPLE_SENTENCES_EN,
k=rng.randint(2, 4))
paragraphs.append(" ".join(sents))
body = f"Legacy archive document #{doc_id}\n\n" + "\n\n".join(paragraphs)
_write_minimal_pdf(path, body)
def _write_minimal_pdf(path: Path, body: str) -> None:
"""Hand-write a 1-page PDF with Helvetica text. Keeps the file under 4 KB
so the load generator scales to tens of thousands of documents on a laptop.
"""
# Escape PDF special chars
body_escaped = (body.replace("\\", "\\\\")
.replace("(", "\\(")
.replace(")", "\\)"))
lines = body_escaped.split("\n")
leading = 14
y_start = PAGE_H - 72
stream_lines = []
for i, line in enumerate(lines[:50]): # cap visible lines
y = y_start - i * leading
stream_lines.append(f"BT /F1 11 Tf 72 {y} Td ({line}) Tj ET")
content_stream = "\n".join(stream_lines) + "\n"
content_bytes = content_stream.encode("latin-1", errors="replace")
objs = []
objs.append(b"<< /Type /Catalog /Pages 2 0 R >>")
objs.append(b"<< /Type /Pages /Count 1 /Kids [3 0 R] >>")
objs.append(
f"<< /Type /Page /Parent 2 0 R /Resources << /Font << /F1 5 0 R >> >>"
f" /MediaBox [0 0 {PAGE_W} {PAGE_H}] /Contents 4 0 R >>".encode("latin-1")
)
objs.append(
b"<< /Length " + str(len(content_bytes)).encode("ascii") + b" >>\nstream\n"
+ content_bytes + b"endstream"
)
objs.append(b"<< /Type /Font /Subtype /Type1 /BaseFont /Helvetica >>")
output = bytearray(b"%PDF-1.4\n%\xE2\xE3\xCF\xD3\n")
offsets = [0]
for i, obj in enumerate(objs, start=1):
offsets.append(len(output))
output += f"{i} 0 obj\n".encode("ascii") + obj + b"\nendobj\n"
xref_offset = len(output)
output += b"xref\n"
output += f"0 {len(objs) + 1}\n".encode("ascii")
output += b"0000000000 65535 f \n"
for off in offsets[1:]:
output += f"{off:010d} 00000 n \n".encode("ascii")
output += b"trailer\n"
output += f"<< /Size {len(objs) + 1} /Root 1 0 R >>\n".encode("ascii")
output += b"startxref\n"
output += f"{xref_offset}\n".encode("ascii")
output += b"%%EOF\n"
path.write_bytes(bytes(output))
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--count", type=int, required=True)
parser.add_argument("--out", type=Path, required=True)
parser.add_argument("--seed", type=int, default=20260513)
args = parser.parse_args()
args.out.mkdir(parents=True, exist_ok=True)
rng = random.Random(args.seed)
for i in range(1, args.count + 1):
target = args.out / f"legacy_{i:06d}.pdf"
make_text_pdf(target, i, rng)
if i % 500 == 0:
print(f" generated {i}/{args.count}")
print(f"done: {args.count} files in {args.out}")
return 0
if __name__ == "__main__":
sys.exit(main())

108
scripts/load_ingest.py Normal file
View File

@@ -0,0 +1,108 @@
"""Drive ingest at scale and report per-stage throughput.
This script does NOT itself run OCR/Docling - it triggers
``POST /api/v1/ingest/folder`` and then samples the ``documents`` /
``processing_events`` tables to compute throughput.
Usage:
# 1. Generate synthetic PDFs
python scripts/generate_synthetic_pdfs.py --count 1000 --out /data/input/load
# 2. Trigger ingest + watch
python scripts/load_ingest.py \
--path /data/input/load \
--api-url http://localhost:8000/api/v1 \
--watch-seconds 600 \
--report-file load_report.json
"""
from __future__ import annotations
import argparse
import json
import sys
import time
from collections import Counter
from pathlib import Path
import httpx
def trigger_ingest(api_url: str, folder: str, force: bool = False) -> dict:
res = httpx.post(
f"{api_url}/ingest/folder",
json={"path": folder, "recursive": True, "force": force},
timeout=600,
)
res.raise_for_status()
return res.json()
def sample_status(api_url: str) -> dict[str, int]:
"""Aggregate document statuses from a backend endpoint or the database.
The current API does not expose /documents/stats; we fall back to /health
only as a liveness probe and rely on the caller to inspect Postgres for
real counts. To keep the script self-contained we attempt a hypothetical
``GET /documents/stats`` first and degrade silently.
"""
try:
res = httpx.get(f"{api_url}/documents/stats", timeout=10)
if res.status_code == 200:
return res.json().get("by_status", {})
except Exception: # noqa: BLE001
pass
return {}
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--path", required=True, help="Folder mounted in the api container")
parser.add_argument("--api-url", default="http://localhost:8000/api/v1")
parser.add_argument("--watch-seconds", type=int, default=600)
parser.add_argument("--poll-interval", type=int, default=10)
parser.add_argument("--force", action="store_true")
parser.add_argument("--report-file", type=Path, default=None)
args = parser.parse_args()
print(f"[load] trigger {args.path}")
enqueue = trigger_ingest(args.api_url, args.path, force=args.force)
print(f"[load] enqueue response: {json.dumps(enqueue)}")
started = time.time()
history: list[dict] = []
last_status: Counter[str] = Counter()
while (time.time() - started) < args.watch_seconds:
snap = Counter(sample_status(args.api_url))
delta = snap - last_status
elapsed = round(time.time() - started, 1)
print(f"[load] t+{elapsed:>6}s {dict(snap)} delta={dict(delta)}")
history.append({"t": elapsed, "snapshot": dict(snap)})
last_status = snap
# Heuristic stop: queued count from enqueue all reached terminal status.
terminal = sum(
snap.get(s, 0)
for s in ("INDEXING_COMPLETED", "FAILED", "OCR_FAILED", "EXTRACTION_FAILED")
)
if terminal >= enqueue.get("queued", 0) > 0:
print("[load] all queued docs reached terminal status")
break
time.sleep(args.poll_interval)
report = {
"enqueue": enqueue,
"watch_seconds": time.time() - started,
"history": history,
"final": dict(last_status),
}
print(json.dumps(report, indent=2))
if args.report_file:
args.report_file.write_text(json.dumps(report, indent=2), encoding="utf-8")
print(f"[load] wrote {args.report_file}")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,72 @@
"""Locust load profile for the LegacyHUB hybrid search API.
Run:
pip install locust
locust -f scripts/locustfile_search.py \
--host http://localhost:8000 \
--users 50 --spawn-rate 5 --run-time 5m
Or headless with HTML report:
locust -f scripts/locustfile_search.py --host http://localhost:8000 \
--headless --users 100 --spawn-rate 10 --run-time 10m \
--html load_search.html
"""
from __future__ import annotations
import random
from locust import HttpUser, between, task
QUERIES = [
"ГОСТ 21.501-93 рабочие чертежи",
"класс бетона B25",
"регламент технического обслуживания",
"контроль качества сварных соединений",
"схема электропитания корпус 3",
"журнал ремонтов узлов",
"правила производства земляных работ",
"акты приемки скрытых работ",
"fundament concrete grade",
"maintenance schedule appendix",
]
MODES = ["hybrid", "hybrid", "hybrid", "lexical", "semantic"]
class SearchUser(HttpUser):
wait_time = between(0.5, 2.5)
api_prefix = "/api/v1"
@task(8)
def hybrid_search(self):
body = {
"query": random.choice(QUERIES),
"limit": random.choice([5, 10, 20]),
"filters": {
"document_id": None,
"source_path": None,
"block_type": None,
"min_ocr_confidence": None,
},
"search_mode": random.choice(MODES),
}
with self.client.post(
f"{self.api_prefix}/search",
json=body,
name="POST /search",
catch_response=True,
) as res:
if res.status_code != 200:
res.failure(f"HTTP {res.status_code}: {res.text[:120]}")
return
data = res.json()
if not data.get("results"):
res.failure("empty results")
@task(1)
def health(self):
self.client.get(f"{self.api_prefix}/health", name="GET /health")

75
tests/test_alembic.py Normal file
View File

@@ -0,0 +1,75 @@
"""Alembic migration smoke test.
We do not boot a real Postgres in CI; instead we point Alembic at an in-process
SQLite database and verify:
- ``alembic upgrade head`` succeeds offline (SQL generation) using the real
migration files, exercising every column type and constraint declaration;
- ``downgrade base`` rewinds without errors.
This catches typos and broken migration ordering early without requiring the
full backing-service compose stack to be online.
"""
from __future__ import annotations
import os
import sys
from pathlib import Path
import pytest
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT))
@pytest.fixture
def alembic_cfg(tmp_path, monkeypatch):
"""Configure Alembic against an isolated SQLite file."""
db_file = tmp_path / "legacyhub.db"
monkeypatch.setenv("POSTGRES_HOST", "127.0.0.1")
monkeypatch.setenv("POSTGRES_PORT", "5432")
# Force a fresh Settings + Alembic env that ignores the configured PG.
from alembic.config import Config
cfg = Config(str(ROOT / "alembic.ini"))
cfg.set_main_option("script_location", str(ROOT / "app" / "db" / "migrations"))
cfg.set_main_option("sqlalchemy.url", f"sqlite:///{db_file}")
return cfg
def test_migration_offline_emits_sql(alembic_cfg, tmp_path):
"""Offline mode generates SQL for every table; verify ``documents`` appears
and at least one JSONB-equivalent column is rendered. SQLite has no JSONB
but Alembic's offline mode happily emits the raw DDL for inspection.
"""
from alembic import command
out_file = tmp_path / "upgrade.sql"
# ``--sql`` mode bypasses dialect-specific runtime, perfect for a fast check.
with out_file.open("w", encoding="utf-8") as f:
old_stdout = sys.stdout
sys.stdout = f
try:
command.upgrade(alembic_cfg, "head", sql=True)
finally:
sys.stdout = old_stdout
sql = out_file.read_text(encoding="utf-8")
assert "CREATE TABLE documents" in sql
assert "CREATE TABLE chunks" in sql
assert "CREATE TABLE processing_events" in sql
# Constraint sanity
assert "uq_chunks_doc_idx" in sql
assert "uq_pages_doc_page" in sql
def test_revision_history_is_linear(alembic_cfg):
"""The current project has a single linear history at 0001_initial."""
from alembic.script import ScriptDirectory
script = ScriptDirectory.from_config(alembic_cfg)
heads = script.get_heads()
assert len(heads) == 1, f"expected one head, got: {heads}"
initial = next(iter(script.walk_revisions()))
assert initial.revision == "0001_initial"

104
tests/test_api_health.py Normal file
View File

@@ -0,0 +1,104 @@
"""Contract test for /health.
Patches the individual probe functions so the test does not depend on a live
Postgres / MinIO / OpenSearch / Qdrant / Redis. Verifies:
- the route is mounted under the configured API prefix;
- the response shape conforms to ``HealthResponse``;
- the overall status follows the worst-component-wins rule
(``ok`` -> ``ok``, any ``error`` -> ``error``, ``degraded`` otherwise);
- the CORS preflight responds with the configured allowed origin.
"""
from __future__ import annotations
import pytest
from fastapi.testclient import TestClient
from app.api.schemas import ComponentHealth
from app.config import settings
from app.main import app
@pytest.fixture
def client() -> TestClient:
return TestClient(app)
def _ok(name: str) -> ComponentHealth:
return ComponentHealth(name=name, status="ok", detail={})
def _err(name: str) -> ComponentHealth:
return ComponentHealth(name=name, status="error", detail={"error": "down"})
def _degraded(name: str) -> ComponentHealth:
return ComponentHealth(name=name, status="degraded", detail={"cluster_status": "red"})
def _patch_probes(monkeypatch, **overrides):
from app.api import routes_health
defaults = {
"_check_postgres": lambda: _ok("postgres"),
"_check_minio": lambda: _ok("minio"),
"_check_opensearch": lambda: _ok("opensearch"),
"_check_qdrant": lambda: _ok("qdrant"),
"_check_redis": lambda: _ok("redis"),
}
defaults.update(overrides)
for name, fn in defaults.items():
monkeypatch.setattr(routes_health, name, fn)
def test_health_all_ok(client: TestClient, monkeypatch):
_patch_probes(monkeypatch)
res = client.get(f"{settings.app_api_prefix}/health")
assert res.status_code == 200
body = res.json()
assert body["status"] == "ok"
assert {c["name"] for c in body["components"]} == {
"postgres",
"minio",
"opensearch",
"qdrant",
"redis",
}
def test_health_error_when_any_component_down(client: TestClient, monkeypatch):
_patch_probes(monkeypatch, _check_qdrant=lambda: _err("qdrant"))
res = client.get(f"{settings.app_api_prefix}/health")
assert res.status_code == 200
body = res.json()
assert body["status"] == "error"
qdrant = next(c for c in body["components"] if c["name"] == "qdrant")
assert qdrant["status"] == "error"
def test_health_degraded_when_any_component_degraded(client: TestClient, monkeypatch):
_patch_probes(monkeypatch, _check_opensearch=lambda: _degraded("opensearch"))
res = client.get(f"{settings.app_api_prefix}/health")
body = res.json()
assert body["status"] == "degraded"
def test_root_includes_api_prefix(client: TestClient):
res = client.get("/")
assert res.status_code == 200
assert res.json()["api"] == settings.app_api_prefix
def test_cors_preflight_allows_configured_origin(client: TestClient):
origin = settings.cors_origins[0]
res = client.options(
f"{settings.app_api_prefix}/health",
headers={
"Origin": origin,
"Access-Control-Request-Method": "GET",
},
)
assert res.status_code == 200
assert res.headers.get("access-control-allow-origin") == origin

167
tests/test_api_security.py Normal file
View File

@@ -0,0 +1,167 @@
"""Tests for the optional API-key auth middleware."""
from __future__ import annotations
import importlib
import pytest
from fastapi.testclient import TestClient
KEY = "test-secret-key-DO-NOT-USE-IN-PROD"
@pytest.fixture
def secured_app(monkeypatch):
"""Reload the FastAPI application with API_KEY set so the middleware
installs itself before the lifespan starts. Returns a TestClient bound to
that fresh app instance.
"""
monkeypatch.setenv("API_KEY", KEY)
# Drop cached Settings and main so the new env vars are picked up.
import app.config as cfg
import app.main as main_module
cfg.get_settings.cache_clear()
importlib.reload(cfg)
importlib.reload(main_module)
return main_module.app
def _patch_health(monkeypatch, module):
from app.api.schemas import ComponentHealth
def _ok(name):
return ComponentHealth(name=name, status="ok", detail={})
for name in (
"_check_postgres",
"_check_minio",
"_check_opensearch",
"_check_qdrant",
"_check_redis",
):
monkeypatch.setattr(module, name, lambda n=name: _ok(n.removeprefix("_check_")))
def test_health_remains_open_when_key_required(secured_app, monkeypatch):
from app.api import routes_health
from app.config import settings
_patch_health(monkeypatch, routes_health)
client = TestClient(secured_app)
res = client.get(f"{settings.app_api_prefix}/health")
assert res.status_code == 200
def test_protected_route_rejects_missing_key(secured_app, monkeypatch):
from app.config import settings
from app.indexing import hybrid_search
monkeypatch.setattr(hybrid_search, "run_search", lambda req: pytest.fail("must not run"))
client = TestClient(secured_app)
res = client.post(
f"{settings.app_api_prefix}/search",
json={
"query": "anything",
"limit": 1,
"filters": {
"document_id": None,
"source_path": None,
"block_type": None,
"min_ocr_confidence": None,
},
"search_mode": "hybrid",
},
)
assert res.status_code == 401
assert res.json()["detail"].startswith("invalid")
def test_protected_route_accepts_x_api_key_header(secured_app, monkeypatch):
from app.config import settings
from app.indexing import hybrid_search
from app.api.schemas import SearchResponse
monkeypatch.setattr(
hybrid_search,
"run_search",
lambda req: SearchResponse(
query=req.query, mode=req.search_mode, total_candidates=0, reranked=False, results=[]
),
)
client = TestClient(secured_app)
res = client.post(
f"{settings.app_api_prefix}/search",
headers={"X-API-Key": KEY},
json={
"query": "x",
"limit": 1,
"filters": {
"document_id": None,
"source_path": None,
"block_type": None,
"min_ocr_confidence": None,
},
"search_mode": "hybrid",
},
)
assert res.status_code == 200
def test_protected_route_accepts_bearer_token(secured_app, monkeypatch):
from app.config import settings
from app.indexing import hybrid_search
from app.api.schemas import SearchResponse
monkeypatch.setattr(
hybrid_search,
"run_search",
lambda req: SearchResponse(
query=req.query, mode=req.search_mode, total_candidates=0, reranked=False, results=[]
),
)
client = TestClient(secured_app)
res = client.post(
f"{settings.app_api_prefix}/search",
headers={"Authorization": f"Bearer {KEY}"},
json={
"query": "x",
"limit": 1,
"filters": {
"document_id": None,
"source_path": None,
"block_type": None,
"min_ocr_confidence": None,
},
"search_mode": "hybrid",
},
)
assert res.status_code == 200
def test_protected_route_rejects_wrong_key(secured_app):
from app.config import settings
client = TestClient(secured_app)
res = client.post(
f"{settings.app_api_prefix}/search",
headers={"X-API-Key": "wrong"},
json={
"query": "x",
"limit": 1,
"filters": {
"document_id": None,
"source_path": None,
"block_type": None,
"min_ocr_confidence": None,
},
"search_mode": "hybrid",
},
)
assert res.status_code == 401

130
tests/test_routes_search.py Normal file
View File

@@ -0,0 +1,130 @@
"""Contract test for POST /search.
The hybrid search backend depends on live OpenSearch + Qdrant + embedder; we
patch :func:`app.indexing.hybrid_search.run_search` so the route can be
exercised with the real request/response schemas without bringing infra up.
"""
from __future__ import annotations
import uuid
import pytest
from fastapi.testclient import TestClient
from app.api.schemas import (
Citation,
SearchHit,
SearchResponse,
)
from app.config import settings
from app.main import app
@pytest.fixture
def client() -> TestClient:
return TestClient(app)
def _stub_response(query: str) -> SearchResponse:
doc_id = uuid.uuid4()
chunk_id = uuid.uuid4()
return SearchResponse(
query=query,
mode="hybrid",
total_candidates=42,
reranked=True,
results=[
SearchHit(
rank=1,
score=0.91,
document_id=doc_id,
chunk_id=chunk_id,
original_file_name="GOST_21.501-93.pdf",
source_path="/data/input/standards/GOST_21.501-93.pdf",
page_number=12,
block_type="paragraph",
text=f"Highlighted text for {query}.",
citation=Citation(
pdf="GOST_21.501-93.pdf",
page=12,
block_id="b-12-0",
table_id=None,
figure_id=None,
),
quality_flags={"low_ocr_confidence": False, "needs_manual_review": False},
metadata={"section_heading": "Глава 2"},
)
],
)
def test_search_returns_hit_with_citation(client: TestClient, monkeypatch):
from app.api import routes_search as routes
from app.indexing import hybrid_search
def fake_run(req):
assert req.query
return _stub_response(req.query)
monkeypatch.setattr(hybrid_search, "run_search", fake_run)
# The route imports run_search lazily; patch the module-level binding too.
monkeypatch.setattr(routes, "run_search", fake_run, raising=False)
res = client.post(
f"{settings.app_api_prefix}/search",
json={
"query": "ГОСТ 21.501-93",
"limit": 10,
"filters": {
"document_id": None,
"source_path": None,
"block_type": None,
"min_ocr_confidence": None,
},
"search_mode": "hybrid",
},
)
assert res.status_code == 200, res.text
body = res.json()
assert body["query"] == "ГОСТ 21.501-93"
assert body["mode"] == "hybrid"
assert body["reranked"] is True
assert body["total_candidates"] == 42
assert len(body["results"]) == 1
hit = body["results"][0]
assert hit["rank"] == 1
assert hit["page_number"] == 12
assert hit["block_type"] == "paragraph"
assert hit["citation"]["pdf"] == "GOST_21.501-93.pdf"
assert hit["citation"]["page"] == 12
assert hit["citation"]["block_id"] == "b-12-0"
def test_search_rejects_empty_query(client: TestClient, monkeypatch):
"""Schema validation should reject empty query without hitting the backend."""
from app.indexing import hybrid_search
def must_not_run(_req): # noqa: ARG001
raise AssertionError("backend should not be called for invalid input")
monkeypatch.setattr(hybrid_search, "run_search", must_not_run)
res = client.post(
f"{settings.app_api_prefix}/search",
json={
"query": "",
"limit": 10,
"filters": {
"document_id": None,
"source_path": None,
"block_type": None,
"min_ocr_confidence": None,
},
"search_mode": "hybrid",
},
)
assert res.status_code == 422