The artifact-upsert helper was duplicated four times (scanner.py, table_processor.py, figure_processor.py, pipeline.py) with slightly different signatures. Consolidates into a single keyword-only function keyed on (document_id, storage_key) - the identity the schema already enforces - so re-running the pipeline never creates duplicate rows. scanner / table_processor / figure_processor now import the shared helper directly. pipeline.py keeps a thin local wrapper to preserve the positional call sites at three artifact upsert points (OCR_PDF, MARKDOWN, DOCLING_JSON). Tests: 24 passed (5 health + 19 original). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
164 lines
5.6 KiB
Python
164 lines
5.6 KiB
Python
"""Folder scanner: discovers PDFs, deduplicates by SHA256, persists discovery rows.
|
|
|
|
The scanner does NOT trigger OCR or extraction. It only:
|
|
- enumerates PDF files,
|
|
- hashes each file,
|
|
- creates / reuses a ``Document`` row,
|
|
- uploads the original PDF to MinIO,
|
|
- emits ``DISCOVERED`` / ``STORED_ORIGINAL`` events.
|
|
|
|
Heavy work (OCR, Docling, indexing) is performed by the Celery worker pipeline.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import uuid
|
|
from collections.abc import Iterator
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
from sqlalchemy import select
|
|
|
|
from app.db.models import (
|
|
ArtifactType,
|
|
Document,
|
|
DocumentStatus,
|
|
ProcessingEvent,
|
|
)
|
|
from app.db.session import session_scope
|
|
from app.logging_config import get_logger
|
|
from app.storage.artifacts import ensure_artifact
|
|
from app.storage.local_paths import key_original_pdf
|
|
from app.storage.minio_client import get_storage
|
|
from app.utils.hashing import sha256_file
|
|
from app.utils.pdf import is_pdf
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class DiscoveryRecord:
|
|
path: Path
|
|
sha256: str | None
|
|
document_id: uuid.UUID | None
|
|
duplicate: bool
|
|
invalid: bool = False
|
|
|
|
|
|
def iter_pdf_files(root: Path, recursive: bool = True) -> Iterator[Path]:
|
|
if root.is_file():
|
|
if is_pdf(root):
|
|
yield root
|
|
return
|
|
if recursive:
|
|
for dirpath, _dirnames, filenames in os.walk(root):
|
|
for name in filenames:
|
|
p = Path(dirpath) / name
|
|
if is_pdf(p):
|
|
yield p
|
|
else:
|
|
for p in root.iterdir():
|
|
if is_pdf(p):
|
|
yield p
|
|
|
|
|
|
def discover_documents(
|
|
root: Path, recursive: bool = True, force: bool = False
|
|
) -> Iterator[DiscoveryRecord]:
|
|
storage = get_storage()
|
|
storage.ensure_buckets()
|
|
|
|
for path in iter_pdf_files(root, recursive=recursive):
|
|
try:
|
|
stat = path.stat()
|
|
sha = sha256_file(path)
|
|
except Exception as exc: # noqa: BLE001
|
|
logger.warning("scan.invalid_file", path=str(path), error=str(exc))
|
|
yield DiscoveryRecord(path=path, sha256=None, document_id=None, duplicate=False, invalid=True)
|
|
continue
|
|
|
|
with session_scope() as db:
|
|
existing = db.execute(
|
|
select(Document).where(Document.sha256 == sha)
|
|
).scalar_one_or_none()
|
|
|
|
if existing and not force:
|
|
logger.debug("scan.duplicate", path=str(path), sha256=sha, document_id=str(existing.id))
|
|
yield DiscoveryRecord(path=path, sha256=sha, document_id=existing.id, duplicate=True)
|
|
continue
|
|
|
|
doc = existing or Document(
|
|
id=uuid.uuid4(),
|
|
source_path=str(path),
|
|
original_file_name=path.name,
|
|
sha256=sha,
|
|
file_size_bytes=stat.st_size,
|
|
mime_type="application/pdf",
|
|
status=DocumentStatus.DISCOVERED,
|
|
)
|
|
if not existing:
|
|
db.add(doc)
|
|
db.flush()
|
|
db.add(
|
|
ProcessingEvent(
|
|
document_id=doc.id,
|
|
stage=DocumentStatus.DISCOVERED,
|
|
level="INFO",
|
|
message="Document discovered",
|
|
data={"sha256": sha, "size": stat.st_size, "path": str(path)},
|
|
)
|
|
)
|
|
|
|
# Upload original (idempotent) and record artifact if missing.
|
|
key = key_original_pdf(doc.id, sha)
|
|
try:
|
|
if not storage.exists(storage.originals_bucket, key):
|
|
storage.put_file(
|
|
bucket=storage.originals_bucket,
|
|
key=key,
|
|
path=path,
|
|
content_type="application/pdf",
|
|
metadata={"sha256": sha, "original-name": path.name[:255]},
|
|
)
|
|
ensure_artifact(
|
|
db,
|
|
document_id=doc.id,
|
|
artifact_type=ArtifactType.ORIGINAL_PDF,
|
|
bucket=storage.originals_bucket,
|
|
key=key,
|
|
checksum=sha,
|
|
)
|
|
if doc.status == DocumentStatus.DISCOVERED:
|
|
doc.status = DocumentStatus.STORED_ORIGINAL
|
|
db.add(
|
|
ProcessingEvent(
|
|
document_id=doc.id,
|
|
stage=DocumentStatus.STORED_ORIGINAL,
|
|
level="INFO",
|
|
message="Original stored to MinIO",
|
|
data={"bucket": storage.originals_bucket, "key": key},
|
|
)
|
|
)
|
|
except Exception as exc: # noqa: BLE001
|
|
logger.error("scan.store_failed", path=str(path), error=str(exc))
|
|
doc.status = DocumentStatus.FAILED
|
|
doc.error_message = f"store_original: {exc}"
|
|
db.add(
|
|
ProcessingEvent(
|
|
document_id=doc.id,
|
|
stage="STORE_FAILED",
|
|
level="ERROR",
|
|
message=str(exc),
|
|
data={"path": str(path)},
|
|
)
|
|
)
|
|
yield DiscoveryRecord(path=path, sha256=sha, document_id=None, duplicate=False, invalid=True)
|
|
continue
|
|
|
|
yield DiscoveryRecord(
|
|
path=path, sha256=sha, document_id=doc.id, duplicate=bool(existing)
|
|
)
|
|
|
|
|