Files
OLD-server-architecture/copernicus/core/copernicus.py

311 lines
11 KiB
Python

import hashlib
import io
import logging
import os
from datetime import datetime, timezone
from typing import Callable, List, Optional
import pandas as pd
from core.cache import cache_get, cache_set
logger = logging.getLogger(__name__)
# ── Chiavi Redis e TTL ────────────────────────────────────────────────
# Chiave per il catalogo completo Copernicus
_CATALOG_KEY = "marine:catalog:full"
# TTL del catalogo: 1 ora (il catalogo Copernicus cambia raramente)
_CATALOG_TTL = 3600
# TTL per i risultati di ricerca: 30 minuti
_SEARCH_TTL = 1800
def _fmt_description(name: Optional[str]) -> Optional[str]:
"""Formatta meglio il titolo del dataset"""
if not name:
return None
return name.replace("_", " ").title()
def _get_raw_catalog() -> dict:
"""Interroga le API di Copernicus per ottenere la lista completa dei dataset.
Strategia cache Redis:
1. Cerca in Redis (chiave marine:catalog:full)
2. Se non trovato → chiama Copernicus SDK → salva in Redis con TTL 1h
3. Se Redis non disponibile → chiama sempre l'SDK (nessuna cache)
Il catalogo in Redis sopravvive al restart del servizio grazie
alla persistenza RDB+AOF configurata in redis.conf.
"""
# Cerca in Redis prima di chiamare l'SDK Copernicus
cached = cache_get(_CATALOG_KEY)
if cached is not None:
logger.debug("[Catalogo] Servito da cache Redis")
return cached
# Cache miss: interroga Copernicus SDK (operazione lenta, ~10-30s)
logger.info("[Catalogo] Cache miss, scaricamento da Copernicus SDK...")
import copernicusmarine
catalog = copernicusmarine.describe(disable_progress_bar=True)
# Serializza la risposta SDK in un dizionario standard
if hasattr(catalog, "model_dump"):
result = catalog.model_dump()
elif hasattr(catalog, "__dict__"):
result = catalog.__dict__
else:
result = catalog
# Salva in Redis per le prossime richieste (TTL 1 ora)
cache_set(_CATALOG_KEY, result, _CATALOG_TTL)
logger.info("[Catalogo] Salvato in cache Redis")
return result
def _get_dataset_reqs(ds: dict) -> tuple:
"""
Ottieni dalla risposta del dataset le variabili disponibili e le coordinate dell'area disponibile.
Attualmente è implementato Copernicus SDK v2, le variabili sono in::
dataset -> versions[-1] -> parts[] -> services[] -> variables[]
Le coordinate sono disponibili in variable.bbox = [min_lon, min_lat, max_lon, max_lat].
La finestra temporale disponibile è nel servizio "arco-time-series"
dove coordinate_id == 'time' (i valori sono in millisecondi, usando Unix epoch).
"""
variables = []
seen: set = set()
bounds = {
"min_longitude": None, "max_longitude": None,
"min_latitude": None, "max_latitude": None,
"start_datetime": None, "end_datetime": None,
}
versions = ds.get("versions", [])
if not versions:
return variables, bounds
for part in versions[-1].get("parts", []):
for service in part.get("services", []):
service_name = service.get("service_name", "")
for var in service.get("variables", []):
short_name = var.get("short_name", "")
if not short_name or short_name in seen:
continue
seen.add(short_name)
std = var.get("standard_name")
variables.append({
"short_name": short_name,
"standard_name": std,
"units": var.get("units"),
"description": _fmt_description(std),
})
# Ottieni la box delle coordinate
if bounds["min_longitude"] is None:
bbox = var.get("bbox")
if bbox and len(bbox) >= 4:
# [min_lon, min_lat, max_lon, max_lat]
bounds["min_longitude"] = bbox[0]
bounds["min_latitude"] = bbox[1]
bounds["max_longitude"] = bbox[2]
bounds["max_latitude"] = bbox[3]
# Ottieni la finestra temporale del dataset dal servizio "arco-time-series"
if bounds["start_datetime"] is None and "arco-time" in service_name:
for coord in var.get("coordinates", []):
if coord.get("coordinate_id") == "time":
min_ms = coord.get("minimum_value")
max_ms = coord.get("maximum_value")
if min_ms is not None:
bounds["start_datetime"] = datetime.fromtimestamp(
min_ms / 1000, tz=timezone.utc
).strftime("%Y-%m-%d")
if max_ms is not None:
bounds["end_datetime"] = datetime.fromtimestamp(
max_ms / 1000, tz=timezone.utc
).strftime("%Y-%m-%d")
break
return variables, bounds
def get_catalog(search: Optional[str] = None, limit: int = 50, offset: int = 0) -> dict:
"""Ottieni dataset dal catalogo Copernicus Marine, filtrabili per nome o ID.
Cache Redis per le ricerche:
- Chiave: marine:catalog:search:{md5(search|limit|offset)}
- TTL: 30 minuti
- La cache ricerca viene invalidata quando il catalogo scade (1h)
"""
# Genera chiave cache unica per questa combinazione di parametri
cache_key = None
if search:
query_hash = hashlib.md5(f"{search}|{limit}|{offset}".encode()).hexdigest()[:12]
cache_key = f"marine:catalog:search:{query_hash}"
# Cerca risultato in cache Redis
cached_result = cache_get(cache_key)
if cached_result is not None:
logger.debug(f"[Catalogo] Ricerca '{search}' servita da cache Redis")
return cached_result
raw = _get_raw_catalog()
# Gestisce formati diversi della risposta SDK (lista o dizionario)
if isinstance(raw, list):
products = raw
else:
products = raw.get("products", [])
results = []
for product in products:
title = product.get("title", "")
description = product.get("description", "")
for ds in product.get("datasets", []):
dataset_id = ds.get("dataset_id", "")
if search:
needle = search.lower()
if needle not in dataset_id.lower() and needle not in title.lower():
continue
variables, bounds = _get_dataset_reqs(ds)
results.append({
"dataset_id": dataset_id,
"title": title,
"description": description[:200] if description else "",
"variables": variables,
**bounds,
})
total = len(results)
page = results[offset: offset + limit]
response = {"total": total, "offset": offset, "limit": limit, "datasets": page}
# Salva risultato ricerca in cache Redis (solo se c'è un filtro di ricerca)
if cache_key:
cache_set(cache_key, response, _SEARCH_TTL)
return response
def get_dataset_info(dataset_id: str) -> Optional[dict]:
"""Return detailed info for a single dataset (variables, bounds, time range)."""
raw = _get_raw_catalog()
if isinstance(raw, list):
products = raw
else:
products = raw.get("products", [])
for product in products:
for ds in product.get("datasets", []):
if ds.get("dataset_id") == dataset_id:
variables, bounds = _get_dataset_reqs(ds)
return {
"dataset_id": dataset_id,
"title": product.get("title", ""),
"description": product.get("description", ""),
"variables": variables,
**bounds,
}
return None
def download_dataset(
dataset_id: str,
variables: List[str],
min_longitude: float,
max_longitude: float,
min_latitude: float,
max_latitude: float,
start_datetime: str,
end_datetime: str,
progress_callback: Optional[Callable[[int, str], None]] = None
) -> pd.DataFrame:
"""
Scarica i dati di un dataset da Copernicus Marine. L'SDK ufficiale di Copernicus,
restituisce i dati del download sotto forma di pandas Dataframe.
"""
import tempfile
import copernicusmarine
if progress_callback:
progress_callback(5, "Avvio dowload...")
# l'SDK di copernicus richiede l'autenticazione di un utente
if not os.getenv("COPERNICUS_USERNAME") or not os.getenv("COPERNICUS_PASSWORD"):
raise ValueError("non sono presenti username e password per copernicus.")
with tempfile.TemporaryDirectory() as tmpdir:
try:
copernicusmarine.subset(
dataset_id=dataset_id,
variables=variables,
minimum_longitude=min_longitude,
maximum_longitude=max_longitude,
minimum_latitude=min_latitude,
maximum_latitude=max_latitude,
start_datetime=start_datetime,
end_datetime=end_datetime,
username=os.getenv("COPERNICUS_USERNAME"),
password=os.getenv("COPERNICUS_PASSWORD"),
output_directory=tmpdir,
output_filename="data.nc",
force_download=True,
overwrite_output_data=True,
disable_progress_bar=True,
)
except TypeError:
# Fallback for older versions of copernicusmarine
copernicusmarine.subset(
dataset_id=dataset_id,
variables=variables,
minimum_longitude=min_longitude,
maximum_longitude=max_longitude,
minimum_latitude=min_latitude,
maximum_latitude=max_latitude,
start_datetime=start_datetime,
end_datetime=end_datetime,
username=os.getenv("COPERNICUS_USERNAME"),
password=os.getenv("COPERNICUS_PASSWORD"),
output_directory=tmpdir,
output_filename="data.nc",
overwrite=True,
disable_progress_bar=True,
)
if progress_callback:
progress_callback(50, "Download completato, elaboro i dati...")
import xarray as xr
ds = xr.open_dataset(os.path.join(tmpdir, "data.nc"))
df = ds.to_dataframe().reset_index()
ds.close()
if df is None or df.empty:
raise ValueError("Nessun dato disponibile. errore nel download")
if progress_callback:
progress_callback(75, "Elaborazione completata, formatto i dati...")
return df
def dataframe_to_bytes(df: pd.DataFrame, fmt: str, variable_renames: dict = None) -> tuple:
"""
Converte i dati in memorie sottoforma di DataFrame scaircati da Copernicus in byte per migliorarne l'elaborazione e la formattazione in file CSV o JSON."""
if variable_renames:
df = df.rename(columns=variable_renames)
if fmt == "csv":
buf = io.StringIO()
df.to_csv(buf, index=True)
return buf.getvalue().encode("utf-8"), "text/csv"
else:
buf = io.StringIO()
df.to_json(buf, orient="records", date_format="iso", indent=2)
return buf.getvalue().encode("utf-8"), "application/json"