""" Flusso: 1. POST /jobs → crea job in Redis con stato "pending" 2. Background task: scarica dati → aggiorna stato in Redis 3. GET /jobs/{id} → legge stato da Redis """ import json import os import uuid from typing import Any, Dict import httpx from core import copernicus from core.cache import cache_get, cache_set, cache_delete from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException from middleware.auth import require_auth from schemas import DownloadJobRequest, JobStatus router = APIRouter(prefix="/jobs", tags=["sessions"]) API_URL = os.getenv("API_SERVICE_URL", "http://api:3003") # TTL per lo stato dei job: 48 ore (i job completati vengono puliti automaticamente) _JOB_TTL = 48 * 3600 def _job_key(session_id: str) -> str: """Genera la chiave Redis per un job.""" return f"marine:job:{session_id}" def _get_job(session_id: str) -> Dict[str, Any] | None: """Legge lo stato di un job da Redis.""" return cache_get(_job_key(session_id)) def _set_job(session_id: str, **kwargs): """Aggiorna lo stato di un job in Redis. Legge lo stato corrente, applica le modifiche, e riscrive.""" job = cache_get(_job_key(session_id)) if job is None: return job.update(kwargs) cache_set(_job_key(session_id), job, _JOB_TTL) def _run_download(session_id: str, req: DownloadJobRequest, username: str, user_token: str): """Download in background: Copernicus → conversione → upload via API service. Ad ogni cambio di fase, lo stato viene aggiornato in Redis così il frontend può fare polling su GET /jobs/{id}. """ def progress(pct: int, msg: str): _set_job(session_id, progress=pct, message=msg) try: _set_job(session_id, status="downloading", progress=5, message="Scarico da Copernicus Marine...") # Scarica dati dal catalogo Copernicus df = copernicus.download_dataset( dataset_id=req.dataset_id, variables=req.variables, min_longitude=req.min_longitude, max_longitude=req.max_longitude, min_latitude=req.min_latitude, max_latitude=req.max_latitude, start_datetime=req.start_date, end_datetime=req.end_date, progress_callback=progress, ) _set_job(session_id, status="converting", progress=80, message="Creo il file...") # Converte il DataFrame in bytes (CSV o JSON) data_bytes, content_type = copernicus.dataframe_to_bytes(df, req.format, req.variable_renames) filename = f"upload.{req.format}" _set_job(session_id, status="saving", progress=90, message="Carico su storage...") # Metadati del dataset per l'API service metadata = { "nome": req.nome, "tags": req.tags, "created_by": username, "type": req.format, "notes": req.notes, "copernicus_dataset_id": req.dataset_id, "variables": req.variables, "variable_renames": req.variable_renames, "bbox": [req.min_longitude, req.min_latitude, req.max_longitude, req.max_latitude], "start_date": req.start_date, "end_date": req.end_date, } # Upload al servizio API che gestisce MinIO with httpx.Client(timeout=None) as client: r = client.post( f"{API_URL}/marine/datasets/upload", headers={"Authorization": f"Bearer {user_token}"}, files={"file": (filename, data_bytes, content_type)}, data={"metadata": json.dumps(metadata)}, ) if not r.is_success: raise RuntimeError(f"API upload failed ({r.status_code}): {r.text}") entry = r.json() _set_job(session_id, status="done", progress=100, message="Dataset salvato.", dataset_id=entry["id"]) except Exception as e: _set_job(session_id, status="error", progress=0, message=str(e)) @router.post("", response_model=JobStatus, status_code=202) async def new_download_session( req: DownloadJobRequest, background_tasks: BackgroundTasks, user=Depends(require_auth) ): """Crea un nuovo job di download e lo avvia in background.""" session_id = str(uuid.uuid4()) # Stato iniziale del job salvato in Redis initial_state = { "job_id": session_id, "status": "pending", "progress": 0, "message": "In coda", "dataset_id": None, } cache_set(_job_key(session_id), initial_state, _JOB_TTL) # Avvia il download in background background_tasks.add_task(_run_download, session_id, req, user["username"], user["token"]) return initial_state @router.get("/{session_id}", response_model=JobStatus) async def get_download_session(session_id: str, user=Depends(require_auth)): """Legge lo stato di un job di download da Redis.""" session = _get_job(session_id) if session is None: raise HTTPException(status_code=404, detail="Job not found") return session