diff --git a/app/ingestion/figure_processor.py b/app/ingestion/figure_processor.py index 6e570f9..60ae5d6 100644 --- a/app/ingestion/figure_processor.py +++ b/app/ingestion/figure_processor.py @@ -6,9 +6,10 @@ import uuid from sqlalchemy import select -from app.db.models import ArtifactType, DocumentArtifact, Figure +from app.db.models import ArtifactType, Figure from app.ingestion.docling_extractor import ExtractedFigure from app.logging_config import get_logger +from app.storage.artifacts import ensure_artifact from app.storage.local_paths import key_figure_crop from app.storage.minio_client import MinioStorage @@ -52,27 +53,14 @@ def persist_figures( ) existing.storage_bucket = storage.derived_bucket existing.storage_key = key - _ensure_artifact(db, document_id, ArtifactType.FIGURE_CROP, storage.derived_bucket, key, f.page_number) + ensure_artifact( + db, + document_id=document_id, + artifact_type=ArtifactType.FIGURE_CROP, + bucket=storage.derived_bucket, + key=key, + page_number=f.page_number, + ) count += 1 return count - - -def _ensure_artifact(db, document_id: uuid.UUID, artifact_type: str, bucket: str, key: str, page: int | None) -> None: - existing = db.execute( - select(DocumentArtifact).where( - DocumentArtifact.document_id == document_id, - DocumentArtifact.storage_key == key, - ) - ).scalar_one_or_none() - if existing: - return - db.add( - DocumentArtifact( - document_id=document_id, - artifact_type=artifact_type, - storage_bucket=bucket, - storage_key=key, - page_number=page, - ) - ) diff --git a/app/ingestion/pipeline.py b/app/ingestion/pipeline.py index 4e61237..b11abbe 100644 --- a/app/ingestion/pipeline.py +++ b/app/ingestion/pipeline.py @@ -25,6 +25,7 @@ from app.db.models import ( Page, ProcessingEvent, ) +from app.storage.artifacts import ensure_artifact from app.db.session import session_scope from app.indexing import opensearch_client, qdrant_client from app.indexing.embeddings import get_embedder @@ -330,21 +331,14 @@ def _build_index_payloads( def _ensure_artifact(db, document_id: uuid.UUID, artifact_type: str, bucket: str, key: str) -> None: - existing = db.execute( - select(DocumentArtifact).where( - DocumentArtifact.document_id == document_id, - DocumentArtifact.storage_key == key, - ) - ).scalar_one_or_none() - if existing: - return - db.add( - DocumentArtifact( - document_id=document_id, - artifact_type=artifact_type, - storage_bucket=bucket, - storage_key=key, - ) + """Thin wrapper preserving the local positional signature used inside this + module while delegating to the shared helper.""" + ensure_artifact( + db, + document_id=document_id, + artifact_type=artifact_type, + bucket=bucket, + key=key, ) diff --git a/app/ingestion/scanner.py b/app/ingestion/scanner.py index d734af7..e18a137 100644 --- a/app/ingestion/scanner.py +++ b/app/ingestion/scanner.py @@ -23,12 +23,12 @@ from sqlalchemy import select from app.db.models import ( ArtifactType, Document, - DocumentArtifact, DocumentStatus, ProcessingEvent, ) from app.db.session import session_scope from app.logging_config import get_logger +from app.storage.artifacts import ensure_artifact from app.storage.local_paths import key_original_pdf from app.storage.minio_client import get_storage from app.utils.hashing import sha256_file @@ -121,13 +121,13 @@ def discover_documents( content_type="application/pdf", metadata={"sha256": sha, "original-name": path.name[:255]}, ) - _ensure_artifact( + ensure_artifact( db, - doc.id, - ArtifactType.ORIGINAL_PDF, - storage.originals_bucket, - key, - sha, + document_id=doc.id, + artifact_type=ArtifactType.ORIGINAL_PDF, + bucket=storage.originals_bucket, + key=key, + checksum=sha, ) if doc.status == DocumentStatus.DISCOVERED: doc.status = DocumentStatus.STORED_ORIGINAL @@ -161,24 +161,3 @@ def discover_documents( ) -def _ensure_artifact( - db, document_id: uuid.UUID, artifact_type: str, bucket: str, key: str, checksum: str | None -) -> None: - existing = db.execute( - select(DocumentArtifact).where( - DocumentArtifact.document_id == document_id, - DocumentArtifact.artifact_type == artifact_type, - DocumentArtifact.storage_key == key, - ) - ).scalar_one_or_none() - if existing: - return - db.add( - DocumentArtifact( - document_id=document_id, - artifact_type=artifact_type, - storage_bucket=bucket, - storage_key=key, - checksum=checksum, - ) - ) diff --git a/app/ingestion/table_processor.py b/app/ingestion/table_processor.py index 04135cc..bcb689c 100644 --- a/app/ingestion/table_processor.py +++ b/app/ingestion/table_processor.py @@ -7,9 +7,10 @@ import uuid from sqlalchemy import select -from app.db.models import ArtifactType, DocumentArtifact, Table +from app.db.models import ArtifactType, Table from app.ingestion.docling_extractor import ExtractedTable from app.logging_config import get_logger +from app.storage.artifacts import ensure_artifact from app.storage.local_paths import key_table_json from app.storage.minio_client import MinioStorage @@ -52,7 +53,14 @@ def persist_tables( data=json.dumps(t.json_data, ensure_ascii=False).encode("utf-8"), content_type="application/json", ) - _ensure_artifact(db, document_id, ArtifactType.TABLE_JSON, storage.derived_bucket, key, t.page_number) + ensure_artifact( + db, + document_id=document_id, + artifact_type=ArtifactType.TABLE_JSON, + bucket=storage.derived_bucket, + key=key, + page_number=t.page_number, + ) count += 1 return count @@ -62,23 +70,3 @@ def _summary(t: ExtractedTable) -> str: md = t.markdown or "" n_rows = max(0, sum(1 for ln in md.splitlines() if ln.startswith("|")) - 2) return f"Table {t.table_index} on page {t.page_number} ({n_rows} rows)." - - -def _ensure_artifact(db, document_id: uuid.UUID, artifact_type: str, bucket: str, key: str, page: int | None) -> None: - existing = db.execute( - select(DocumentArtifact).where( - DocumentArtifact.document_id == document_id, - DocumentArtifact.storage_key == key, - ) - ).scalar_one_or_none() - if existing: - return - db.add( - DocumentArtifact( - document_id=document_id, - artifact_type=artifact_type, - storage_bucket=bucket, - storage_key=key, - page_number=page, - ) - ) diff --git a/app/storage/__init__.py b/app/storage/__init__.py index 17d5dbb..b2aff82 100644 --- a/app/storage/__init__.py +++ b/app/storage/__init__.py @@ -1,3 +1,4 @@ +from app.storage.artifacts import ensure_artifact from app.storage.minio_client import MinioStorage, get_storage -__all__ = ["MinioStorage", "get_storage"] +__all__ = ["MinioStorage", "ensure_artifact", "get_storage"] diff --git a/app/storage/artifacts.py b/app/storage/artifacts.py new file mode 100644 index 0000000..13d11f4 --- /dev/null +++ b/app/storage/artifacts.py @@ -0,0 +1,53 @@ +"""Shared ``document_artifacts`` upsert helper. + +Single source of truth used by the scanner, the per-document pipeline, and the +table / figure processors. Each caller previously carried its own copy with +slightly different signatures; this module replaces them so that the artifact +row schema is enforced in one place. +""" + +from __future__ import annotations + +import uuid + +from sqlalchemy import select +from sqlalchemy.orm import Session + +from app.db.models import DocumentArtifact + + +def ensure_artifact( + db: Session, + *, + document_id: uuid.UUID, + artifact_type: str, + bucket: str, + key: str, + page_number: int | None = None, + checksum: str | None = None, +) -> DocumentArtifact: + """Insert a ``DocumentArtifact`` row if none exists with the same key. + + Identity is ``(document_id, storage_key)``. Re-running the pipeline never + duplicates artifact rows; metadata fields are not updated in place because + derived bytes are versioned by their storage key. + """ + existing = db.execute( + select(DocumentArtifact).where( + DocumentArtifact.document_id == document_id, + DocumentArtifact.storage_key == key, + ) + ).scalar_one_or_none() + if existing is not None: + return existing + + artifact = DocumentArtifact( + document_id=document_id, + artifact_type=artifact_type, + storage_bucket=bucket, + storage_key=key, + page_number=page_number, + checksum=checksum, + ) + db.add(artifact) + return artifact