""" Cache two-tier per il servizio Marine. L1 = Redis (RAM): scadenza 2 ore, velocissima, condivisa tra processi. L2 = SQLite+disco: persistente (200GB), fallback quando Redis non c'è o quando L1 è scaduta. Scadenza configurabile (default 30 giorni). Flusso lettura: 1. Prova L1 (Redis). Se hit → ritorna. 2. Prova L2 (SQLite). Se hit non scaduta → ritorna E ripopola L1 (re-warm). 3. Miss totale → None. Flusso scrittura: Scrive in entrambi i tier contemporaneamente. Chiavi standard: - marine:catalog:full → lista completa dataset Copernicus - marine:catalog:search:{hash} → risultati ricerca utente - marine:job:{session_id} → stato job download (solo Redis, ephemeri) """ import gzip import json import logging import os import sqlite3 import threading import time from pathlib import Path from typing import Any, Optional import redis logger = logging.getLogger(__name__) # ── Config ─────────────────────────────────────────────────────────────── REDIS_HOST = os.getenv("REDIS_HOST", "meb-redis") REDIS_PORT = int(os.getenv("REDIS_PORT", "6379")) # Il volume persistente è montato dal container, default /app/cache CACHE_DIR = Path(os.getenv("CACHE_DIR", "/app/cache")) CACHE_DB = CACHE_DIR / "catalog.sqlite" BLOB_DIR = CACHE_DIR / "blobs" # TTL default DEFAULT_REDIS_TTL = 2 * 3600 # 2 ore (L1) DEFAULT_DISK_TTL = 30 * 24 * 3600 # 30 giorni (L2) # Soglia sopra la quale il valore va in un file su disco invece che in sqlite BLOB_THRESHOLD_BYTES = 64 * 1024 # 64 KB # ── Stato globale ──────────────────────────────────────────────────────── _pool: Optional[redis.ConnectionPool] = None _client: Optional[redis.Redis] = None _redis_disabled = False _sqlite_lock = threading.Lock() _sqlite_initialized = False # ── Redis (L1) ─────────────────────────────────────────────────────────── def _get_redis() -> Optional[redis.Redis]: global _pool, _client, _redis_disabled if _redis_disabled: return None if _client is not None: return _client try: _pool = redis.ConnectionPool( host=REDIS_HOST, port=REDIS_PORT, decode_responses=False, # tratto blob binari (gzip) max_connections=5, socket_connect_timeout=3, socket_timeout=3, retry_on_timeout=True, ) _client = redis.Redis(connection_pool=_pool) _client.ping() logger.info("[Cache] Redis L1 connesso") return _client except Exception as e: logger.warning(f"[Cache] Redis non disponibile, uso solo disco: {e}") _redis_disabled = True _client = None return None # ── SQLite (L2) ────────────────────────────────────────────────────────── def _ensure_sqlite() -> sqlite3.Connection: """Apre/crea il db SQLite su disco. Crea anche la dir blob.""" global _sqlite_initialized CACHE_DIR.mkdir(parents=True, exist_ok=True) BLOB_DIR.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(CACHE_DB), timeout=5.0, isolation_level=None) conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA synchronous=NORMAL") if not _sqlite_initialized: conn.execute(""" CREATE TABLE IF NOT EXISTS cache ( key TEXT PRIMARY KEY, expires_at INTEGER NOT NULL, is_blob INTEGER NOT NULL DEFAULT 0, value BLOB, blob_path TEXT, size_bytes INTEGER NOT NULL, updated_at INTEGER NOT NULL ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_cache_expires ON cache(expires_at)") _sqlite_initialized = True return conn def _blob_path(key: str) -> Path: # Nome file safe: solo caratteri alfanumerici + hash per unicità safe = "".join(c if c.isalnum() or c in ("-", "_") else "_" for c in key) return BLOB_DIR / f"{safe}.json.gz" def _disk_get(key: str) -> Optional[Any]: try: with _sqlite_lock: conn = _ensure_sqlite() row = conn.execute( "SELECT expires_at, is_blob, value, blob_path FROM cache WHERE key = ?", (key,) ).fetchone() if row is None: return None expires_at, is_blob, value, blob_path = row if expires_at < int(time.time()): # Scaduta: la elimino in lazy _disk_delete(key) return None if is_blob: data = Path(blob_path).read_bytes() else: data = value return json.loads(gzip.decompress(data).decode("utf-8")) except Exception as e: logger.warning(f"[Cache] Errore lettura disco '{key}': {e}") return None def _disk_set(key: str, raw_gz: bytes, ttl: int) -> None: try: expires_at = int(time.time()) + ttl updated_at = int(time.time()) size = len(raw_gz) if size > BLOB_THRESHOLD_BYTES: path = _blob_path(key) path.write_bytes(raw_gz) with _sqlite_lock: conn = _ensure_sqlite() conn.execute( "INSERT OR REPLACE INTO cache(key, expires_at, is_blob, value, blob_path, size_bytes, updated_at) " "VALUES(?,?,?,?,?,?,?)", (key, expires_at, 1, None, str(path), size, updated_at) ) else: with _sqlite_lock: conn = _ensure_sqlite() conn.execute( "INSERT OR REPLACE INTO cache(key, expires_at, is_blob, value, blob_path, size_bytes, updated_at) " "VALUES(?,?,?,?,?,?,?)", (key, expires_at, 0, raw_gz, None, size, updated_at) ) except Exception as e: logger.warning(f"[Cache] Errore scrittura disco '{key}': {e}") def _disk_delete(key: str) -> None: try: with _sqlite_lock: conn = _ensure_sqlite() row = conn.execute("SELECT blob_path FROM cache WHERE key = ?", (key,)).fetchone() conn.execute("DELETE FROM cache WHERE key = ?", (key,)) if row and row[0]: try: Path(row[0]).unlink(missing_ok=True) except Exception: pass except Exception as e: logger.warning(f"[Cache] Errore delete disco '{key}': {e}") # ── API pubblica ───────────────────────────────────────────────────────── def cache_get(key: str) -> Optional[Any]: """Legge L1 → L2. Se L2 hit, ripopola L1 (re-warm).""" # L1 client = _get_redis() if client is not None: try: raw = client.get(key) if raw is not None: return json.loads(gzip.decompress(raw).decode("utf-8")) except Exception as e: logger.warning(f"[Cache] Errore Redis '{key}': {e}") # L2 value = _disk_get(key) if value is not None and client is not None: # Re-warm L1 con TTL standard try: raw_gz = gzip.compress(json.dumps(value).encode("utf-8")) client.setex(key, DEFAULT_REDIS_TTL, raw_gz) except Exception: pass return value def cache_set(key: str, value: Any, ttl: int = DEFAULT_REDIS_TTL, disk_ttl: Optional[int] = None) -> bool: """Scrive in L1 (ttl) e L2 (disk_ttl, default 30 giorni). Per chiavi ephemere (es. job state) passa disk_ttl=0 per saltare il disco.""" if disk_ttl is None: disk_ttl = DEFAULT_DISK_TTL try: serialized = json.dumps(value).encode("utf-8") raw_gz = gzip.compress(serialized) except Exception as e: logger.warning(f"[Cache] Errore serializzazione '{key}': {e}") return False ok = False # L1 client = _get_redis() if client is not None: try: client.setex(key, ttl, raw_gz) ok = True except Exception as e: logger.warning(f"[Cache] Errore scrittura Redis '{key}': {e}") # L2 if disk_ttl > 0: _disk_set(key, raw_gz, disk_ttl) ok = True return ok def cache_delete(key: str) -> bool: client = _get_redis() if client is not None: try: client.delete(key) except Exception: pass _disk_delete(key) return True def cache_stats() -> dict: """Ritorna statistiche della cache: utile per /health e debug.""" stats = {"redis": False, "disk": {"entries": 0, "bytes": 0, "blobs": 0}} if _get_redis() is not None: stats["redis"] = True try: with _sqlite_lock: conn = _ensure_sqlite() row = conn.execute( "SELECT COUNT(*), COALESCE(SUM(size_bytes),0), COALESCE(SUM(is_blob),0) FROM cache" ).fetchone() stats["disk"]["entries"] = row[0] stats["disk"]["bytes"] = row[1] stats["disk"]["blobs"] = row[2] except Exception: pass return stats def cache_sweep() -> int: """Rimuove voci scadute su disco (da chiamare periodicamente). Ritorna numero eliminate.""" try: now = int(time.time()) with _sqlite_lock: conn = _ensure_sqlite() rows = conn.execute( "SELECT key, blob_path FROM cache WHERE expires_at < ?", (now,) ).fetchall() conn.execute("DELETE FROM cache WHERE expires_at < ?", (now,)) for _, path in rows: if path: try: Path(path).unlink(missing_ok=True) except Exception: pass return len(rows) except Exception as e: logger.warning(f"[Cache] Errore sweep: {e}") return 0