"""Thin wrapper around the MinIO Python SDK with bucket bootstrap and retries.""" from __future__ import annotations import io from functools import lru_cache from pathlib import Path from typing import Any from minio import Minio from minio.error import S3Error from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential from app.config import settings from app.logging_config import get_logger logger = get_logger(__name__) class MinioStorage: def __init__(self, client: Minio | None = None) -> None: self.client = client or Minio( endpoint=settings.minio_endpoint, access_key=settings.minio_access_key, secret_key=settings.minio_secret_key, secure=settings.minio_secure, region=settings.minio_region, ) self.originals_bucket = settings.minio_bucket_originals self.derived_bucket = settings.minio_bucket_derived def ensure_buckets(self) -> None: for bucket in (self.originals_bucket, self.derived_bucket): if not self.client.bucket_exists(bucket): logger.info("minio.create_bucket", bucket=bucket) self.client.make_bucket(bucket) @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10), retry=retry_if_exception_type(S3Error), reraise=True, ) def put_file( self, bucket: str, key: str, path: Path, content_type: str = "application/octet-stream", metadata: dict[str, str] | None = None, ) -> None: size = path.stat().st_size with path.open("rb") as f: self.client.put_object( bucket_name=bucket, object_name=key, data=f, length=size, content_type=content_type, metadata=metadata or {}, ) @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10), retry=retry_if_exception_type(S3Error), reraise=True, ) def put_bytes( self, bucket: str, key: str, data: bytes, content_type: str = "application/octet-stream", metadata: dict[str, str] | None = None, ) -> None: self.client.put_object( bucket_name=bucket, object_name=key, data=io.BytesIO(data), length=len(data), content_type=content_type, metadata=metadata or {}, ) def get_to_path(self, bucket: str, key: str, dest: Path) -> Path: dest.parent.mkdir(parents=True, exist_ok=True) self.client.fget_object(bucket, key, str(dest)) return dest def exists(self, bucket: str, key: str) -> bool: try: self.client.stat_object(bucket, key) return True except S3Error as exc: if exc.code in {"NoSuchKey", "NoSuchObject"}: return False raise def health(self) -> dict[str, Any]: try: buckets = [b.name for b in self.client.list_buckets()] return {"status": "ok", "buckets": buckets} except Exception as exc: return {"status": "error", "error": str(exc)} @lru_cache(maxsize=1) def get_storage() -> MinioStorage: return MinioStorage()