"""Worker loop: BRPOP da ml:queue:train e dispatch al docker_runner. Parte N task asincroni concorrenti (settings.train_concurrency). """ from __future__ import annotations import asyncio import logging from core import redis_client from core.config import settings from core.docker_runner import run_training_job log = logging.getLogger(__name__) _tasks: list[asyncio.Task] = [] async def _worker_loop(idx: int): r = redis_client.client() log.info("ml worker[%d] started", idx) while True: try: res = await r.brpop("ml:queue:train", timeout=10) except Exception as e: log.warning("brpop error: %s", e) await asyncio.sleep(2) continue if res is None: continue _, training_id = res log.info("worker[%d] picked training %s", idx, training_id) try: await run_training_job(training_id) except Exception: log.exception("worker[%d] training %s crashed", idx, training_id) def start_workers() -> None: global _tasks n = max(1, settings.train_concurrency) for i in range(n): _tasks.append(asyncio.create_task(_worker_loop(i))) async def stop_workers() -> None: for t in _tasks: t.cancel() for t in _tasks: try: await t except Exception: pass _tasks.clear()