"""Wrapper MinIO: bucket unico (settings.minio_bucket) con prefissi logici. Prefissi usati: datasets/. models//spec.yml models////... (artefatti training) trainings//logs.jsonl """ from __future__ import annotations import io from datetime import timedelta from typing import Iterable, Optional from minio import Minio from minio.error import S3Error from core.config import settings _client: Optional[Minio] = None def client() -> Minio: global _client if _client is None: _client = Minio( f"{settings.minio_endpoint}:{settings.minio_port}", access_key=settings.minio_access_key, secret_key=settings.minio_secret_key, secure=settings.minio_use_ssl, ) return _client def _bucket(b: Optional[str] = None) -> str: return b or settings.minio_bucket def ensure_bucket(bucket: Optional[str] = None) -> None: name = _bucket(bucket) c = client() if not c.bucket_exists(name): c.make_bucket(name) def put_bytes(key: str, data: bytes, content_type: str = "application/octet-stream", bucket: Optional[str] = None) -> None: ensure_bucket(bucket) client().put_object( _bucket(bucket), key, io.BytesIO(data), length=len(data), content_type=content_type, ) def put_stream(key: str, stream, length: int, content_type: str = "application/octet-stream", bucket: Optional[str] = None) -> None: ensure_bucket(bucket) client().put_object( _bucket(bucket), key, stream, length=length, content_type=content_type ) def get_bytes(key: str, bucket: Optional[str] = None) -> bytes: r = client().get_object(_bucket(bucket), key) try: return r.read() finally: r.close() r.release_conn() def remove(key: str, bucket: Optional[str] = None) -> None: try: client().remove_object(_bucket(bucket), key) except S3Error: pass def remove_prefix(prefix: str, bucket: Optional[str] = None) -> int: name = _bucket(bucket) n = 0 for obj in client().list_objects(name, prefix=prefix, recursive=True): try: client().remove_object(name, obj.object_name) n += 1 except S3Error: pass return n def presigned_get(key: str, expires_seconds: int = 3600, bucket: Optional[str] = None) -> str: return client().presigned_get_object( _bucket(bucket), key, expires=timedelta(seconds=expires_seconds) ) def list_prefix(prefix: str, bucket: Optional[str] = None) -> list[dict]: out = [] for obj in client().list_objects(_bucket(bucket), prefix=prefix, recursive=True): out.append({ "name": obj.object_name, "size": obj.size, "last_modified": obj.last_modified.isoformat() if obj.last_modified else None, "etag": obj.etag, }) return out def check() -> bool: try: client().list_buckets() return True except Exception: return False