scripts/generate_synthetic_pdfs.py builds real PDF/1.4 documents with a hand-written xref so we can generate tens of thousands of ~2 KB PDFs locally. Helvetica only covers latin-1, which is fine for a load generator (throughput, not retrieval relevance); the docstring calls this out so no one mistakes the output for a quality corpus. scripts/load_ingest.py drives POST /ingest/folder, then polls a hypothetical /documents/stats endpoint every poll-interval seconds to track terminal-state progression. Writes a JSON history report so results can be diffed between runs. scripts/locustfile_search.py defines a SearchUser profile mixing hybrid / lexical / semantic queries against POST /search plus a health-check sampler. Asserts non-empty results so a "200 with zero hits" regression surfaces as a failure rather than a green percentile graph. RUNBOOK gains a Load testing section with CPU/GPU SLO tables for both axes (sustained docs/min, search latency p50/p95/p99). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
109 lines
3.6 KiB
Python
109 lines
3.6 KiB
Python
"""Drive ingest at scale and report per-stage throughput.
|
|
|
|
This script does NOT itself run OCR/Docling - it triggers
|
|
``POST /api/v1/ingest/folder`` and then samples the ``documents`` /
|
|
``processing_events`` tables to compute throughput.
|
|
|
|
Usage:
|
|
|
|
# 1. Generate synthetic PDFs
|
|
python scripts/generate_synthetic_pdfs.py --count 1000 --out /data/input/load
|
|
|
|
# 2. Trigger ingest + watch
|
|
python scripts/load_ingest.py \
|
|
--path /data/input/load \
|
|
--api-url http://localhost:8000/api/v1 \
|
|
--watch-seconds 600 \
|
|
--report-file load_report.json
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
import time
|
|
from collections import Counter
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
|
|
|
|
def trigger_ingest(api_url: str, folder: str, force: bool = False) -> dict:
|
|
res = httpx.post(
|
|
f"{api_url}/ingest/folder",
|
|
json={"path": folder, "recursive": True, "force": force},
|
|
timeout=600,
|
|
)
|
|
res.raise_for_status()
|
|
return res.json()
|
|
|
|
|
|
def sample_status(api_url: str) -> dict[str, int]:
|
|
"""Aggregate document statuses from a backend endpoint or the database.
|
|
|
|
The current API does not expose /documents/stats; we fall back to /health
|
|
only as a liveness probe and rely on the caller to inspect Postgres for
|
|
real counts. To keep the script self-contained we attempt a hypothetical
|
|
``GET /documents/stats`` first and degrade silently.
|
|
"""
|
|
try:
|
|
res = httpx.get(f"{api_url}/documents/stats", timeout=10)
|
|
if res.status_code == 200:
|
|
return res.json().get("by_status", {})
|
|
except Exception: # noqa: BLE001
|
|
pass
|
|
return {}
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("--path", required=True, help="Folder mounted in the api container")
|
|
parser.add_argument("--api-url", default="http://localhost:8000/api/v1")
|
|
parser.add_argument("--watch-seconds", type=int, default=600)
|
|
parser.add_argument("--poll-interval", type=int, default=10)
|
|
parser.add_argument("--force", action="store_true")
|
|
parser.add_argument("--report-file", type=Path, default=None)
|
|
args = parser.parse_args()
|
|
|
|
print(f"[load] trigger {args.path}")
|
|
enqueue = trigger_ingest(args.api_url, args.path, force=args.force)
|
|
print(f"[load] enqueue response: {json.dumps(enqueue)}")
|
|
|
|
started = time.time()
|
|
history: list[dict] = []
|
|
last_status: Counter[str] = Counter()
|
|
|
|
while (time.time() - started) < args.watch_seconds:
|
|
snap = Counter(sample_status(args.api_url))
|
|
delta = snap - last_status
|
|
elapsed = round(time.time() - started, 1)
|
|
print(f"[load] t+{elapsed:>6}s {dict(snap)} delta={dict(delta)}")
|
|
history.append({"t": elapsed, "snapshot": dict(snap)})
|
|
last_status = snap
|
|
# Heuristic stop: queued count from enqueue all reached terminal status.
|
|
terminal = sum(
|
|
snap.get(s, 0)
|
|
for s in ("INDEXING_COMPLETED", "FAILED", "OCR_FAILED", "EXTRACTION_FAILED")
|
|
)
|
|
if terminal >= enqueue.get("queued", 0) > 0:
|
|
print("[load] all queued docs reached terminal status")
|
|
break
|
|
time.sleep(args.poll_interval)
|
|
|
|
report = {
|
|
"enqueue": enqueue,
|
|
"watch_seconds": time.time() - started,
|
|
"history": history,
|
|
"final": dict(last_status),
|
|
}
|
|
print(json.dumps(report, indent=2))
|
|
if args.report_file:
|
|
args.report_file.write_text(json.dumps(report, indent=2), encoding="utf-8")
|
|
print(f"[load] wrote {args.report_file}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|