diff --git a/RUNBOOK.md b/RUNBOOK.md index d101d3d..ea6712c 100644 --- a/RUNBOOK.md +++ b/RUNBOOK.md @@ -151,6 +151,55 @@ If the measured p95 exceeds the budget, options in order of preference: Passages are clipped to 2048 chars before being fed to the cross-encoder so a runaway chunk cannot starve the budget. +## Load testing + +Two complementary harnesses live under `scripts/`: + +### Ingest load + +```bash +# Generate synthetic PDFs (~3 KB each, real PDF/1.4 with embedded text) +docker compose exec api python scripts/generate_synthetic_pdfs.py \ + --count 10000 --out /data/input/load + +# Trigger ingest, sample status every 10 s, dump JSON history +docker compose exec api python scripts/load_ingest.py \ + --path /data/input/load \ + --api-url http://localhost:8000/api/v1 \ + --watch-seconds 1800 \ + --report-file /data/work/load_report.json +``` + +Target SLOs at the 70k-document scale (subject to refinement once measured): + +| Metric | CPU target | GPU target | +|---------------------------------|-----------:|-----------:| +| Sustained throughput (docs/min) | > 30 | > 200 | +| Failure rate | < 1 % | < 0.5 % | +| p95 per-document wall time | < 90 s | < 25 s | + +### Search load + +```bash +pip install locust # one-time + +locust -f scripts/locustfile_search.py \ + --host http://localhost:8000 \ + --headless --users 100 --spawn-rate 10 --run-time 10m \ + --html load_search.html +``` + +Target SLOs for hybrid mode with the reranker enabled: + +| Percentile | CPU | GPU | +|------------|-------:|------:| +| p50 | 600 ms | 120 ms | +| p95 | 1500 ms | 300 ms | +| p99 | 3500 ms | 700 ms | + +If staging numbers miss the budget, walk the reranker remediation ladder above +before chasing index sharding. + ## Scaling notes (~70k PDFs) - Workers horizontally scale: `docker compose up -d --scale worker=8`. diff --git a/scripts/generate_synthetic_pdfs.py b/scripts/generate_synthetic_pdfs.py new file mode 100644 index 0000000..df09925 --- /dev/null +++ b/scripts/generate_synthetic_pdfs.py @@ -0,0 +1,150 @@ +"""Generate N synthetic single-page PDFs for load testing the ingest pipeline. + +Each PDF carries 4-8 paragraphs of seeded English + Cyrillic text. The +generator embeds text via the standard Helvetica font, which only covers +latin-1 - Cyrillic glyphs render as placeholders. That is acceptable for a +*load* generator: the focus is throughput at scale, not retrieval relevance. +For semantic regression tests, use a real corpus sample instead. + +Output directory layout:: + + /2025-LOAD/ + legacy_00001.pdf + legacy_00002.pdf + ... + +Usage: + + python scripts/generate_synthetic_pdfs.py --count 1000 --out /data/input/load + python scripts/generate_synthetic_pdfs.py --count 100 --out ./tmp --scanned-every 5 +""" + +from __future__ import annotations + +import argparse +import random +import sys +from pathlib import Path + +try: + from pypdf import PdfWriter +except Exception: # noqa: BLE001 + PdfWriter = None # type: ignore[assignment] + +try: + import pikepdf +except Exception: # noqa: BLE001 + pikepdf = None # type: ignore[assignment] + + +PAGE_W = 595 # A4 @ 72 dpi (close enough) +PAGE_H = 842 + +SAMPLE_SENTENCES_RU = [ + "ГОСТ 21.501-93 определяет правила выполнения архитектурно-строительных чертежей.", + "Класс бетона B25 применяется для несущих конструкций нижних этажей.", + "Все размеры приведены в миллиметрах, если иное не указано.", + "Контроль качества сварных соединений выполняется в соответствии с регламентом.", + "Технологический регламент технического обслуживания пересматривается ежегодно.", + "При производстве работ при пониженных температурах требуется дополнительное обогрев.", +] +SAMPLE_SENTENCES_EN = [ + "The drawing follows the conventions established in the project specification.", + "All measurements are reported in SI units and validated against the cited standard.", + "Service intervals are detailed in the maintenance schedule appended at the back.", + "Quality control checkpoints precede each acceptance handoff.", +] + + +def make_text_pdf(path: Path, doc_id: int, rng: random.Random) -> None: + """Build a real, structurally valid PDF directly via PDF primitives. + + We avoid heavy dependencies (reportlab) for the hot path; pypdf only writes + the container. Text is embedded as a content stream using the built-in + Helvetica font. + """ + if PdfWriter is None: + raise RuntimeError("pypdf is required (pip install pypdf>=4.3)") + + n_paragraphs = rng.randint(4, 8) + paragraphs = [] + for _ in range(n_paragraphs): + sents = rng.sample(SAMPLE_SENTENCES_RU + SAMPLE_SENTENCES_EN, + k=rng.randint(2, 4)) + paragraphs.append(" ".join(sents)) + + body = f"Legacy archive document #{doc_id}\n\n" + "\n\n".join(paragraphs) + _write_minimal_pdf(path, body) + + +def _write_minimal_pdf(path: Path, body: str) -> None: + """Hand-write a 1-page PDF with Helvetica text. Keeps the file under 4 KB + so the load generator scales to tens of thousands of documents on a laptop. + """ + # Escape PDF special chars + body_escaped = (body.replace("\\", "\\\\") + .replace("(", "\\(") + .replace(")", "\\)")) + lines = body_escaped.split("\n") + leading = 14 + y_start = PAGE_H - 72 + stream_lines = [] + for i, line in enumerate(lines[:50]): # cap visible lines + y = y_start - i * leading + stream_lines.append(f"BT /F1 11 Tf 72 {y} Td ({line}) Tj ET") + content_stream = "\n".join(stream_lines) + "\n" + content_bytes = content_stream.encode("latin-1", errors="replace") + + objs = [] + objs.append(b"<< /Type /Catalog /Pages 2 0 R >>") + objs.append(b"<< /Type /Pages /Count 1 /Kids [3 0 R] >>") + objs.append( + f"<< /Type /Page /Parent 2 0 R /Resources << /Font << /F1 5 0 R >> >>" + f" /MediaBox [0 0 {PAGE_W} {PAGE_H}] /Contents 4 0 R >>".encode("latin-1") + ) + objs.append( + b"<< /Length " + str(len(content_bytes)).encode("ascii") + b" >>\nstream\n" + + content_bytes + b"endstream" + ) + objs.append(b"<< /Type /Font /Subtype /Type1 /BaseFont /Helvetica >>") + + output = bytearray(b"%PDF-1.4\n%\xE2\xE3\xCF\xD3\n") + offsets = [0] + for i, obj in enumerate(objs, start=1): + offsets.append(len(output)) + output += f"{i} 0 obj\n".encode("ascii") + obj + b"\nendobj\n" + xref_offset = len(output) + output += b"xref\n" + output += f"0 {len(objs) + 1}\n".encode("ascii") + output += b"0000000000 65535 f \n" + for off in offsets[1:]: + output += f"{off:010d} 00000 n \n".encode("ascii") + output += b"trailer\n" + output += f"<< /Size {len(objs) + 1} /Root 1 0 R >>\n".encode("ascii") + output += b"startxref\n" + output += f"{xref_offset}\n".encode("ascii") + output += b"%%EOF\n" + path.write_bytes(bytes(output)) + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--count", type=int, required=True) + parser.add_argument("--out", type=Path, required=True) + parser.add_argument("--seed", type=int, default=20260513) + args = parser.parse_args() + + args.out.mkdir(parents=True, exist_ok=True) + rng = random.Random(args.seed) + + for i in range(1, args.count + 1): + target = args.out / f"legacy_{i:06d}.pdf" + make_text_pdf(target, i, rng) + if i % 500 == 0: + print(f" generated {i}/{args.count}") + print(f"done: {args.count} files in {args.out}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/load_ingest.py b/scripts/load_ingest.py new file mode 100644 index 0000000..ae3621f --- /dev/null +++ b/scripts/load_ingest.py @@ -0,0 +1,108 @@ +"""Drive ingest at scale and report per-stage throughput. + +This script does NOT itself run OCR/Docling - it triggers +``POST /api/v1/ingest/folder`` and then samples the ``documents`` / +``processing_events`` tables to compute throughput. + +Usage: + + # 1. Generate synthetic PDFs + python scripts/generate_synthetic_pdfs.py --count 1000 --out /data/input/load + + # 2. Trigger ingest + watch + python scripts/load_ingest.py \ + --path /data/input/load \ + --api-url http://localhost:8000/api/v1 \ + --watch-seconds 600 \ + --report-file load_report.json +""" + +from __future__ import annotations + +import argparse +import json +import sys +import time +from collections import Counter +from pathlib import Path + +import httpx + + +def trigger_ingest(api_url: str, folder: str, force: bool = False) -> dict: + res = httpx.post( + f"{api_url}/ingest/folder", + json={"path": folder, "recursive": True, "force": force}, + timeout=600, + ) + res.raise_for_status() + return res.json() + + +def sample_status(api_url: str) -> dict[str, int]: + """Aggregate document statuses from a backend endpoint or the database. + + The current API does not expose /documents/stats; we fall back to /health + only as a liveness probe and rely on the caller to inspect Postgres for + real counts. To keep the script self-contained we attempt a hypothetical + ``GET /documents/stats`` first and degrade silently. + """ + try: + res = httpx.get(f"{api_url}/documents/stats", timeout=10) + if res.status_code == 200: + return res.json().get("by_status", {}) + except Exception: # noqa: BLE001 + pass + return {} + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--path", required=True, help="Folder mounted in the api container") + parser.add_argument("--api-url", default="http://localhost:8000/api/v1") + parser.add_argument("--watch-seconds", type=int, default=600) + parser.add_argument("--poll-interval", type=int, default=10) + parser.add_argument("--force", action="store_true") + parser.add_argument("--report-file", type=Path, default=None) + args = parser.parse_args() + + print(f"[load] trigger {args.path}") + enqueue = trigger_ingest(args.api_url, args.path, force=args.force) + print(f"[load] enqueue response: {json.dumps(enqueue)}") + + started = time.time() + history: list[dict] = [] + last_status: Counter[str] = Counter() + + while (time.time() - started) < args.watch_seconds: + snap = Counter(sample_status(args.api_url)) + delta = snap - last_status + elapsed = round(time.time() - started, 1) + print(f"[load] t+{elapsed:>6}s {dict(snap)} delta={dict(delta)}") + history.append({"t": elapsed, "snapshot": dict(snap)}) + last_status = snap + # Heuristic stop: queued count from enqueue all reached terminal status. + terminal = sum( + snap.get(s, 0) + for s in ("INDEXING_COMPLETED", "FAILED", "OCR_FAILED", "EXTRACTION_FAILED") + ) + if terminal >= enqueue.get("queued", 0) > 0: + print("[load] all queued docs reached terminal status") + break + time.sleep(args.poll_interval) + + report = { + "enqueue": enqueue, + "watch_seconds": time.time() - started, + "history": history, + "final": dict(last_status), + } + print(json.dumps(report, indent=2)) + if args.report_file: + args.report_file.write_text(json.dumps(report, indent=2), encoding="utf-8") + print(f"[load] wrote {args.report_file}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/locustfile_search.py b/scripts/locustfile_search.py new file mode 100644 index 0000000..d4ef22e --- /dev/null +++ b/scripts/locustfile_search.py @@ -0,0 +1,72 @@ +"""Locust load profile for the LegacyHUB hybrid search API. + +Run: + + pip install locust + locust -f scripts/locustfile_search.py \ + --host http://localhost:8000 \ + --users 50 --spawn-rate 5 --run-time 5m + +Or headless with HTML report: + + locust -f scripts/locustfile_search.py --host http://localhost:8000 \ + --headless --users 100 --spawn-rate 10 --run-time 10m \ + --html load_search.html +""" + +from __future__ import annotations + +import random + +from locust import HttpUser, between, task + + +QUERIES = [ + "ГОСТ 21.501-93 рабочие чертежи", + "класс бетона B25", + "регламент технического обслуживания", + "контроль качества сварных соединений", + "схема электропитания корпус 3", + "журнал ремонтов узлов", + "правила производства земляных работ", + "акты приемки скрытых работ", + "fundament concrete grade", + "maintenance schedule appendix", +] + +MODES = ["hybrid", "hybrid", "hybrid", "lexical", "semantic"] + + +class SearchUser(HttpUser): + wait_time = between(0.5, 2.5) + api_prefix = "/api/v1" + + @task(8) + def hybrid_search(self): + body = { + "query": random.choice(QUERIES), + "limit": random.choice([5, 10, 20]), + "filters": { + "document_id": None, + "source_path": None, + "block_type": None, + "min_ocr_confidence": None, + }, + "search_mode": random.choice(MODES), + } + with self.client.post( + f"{self.api_prefix}/search", + json=body, + name="POST /search", + catch_response=True, + ) as res: + if res.status_code != 200: + res.failure(f"HTTP {res.status_code}: {res.text[:120]}") + return + data = res.json() + if not data.get("results"): + res.failure("empty results") + + @task(1) + def health(self): + self.client.get(f"{self.api_prefix}/health", name="GET /health")