perf: add ingest and search load-test harnesses

scripts/generate_synthetic_pdfs.py builds real PDF/1.4 documents with
a hand-written xref so we can generate tens of thousands of ~2 KB
PDFs locally. Helvetica only covers latin-1, which is fine for a
load generator (throughput, not retrieval relevance); the docstring
calls this out so no one mistakes the output for a quality corpus.

scripts/load_ingest.py drives POST /ingest/folder, then polls a
hypothetical /documents/stats endpoint every poll-interval seconds
to track terminal-state progression. Writes a JSON history report so
results can be diffed between runs.

scripts/locustfile_search.py defines a SearchUser profile mixing
hybrid / lexical / semantic queries against POST /search plus a
health-check sampler. Asserts non-empty results so a "200 with
zero hits" regression surfaces as a failure rather than a green
percentile graph.

RUNBOOK gains a Load testing section with CPU/GPU SLO tables for
both axes (sustained docs/min, search latency p50/p95/p99).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Vadim Malanov
2026-05-13 17:11:08 +03:00
parent 349f4ea838
commit a97d0bbcfd
4 changed files with 379 additions and 0 deletions

108
scripts/load_ingest.py Normal file
View File

@@ -0,0 +1,108 @@
"""Drive ingest at scale and report per-stage throughput.
This script does NOT itself run OCR/Docling - it triggers
``POST /api/v1/ingest/folder`` and then samples the ``documents`` /
``processing_events`` tables to compute throughput.
Usage:
# 1. Generate synthetic PDFs
python scripts/generate_synthetic_pdfs.py --count 1000 --out /data/input/load
# 2. Trigger ingest + watch
python scripts/load_ingest.py \
--path /data/input/load \
--api-url http://localhost:8000/api/v1 \
--watch-seconds 600 \
--report-file load_report.json
"""
from __future__ import annotations
import argparse
import json
import sys
import time
from collections import Counter
from pathlib import Path
import httpx
def trigger_ingest(api_url: str, folder: str, force: bool = False) -> dict:
res = httpx.post(
f"{api_url}/ingest/folder",
json={"path": folder, "recursive": True, "force": force},
timeout=600,
)
res.raise_for_status()
return res.json()
def sample_status(api_url: str) -> dict[str, int]:
"""Aggregate document statuses from a backend endpoint or the database.
The current API does not expose /documents/stats; we fall back to /health
only as a liveness probe and rely on the caller to inspect Postgres for
real counts. To keep the script self-contained we attempt a hypothetical
``GET /documents/stats`` first and degrade silently.
"""
try:
res = httpx.get(f"{api_url}/documents/stats", timeout=10)
if res.status_code == 200:
return res.json().get("by_status", {})
except Exception: # noqa: BLE001
pass
return {}
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--path", required=True, help="Folder mounted in the api container")
parser.add_argument("--api-url", default="http://localhost:8000/api/v1")
parser.add_argument("--watch-seconds", type=int, default=600)
parser.add_argument("--poll-interval", type=int, default=10)
parser.add_argument("--force", action="store_true")
parser.add_argument("--report-file", type=Path, default=None)
args = parser.parse_args()
print(f"[load] trigger {args.path}")
enqueue = trigger_ingest(args.api_url, args.path, force=args.force)
print(f"[load] enqueue response: {json.dumps(enqueue)}")
started = time.time()
history: list[dict] = []
last_status: Counter[str] = Counter()
while (time.time() - started) < args.watch_seconds:
snap = Counter(sample_status(args.api_url))
delta = snap - last_status
elapsed = round(time.time() - started, 1)
print(f"[load] t+{elapsed:>6}s {dict(snap)} delta={dict(delta)}")
history.append({"t": elapsed, "snapshot": dict(snap)})
last_status = snap
# Heuristic stop: queued count from enqueue all reached terminal status.
terminal = sum(
snap.get(s, 0)
for s in ("INDEXING_COMPLETED", "FAILED", "OCR_FAILED", "EXTRACTION_FAILED")
)
if terminal >= enqueue.get("queued", 0) > 0:
print("[load] all queued docs reached terminal status")
break
time.sleep(args.poll_interval)
report = {
"enqueue": enqueue,
"watch_seconds": time.time() - started,
"history": history,
"final": dict(last_status),
}
print(json.dumps(report, indent=2))
if args.report_file:
args.report_file.write_text(json.dumps(report, indent=2), encoding="utf-8")
print(f"[load] wrote {args.report_file}")
return 0
if __name__ == "__main__":
sys.exit(main())