feat: Add new API endpoints and HTML pages for ML model management

- Implemented HTML pages for datasets, models, training, testing, and results.
- Created API endpoints for managing repositories, results, tests, and training sessions.
- Added functionality for streaming training progress via Server-Sent Events (SSE).
- Introduced a Dockerfile for the ML runner with necessary dependencies.
- Developed an SDK for user code execution within the runner container.
- Enhanced CSS styles for improved UI layout and navigation.
- Established a layout template for consistent HTML structure across pages.
- Added JavaScript for dynamic interactions on the models page.
- Implemented WebSocket handling for real-time communication with kiosk devices and controllers.
- Implemented model registration and management API at /api/models
- Added Gitea proxy API for repository interactions at /api/repos
- Created results API for listing and comparing training results at /api/results
- Developed training management API for enqueueing and retrieving training jobs at /api/trainings
- Introduced SSE endpoint for live training progress updates
- Added HTML pages for models, datasets, and training management
- Created a Dockerfile for the ML runner with necessary dependencies
- Developed SDK for user code execution within the runner container
- Enhanced CSS styles for improved UI/UX
- Implemented WebSocket communication for real-time device and controller interactions in the kiosk system
This commit is contained in:
Giuseppe Raffa
2026-04-28 09:24:38 +02:00
parent ee478e52ef
commit 0ce879aa44
81 changed files with 7491 additions and 746 deletions

72
ml/core/api_client.py Normal file
View File

@@ -0,0 +1,72 @@
"""Client HTTP verso l'api-service (service-to-service via x-api-key).
Espone accesso a:
/jobs ciclo di vita job
/queue stato coda
/pageconnections registro sessioni di pagina (enforcement /test max 2)
"""
from __future__ import annotations
from typing import Any, Optional
import httpx
from core.config import settings
def _headers() -> dict:
return {"x-api-key": settings.internal_api_key, "Content-Type": "application/json"}
async def _req(method: str, path: str, json: Optional[dict] = None, params: Optional[dict] = None) -> Any:
url = f"{settings.api_url}{path}"
async with httpx.AsyncClient(timeout=10.0) as c:
r = await c.request(method, url, json=json, params=params, headers=_headers())
r.raise_for_status()
if r.status_code == 204 or not r.content:
return None
return r.json()
# ── jobs ────────────────────────────────────────────────────────────────────
async def create_job(type_: str, created_by: str, payload: dict) -> dict:
return await _req("POST", "/jobs", json={"type": type_, "created_by": created_by, "payload": payload})
async def update_job(job_id: str, **fields) -> dict:
return await _req("PATCH", f"/jobs/{job_id}", json=fields)
async def get_job(job_id: str) -> dict:
return await _req("GET", f"/jobs/{job_id}")
async def list_jobs(type_: Optional[str] = None, status: Optional[str] = None, limit: int = 50) -> list:
params = {"limit": str(limit)}
if type_:
params["type"] = type_
if status:
params["status"] = status
return await _req("GET", "/jobs", params=params) or []
# ── queue ───────────────────────────────────────────────────────────────────
async def queue_status(type_: str = "train") -> dict:
return await _req("GET", "/queue", params={"type": type_})
# ── page connections ───────────────────────────────────────────────────────
async def page_connect(page: str, user_id: str, session_id: str) -> dict:
return await _req("POST", "/pageconnections", json={"page": page, "user_id": user_id, "session_id": session_id})
async def page_ping(session_id: str) -> dict:
return await _req("POST", f"/pageconnections/{session_id}/ping")
async def page_disconnect(session_id: str) -> None:
await _req("DELETE", f"/pageconnections/{session_id}")
async def page_count(page: str) -> dict:
return await _req("GET", f"/pageconnections/{page}")

64
ml/core/config.py Normal file
View File

@@ -0,0 +1,64 @@
"""Configurazione centralizzata del servizio ML, letta da env."""
from __future__ import annotations
import os
from dataclasses import dataclass
def _b(name: str, default: bool = False) -> bool:
return os.environ.get(name, str(default)).lower() in ("1", "true", "yes", "on")
@dataclass(frozen=True)
class Settings:
# Postgres (db "ml")
pg_host: str = os.environ.get("PG_HOST", "meb-postgres")
pg_port: int = int(os.environ.get("PG_PORT", "5432"))
pg_user: str = os.environ.get("DB_USER", "meb")
pg_password: str = os.environ.get("DB_PASSWORD", "meb")
pg_db: str = os.environ.get("ML_DB", "ml")
# Redis
redis_host: str = os.environ.get("REDIS_HOST", "meb-redis")
redis_port: int = int(os.environ.get("REDIS_PORT", "6379"))
# MinIO (bucket unico)
minio_endpoint: str = os.environ.get("MINIO_ENDPOINT", "minio")
minio_port: int = int(os.environ.get("MINIO_PORT", "9000"))
minio_use_ssl: bool = _b("MINIO_USE_SSL", False)
minio_access_key: str = os.environ.get("MINIO_ACCESS_KEY", "")
minio_secret_key: str = os.environ.get("MINIO_SECRET_KEY", "")
minio_bucket: str = os.environ.get("MINIO_BUCKET", "ml")
# InfluxDB — accetta sia INFLUX_* che INFLX_* per allinearsi alle var già
# usate dagli altri servizi (realtime, api) senza dover duplicare la config.
influx_url: str = os.environ.get("INFLUX_URL") or os.environ.get("INFLX_URL", "http://meb-influx:8086")
influx_token: str = os.environ.get("INFLUX_TOKEN") or os.environ.get("INFLX_TOKEN", "")
influx_org: str = os.environ.get("INFLUX_ORG") or os.environ.get("INFLX_ORG", "meb")
# Bucket dedicato alle metriche di training/test ML, separato dai logs e
# dai dati meteo. Sovrascrivibile via INFLUX_BUCKET o ML_INFLUX_BUCKET.
influx_bucket: str = os.environ.get("ML_INFLUX_BUCKET") or os.environ.get("INFLUX_BUCKET", "ml_metrics")
# Gitea (installato esternamente)
gitea_url: str = os.environ.get("GITEA_URL", "")
gitea_token: str = os.environ.get("GITEA_TOKEN", "")
# API service (per jobs/queue/pageconnections)
api_url: str = os.environ.get("API_URL", "http://api:3003")
internal_api_key: str = os.environ.get("INTERNAL_API_KEY", "")
# Auth (condiviso)
jwt_secret: str = os.environ.get("JWT_SECRET", "")
auth_login_url: str = os.environ.get("AUTH_LOGIN_URL", "https://auth.mebboat.it/login")
# Esecuzione training
train_concurrency: int = int(os.environ.get("ML_TRAIN_CONCURRENCY", "1"))
runner_image: str = os.environ.get("ML_RUNNER_IMAGE", "meb-ml-runner:latest")
runner_tmp_dir: str = os.environ.get("ML_RUNNER_TMP", "/var/ml/tmp")
gitcache_dir: str = os.environ.get("ML_GITCACHE_DIR", "/var/ml/gitcache")
# Limiti runtime
max_upload_mb: int = int(os.environ.get("ML_MAX_UPLOAD_MB", "500"))
settings = Settings()

53
ml/core/db.py Normal file
View File

@@ -0,0 +1,53 @@
"""Connessione asyncpg al database ml. Pool singleton."""
from __future__ import annotations
import asyncpg
from typing import Optional
from core.config import settings
_pool: Optional[asyncpg.Pool] = None
async def init_pool() -> asyncpg.Pool:
global _pool
if _pool is None:
_pool = await asyncpg.create_pool(
host=settings.pg_host,
port=settings.pg_port,
user=settings.pg_user,
password=settings.pg_password,
database=settings.pg_db,
min_size=1,
max_size=10,
command_timeout=30,
)
return _pool
async def close_pool() -> None:
global _pool
if _pool is not None:
await _pool.close()
_pool = None
def pool() -> asyncpg.Pool:
if _pool is None:
raise RuntimeError("DB pool not initialized — call init_pool() at startup")
return _pool
async def fetch(sql: str, *args):
async with pool().acquire() as c:
return await c.fetch(sql, *args)
async def fetchrow(sql: str, *args):
async with pool().acquire() as c:
return await c.fetchrow(sql, *args)
async def execute(sql: str, *args):
async with pool().acquire() as c:
return await c.execute(sql, *args)

439
ml/core/docker_runner.py Normal file
View File

@@ -0,0 +1,439 @@
"""Runner Docker per train e test.
train:
- clone repo Gitea @ sha
- prepara workdir /var/ml/tmp/{training_id}
- scarica dataset da MinIO in workdir/data.<ext>
- docker run meb-ml-runner con mount tmp, env, limits da model.yml
- legge stdout JSON → Redis stream + Influx; docker stats ogni 5s
- a fine: collect outputs, upload su MinIO prefix artifacts_prefix
- UPDATE trainings
test:
- analogo ma sincrono, stdin JSON → stdout JSON
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import shutil
import subprocess
import time
import uuid
from pathlib import Path
from typing import Any, Optional
import docker
from influxdb_client import Point
from core import db, gitea, influx_client, minio_client, redis_client
from core.config import settings
from core.model_spec import fetch_and_parse_spec
log = logging.getLogger(__name__)
_docker = None
def _docker_client():
global _docker
if _docker is None:
_docker = docker.from_env()
return _docker
async def _emit(stream_key: str, payload: dict) -> None:
try:
await redis_client.client().xadd(stream_key, {"payload": json.dumps(payload)}, maxlen=10_000)
except Exception as e:
log.warning("xadd failed: %s", e)
async def _clone_repo(owner_repo: str, sha: str, dest: Path) -> None:
dest.mkdir(parents=True, exist_ok=True)
url = gitea.clone_url(owner_repo)
# clone shallow del branch/sha specifico
# per evitare leak del token nei log, logghiamo solo host
proc = await asyncio.create_subprocess_exec(
"git", "clone", "--depth", "50", url, str(dest),
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
)
_, err = await proc.communicate()
if proc.returncode != 0:
raise RuntimeError(f"git clone failed: {err.decode(errors='replace')[:400]}")
# checkout sha
proc = await asyncio.create_subprocess_exec(
"git", "-C", str(dest), "checkout", sha,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
)
_, err = await proc.communicate()
if proc.returncode != 0:
raise RuntimeError(f"git checkout failed: {err.decode(errors='replace')[:400]}")
async def _download_dataset(dataset_id: str, dest: Path) -> str:
row = await db.fetchrow(
"SELECT file_key, format FROM datasets WHERE id = $1", uuid.UUID(dataset_id)
)
if not row:
raise RuntimeError("dataset not found")
data = minio_client.get_bytes(row["file_key"], bucket="ml.datasets")
ext = {"csv": "csv", "json": "json", "netcdf": "nc"}.get(row["format"], "bin")
out = dest / f"data.{ext}"
out.write_bytes(data)
return str(out)
def _stats_loop_sync(container, training_id: str, model_id: str, samples: list, stop_evt: asyncio.Event, loop: asyncio.AbstractEventLoop):
"""Sincrono, eseguito in thread. Ogni 5s legge docker stats → Influx + samples."""
while not stop_evt.is_set():
try:
stats = container.stats(stream=False)
# CPU%
cpu_delta = stats["cpu_stats"]["cpu_usage"]["total_usage"] - stats["precpu_stats"]["cpu_usage"]["total_usage"]
sys_delta = stats["cpu_stats"].get("system_cpu_usage", 0) - stats["precpu_stats"].get("system_cpu_usage", 0)
online = stats["cpu_stats"].get("online_cpus") or len(stats["cpu_stats"]["cpu_usage"].get("percpu_usage") or [1])
cpu_pct = (cpu_delta / sys_delta) * online * 100.0 if sys_delta > 0 else 0.0
mem_mb = (stats["memory_stats"].get("usage") or 0) / (1024 * 1024)
samples.append((cpu_pct, mem_mb))
point = (
Point("ml_training")
.tag("training_id", training_id)
.tag("model_id", model_id)
.field("cpu_pct", float(cpu_pct))
.field("mem_mb", float(mem_mb))
)
asyncio.run_coroutine_threadsafe(influx_client.write_points([point]), loop)
except Exception as e:
log.warning("stats loop error: %s", e)
time.sleep(5)
async def _stream_container_logs(container, training_id: str, model_id: str, stream_key: str):
"""Legge stdout del container, pubblica righe JSON su Redis stream e Influx."""
def _iter():
return container.logs(stream=True, follow=True, stdout=True, stderr=True)
loop = asyncio.get_event_loop()
it = await loop.run_in_executor(None, _iter)
while True:
line = await loop.run_in_executor(None, next, it, None)
if line is None:
break
try:
text = line.decode("utf-8", errors="replace").rstrip("\n")
except Exception:
continue
if not text:
continue
# righe non-JSON → log
payload: dict
if text.startswith("{") and text.endswith("}"):
try:
payload = json.loads(text)
except json.JSONDecodeError:
payload = {"type": "log", "level": "info", "message": text}
else:
payload = {"type": "log", "level": "info", "message": text}
await _emit(stream_key, payload)
if payload.get("type") == "metric":
p = Point("ml_training").tag("training_id", training_id).tag("model_id", model_id)
for k, v in payload.items():
if k == "type":
continue
if isinstance(v, (int, float)):
p = p.field(k, float(v))
try:
await influx_client.write_points([p])
except Exception as e:
log.warning("influx write metric failed: %s", e)
async def run_training_job(training_id: str) -> None:
"""Esegue un job di training end-to-end. Aggiorna Postgres e Redis state."""
r = redis_client.client()
state_key = f"ml:train:{training_id}"
stream_key = f"ml:train:{training_id}:events"
tr = await db.fetchrow("SELECT * FROM trainings WHERE id = $1", uuid.UUID(training_id))
if not tr:
log.error("training %s not found", training_id)
return
model = await db.fetchrow("SELECT * FROM models WHERE id = $1", tr["model_id"])
if not model:
await db.execute(
"UPDATE trainings SET status='failed', error=$2 WHERE id=$1",
uuid.UUID(training_id), "model not found",
)
return
await db.execute(
"UPDATE trainings SET status='running', started_at=NOW() WHERE id=$1",
uuid.UUID(training_id),
)
await r.hset(state_key, mapping={"status": "running", "progress": "0", "message": "starting"})
workdir = Path(settings.runner_tmp_dir) / training_id
artifacts_prefix = f"models/{tr['model_id']}/{tr['version']}/{tr['patch']}"
error: Optional[str] = None
samples: list[tuple[float, float]] = []
try:
workdir.mkdir(parents=True, exist_ok=True)
await _emit(stream_key, {"type": "log", "level": "info", "message": "cloning repo"})
await _clone_repo(model["gitea_repo"], tr["patch"], workdir / "repo")
await _emit(stream_key, {"type": "log", "level": "info", "message": "parsing model.yml"})
spec = await fetch_and_parse_spec(model["gitea_repo"], tr["patch"]) or {}
train_spec = spec.get("train", {})
entrypoint = train_spec.get("entrypoint") or "python -m src.train"
resources = spec.get("resources", {}) or {}
await _emit(stream_key, {"type": "log", "level": "info", "message": "downloading dataset"})
dataset_path = await _download_dataset(str(tr["dataset_id"]), workdir)
out_dir = workdir / "out"
out_dir.mkdir(exist_ok=True)
# docker run
dc = _docker_client()
await _emit(stream_key, {"type": "log", "level": "info", "message": "starting container"})
container = dc.containers.run(
settings.runner_image,
command=["sh", "-c", f"cd /workdir/repo && pip install -q -r requirements.txt 2>&1 || true && {entrypoint}"],
detach=True,
working_dir="/workdir/repo",
environment={
"MEB_DATASET_PATH": f"/workdir/{Path(dataset_path).name}",
"MEB_ARTIFACTS_DIR": "/workdir/out",
"MEB_TRAINING_ID": training_id,
},
volumes={str(workdir): {"bind": "/workdir", "mode": "rw"}},
network_mode="none",
mem_limit=f"{int(resources.get('mem_mb', 2048))}m",
nano_cpus=int(float(resources.get("cpu", 1)) * 1e9),
read_only=False,
tty=False,
detach_mode=None,
)
loop = asyncio.get_event_loop()
stop_evt = asyncio.Event()
stats_task = loop.run_in_executor(
None, _stats_loop_sync, container, training_id, str(tr["model_id"]), samples, stop_evt, loop
)
log_task = asyncio.create_task(
_stream_container_logs(container, training_id, str(tr["model_id"]), stream_key)
)
# attendi exit
exit_code = await loop.run_in_executor(None, lambda: container.wait()["StatusCode"])
stop_evt.set()
await log_task
try:
stats_task.cancel()
except Exception:
pass
if exit_code != 0:
error = f"container exited with code {exit_code}"
# raccogli outputs
results: dict = {}
final_metrics_path = out_dir / "metrics.json"
if final_metrics_path.exists():
try:
results = json.loads(final_metrics_path.read_text())
except Exception:
results = {"raw": final_metrics_path.read_text()[:10000]}
# upload artefatti (tutta la cartella out/)
for p in out_dir.rglob("*"):
if p.is_file():
rel = p.relative_to(out_dir).as_posix()
key = f"{artifacts_prefix}/{rel}"
minio_client.put_bytes(key, p.read_bytes())
# upload logs jsonl dallo stream redis (copia su minio per persistenza)
try:
entries = await r.xrange(stream_key, min="-", max="+")
lines = "\n".join(json.dumps({"id": i, **({"payload": json.loads(f.get("payload", "{}"))} if "payload" in f else f)}) for i, f in entries)
minio_client.put_bytes(f"trainings/{training_id}/logs.jsonl", lines.encode("utf-8"), "application/x-ndjson")
except Exception as e:
log.warning("log archive failed: %s", e)
cpu_avg = sum(s[0] for s in samples) / len(samples) if samples else 0.0
cpu_peak = max((s[0] for s in samples), default=0.0)
mem_avg = sum(s[1] for s in samples) / len(samples) if samples else 0.0
mem_peak = max((s[1] for s in samples), default=0.0)
resource_summary = {
"cpu_avg": round(cpu_avg, 2),
"cpu_peak": round(cpu_peak, 2),
"mem_avg_mb": round(mem_avg, 2),
"mem_peak_mb": round(mem_peak, 2),
"samples": len(samples),
}
status = "failed" if error else "succeeded"
await db.execute(
"""
UPDATE trainings SET
status=$2,
finished_at=NOW(),
duration_ms=EXTRACT(EPOCH FROM (NOW() - started_at))*1000,
artifacts_prefix=$3,
results=$4::jsonb,
resource_summary=$5::jsonb,
error=$6
WHERE id=$1
""",
uuid.UUID(training_id),
status,
artifacts_prefix,
json.dumps(results),
json.dumps(resource_summary),
error,
)
await r.hset(state_key, mapping={"status": status, "progress": "100", "message": error or "done"})
await _emit(stream_key, {"type": "end", "status": status, "error": error})
# Flush dei punti Influx accumulati durante il training (batched).
await influx_client.flush()
try:
container.remove(force=True)
except Exception:
pass
except Exception as e:
log.exception("training %s failed: %s", training_id, e)
await db.execute(
"UPDATE trainings SET status='failed', finished_at=NOW(), error=$2 WHERE id=$1",
uuid.UUID(training_id), str(e)[:1000],
)
await r.hset(state_key, mapping={"status": "failed", "message": str(e)[:200]})
await _emit(stream_key, {"type": "end", "status": "failed", "error": str(e)[:400]})
finally:
# cleanup workdir
try:
shutil.rmtree(workdir, ignore_errors=True)
except Exception:
pass
async def run_test_once(training_id: str, inputs: dict) -> dict:
"""Esegue una singola predizione via container spawn."""
tr = await db.fetchrow(
"SELECT t.*, m.gitea_repo FROM trainings t JOIN models m ON t.model_id = m.id WHERE t.id=$1",
uuid.UUID(training_id),
)
if not tr:
raise RuntimeError("training not found")
spec = await fetch_and_parse_spec(tr["gitea_repo"], tr["patch"]) or {}
test_spec = spec.get("test") or {}
entrypoint = test_spec.get("entrypoint") or "python -m src.predict"
workdir = Path(settings.runner_tmp_dir) / f"test-{uuid.uuid4()}"
workdir.mkdir(parents=True, exist_ok=True)
try:
await _clone_repo(tr["gitea_repo"], tr["patch"], workdir / "repo")
# scarica artefatti
if tr["artifacts_prefix"]:
art_dir = workdir / "artifacts"
art_dir.mkdir(exist_ok=True)
for obj in minio_client.list_prefix(tr["artifacts_prefix"] + "/"):
rel = obj["name"][len(tr["artifacts_prefix"]) + 1:]
out_path = art_dir / rel
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_bytes(minio_client.get_bytes(obj["name"]))
dc = _docker_client()
payload = json.dumps({"inputs": inputs}).encode()
container = dc.containers.run(
settings.runner_image,
command=["sh", "-c", f"cd /workdir/repo && pip install -q -r requirements.txt 2>&1 >/dev/null || true && {entrypoint}"],
detach=True,
working_dir="/workdir/repo",
environment={
"MEB_ARTIFACTS_DIR": "/workdir/artifacts",
"MEB_TRAINING_ID": training_id,
},
volumes={str(workdir): {"bind": "/workdir", "mode": "ro"}},
network_mode="none",
mem_limit="2048m",
nano_cpus=int(1e9),
stdin_open=True,
tty=False,
)
# scrivi input su stdin via attach socket
sock = container.attach_socket(params={"stdin": 1, "stream": 1})
try:
sock._sock.sendall(payload + b"\n")
except Exception:
pass
try:
sock.close()
except Exception:
pass
loop = asyncio.get_event_loop()
# stats peak
peak_cpu = 0.0
peak_mem = 0.0
stop = False
def _stats():
nonlocal peak_cpu, peak_mem, stop
for st in container.stats(stream=True, decode=True):
if stop:
return
try:
cpu_delta = st["cpu_stats"]["cpu_usage"]["total_usage"] - st["precpu_stats"]["cpu_usage"]["total_usage"]
sys_delta = st["cpu_stats"].get("system_cpu_usage", 0) - st["precpu_stats"].get("system_cpu_usage", 0)
online = st["cpu_stats"].get("online_cpus") or 1
cpu_pct = (cpu_delta / sys_delta) * online * 100 if sys_delta > 0 else 0
mem_mb = (st["memory_stats"].get("usage") or 0) / (1024 * 1024)
peak_cpu = max(peak_cpu, cpu_pct)
peak_mem = max(peak_mem, mem_mb)
except Exception:
pass
stats_fut = loop.run_in_executor(None, _stats)
exit_info = await loop.run_in_executor(None, container.wait)
stop = True
logs = container.logs(stdout=True, stderr=False).decode("utf-8", errors="replace")
try:
container.remove(force=True)
except Exception:
pass
outputs: dict = {}
for line in logs.strip().splitlines():
line = line.strip()
if line.startswith("{") and line.endswith("}"):
try:
obj = json.loads(line)
if "outputs" in obj:
outputs = obj["outputs"]
break
except json.JSONDecodeError:
continue
return {
"outputs": outputs,
"exit_code": exit_info.get("StatusCode"),
"cpu_peak": round(peak_cpu, 2),
"mem_peak_mb": round(peak_mem, 2),
"raw_log": logs[-2000:],
}
finally:
shutil.rmtree(workdir, ignore_errors=True)

57
ml/core/gitea.py Normal file
View File

@@ -0,0 +1,57 @@
"""Client Gitea: browse repo, branches, commits, file raw, clone URL autenticato."""
from __future__ import annotations
from typing import Optional
import httpx
from core.config import settings
def _headers() -> dict:
h = {"Accept": "application/json"}
if settings.gitea_token:
h["Authorization"] = f"token {settings.gitea_token}"
return h
def clone_url(owner_repo: str) -> str:
"""URL https://oauth2:TOKEN@<host>/owner/repo.git — usato SOLO lato server."""
if not settings.gitea_url:
raise RuntimeError("GITEA_URL not configured")
base = settings.gitea_url.rstrip("/")
if settings.gitea_token:
base = base.replace("https://", f"https://oauth2:{settings.gitea_token}@").replace(
"http://", f"http://oauth2:{settings.gitea_token}@"
)
return f"{base}/{owner_repo}.git"
async def _get(path: str, params: Optional[dict] = None) -> list | dict:
url = f"{settings.gitea_url.rstrip('/')}/api/v1{path}"
async with httpx.AsyncClient(timeout=15.0) as c:
r = await c.get(url, params=params, headers=_headers())
r.raise_for_status()
return r.json()
async def list_repos(limit: int = 50) -> list[dict]:
data = await _get("/repos/search", params={"limit": str(limit)})
return data.get("data", []) if isinstance(data, dict) else []
async def list_branches(owner_repo: str) -> list[dict]:
return await _get(f"/repos/{owner_repo}/branches")
async def list_commits(owner_repo: str, branch: str = "main", limit: int = 50) -> list[dict]:
return await _get(f"/repos/{owner_repo}/commits", params={"sha": branch, "limit": str(limit)})
async def get_file_raw(owner_repo: str, ref: str, path: str) -> bytes:
"""Scarica il file raw alla revisione indicata."""
url = f"{settings.gitea_url.rstrip('/')}/api/v1/repos/{owner_repo}/raw/{path}"
async with httpx.AsyncClient(timeout=15.0) as c:
r = await c.get(url, params={"ref": ref}, headers=_headers())
r.raise_for_status()
return r.content

75
ml/core/influx_client.py Normal file
View File

@@ -0,0 +1,75 @@
"""Client InfluxDB (influxdb-client sync wrapper in thread-pool per async).
Le scritture usano il batching async dell'SDK invece di SYNCHRONOUS.
Le metriche di training arrivano in burst (logs container, stats loop ogni 5s):
con SYNCHRONOUS ogni write era una HTTP request bloccante. Con WriteOptions
batched, l'SDK accumula i Point e fa flush periodico in background, senza
perdere durabilità (flush forzato a fine training).
"""
from __future__ import annotations
import asyncio
from typing import Iterable, Optional
from influxdb_client import InfluxDBClient, Point, WriteOptions
from core.config import settings
_client: Optional[InfluxDBClient] = None
_write_api = None
def client() -> InfluxDBClient:
global _client, _write_api
if _client is None:
_client = InfluxDBClient(
url=settings.influx_url, token=settings.influx_token, org=settings.influx_org
)
_write_api = _client.write_api(write_options=WriteOptions(
batch_size=200,
flush_interval=2_000,
jitter_interval=200,
retry_interval=2_000,
max_retries=3,
))
return _client
def _wa():
client()
return _write_api
async def write_points(points: Iterable[Point]) -> None:
wa = _wa()
pts = list(points)
await asyncio.to_thread(wa.write, settings.influx_bucket, settings.influx_org, pts)
async def flush() -> None:
"""Forza il flush del buffer batched. Da chiamare a fine training per
garantire che tutte le metriche raccolte siano persistite."""
if _write_api is None:
return
try:
await asyncio.to_thread(_write_api.flush)
except Exception:
pass
async def query_flux(flux: str) -> list[dict]:
c = client()
def _q():
tables = c.query_api().query(flux, org=settings.influx_org)
out = []
for table in tables:
for r in table.records:
out.append({
"time": r.get_time().isoformat() if r.get_time() else None,
"measurement": r.get_measurement(),
"field": r.get_field(),
"value": r.get_value(),
"tags": {k: v for k, v in r.values.items() if k.startswith("_") is False and k not in ("result", "table")},
})
return out
return await asyncio.to_thread(_q)

118
ml/core/minio_client.py Normal file
View File

@@ -0,0 +1,118 @@
"""Wrapper MinIO: bucket unico (settings.minio_bucket) con prefissi logici.
Prefissi usati:
datasets/<uuid>.<ext>
models/<model_id>/spec.yml
models/<model_id>/<version>/<patch>/... (artefatti training)
trainings/<training_id>/logs.jsonl
"""
from __future__ import annotations
import io
from datetime import timedelta
from typing import Iterable, Optional
from minio import Minio
from minio.error import S3Error
from core.config import settings
_client: Optional[Minio] = None
def client() -> Minio:
global _client
if _client is None:
_client = Minio(
f"{settings.minio_endpoint}:{settings.minio_port}",
access_key=settings.minio_access_key,
secret_key=settings.minio_secret_key,
secure=settings.minio_use_ssl,
)
return _client
def _bucket(b: Optional[str] = None) -> str:
return b or settings.minio_bucket
def ensure_bucket(bucket: Optional[str] = None) -> None:
name = _bucket(bucket)
c = client()
if not c.bucket_exists(name):
c.make_bucket(name)
def put_bytes(key: str, data: bytes, content_type: str = "application/octet-stream",
bucket: Optional[str] = None) -> None:
ensure_bucket(bucket)
client().put_object(
_bucket(bucket),
key,
io.BytesIO(data),
length=len(data),
content_type=content_type,
)
def put_stream(key: str, stream, length: int, content_type: str = "application/octet-stream",
bucket: Optional[str] = None) -> None:
ensure_bucket(bucket)
client().put_object(
_bucket(bucket), key, stream, length=length, content_type=content_type
)
def get_bytes(key: str, bucket: Optional[str] = None) -> bytes:
r = client().get_object(_bucket(bucket), key)
try:
return r.read()
finally:
r.close()
r.release_conn()
def remove(key: str, bucket: Optional[str] = None) -> None:
try:
client().remove_object(_bucket(bucket), key)
except S3Error:
pass
def remove_prefix(prefix: str, bucket: Optional[str] = None) -> int:
name = _bucket(bucket)
n = 0
for obj in client().list_objects(name, prefix=prefix, recursive=True):
try:
client().remove_object(name, obj.object_name)
n += 1
except S3Error:
pass
return n
def presigned_get(key: str, expires_seconds: int = 3600, bucket: Optional[str] = None) -> str:
return client().presigned_get_object(
_bucket(bucket), key, expires=timedelta(seconds=expires_seconds)
)
def list_prefix(prefix: str, bucket: Optional[str] = None) -> list[dict]:
out = []
for obj in client().list_objects(_bucket(bucket), prefix=prefix, recursive=True):
out.append({
"name": obj.object_name,
"size": obj.size,
"last_modified": obj.last_modified.isoformat() if obj.last_modified else None,
"etag": obj.etag,
})
return out
def check() -> bool:
try:
client().list_buckets()
return True
except Exception:
return False

90
ml/core/model_spec.py Normal file
View File

@@ -0,0 +1,90 @@
"""Parse e validazione del contratto `model.yml` nelle repo utente.
Schema sintetico (vedi piano):
name, type, version, python
train: {entrypoint, inputs, outputs, metrics}
test: {entrypoint, io, input_schema[], output_schema[]}
resources: {cpu, mem_mb, gpu}
"""
from __future__ import annotations
from typing import Any, Optional
import yaml
from pydantic import BaseModel, ValidationError
from core import gitea, redis_client
class _FieldSpec(BaseModel):
name: str
dtype: str
min: Optional[float] = None
max: Optional[float] = None
unit: Optional[str] = None
class _Train(BaseModel):
entrypoint: str
inputs: dict = {}
outputs: dict = {}
metrics: dict = {}
class _Test(BaseModel):
entrypoint: str
io: str = "stdio_json"
input_schema: list[_FieldSpec] = []
output_schema: list[_FieldSpec] = []
class ModelSpec(BaseModel):
name: str
type: str
version: str = "0.1.0"
python: str = "3.11"
train: _Train
test: Optional[_Test] = None
resources: dict = {}
def parse_yaml(content: bytes | str) -> dict:
"""Parsa stringa YAML → dict validato. Solleva ValueError su errore."""
if isinstance(content, bytes):
content = content.decode("utf-8")
try:
raw = yaml.safe_load(content) or {}
spec = ModelSpec(**raw)
return spec.model_dump()
except (yaml.YAMLError, ValidationError) as e:
raise ValueError(f"invalid model.yml: {e}") from e
async def fetch_and_parse_spec(owner_repo: str, ref: str) -> Optional[dict]:
"""Recupera model.yml dalla repo alla revisione e lo parsa.
Cache Redis `ml:modelspec:{repo}:{ref}` TTL 1h.
"""
cache_key = f"ml:modelspec:{owner_repo}:{ref}"
try:
cached = await redis_client.client().get(cache_key)
if cached:
import json
return json.loads(cached)
except Exception:
pass
try:
raw = await gitea.get_file_raw(owner_repo, ref, "model.yml")
except Exception:
try:
raw = await gitea.get_file_raw(owner_repo, ref, "model.yaml")
except Exception:
return None
spec = parse_yaml(raw)
try:
import json
await redis_client.client().set(cache_key, json.dumps(spec), ex=3600)
except Exception:
pass
return spec

29
ml/core/redis_client.py Normal file
View File

@@ -0,0 +1,29 @@
"""Client Redis asincrono (redis-py asyncio). Singleton semplice."""
from __future__ import annotations
from typing import Optional
import redis.asyncio as redis
from core.config import settings
_client: Optional[redis.Redis] = None
def client() -> redis.Redis:
global _client
if _client is None:
_client = redis.Redis(
host=settings.redis_host,
port=settings.redis_port,
decode_responses=True,
health_check_interval=30,
)
return _client
async def close() -> None:
global _client
if _client is not None:
await _client.aclose()
_client = None

54
ml/core/worker.py Normal file
View File

@@ -0,0 +1,54 @@
"""Worker loop: BRPOP da ml:queue:train e dispatch al docker_runner.
Parte N task asincroni concorrenti (settings.train_concurrency).
"""
from __future__ import annotations
import asyncio
import logging
from core import redis_client
from core.config import settings
from core.docker_runner import run_training_job
log = logging.getLogger(__name__)
_tasks: list[asyncio.Task] = []
async def _worker_loop(idx: int):
r = redis_client.client()
log.info("ml worker[%d] started", idx)
while True:
try:
res = await r.brpop("ml:queue:train", timeout=10)
except Exception as e:
log.warning("brpop error: %s", e)
await asyncio.sleep(2)
continue
if res is None:
continue
_, training_id = res
log.info("worker[%d] picked training %s", idx, training_id)
try:
await run_training_job(training_id)
except Exception:
log.exception("worker[%d] training %s crashed", idx, training_id)
def start_workers() -> None:
global _tasks
n = max(1, settings.train_concurrency)
for i in range(n):
_tasks.append(asyncio.create_task(_worker_loop(i)))
async def stop_workers() -> None:
for t in _tasks:
t.cancel()
for t in _tasks:
try:
await t
except Exception:
pass
_tasks.clear()