Files
LegacyHUB/app/ingestion/figure_processor.py
Vadim Malanov a375ca55b9 refactor: extract ensure_artifact into app/storage/artifacts.py
The artifact-upsert helper was duplicated four times (scanner.py,
table_processor.py, figure_processor.py, pipeline.py) with slightly
different signatures. Consolidates into a single keyword-only function
keyed on (document_id, storage_key) - the identity the schema already
enforces - so re-running the pipeline never creates duplicate rows.

scanner / table_processor / figure_processor now import the shared
helper directly. pipeline.py keeps a thin local wrapper to preserve
the positional call sites at three artifact upsert points (OCR_PDF,
MARKDOWN, DOCLING_JSON).

Tests: 24 passed (5 health + 19 original).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 16:51:54 +03:00

67 lines
2.1 KiB
Python

"""Persists Docling figures to PostgreSQL + MinIO (caption + optional crop)."""
from __future__ import annotations
import uuid
from sqlalchemy import select
from app.db.models import ArtifactType, Figure
from app.ingestion.docling_extractor import ExtractedFigure
from app.logging_config import get_logger
from app.storage.artifacts import ensure_artifact
from app.storage.local_paths import key_figure_crop
from app.storage.minio_client import MinioStorage
logger = get_logger(__name__)
def persist_figures(
db,
storage: MinioStorage,
document_id: uuid.UUID,
figures: list[ExtractedFigure],
page_id_by_number: dict[int, uuid.UUID],
) -> int:
count = 0
for f in figures:
existing = db.execute(
select(Figure).where(Figure.document_id == document_id, Figure.figure_index == f.figure_index)
).scalar_one_or_none()
if existing is None:
existing = Figure(
document_id=document_id,
page_id=page_id_by_number.get(f.page_number),
page_number=f.page_number,
figure_index=f.figure_index,
)
db.add(existing)
existing.caption = f.caption
existing.description = (
f"Figure detected on page {f.page_number}." if not f.caption else
f"Figure on page {f.page_number}. Caption: {f.caption}"
)
if f.image_bytes:
key = key_figure_crop(document_id, f.page_number, f.figure_index)
storage.put_bytes(
bucket=storage.derived_bucket,
key=key,
data=f.image_bytes,
content_type=f"image/{f.image_ext}",
)
existing.storage_bucket = storage.derived_bucket
existing.storage_key = key
ensure_artifact(
db,
document_id=document_id,
artifact_type=ArtifactType.FIGURE_CROP,
bucket=storage.derived_bucket,
key=key,
page_number=f.page_number,
)
count += 1
return count