"""Runner Docker per train e test. train: - clone repo Gitea @ sha - prepara workdir /var/ml/tmp/{training_id} - scarica dataset da MinIO in workdir/data. - docker run meb-ml-runner con mount tmp, env, limits da model.yml - legge stdout JSON → Redis stream + Influx; docker stats ogni 5s - a fine: collect outputs, upload su MinIO prefix artifacts_prefix - UPDATE trainings test: - analogo ma sincrono, stdin JSON → stdout JSON """ from __future__ import annotations import asyncio import json import logging import os import shutil import subprocess import time import uuid from pathlib import Path from typing import Any, Optional import docker from influxdb_client import Point from core import db, gitea, influx_client, minio_client, redis_client from core.config import settings from core.model_spec import fetch_and_parse_spec log = logging.getLogger(__name__) _docker = None def _docker_client(): global _docker if _docker is None: _docker = docker.from_env() return _docker async def _emit(stream_key: str, payload: dict) -> None: try: await redis_client.client().xadd(stream_key, {"payload": json.dumps(payload)}, maxlen=10_000) except Exception as e: log.warning("xadd failed: %s", e) async def _clone_repo(owner_repo: str, sha: str, dest: Path) -> None: dest.mkdir(parents=True, exist_ok=True) url = gitea.clone_url(owner_repo) # clone shallow del branch/sha specifico # per evitare leak del token nei log, logghiamo solo host proc = await asyncio.create_subprocess_exec( "git", "clone", "--depth", "50", url, str(dest), stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) _, err = await proc.communicate() if proc.returncode != 0: raise RuntimeError(f"git clone failed: {err.decode(errors='replace')[:400]}") # checkout sha proc = await asyncio.create_subprocess_exec( "git", "-C", str(dest), "checkout", sha, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) _, err = await proc.communicate() if proc.returncode != 0: raise RuntimeError(f"git checkout failed: {err.decode(errors='replace')[:400]}") async def _download_dataset(dataset_id: str, dest: Path) -> str: row = await db.fetchrow( "SELECT file_key, format FROM datasets WHERE id = $1", uuid.UUID(dataset_id) ) if not row: raise RuntimeError("dataset not found") data = minio_client.get_bytes(row["file_key"], bucket="ml.datasets") ext = {"csv": "csv", "json": "json", "netcdf": "nc"}.get(row["format"], "bin") out = dest / f"data.{ext}" out.write_bytes(data) return str(out) def _stats_loop_sync(container, training_id: str, model_id: str, samples: list, stop_evt: asyncio.Event, loop: asyncio.AbstractEventLoop): """Sincrono, eseguito in thread. Ogni 5s legge docker stats → Influx + samples.""" while not stop_evt.is_set(): try: stats = container.stats(stream=False) # CPU% cpu_delta = stats["cpu_stats"]["cpu_usage"]["total_usage"] - stats["precpu_stats"]["cpu_usage"]["total_usage"] sys_delta = stats["cpu_stats"].get("system_cpu_usage", 0) - stats["precpu_stats"].get("system_cpu_usage", 0) online = stats["cpu_stats"].get("online_cpus") or len(stats["cpu_stats"]["cpu_usage"].get("percpu_usage") or [1]) cpu_pct = (cpu_delta / sys_delta) * online * 100.0 if sys_delta > 0 else 0.0 mem_mb = (stats["memory_stats"].get("usage") or 0) / (1024 * 1024) samples.append((cpu_pct, mem_mb)) point = ( Point("ml_training") .tag("training_id", training_id) .tag("model_id", model_id) .field("cpu_pct", float(cpu_pct)) .field("mem_mb", float(mem_mb)) ) asyncio.run_coroutine_threadsafe(influx_client.write_points([point]), loop) except Exception as e: log.warning("stats loop error: %s", e) time.sleep(5) async def _stream_container_logs(container, training_id: str, model_id: str, stream_key: str): """Legge stdout del container, pubblica righe JSON su Redis stream e Influx.""" def _iter(): return container.logs(stream=True, follow=True, stdout=True, stderr=True) loop = asyncio.get_event_loop() it = await loop.run_in_executor(None, _iter) while True: line = await loop.run_in_executor(None, next, it, None) if line is None: break try: text = line.decode("utf-8", errors="replace").rstrip("\n") except Exception: continue if not text: continue # righe non-JSON → log payload: dict if text.startswith("{") and text.endswith("}"): try: payload = json.loads(text) except json.JSONDecodeError: payload = {"type": "log", "level": "info", "message": text} else: payload = {"type": "log", "level": "info", "message": text} await _emit(stream_key, payload) if payload.get("type") == "metric": p = Point("ml_training").tag("training_id", training_id).tag("model_id", model_id) for k, v in payload.items(): if k == "type": continue if isinstance(v, (int, float)): p = p.field(k, float(v)) try: await influx_client.write_points([p]) except Exception as e: log.warning("influx write metric failed: %s", e) async def run_training_job(training_id: str) -> None: """Esegue un job di training end-to-end. Aggiorna Postgres e Redis state.""" r = redis_client.client() state_key = f"ml:train:{training_id}" stream_key = f"ml:train:{training_id}:events" tr = await db.fetchrow("SELECT * FROM trainings WHERE id = $1", uuid.UUID(training_id)) if not tr: log.error("training %s not found", training_id) return model = await db.fetchrow("SELECT * FROM models WHERE id = $1", tr["model_id"]) if not model: await db.execute( "UPDATE trainings SET status='failed', error=$2 WHERE id=$1", uuid.UUID(training_id), "model not found", ) return await db.execute( "UPDATE trainings SET status='running', started_at=NOW() WHERE id=$1", uuid.UUID(training_id), ) await r.hset(state_key, mapping={"status": "running", "progress": "0", "message": "starting"}) workdir = Path(settings.runner_tmp_dir) / training_id artifacts_prefix = f"models/{tr['model_id']}/{tr['version']}/{tr['patch']}" error: Optional[str] = None samples: list[tuple[float, float]] = [] try: workdir.mkdir(parents=True, exist_ok=True) await _emit(stream_key, {"type": "log", "level": "info", "message": "cloning repo"}) await _clone_repo(model["gitea_repo"], tr["patch"], workdir / "repo") await _emit(stream_key, {"type": "log", "level": "info", "message": "parsing model.yml"}) spec = await fetch_and_parse_spec(model["gitea_repo"], tr["patch"]) or {} train_spec = spec.get("train", {}) entrypoint = train_spec.get("entrypoint") or "python -m src.train" resources = spec.get("resources", {}) or {} await _emit(stream_key, {"type": "log", "level": "info", "message": "downloading dataset"}) dataset_path = await _download_dataset(str(tr["dataset_id"]), workdir) out_dir = workdir / "out" out_dir.mkdir(exist_ok=True) # docker run dc = _docker_client() await _emit(stream_key, {"type": "log", "level": "info", "message": "starting container"}) container = dc.containers.run( settings.runner_image, command=["sh", "-c", f"cd /workdir/repo && pip install -q -r requirements.txt 2>&1 || true && {entrypoint}"], detach=True, working_dir="/workdir/repo", environment={ "MEB_DATASET_PATH": f"/workdir/{Path(dataset_path).name}", "MEB_ARTIFACTS_DIR": "/workdir/out", "MEB_TRAINING_ID": training_id, }, volumes={str(workdir): {"bind": "/workdir", "mode": "rw"}}, network_mode="none", mem_limit=f"{int(resources.get('mem_mb', 2048))}m", nano_cpus=int(float(resources.get("cpu", 1)) * 1e9), read_only=False, tty=False, detach_mode=None, ) loop = asyncio.get_event_loop() stop_evt = asyncio.Event() stats_task = loop.run_in_executor( None, _stats_loop_sync, container, training_id, str(tr["model_id"]), samples, stop_evt, loop ) log_task = asyncio.create_task( _stream_container_logs(container, training_id, str(tr["model_id"]), stream_key) ) # attendi exit exit_code = await loop.run_in_executor(None, lambda: container.wait()["StatusCode"]) stop_evt.set() await log_task try: stats_task.cancel() except Exception: pass if exit_code != 0: error = f"container exited with code {exit_code}" # raccogli outputs results: dict = {} final_metrics_path = out_dir / "metrics.json" if final_metrics_path.exists(): try: results = json.loads(final_metrics_path.read_text()) except Exception: results = {"raw": final_metrics_path.read_text()[:10000]} # upload artefatti (tutta la cartella out/) for p in out_dir.rglob("*"): if p.is_file(): rel = p.relative_to(out_dir).as_posix() key = f"{artifacts_prefix}/{rel}" minio_client.put_bytes(key, p.read_bytes()) # upload logs jsonl dallo stream redis (copia su minio per persistenza) try: entries = await r.xrange(stream_key, min="-", max="+") lines = "\n".join(json.dumps({"id": i, **({"payload": json.loads(f.get("payload", "{}"))} if "payload" in f else f)}) for i, f in entries) minio_client.put_bytes(f"trainings/{training_id}/logs.jsonl", lines.encode("utf-8"), "application/x-ndjson") except Exception as e: log.warning("log archive failed: %s", e) cpu_avg = sum(s[0] for s in samples) / len(samples) if samples else 0.0 cpu_peak = max((s[0] for s in samples), default=0.0) mem_avg = sum(s[1] for s in samples) / len(samples) if samples else 0.0 mem_peak = max((s[1] for s in samples), default=0.0) resource_summary = { "cpu_avg": round(cpu_avg, 2), "cpu_peak": round(cpu_peak, 2), "mem_avg_mb": round(mem_avg, 2), "mem_peak_mb": round(mem_peak, 2), "samples": len(samples), } status = "failed" if error else "succeeded" await db.execute( """ UPDATE trainings SET status=$2, finished_at=NOW(), duration_ms=EXTRACT(EPOCH FROM (NOW() - started_at))*1000, artifacts_prefix=$3, results=$4::jsonb, resource_summary=$5::jsonb, error=$6 WHERE id=$1 """, uuid.UUID(training_id), status, artifacts_prefix, json.dumps(results), json.dumps(resource_summary), error, ) await r.hset(state_key, mapping={"status": status, "progress": "100", "message": error or "done"}) await _emit(stream_key, {"type": "end", "status": status, "error": error}) # Flush dei punti Influx accumulati durante il training (batched). await influx_client.flush() try: container.remove(force=True) except Exception: pass except Exception as e: log.exception("training %s failed: %s", training_id, e) await db.execute( "UPDATE trainings SET status='failed', finished_at=NOW(), error=$2 WHERE id=$1", uuid.UUID(training_id), str(e)[:1000], ) await r.hset(state_key, mapping={"status": "failed", "message": str(e)[:200]}) await _emit(stream_key, {"type": "end", "status": "failed", "error": str(e)[:400]}) finally: # cleanup workdir try: shutil.rmtree(workdir, ignore_errors=True) except Exception: pass async def run_test_once(training_id: str, inputs: dict) -> dict: """Esegue una singola predizione via container spawn.""" tr = await db.fetchrow( "SELECT t.*, m.gitea_repo FROM trainings t JOIN models m ON t.model_id = m.id WHERE t.id=$1", uuid.UUID(training_id), ) if not tr: raise RuntimeError("training not found") spec = await fetch_and_parse_spec(tr["gitea_repo"], tr["patch"]) or {} test_spec = spec.get("test") or {} entrypoint = test_spec.get("entrypoint") or "python -m src.predict" workdir = Path(settings.runner_tmp_dir) / f"test-{uuid.uuid4()}" workdir.mkdir(parents=True, exist_ok=True) try: await _clone_repo(tr["gitea_repo"], tr["patch"], workdir / "repo") # scarica artefatti if tr["artifacts_prefix"]: art_dir = workdir / "artifacts" art_dir.mkdir(exist_ok=True) for obj in minio_client.list_prefix(tr["artifacts_prefix"] + "/"): rel = obj["name"][len(tr["artifacts_prefix"]) + 1:] out_path = art_dir / rel out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_bytes(minio_client.get_bytes(obj["name"])) dc = _docker_client() payload = json.dumps({"inputs": inputs}).encode() container = dc.containers.run( settings.runner_image, command=["sh", "-c", f"cd /workdir/repo && pip install -q -r requirements.txt 2>&1 >/dev/null || true && {entrypoint}"], detach=True, working_dir="/workdir/repo", environment={ "MEB_ARTIFACTS_DIR": "/workdir/artifacts", "MEB_TRAINING_ID": training_id, }, volumes={str(workdir): {"bind": "/workdir", "mode": "ro"}}, network_mode="none", mem_limit="2048m", nano_cpus=int(1e9), stdin_open=True, tty=False, ) # scrivi input su stdin via attach socket sock = container.attach_socket(params={"stdin": 1, "stream": 1}) try: sock._sock.sendall(payload + b"\n") except Exception: pass try: sock.close() except Exception: pass loop = asyncio.get_event_loop() # stats peak peak_cpu = 0.0 peak_mem = 0.0 stop = False def _stats(): nonlocal peak_cpu, peak_mem, stop for st in container.stats(stream=True, decode=True): if stop: return try: cpu_delta = st["cpu_stats"]["cpu_usage"]["total_usage"] - st["precpu_stats"]["cpu_usage"]["total_usage"] sys_delta = st["cpu_stats"].get("system_cpu_usage", 0) - st["precpu_stats"].get("system_cpu_usage", 0) online = st["cpu_stats"].get("online_cpus") or 1 cpu_pct = (cpu_delta / sys_delta) * online * 100 if sys_delta > 0 else 0 mem_mb = (st["memory_stats"].get("usage") or 0) / (1024 * 1024) peak_cpu = max(peak_cpu, cpu_pct) peak_mem = max(peak_mem, mem_mb) except Exception: pass stats_fut = loop.run_in_executor(None, _stats) exit_info = await loop.run_in_executor(None, container.wait) stop = True logs = container.logs(stdout=True, stderr=False).decode("utf-8", errors="replace") try: container.remove(force=True) except Exception: pass outputs: dict = {} for line in logs.strip().splitlines(): line = line.strip() if line.startswith("{") and line.endswith("}"): try: obj = json.loads(line) if "outputs" in obj: outputs = obj["outputs"] break except json.JSONDecodeError: continue return { "outputs": outputs, "exit_code": exit_info.get("StatusCode"), "cpu_peak": round(peak_cpu, 2), "mem_peak_mb": round(peak_mem, 2), "raw_log": logs[-2000:], } finally: shutil.rmtree(workdir, ignore_errors=True)