import io import json import os from typing import Any, Optional from minio.error import S3Error from minio import Minio _minio_host = os.getenv("MINIO_ENDPOINT", "minio") _minio_port = os.getenv("MINIO_PORT", "9000") MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY", "meb-admin") MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY", "meb-cloud") MINIO_SECURE = os.getenv("MINIO_SECURE", "false").lower() == "true" DATASETS_BUCKET = "datasets" METADATA_FILE = "metadata.json" _client: Optional[Minio] = None def get_client() -> Minio: global _client if _client is None: _client = Minio( f"{_minio_host}:{_minio_port}", access_key=MINIO_ACCESS_KEY, secret_key=MINIO_SECRET_KEY, secure=MINIO_SECURE ) return _client def bucket_exists(bucket: str = DATASETS_BUCKET) -> bool: try: client = get_client() if not client.bucket_exists(bucket): client.make_bucket(bucket) return True except Exception as e: print(f"[Storage] Error in '{bucket}': {e}") return False def fetch_metadata() -> dict: """Il bucket datasets contiene un file JSON di metadata valido per tutti i file dataset salvati, che questi siano JSON, csv o un altro formato. I metadata per ogni file sono salvati come oggetti nel file metadata.json. """ try: client = get_client() response = client.get_object(DATASETS_BUCKET, METADATA_FILE) data = json.loads(response.read().decode("utf-8")) response.close() return data except S3Error as e: if e.code == "NoSuchKey": return {"datasets": []} raise except Exception: return {"datasets": []} def write_metadata(data: dict) -> None: """Aggiunge al file metadata.json un nuovo oggetto con l'id del nuovo file caricato dall'utente""" client = get_client() raw = json.dumps(data, ensure_ascii=False, indent=2).encode("utf-8") client.put_object( DATASETS_BUCKET, METADATA_FILE, io.BytesIO(raw), length=len(raw), content_type="application/json" ) def upload_file(data: bytes, filename: str, content_type: str) -> None: """Carica un nuovo file di qualsiasi formato nel bucket dataset.""" client = get_client() client.put_object( DATASETS_BUCKET, filename, io.BytesIO(data), length=len(data), content_type=content_type ) def delete_file(filename: str) -> None: """Elimina un file dal bucket dataset.""" client = get_client() client.remove_object(DATASETS_BUCKET, filename) def get_presigned_url(filename: str, expires_hours: int = 1) -> str: """Genera un URL temporaneo per scaricare un file dal bucket dataset""" from datetime import timedelta client = get_client() return client.presigned_get_object( DATASETS_BUCKET, filename, expires=timedelta(hours=expires_hours) ) def file_exists(filename: str) -> bool: """Verifica se un file esiste.""" try: client = get_client() client.stat_object(DATASETS_BUCKET, filename) return True except S3Error: return False