Files
LegacyHUB/app/ingestion/scanner.py
Vadim Malanov a375ca55b9 refactor: extract ensure_artifact into app/storage/artifacts.py
The artifact-upsert helper was duplicated four times (scanner.py,
table_processor.py, figure_processor.py, pipeline.py) with slightly
different signatures. Consolidates into a single keyword-only function
keyed on (document_id, storage_key) - the identity the schema already
enforces - so re-running the pipeline never creates duplicate rows.

scanner / table_processor / figure_processor now import the shared
helper directly. pipeline.py keeps a thin local wrapper to preserve
the positional call sites at three artifact upsert points (OCR_PDF,
MARKDOWN, DOCLING_JSON).

Tests: 24 passed (5 health + 19 original).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 16:51:54 +03:00

164 lines
5.6 KiB
Python

"""Folder scanner: discovers PDFs, deduplicates by SHA256, persists discovery rows.
The scanner does NOT trigger OCR or extraction. It only:
- enumerates PDF files,
- hashes each file,
- creates / reuses a ``Document`` row,
- uploads the original PDF to MinIO,
- emits ``DISCOVERED`` / ``STORED_ORIGINAL`` events.
Heavy work (OCR, Docling, indexing) is performed by the Celery worker pipeline.
"""
from __future__ import annotations
import os
import uuid
from collections.abc import Iterator
from dataclasses import dataclass
from pathlib import Path
from sqlalchemy import select
from app.db.models import (
ArtifactType,
Document,
DocumentStatus,
ProcessingEvent,
)
from app.db.session import session_scope
from app.logging_config import get_logger
from app.storage.artifacts import ensure_artifact
from app.storage.local_paths import key_original_pdf
from app.storage.minio_client import get_storage
from app.utils.hashing import sha256_file
from app.utils.pdf import is_pdf
logger = get_logger(__name__)
@dataclass
class DiscoveryRecord:
path: Path
sha256: str | None
document_id: uuid.UUID | None
duplicate: bool
invalid: bool = False
def iter_pdf_files(root: Path, recursive: bool = True) -> Iterator[Path]:
if root.is_file():
if is_pdf(root):
yield root
return
if recursive:
for dirpath, _dirnames, filenames in os.walk(root):
for name in filenames:
p = Path(dirpath) / name
if is_pdf(p):
yield p
else:
for p in root.iterdir():
if is_pdf(p):
yield p
def discover_documents(
root: Path, recursive: bool = True, force: bool = False
) -> Iterator[DiscoveryRecord]:
storage = get_storage()
storage.ensure_buckets()
for path in iter_pdf_files(root, recursive=recursive):
try:
stat = path.stat()
sha = sha256_file(path)
except Exception as exc: # noqa: BLE001
logger.warning("scan.invalid_file", path=str(path), error=str(exc))
yield DiscoveryRecord(path=path, sha256=None, document_id=None, duplicate=False, invalid=True)
continue
with session_scope() as db:
existing = db.execute(
select(Document).where(Document.sha256 == sha)
).scalar_one_or_none()
if existing and not force:
logger.debug("scan.duplicate", path=str(path), sha256=sha, document_id=str(existing.id))
yield DiscoveryRecord(path=path, sha256=sha, document_id=existing.id, duplicate=True)
continue
doc = existing or Document(
id=uuid.uuid4(),
source_path=str(path),
original_file_name=path.name,
sha256=sha,
file_size_bytes=stat.st_size,
mime_type="application/pdf",
status=DocumentStatus.DISCOVERED,
)
if not existing:
db.add(doc)
db.flush()
db.add(
ProcessingEvent(
document_id=doc.id,
stage=DocumentStatus.DISCOVERED,
level="INFO",
message="Document discovered",
data={"sha256": sha, "size": stat.st_size, "path": str(path)},
)
)
# Upload original (idempotent) and record artifact if missing.
key = key_original_pdf(doc.id, sha)
try:
if not storage.exists(storage.originals_bucket, key):
storage.put_file(
bucket=storage.originals_bucket,
key=key,
path=path,
content_type="application/pdf",
metadata={"sha256": sha, "original-name": path.name[:255]},
)
ensure_artifact(
db,
document_id=doc.id,
artifact_type=ArtifactType.ORIGINAL_PDF,
bucket=storage.originals_bucket,
key=key,
checksum=sha,
)
if doc.status == DocumentStatus.DISCOVERED:
doc.status = DocumentStatus.STORED_ORIGINAL
db.add(
ProcessingEvent(
document_id=doc.id,
stage=DocumentStatus.STORED_ORIGINAL,
level="INFO",
message="Original stored to MinIO",
data={"bucket": storage.originals_bucket, "key": key},
)
)
except Exception as exc: # noqa: BLE001
logger.error("scan.store_failed", path=str(path), error=str(exc))
doc.status = DocumentStatus.FAILED
doc.error_message = f"store_original: {exc}"
db.add(
ProcessingEvent(
document_id=doc.id,
stage="STORE_FAILED",
level="ERROR",
message=str(exc),
data={"path": str(path)},
)
)
yield DiscoveryRecord(path=path, sha256=sha, document_id=None, duplicate=False, invalid=True)
continue
yield DiscoveryRecord(
path=path, sha256=sha, document_id=doc.id, duplicate=bool(existing)
)