Files
OLD-server-architecture/ml/core/docker_runner.py
Giuseppe Raffa 0ce879aa44 feat: Add new API endpoints and HTML pages for ML model management
- Implemented HTML pages for datasets, models, training, testing, and results.
- Created API endpoints for managing repositories, results, tests, and training sessions.
- Added functionality for streaming training progress via Server-Sent Events (SSE).
- Introduced a Dockerfile for the ML runner with necessary dependencies.
- Developed an SDK for user code execution within the runner container.
- Enhanced CSS styles for improved UI layout and navigation.
- Established a layout template for consistent HTML structure across pages.
- Added JavaScript for dynamic interactions on the models page.
- Implemented WebSocket handling for real-time communication with kiosk devices and controllers.
- Implemented model registration and management API at /api/models
- Added Gitea proxy API for repository interactions at /api/repos
- Created results API for listing and comparing training results at /api/results
- Developed training management API for enqueueing and retrieving training jobs at /api/trainings
- Introduced SSE endpoint for live training progress updates
- Added HTML pages for models, datasets, and training management
- Created a Dockerfile for the ML runner with necessary dependencies
- Developed SDK for user code execution within the runner container
- Enhanced CSS styles for improved UI/UX
- Implemented WebSocket communication for real-time device and controller interactions in the kiosk system
2026-04-28 09:24:38 +02:00

440 lines
17 KiB
Python

"""Runner Docker per train e test.
train:
- clone repo Gitea @ sha
- prepara workdir /var/ml/tmp/{training_id}
- scarica dataset da MinIO in workdir/data.<ext>
- docker run meb-ml-runner con mount tmp, env, limits da model.yml
- legge stdout JSON → Redis stream + Influx; docker stats ogni 5s
- a fine: collect outputs, upload su MinIO prefix artifacts_prefix
- UPDATE trainings
test:
- analogo ma sincrono, stdin JSON → stdout JSON
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import shutil
import subprocess
import time
import uuid
from pathlib import Path
from typing import Any, Optional
import docker
from influxdb_client import Point
from core import db, gitea, influx_client, minio_client, redis_client
from core.config import settings
from core.model_spec import fetch_and_parse_spec
log = logging.getLogger(__name__)
_docker = None
def _docker_client():
global _docker
if _docker is None:
_docker = docker.from_env()
return _docker
async def _emit(stream_key: str, payload: dict) -> None:
try:
await redis_client.client().xadd(stream_key, {"payload": json.dumps(payload)}, maxlen=10_000)
except Exception as e:
log.warning("xadd failed: %s", e)
async def _clone_repo(owner_repo: str, sha: str, dest: Path) -> None:
dest.mkdir(parents=True, exist_ok=True)
url = gitea.clone_url(owner_repo)
# clone shallow del branch/sha specifico
# per evitare leak del token nei log, logghiamo solo host
proc = await asyncio.create_subprocess_exec(
"git", "clone", "--depth", "50", url, str(dest),
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
)
_, err = await proc.communicate()
if proc.returncode != 0:
raise RuntimeError(f"git clone failed: {err.decode(errors='replace')[:400]}")
# checkout sha
proc = await asyncio.create_subprocess_exec(
"git", "-C", str(dest), "checkout", sha,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
)
_, err = await proc.communicate()
if proc.returncode != 0:
raise RuntimeError(f"git checkout failed: {err.decode(errors='replace')[:400]}")
async def _download_dataset(dataset_id: str, dest: Path) -> str:
row = await db.fetchrow(
"SELECT file_key, format FROM datasets WHERE id = $1", uuid.UUID(dataset_id)
)
if not row:
raise RuntimeError("dataset not found")
data = minio_client.get_bytes(row["file_key"], bucket="ml.datasets")
ext = {"csv": "csv", "json": "json", "netcdf": "nc"}.get(row["format"], "bin")
out = dest / f"data.{ext}"
out.write_bytes(data)
return str(out)
def _stats_loop_sync(container, training_id: str, model_id: str, samples: list, stop_evt: asyncio.Event, loop: asyncio.AbstractEventLoop):
"""Sincrono, eseguito in thread. Ogni 5s legge docker stats → Influx + samples."""
while not stop_evt.is_set():
try:
stats = container.stats(stream=False)
# CPU%
cpu_delta = stats["cpu_stats"]["cpu_usage"]["total_usage"] - stats["precpu_stats"]["cpu_usage"]["total_usage"]
sys_delta = stats["cpu_stats"].get("system_cpu_usage", 0) - stats["precpu_stats"].get("system_cpu_usage", 0)
online = stats["cpu_stats"].get("online_cpus") or len(stats["cpu_stats"]["cpu_usage"].get("percpu_usage") or [1])
cpu_pct = (cpu_delta / sys_delta) * online * 100.0 if sys_delta > 0 else 0.0
mem_mb = (stats["memory_stats"].get("usage") or 0) / (1024 * 1024)
samples.append((cpu_pct, mem_mb))
point = (
Point("ml_training")
.tag("training_id", training_id)
.tag("model_id", model_id)
.field("cpu_pct", float(cpu_pct))
.field("mem_mb", float(mem_mb))
)
asyncio.run_coroutine_threadsafe(influx_client.write_points([point]), loop)
except Exception as e:
log.warning("stats loop error: %s", e)
time.sleep(5)
async def _stream_container_logs(container, training_id: str, model_id: str, stream_key: str):
"""Legge stdout del container, pubblica righe JSON su Redis stream e Influx."""
def _iter():
return container.logs(stream=True, follow=True, stdout=True, stderr=True)
loop = asyncio.get_event_loop()
it = await loop.run_in_executor(None, _iter)
while True:
line = await loop.run_in_executor(None, next, it, None)
if line is None:
break
try:
text = line.decode("utf-8", errors="replace").rstrip("\n")
except Exception:
continue
if not text:
continue
# righe non-JSON → log
payload: dict
if text.startswith("{") and text.endswith("}"):
try:
payload = json.loads(text)
except json.JSONDecodeError:
payload = {"type": "log", "level": "info", "message": text}
else:
payload = {"type": "log", "level": "info", "message": text}
await _emit(stream_key, payload)
if payload.get("type") == "metric":
p = Point("ml_training").tag("training_id", training_id).tag("model_id", model_id)
for k, v in payload.items():
if k == "type":
continue
if isinstance(v, (int, float)):
p = p.field(k, float(v))
try:
await influx_client.write_points([p])
except Exception as e:
log.warning("influx write metric failed: %s", e)
async def run_training_job(training_id: str) -> None:
"""Esegue un job di training end-to-end. Aggiorna Postgres e Redis state."""
r = redis_client.client()
state_key = f"ml:train:{training_id}"
stream_key = f"ml:train:{training_id}:events"
tr = await db.fetchrow("SELECT * FROM trainings WHERE id = $1", uuid.UUID(training_id))
if not tr:
log.error("training %s not found", training_id)
return
model = await db.fetchrow("SELECT * FROM models WHERE id = $1", tr["model_id"])
if not model:
await db.execute(
"UPDATE trainings SET status='failed', error=$2 WHERE id=$1",
uuid.UUID(training_id), "model not found",
)
return
await db.execute(
"UPDATE trainings SET status='running', started_at=NOW() WHERE id=$1",
uuid.UUID(training_id),
)
await r.hset(state_key, mapping={"status": "running", "progress": "0", "message": "starting"})
workdir = Path(settings.runner_tmp_dir) / training_id
artifacts_prefix = f"models/{tr['model_id']}/{tr['version']}/{tr['patch']}"
error: Optional[str] = None
samples: list[tuple[float, float]] = []
try:
workdir.mkdir(parents=True, exist_ok=True)
await _emit(stream_key, {"type": "log", "level": "info", "message": "cloning repo"})
await _clone_repo(model["gitea_repo"], tr["patch"], workdir / "repo")
await _emit(stream_key, {"type": "log", "level": "info", "message": "parsing model.yml"})
spec = await fetch_and_parse_spec(model["gitea_repo"], tr["patch"]) or {}
train_spec = spec.get("train", {})
entrypoint = train_spec.get("entrypoint") or "python -m src.train"
resources = spec.get("resources", {}) or {}
await _emit(stream_key, {"type": "log", "level": "info", "message": "downloading dataset"})
dataset_path = await _download_dataset(str(tr["dataset_id"]), workdir)
out_dir = workdir / "out"
out_dir.mkdir(exist_ok=True)
# docker run
dc = _docker_client()
await _emit(stream_key, {"type": "log", "level": "info", "message": "starting container"})
container = dc.containers.run(
settings.runner_image,
command=["sh", "-c", f"cd /workdir/repo && pip install -q -r requirements.txt 2>&1 || true && {entrypoint}"],
detach=True,
working_dir="/workdir/repo",
environment={
"MEB_DATASET_PATH": f"/workdir/{Path(dataset_path).name}",
"MEB_ARTIFACTS_DIR": "/workdir/out",
"MEB_TRAINING_ID": training_id,
},
volumes={str(workdir): {"bind": "/workdir", "mode": "rw"}},
network_mode="none",
mem_limit=f"{int(resources.get('mem_mb', 2048))}m",
nano_cpus=int(float(resources.get("cpu", 1)) * 1e9),
read_only=False,
tty=False,
detach_mode=None,
)
loop = asyncio.get_event_loop()
stop_evt = asyncio.Event()
stats_task = loop.run_in_executor(
None, _stats_loop_sync, container, training_id, str(tr["model_id"]), samples, stop_evt, loop
)
log_task = asyncio.create_task(
_stream_container_logs(container, training_id, str(tr["model_id"]), stream_key)
)
# attendi exit
exit_code = await loop.run_in_executor(None, lambda: container.wait()["StatusCode"])
stop_evt.set()
await log_task
try:
stats_task.cancel()
except Exception:
pass
if exit_code != 0:
error = f"container exited with code {exit_code}"
# raccogli outputs
results: dict = {}
final_metrics_path = out_dir / "metrics.json"
if final_metrics_path.exists():
try:
results = json.loads(final_metrics_path.read_text())
except Exception:
results = {"raw": final_metrics_path.read_text()[:10000]}
# upload artefatti (tutta la cartella out/)
for p in out_dir.rglob("*"):
if p.is_file():
rel = p.relative_to(out_dir).as_posix()
key = f"{artifacts_prefix}/{rel}"
minio_client.put_bytes(key, p.read_bytes())
# upload logs jsonl dallo stream redis (copia su minio per persistenza)
try:
entries = await r.xrange(stream_key, min="-", max="+")
lines = "\n".join(json.dumps({"id": i, **({"payload": json.loads(f.get("payload", "{}"))} if "payload" in f else f)}) for i, f in entries)
minio_client.put_bytes(f"trainings/{training_id}/logs.jsonl", lines.encode("utf-8"), "application/x-ndjson")
except Exception as e:
log.warning("log archive failed: %s", e)
cpu_avg = sum(s[0] for s in samples) / len(samples) if samples else 0.0
cpu_peak = max((s[0] for s in samples), default=0.0)
mem_avg = sum(s[1] for s in samples) / len(samples) if samples else 0.0
mem_peak = max((s[1] for s in samples), default=0.0)
resource_summary = {
"cpu_avg": round(cpu_avg, 2),
"cpu_peak": round(cpu_peak, 2),
"mem_avg_mb": round(mem_avg, 2),
"mem_peak_mb": round(mem_peak, 2),
"samples": len(samples),
}
status = "failed" if error else "succeeded"
await db.execute(
"""
UPDATE trainings SET
status=$2,
finished_at=NOW(),
duration_ms=EXTRACT(EPOCH FROM (NOW() - started_at))*1000,
artifacts_prefix=$3,
results=$4::jsonb,
resource_summary=$5::jsonb,
error=$6
WHERE id=$1
""",
uuid.UUID(training_id),
status,
artifacts_prefix,
json.dumps(results),
json.dumps(resource_summary),
error,
)
await r.hset(state_key, mapping={"status": status, "progress": "100", "message": error or "done"})
await _emit(stream_key, {"type": "end", "status": status, "error": error})
# Flush dei punti Influx accumulati durante il training (batched).
await influx_client.flush()
try:
container.remove(force=True)
except Exception:
pass
except Exception as e:
log.exception("training %s failed: %s", training_id, e)
await db.execute(
"UPDATE trainings SET status='failed', finished_at=NOW(), error=$2 WHERE id=$1",
uuid.UUID(training_id), str(e)[:1000],
)
await r.hset(state_key, mapping={"status": "failed", "message": str(e)[:200]})
await _emit(stream_key, {"type": "end", "status": "failed", "error": str(e)[:400]})
finally:
# cleanup workdir
try:
shutil.rmtree(workdir, ignore_errors=True)
except Exception:
pass
async def run_test_once(training_id: str, inputs: dict) -> dict:
"""Esegue una singola predizione via container spawn."""
tr = await db.fetchrow(
"SELECT t.*, m.gitea_repo FROM trainings t JOIN models m ON t.model_id = m.id WHERE t.id=$1",
uuid.UUID(training_id),
)
if not tr:
raise RuntimeError("training not found")
spec = await fetch_and_parse_spec(tr["gitea_repo"], tr["patch"]) or {}
test_spec = spec.get("test") or {}
entrypoint = test_spec.get("entrypoint") or "python -m src.predict"
workdir = Path(settings.runner_tmp_dir) / f"test-{uuid.uuid4()}"
workdir.mkdir(parents=True, exist_ok=True)
try:
await _clone_repo(tr["gitea_repo"], tr["patch"], workdir / "repo")
# scarica artefatti
if tr["artifacts_prefix"]:
art_dir = workdir / "artifacts"
art_dir.mkdir(exist_ok=True)
for obj in minio_client.list_prefix(tr["artifacts_prefix"] + "/"):
rel = obj["name"][len(tr["artifacts_prefix"]) + 1:]
out_path = art_dir / rel
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_bytes(minio_client.get_bytes(obj["name"]))
dc = _docker_client()
payload = json.dumps({"inputs": inputs}).encode()
container = dc.containers.run(
settings.runner_image,
command=["sh", "-c", f"cd /workdir/repo && pip install -q -r requirements.txt 2>&1 >/dev/null || true && {entrypoint}"],
detach=True,
working_dir="/workdir/repo",
environment={
"MEB_ARTIFACTS_DIR": "/workdir/artifacts",
"MEB_TRAINING_ID": training_id,
},
volumes={str(workdir): {"bind": "/workdir", "mode": "ro"}},
network_mode="none",
mem_limit="2048m",
nano_cpus=int(1e9),
stdin_open=True,
tty=False,
)
# scrivi input su stdin via attach socket
sock = container.attach_socket(params={"stdin": 1, "stream": 1})
try:
sock._sock.sendall(payload + b"\n")
except Exception:
pass
try:
sock.close()
except Exception:
pass
loop = asyncio.get_event_loop()
# stats peak
peak_cpu = 0.0
peak_mem = 0.0
stop = False
def _stats():
nonlocal peak_cpu, peak_mem, stop
for st in container.stats(stream=True, decode=True):
if stop:
return
try:
cpu_delta = st["cpu_stats"]["cpu_usage"]["total_usage"] - st["precpu_stats"]["cpu_usage"]["total_usage"]
sys_delta = st["cpu_stats"].get("system_cpu_usage", 0) - st["precpu_stats"].get("system_cpu_usage", 0)
online = st["cpu_stats"].get("online_cpus") or 1
cpu_pct = (cpu_delta / sys_delta) * online * 100 if sys_delta > 0 else 0
mem_mb = (st["memory_stats"].get("usage") or 0) / (1024 * 1024)
peak_cpu = max(peak_cpu, cpu_pct)
peak_mem = max(peak_mem, mem_mb)
except Exception:
pass
stats_fut = loop.run_in_executor(None, _stats)
exit_info = await loop.run_in_executor(None, container.wait)
stop = True
logs = container.logs(stdout=True, stderr=False).decode("utf-8", errors="replace")
try:
container.remove(force=True)
except Exception:
pass
outputs: dict = {}
for line in logs.strip().splitlines():
line = line.strip()
if line.startswith("{") and line.endswith("}"):
try:
obj = json.loads(line)
if "outputs" in obj:
outputs = obj["outputs"]
break
except json.JSONDecodeError:
continue
return {
"outputs": outputs,
"exit_code": exit_info.get("StatusCode"),
"cpu_peak": round(peak_cpu, 2),
"mem_peak_mb": round(peak_mem, 2),
"raw_log": logs[-2000:],
}
finally:
shutil.rmtree(workdir, ignore_errors=True)