- Implemented HTML pages for datasets, models, training, testing, and results. - Created API endpoints for managing repositories, results, tests, and training sessions. - Added functionality for streaming training progress via Server-Sent Events (SSE). - Introduced a Dockerfile for the ML runner with necessary dependencies. - Developed an SDK for user code execution within the runner container. - Enhanced CSS styles for improved UI layout and navigation. - Established a layout template for consistent HTML structure across pages. - Added JavaScript for dynamic interactions on the models page. - Implemented WebSocket handling for real-time communication with kiosk devices and controllers. - Implemented model registration and management API at /api/models - Added Gitea proxy API for repository interactions at /api/repos - Created results API for listing and comparing training results at /api/results - Developed training management API for enqueueing and retrieving training jobs at /api/trainings - Introduced SSE endpoint for live training progress updates - Added HTML pages for models, datasets, and training management - Created a Dockerfile for the ML runner with necessary dependencies - Developed SDK for user code execution within the runner container - Enhanced CSS styles for improved UI/UX - Implemented WebSocket communication for real-time device and controller interactions in the kiosk system
293 lines
10 KiB
Python
293 lines
10 KiB
Python
"""
|
|
Cache two-tier per il servizio Marine.
|
|
|
|
L1 = Redis (RAM): scadenza 2 ore, velocissima, condivisa tra processi.
|
|
L2 = SQLite+disco: persistente (200GB), fallback quando Redis non c'è
|
|
o quando L1 è scaduta. Scadenza configurabile (default 30 giorni).
|
|
|
|
Flusso lettura:
|
|
1. Prova L1 (Redis). Se hit → ritorna.
|
|
2. Prova L2 (SQLite). Se hit non scaduta → ritorna E ripopola L1 (re-warm).
|
|
3. Miss totale → None.
|
|
|
|
Flusso scrittura:
|
|
Scrive in entrambi i tier contemporaneamente.
|
|
|
|
Chiavi standard:
|
|
- marine:catalog:full → lista completa dataset Copernicus
|
|
- marine:catalog:search:{hash} → risultati ricerca utente
|
|
- marine:job:{session_id} → stato job download (solo Redis, ephemeri)
|
|
"""
|
|
|
|
import gzip
|
|
import json
|
|
import logging
|
|
import os
|
|
import sqlite3
|
|
import threading
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
import redis
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ── Config ───────────────────────────────────────────────────────────────
|
|
REDIS_HOST = os.getenv("REDIS_HOST", "meb-redis")
|
|
REDIS_PORT = int(os.getenv("REDIS_PORT", "6379"))
|
|
|
|
# Il volume persistente è montato dal container, default /app/cache
|
|
CACHE_DIR = Path(os.getenv("CACHE_DIR", "/app/cache"))
|
|
CACHE_DB = CACHE_DIR / "catalog.sqlite"
|
|
BLOB_DIR = CACHE_DIR / "blobs"
|
|
|
|
# TTL default
|
|
DEFAULT_REDIS_TTL = 2 * 3600 # 2 ore (L1)
|
|
DEFAULT_DISK_TTL = 30 * 24 * 3600 # 30 giorni (L2)
|
|
|
|
# Soglia sopra la quale il valore va in un file su disco invece che in sqlite
|
|
BLOB_THRESHOLD_BYTES = 64 * 1024 # 64 KB
|
|
|
|
# ── Stato globale ────────────────────────────────────────────────────────
|
|
_pool: Optional[redis.ConnectionPool] = None
|
|
_client: Optional[redis.Redis] = None
|
|
_redis_disabled = False
|
|
|
|
_sqlite_lock = threading.Lock()
|
|
_sqlite_initialized = False
|
|
|
|
|
|
# ── Redis (L1) ───────────────────────────────────────────────────────────
|
|
def _get_redis() -> Optional[redis.Redis]:
|
|
global _pool, _client, _redis_disabled
|
|
if _redis_disabled:
|
|
return None
|
|
if _client is not None:
|
|
return _client
|
|
try:
|
|
_pool = redis.ConnectionPool(
|
|
host=REDIS_HOST,
|
|
port=REDIS_PORT,
|
|
decode_responses=False, # tratto blob binari (gzip)
|
|
max_connections=5,
|
|
socket_connect_timeout=3,
|
|
socket_timeout=3,
|
|
retry_on_timeout=True,
|
|
)
|
|
_client = redis.Redis(connection_pool=_pool)
|
|
_client.ping()
|
|
logger.info("[Cache] Redis L1 connesso")
|
|
return _client
|
|
except Exception as e:
|
|
logger.warning(f"[Cache] Redis non disponibile, uso solo disco: {e}")
|
|
_redis_disabled = True
|
|
_client = None
|
|
return None
|
|
|
|
|
|
# ── SQLite (L2) ──────────────────────────────────────────────────────────
|
|
def _ensure_sqlite() -> sqlite3.Connection:
|
|
"""Apre/crea il db SQLite su disco. Crea anche la dir blob."""
|
|
global _sqlite_initialized
|
|
CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
|
BLOB_DIR.mkdir(parents=True, exist_ok=True)
|
|
conn = sqlite3.connect(str(CACHE_DB), timeout=5.0, isolation_level=None)
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.execute("PRAGMA synchronous=NORMAL")
|
|
if not _sqlite_initialized:
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS cache (
|
|
key TEXT PRIMARY KEY,
|
|
expires_at INTEGER NOT NULL,
|
|
is_blob INTEGER NOT NULL DEFAULT 0,
|
|
value BLOB,
|
|
blob_path TEXT,
|
|
size_bytes INTEGER NOT NULL,
|
|
updated_at INTEGER NOT NULL
|
|
)
|
|
""")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_cache_expires ON cache(expires_at)")
|
|
_sqlite_initialized = True
|
|
return conn
|
|
|
|
|
|
def _blob_path(key: str) -> Path:
|
|
# Nome file safe: solo caratteri alfanumerici + hash per unicità
|
|
safe = "".join(c if c.isalnum() or c in ("-", "_") else "_" for c in key)
|
|
return BLOB_DIR / f"{safe}.json.gz"
|
|
|
|
|
|
def _disk_get(key: str) -> Optional[Any]:
|
|
try:
|
|
with _sqlite_lock:
|
|
conn = _ensure_sqlite()
|
|
row = conn.execute(
|
|
"SELECT expires_at, is_blob, value, blob_path FROM cache WHERE key = ?",
|
|
(key,)
|
|
).fetchone()
|
|
if row is None:
|
|
return None
|
|
expires_at, is_blob, value, blob_path = row
|
|
if expires_at < int(time.time()):
|
|
# Scaduta: la elimino in lazy
|
|
_disk_delete(key)
|
|
return None
|
|
if is_blob:
|
|
data = Path(blob_path).read_bytes()
|
|
else:
|
|
data = value
|
|
return json.loads(gzip.decompress(data).decode("utf-8"))
|
|
except Exception as e:
|
|
logger.warning(f"[Cache] Errore lettura disco '{key}': {e}")
|
|
return None
|
|
|
|
|
|
def _disk_set(key: str, raw_gz: bytes, ttl: int) -> None:
|
|
try:
|
|
expires_at = int(time.time()) + ttl
|
|
updated_at = int(time.time())
|
|
size = len(raw_gz)
|
|
if size > BLOB_THRESHOLD_BYTES:
|
|
path = _blob_path(key)
|
|
path.write_bytes(raw_gz)
|
|
with _sqlite_lock:
|
|
conn = _ensure_sqlite()
|
|
conn.execute(
|
|
"INSERT OR REPLACE INTO cache(key, expires_at, is_blob, value, blob_path, size_bytes, updated_at) "
|
|
"VALUES(?,?,?,?,?,?,?)",
|
|
(key, expires_at, 1, None, str(path), size, updated_at)
|
|
)
|
|
else:
|
|
with _sqlite_lock:
|
|
conn = _ensure_sqlite()
|
|
conn.execute(
|
|
"INSERT OR REPLACE INTO cache(key, expires_at, is_blob, value, blob_path, size_bytes, updated_at) "
|
|
"VALUES(?,?,?,?,?,?,?)",
|
|
(key, expires_at, 0, raw_gz, None, size, updated_at)
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"[Cache] Errore scrittura disco '{key}': {e}")
|
|
|
|
|
|
def _disk_delete(key: str) -> None:
|
|
try:
|
|
with _sqlite_lock:
|
|
conn = _ensure_sqlite()
|
|
row = conn.execute("SELECT blob_path FROM cache WHERE key = ?", (key,)).fetchone()
|
|
conn.execute("DELETE FROM cache WHERE key = ?", (key,))
|
|
if row and row[0]:
|
|
try:
|
|
Path(row[0]).unlink(missing_ok=True)
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
logger.warning(f"[Cache] Errore delete disco '{key}': {e}")
|
|
|
|
|
|
# ── API pubblica ─────────────────────────────────────────────────────────
|
|
def cache_get(key: str) -> Optional[Any]:
|
|
"""Legge L1 → L2. Se L2 hit, ripopola L1 (re-warm)."""
|
|
# L1
|
|
client = _get_redis()
|
|
if client is not None:
|
|
try:
|
|
raw = client.get(key)
|
|
if raw is not None:
|
|
return json.loads(gzip.decompress(raw).decode("utf-8"))
|
|
except Exception as e:
|
|
logger.warning(f"[Cache] Errore Redis '{key}': {e}")
|
|
|
|
# L2
|
|
value = _disk_get(key)
|
|
if value is not None and client is not None:
|
|
# Re-warm L1 con TTL standard
|
|
try:
|
|
raw_gz = gzip.compress(json.dumps(value).encode("utf-8"))
|
|
client.setex(key, DEFAULT_REDIS_TTL, raw_gz)
|
|
except Exception:
|
|
pass
|
|
return value
|
|
|
|
|
|
def cache_set(key: str, value: Any, ttl: int = DEFAULT_REDIS_TTL, disk_ttl: Optional[int] = None) -> bool:
|
|
"""Scrive in L1 (ttl) e L2 (disk_ttl, default 30 giorni).
|
|
Per chiavi ephemere (es. job state) passa disk_ttl=0 per saltare il disco."""
|
|
if disk_ttl is None:
|
|
disk_ttl = DEFAULT_DISK_TTL
|
|
try:
|
|
serialized = json.dumps(value).encode("utf-8")
|
|
raw_gz = gzip.compress(serialized)
|
|
except Exception as e:
|
|
logger.warning(f"[Cache] Errore serializzazione '{key}': {e}")
|
|
return False
|
|
|
|
ok = False
|
|
# L1
|
|
client = _get_redis()
|
|
if client is not None:
|
|
try:
|
|
client.setex(key, ttl, raw_gz)
|
|
ok = True
|
|
except Exception as e:
|
|
logger.warning(f"[Cache] Errore scrittura Redis '{key}': {e}")
|
|
|
|
# L2
|
|
if disk_ttl > 0:
|
|
_disk_set(key, raw_gz, disk_ttl)
|
|
ok = True
|
|
|
|
return ok
|
|
|
|
|
|
def cache_delete(key: str) -> bool:
|
|
client = _get_redis()
|
|
if client is not None:
|
|
try:
|
|
client.delete(key)
|
|
except Exception:
|
|
pass
|
|
_disk_delete(key)
|
|
return True
|
|
|
|
|
|
def cache_stats() -> dict:
|
|
"""Ritorna statistiche della cache: utile per /health e debug."""
|
|
stats = {"redis": False, "disk": {"entries": 0, "bytes": 0, "blobs": 0}}
|
|
if _get_redis() is not None:
|
|
stats["redis"] = True
|
|
try:
|
|
with _sqlite_lock:
|
|
conn = _ensure_sqlite()
|
|
row = conn.execute(
|
|
"SELECT COUNT(*), COALESCE(SUM(size_bytes),0), COALESCE(SUM(is_blob),0) FROM cache"
|
|
).fetchone()
|
|
stats["disk"]["entries"] = row[0]
|
|
stats["disk"]["bytes"] = row[1]
|
|
stats["disk"]["blobs"] = row[2]
|
|
except Exception:
|
|
pass
|
|
return stats
|
|
|
|
|
|
def cache_sweep() -> int:
|
|
"""Rimuove voci scadute su disco (da chiamare periodicamente). Ritorna numero eliminate."""
|
|
try:
|
|
now = int(time.time())
|
|
with _sqlite_lock:
|
|
conn = _ensure_sqlite()
|
|
rows = conn.execute(
|
|
"SELECT key, blob_path FROM cache WHERE expires_at < ?", (now,)
|
|
).fetchall()
|
|
conn.execute("DELETE FROM cache WHERE expires_at < ?", (now,))
|
|
for _, path in rows:
|
|
if path:
|
|
try:
|
|
Path(path).unlink(missing_ok=True)
|
|
except Exception:
|
|
pass
|
|
return len(rows)
|
|
except Exception as e:
|
|
logger.warning(f"[Cache] Errore sweep: {e}")
|
|
return 0
|