Files
LegacyHUB/app/ingestion/scanner.py
Vadim Malanov 7f72171572 chore: bootstrap repository with governance docs
Initialize git, add Apache-2.0 LICENSE, .gitattributes (LF line
endings), AGENTS.md (entry points, stack, discovery order, baseline
checks), RUNBOOK.md (dev boot, prod deploy with overlay, ingestion,
failures, rollback, scaling notes), .env.prod.example with rotated
credential placeholders, and dev-only warnings on .env.example.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 16:41:50 +03:00

185 lines
6.1 KiB
Python

"""Folder scanner: discovers PDFs, deduplicates by SHA256, persists discovery rows.
The scanner does NOT trigger OCR or extraction. It only:
- enumerates PDF files,
- hashes each file,
- creates / reuses a ``Document`` row,
- uploads the original PDF to MinIO,
- emits ``DISCOVERED`` / ``STORED_ORIGINAL`` events.
Heavy work (OCR, Docling, indexing) is performed by the Celery worker pipeline.
"""
from __future__ import annotations
import os
import uuid
from collections.abc import Iterator
from dataclasses import dataclass
from pathlib import Path
from sqlalchemy import select
from app.db.models import (
ArtifactType,
Document,
DocumentArtifact,
DocumentStatus,
ProcessingEvent,
)
from app.db.session import session_scope
from app.logging_config import get_logger
from app.storage.local_paths import key_original_pdf
from app.storage.minio_client import get_storage
from app.utils.hashing import sha256_file
from app.utils.pdf import is_pdf
logger = get_logger(__name__)
@dataclass
class DiscoveryRecord:
path: Path
sha256: str | None
document_id: uuid.UUID | None
duplicate: bool
invalid: bool = False
def iter_pdf_files(root: Path, recursive: bool = True) -> Iterator[Path]:
if root.is_file():
if is_pdf(root):
yield root
return
if recursive:
for dirpath, _dirnames, filenames in os.walk(root):
for name in filenames:
p = Path(dirpath) / name
if is_pdf(p):
yield p
else:
for p in root.iterdir():
if is_pdf(p):
yield p
def discover_documents(
root: Path, recursive: bool = True, force: bool = False
) -> Iterator[DiscoveryRecord]:
storage = get_storage()
storage.ensure_buckets()
for path in iter_pdf_files(root, recursive=recursive):
try:
stat = path.stat()
sha = sha256_file(path)
except Exception as exc: # noqa: BLE001
logger.warning("scan.invalid_file", path=str(path), error=str(exc))
yield DiscoveryRecord(path=path, sha256=None, document_id=None, duplicate=False, invalid=True)
continue
with session_scope() as db:
existing = db.execute(
select(Document).where(Document.sha256 == sha)
).scalar_one_or_none()
if existing and not force:
logger.debug("scan.duplicate", path=str(path), sha256=sha, document_id=str(existing.id))
yield DiscoveryRecord(path=path, sha256=sha, document_id=existing.id, duplicate=True)
continue
doc = existing or Document(
id=uuid.uuid4(),
source_path=str(path),
original_file_name=path.name,
sha256=sha,
file_size_bytes=stat.st_size,
mime_type="application/pdf",
status=DocumentStatus.DISCOVERED,
)
if not existing:
db.add(doc)
db.flush()
db.add(
ProcessingEvent(
document_id=doc.id,
stage=DocumentStatus.DISCOVERED,
level="INFO",
message="Document discovered",
data={"sha256": sha, "size": stat.st_size, "path": str(path)},
)
)
# Upload original (idempotent) and record artifact if missing.
key = key_original_pdf(doc.id, sha)
try:
if not storage.exists(storage.originals_bucket, key):
storage.put_file(
bucket=storage.originals_bucket,
key=key,
path=path,
content_type="application/pdf",
metadata={"sha256": sha, "original-name": path.name[:255]},
)
_ensure_artifact(
db,
doc.id,
ArtifactType.ORIGINAL_PDF,
storage.originals_bucket,
key,
sha,
)
if doc.status == DocumentStatus.DISCOVERED:
doc.status = DocumentStatus.STORED_ORIGINAL
db.add(
ProcessingEvent(
document_id=doc.id,
stage=DocumentStatus.STORED_ORIGINAL,
level="INFO",
message="Original stored to MinIO",
data={"bucket": storage.originals_bucket, "key": key},
)
)
except Exception as exc: # noqa: BLE001
logger.error("scan.store_failed", path=str(path), error=str(exc))
doc.status = DocumentStatus.FAILED
doc.error_message = f"store_original: {exc}"
db.add(
ProcessingEvent(
document_id=doc.id,
stage="STORE_FAILED",
level="ERROR",
message=str(exc),
data={"path": str(path)},
)
)
yield DiscoveryRecord(path=path, sha256=sha, document_id=None, duplicate=False, invalid=True)
continue
yield DiscoveryRecord(
path=path, sha256=sha, document_id=doc.id, duplicate=bool(existing)
)
def _ensure_artifact(
db, document_id: uuid.UUID, artifact_type: str, bucket: str, key: str, checksum: str | None
) -> None:
existing = db.execute(
select(DocumentArtifact).where(
DocumentArtifact.document_id == document_id,
DocumentArtifact.artifact_type == artifact_type,
DocumentArtifact.storage_key == key,
)
).scalar_one_or_none()
if existing:
return
db.add(
DocumentArtifact(
document_id=document_id,
artifact_type=artifact_type,
storage_bucket=bucket,
storage_key=key,
checksum=checksum,
)
)