- Implemented HTML pages for datasets, models, training, testing, and results. - Created API endpoints for managing repositories, results, tests, and training sessions. - Added functionality for streaming training progress via Server-Sent Events (SSE). - Introduced a Dockerfile for the ML runner with necessary dependencies. - Developed an SDK for user code execution within the runner container. - Enhanced CSS styles for improved UI layout and navigation. - Established a layout template for consistent HTML structure across pages. - Added JavaScript for dynamic interactions on the models page. - Implemented WebSocket handling for real-time communication with kiosk devices and controllers. - Implemented model registration and management API at /api/models - Added Gitea proxy API for repository interactions at /api/repos - Created results API for listing and comparing training results at /api/results - Developed training management API for enqueueing and retrieving training jobs at /api/trainings - Introduced SSE endpoint for live training progress updates - Added HTML pages for models, datasets, and training management - Created a Dockerfile for the ML runner with necessary dependencies - Developed SDK for user code execution within the runner container - Enhanced CSS styles for improved UI/UX - Implemented WebSocket communication for real-time device and controller interactions in the kiosk system
160 lines
5.9 KiB
Python
160 lines
5.9 KiB
Python
"""
|
|
Flusso:
|
|
1. POST /jobs → crea job in Redis con stato "pending"
|
|
2. Background task: scarica dati → aggiorna stato in Redis
|
|
3. GET /jobs/{id} → legge stato da Redis
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import threading
|
|
import uuid
|
|
from typing import Any, Dict
|
|
|
|
import httpx
|
|
from core import copernicus
|
|
from core.cache import cache_get, cache_set, cache_delete
|
|
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException
|
|
from middleware.auth import require_auth
|
|
from schemas import DownloadJobRequest, JobStatus
|
|
|
|
router = APIRouter(prefix="/jobs", tags=["sessions"])
|
|
|
|
API_URL = os.getenv("API_SERVICE_URL", "http://api:3003")
|
|
|
|
# TTL per lo stato dei job: 48 ore (i job completati vengono puliti automaticamente)
|
|
_JOB_TTL = 48 * 3600
|
|
|
|
# Limite di download Copernicus concorrenti. Le subset() dell'SDK sono
|
|
# CPU + memoria intensive (xarray + netCDF + pandas conversion) e sul server
|
|
# le risorse sono limitate. Senza semaforo, N utenti che cliccano insieme
|
|
# saturano la RAM e fanno OOM-kill del processo.
|
|
_DOWNLOAD_CONCURRENCY = int(os.getenv("MARINE_DOWNLOAD_CONCURRENCY", "2"))
|
|
_download_semaphore = threading.BoundedSemaphore(_DOWNLOAD_CONCURRENCY)
|
|
|
|
|
|
def _job_key(session_id: str) -> str:
|
|
"""Genera la chiave Redis per un job."""
|
|
return f"marine:job:{session_id}"
|
|
|
|
|
|
def _get_job(session_id: str) -> Dict[str, Any] | None:
|
|
"""Legge lo stato di un job da Redis."""
|
|
return cache_get(_job_key(session_id))
|
|
|
|
|
|
def _set_job(session_id: str, **kwargs):
|
|
"""Aggiorna lo stato di un job in Redis.
|
|
Legge lo stato corrente, applica le modifiche, e riscrive."""
|
|
job = cache_get(_job_key(session_id))
|
|
if job is None:
|
|
return
|
|
job.update(kwargs)
|
|
cache_set(_job_key(session_id), job, _JOB_TTL, disk_ttl=0)
|
|
|
|
|
|
def _run_download(session_id: str, req: DownloadJobRequest, username: str, user_token: str):
|
|
"""Download in background: Copernicus → conversione → upload via API service.
|
|
|
|
Ad ogni cambio di fase, lo stato viene aggiornato in Redis
|
|
così il frontend può fare polling su GET /jobs/{id}.
|
|
"""
|
|
def progress(pct: int, msg: str):
|
|
_set_job(session_id, progress=pct, message=msg)
|
|
|
|
try:
|
|
_set_job(session_id, status="queued", progress=2, message="In coda (max concorrenti raggiunto)...")
|
|
|
|
# Acquisisce uno slot di download (blocca se già al limite). Garantisce
|
|
# che il numero di chiamate Copernicus simultanee non superi
|
|
# MARINE_DOWNLOAD_CONCURRENCY, proteggendo CPU/RAM del server.
|
|
with _download_semaphore:
|
|
_set_job(session_id, status="downloading", progress=5, message="Scarico da Copernicus Marine...")
|
|
|
|
# Scarica dati dal catalogo Copernicus
|
|
df = copernicus.download_dataset(
|
|
dataset_id=req.dataset_id,
|
|
variables=req.variables,
|
|
min_longitude=req.min_longitude,
|
|
max_longitude=req.max_longitude,
|
|
min_latitude=req.min_latitude,
|
|
max_latitude=req.max_latitude,
|
|
start_datetime=req.start_date,
|
|
end_datetime=req.end_date,
|
|
progress_callback=progress,
|
|
)
|
|
|
|
_set_job(session_id, status="converting", progress=80, message="Creo il file...")
|
|
|
|
# Converte il DataFrame in bytes (CSV o JSON)
|
|
data_bytes, content_type = copernicus.dataframe_to_bytes(df, req.format, req.variable_renames)
|
|
filename = f"upload.{req.format}"
|
|
|
|
_set_job(session_id, status="saving", progress=90, message="Carico su storage...")
|
|
|
|
# Metadati del dataset per l'API service
|
|
metadata = {
|
|
"nome": req.nome,
|
|
"tags": req.tags,
|
|
"created_by": username,
|
|
"type": req.format,
|
|
"notes": req.notes,
|
|
"copernicus_id": req.dataset_id,
|
|
"variables": req.variables,
|
|
"variable_renames": req.variable_renames,
|
|
"bbox": [req.min_longitude, req.min_latitude, req.max_longitude, req.max_latitude],
|
|
"start_date": req.start_date,
|
|
"end_date": req.end_date,
|
|
}
|
|
|
|
# Upload al servizio API che gestisce MinIO
|
|
with httpx.Client(timeout=None) as client:
|
|
r = client.post(
|
|
f"{API_URL}/marine/datasets/upload",
|
|
headers={"Authorization": f"Bearer {user_token}"},
|
|
files={"file": (filename, data_bytes, content_type)},
|
|
data={"metadata": json.dumps(metadata)},
|
|
)
|
|
|
|
if not r.is_success:
|
|
raise RuntimeError(f"API upload failed ({r.status_code}): {r.text}")
|
|
|
|
entry = r.json()
|
|
_set_job(session_id, status="done", progress=100, message="Dataset salvato.", dataset_id=entry["id"])
|
|
|
|
except Exception as e:
|
|
_set_job(session_id, status="error", progress=0, message=str(e))
|
|
|
|
|
|
@router.post("", response_model=JobStatus, status_code=202)
|
|
async def new_download_session(
|
|
req: DownloadJobRequest,
|
|
background_tasks: BackgroundTasks,
|
|
user=Depends(require_auth)
|
|
):
|
|
"""Crea un nuovo job di download e lo avvia in background."""
|
|
session_id = str(uuid.uuid4())
|
|
|
|
# Stato iniziale del job salvato in Redis
|
|
initial_state = {
|
|
"job_id": session_id,
|
|
"status": "pending",
|
|
"progress": 0,
|
|
"message": "In coda",
|
|
"dataset_id": None,
|
|
}
|
|
cache_set(_job_key(session_id), initial_state, _JOB_TTL, disk_ttl=0)
|
|
|
|
# Avvia il download in background
|
|
background_tasks.add_task(_run_download, session_id, req, user["username"], user["token"])
|
|
return initial_state
|
|
|
|
|
|
@router.get("/{session_id}", response_model=JobStatus)
|
|
async def get_download_session(session_id: str, user=Depends(require_auth)):
|
|
"""Legge lo stato di un job di download da Redis."""
|
|
session = _get_job(session_id)
|
|
if session is None:
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
return session
|