import hashlib import io import logging import os from datetime import datetime, timezone from typing import Callable, List, Optional import pandas as pd from core.cache import cache_get, cache_set logger = logging.getLogger(__name__) # ── Chiavi Redis e TTL ──────────────────────────────────────────────── # Chiave per il catalogo completo Copernicus _CATALOG_KEY = "marine:catalog:full" # TTL del catalogo: 1 ora (il catalogo Copernicus cambia raramente) _CATALOG_TTL = 3600 # TTL per i risultati di ricerca: 30 minuti _SEARCH_TTL = 1800 def _fmt_description(name: Optional[str]) -> Optional[str]: """Formatta meglio il titolo del dataset""" if not name: return None return name.replace("_", " ").title() def _get_raw_catalog() -> dict: """Interroga le API di Copernicus per ottenere la lista completa dei dataset. Strategia cache Redis: 1. Cerca in Redis (chiave marine:catalog:full) 2. Se non trovato → chiama Copernicus SDK → salva in Redis con TTL 1h 3. Se Redis non disponibile → chiama sempre l'SDK (nessuna cache) Il catalogo in Redis sopravvive al restart del servizio grazie alla persistenza RDB+AOF configurata in redis.conf. """ # Cerca in Redis prima di chiamare l'SDK Copernicus cached = cache_get(_CATALOG_KEY) if cached is not None: logger.debug("[Catalogo] Servito da cache Redis") return cached # Cache miss: interroga Copernicus SDK (operazione lenta, ~10-30s) logger.info("[Catalogo] Cache miss, scaricamento da Copernicus SDK...") import copernicusmarine catalog = copernicusmarine.describe(disable_progress_bar=True) # Serializza la risposta SDK in un dizionario standard if hasattr(catalog, "model_dump"): result = catalog.model_dump() elif hasattr(catalog, "__dict__"): result = catalog.__dict__ else: result = catalog # Salva in Redis per le prossime richieste (TTL 1 ora) cache_set(_CATALOG_KEY, result, _CATALOG_TTL) logger.info("[Catalogo] Salvato in cache Redis") return result def _get_dataset_reqs(ds: dict) -> tuple: """ Ottieni dalla risposta del dataset le variabili disponibili e le coordinate dell'area disponibile. Attualmente è implementato Copernicus SDK v2, le variabili sono in:: dataset -> versions[-1] -> parts[] -> services[] -> variables[] Le coordinate sono disponibili in variable.bbox = [min_lon, min_lat, max_lon, max_lat]. La finestra temporale disponibile è nel servizio "arco-time-series" dove coordinate_id == 'time' (i valori sono in millisecondi, usando Unix epoch). """ variables = [] seen: set = set() bounds = { "min_longitude": None, "max_longitude": None, "min_latitude": None, "max_latitude": None, "start_datetime": None, "end_datetime": None, } versions = ds.get("versions", []) if not versions: return variables, bounds for part in versions[-1].get("parts", []): for service in part.get("services", []): service_name = service.get("service_name", "") for var in service.get("variables", []): short_name = var.get("short_name", "") if not short_name or short_name in seen: continue seen.add(short_name) std = var.get("standard_name") variables.append({ "short_name": short_name, "standard_name": std, "units": var.get("units"), "description": _fmt_description(std), }) # Ottieni la box delle coordinate if bounds["min_longitude"] is None: bbox = var.get("bbox") if bbox and len(bbox) >= 4: # [min_lon, min_lat, max_lon, max_lat] bounds["min_longitude"] = bbox[0] bounds["min_latitude"] = bbox[1] bounds["max_longitude"] = bbox[2] bounds["max_latitude"] = bbox[3] # Ottieni la finestra temporale del dataset dal servizio "arco-time-series" if bounds["start_datetime"] is None and "arco-time" in service_name: for coord in var.get("coordinates", []): if coord.get("coordinate_id") == "time": min_ms = coord.get("minimum_value") max_ms = coord.get("maximum_value") if min_ms is not None: bounds["start_datetime"] = datetime.fromtimestamp( min_ms / 1000, tz=timezone.utc ).strftime("%Y-%m-%d") if max_ms is not None: bounds["end_datetime"] = datetime.fromtimestamp( max_ms / 1000, tz=timezone.utc ).strftime("%Y-%m-%d") break return variables, bounds def get_catalog(search: Optional[str] = None, limit: int = 50, offset: int = 0) -> dict: """Ottieni dataset dal catalogo Copernicus Marine, filtrabili per nome o ID. Cache Redis per le ricerche: - Chiave: marine:catalog:search:{md5(search|limit|offset)} - TTL: 30 minuti - La cache ricerca viene invalidata quando il catalogo scade (1h) """ # Genera chiave cache unica per questa combinazione di parametri cache_key = None if search: query_hash = hashlib.md5(f"{search}|{limit}|{offset}".encode()).hexdigest()[:12] cache_key = f"marine:catalog:search:{query_hash}" # Cerca risultato in cache Redis cached_result = cache_get(cache_key) if cached_result is not None: logger.debug(f"[Catalogo] Ricerca '{search}' servita da cache Redis") return cached_result raw = _get_raw_catalog() # Gestisce formati diversi della risposta SDK (lista o dizionario) if isinstance(raw, list): products = raw else: products = raw.get("products", []) results = [] for product in products: title = product.get("title", "") description = product.get("description", "") for ds in product.get("datasets", []): dataset_id = ds.get("dataset_id", "") if search: needle = search.lower() if needle not in dataset_id.lower() and needle not in title.lower(): continue variables, bounds = _get_dataset_reqs(ds) results.append({ "dataset_id": dataset_id, "title": title, "description": description[:200] if description else "", "variables": variables, **bounds, }) total = len(results) page = results[offset: offset + limit] response = {"total": total, "offset": offset, "limit": limit, "datasets": page} # Salva risultato ricerca in cache Redis (solo se c'è un filtro di ricerca) if cache_key: cache_set(cache_key, response, _SEARCH_TTL) return response def get_dataset_info(dataset_id: str) -> Optional[dict]: """Return detailed info for a single dataset (variables, bounds, time range).""" raw = _get_raw_catalog() if isinstance(raw, list): products = raw else: products = raw.get("products", []) for product in products: for ds in product.get("datasets", []): if ds.get("dataset_id") == dataset_id: variables, bounds = _get_dataset_reqs(ds) return { "dataset_id": dataset_id, "title": product.get("title", ""), "description": product.get("description", ""), "variables": variables, **bounds, } return None def download_dataset( dataset_id: str, variables: List[str], min_longitude: float, max_longitude: float, min_latitude: float, max_latitude: float, start_datetime: str, end_datetime: str, progress_callback: Optional[Callable[[int, str], None]] = None ) -> pd.DataFrame: """ Scarica i dati di un dataset da Copernicus Marine. L'SDK ufficiale di Copernicus, restituisce i dati del download sotto forma di pandas Dataframe. """ import tempfile import copernicusmarine if progress_callback: progress_callback(5, "Avvio dowload...") # l'SDK di copernicus richiede l'autenticazione di un utente if not os.getenv("COPERNICUS_USERNAME") or not os.getenv("COPERNICUS_PASSWORD"): raise ValueError("non sono presenti username e password per copernicus.") with tempfile.TemporaryDirectory() as tmpdir: try: copernicusmarine.subset( dataset_id=dataset_id, variables=variables, minimum_longitude=min_longitude, maximum_longitude=max_longitude, minimum_latitude=min_latitude, maximum_latitude=max_latitude, start_datetime=start_datetime, end_datetime=end_datetime, username=os.getenv("COPERNICUS_USERNAME"), password=os.getenv("COPERNICUS_PASSWORD"), output_directory=tmpdir, output_filename="data.nc", force_download=True, overwrite_output_data=True, disable_progress_bar=True, ) except TypeError: # Fallback for older versions of copernicusmarine copernicusmarine.subset( dataset_id=dataset_id, variables=variables, minimum_longitude=min_longitude, maximum_longitude=max_longitude, minimum_latitude=min_latitude, maximum_latitude=max_latitude, start_datetime=start_datetime, end_datetime=end_datetime, username=os.getenv("COPERNICUS_USERNAME"), password=os.getenv("COPERNICUS_PASSWORD"), output_directory=tmpdir, output_filename="data.nc", overwrite=True, disable_progress_bar=True, ) if progress_callback: progress_callback(50, "Download completato, elaboro i dati...") import xarray as xr ds = xr.open_dataset(os.path.join(tmpdir, "data.nc")) df = ds.to_dataframe().reset_index() ds.close() if df is None or df.empty: raise ValueError("Nessun dato disponibile. errore nel download") if progress_callback: progress_callback(75, "Elaborazione completata, formatto i dati...") return df def dataframe_to_bytes(df: pd.DataFrame, fmt: str, variable_renames: dict = None) -> tuple: """ Converte i dati in memorie sottoforma di DataFrame scaircati da Copernicus in byte per migliorarne l'elaborazione e la formattazione in file CSV o JSON.""" if variable_renames: df = df.rename(columns=variable_renames) if fmt == "csv": buf = io.StringIO() df.to_csv(buf, index=True) return buf.getvalue().encode("utf-8"), "text/csv" else: buf = io.StringIO() df.to_json(buf, orient="records", date_format="iso", indent=2) return buf.getvalue().encode("utf-8"), "application/json"