import io import logging from minio import Minio from minio.error import S3Error from app.core.config import settings logger = logging.getLogger(__name__) _client: Minio | None = None def get_client() -> Minio: global _client if _client is None: _client = Minio( endpoint=settings.MINIO_ENDPOINT, access_key=settings.MINIO_ACCESS_KEY, secret_key=settings.MINIO_SECRET_KEY, secure=settings.MINIO_USE_SSL, ) return _client def upload_file(bucket: str, object_name: str, data: bytes, content_type: str = "application/octet-stream") -> str: """Upload bytes to MinIO. Returns the object key.""" client = get_client() client.put_object( bucket_name=bucket, object_name=object_name, data=io.BytesIO(data), length=len(data), content_type=content_type, ) logger.info(f"Uploaded {object_name} to bucket {bucket}") return object_name def get_presigned_url(bucket: str, object_name: str, expires_seconds: int = 3600) -> str: """Generate presigned GET URL valid for expires_seconds (default 1h).""" from datetime import timedelta client = get_client() return client.presigned_get_object(bucket, object_name, expires=timedelta(seconds=expires_seconds)) def delete_file(bucket: str, object_name: str) -> None: client = get_client() client.remove_object(bucket, object_name) logger.info(f"Deleted {object_name} from bucket {bucket}")