/** * Rulesets manager (plugin side) — formato v2. * * Gestisce 5 tipi: logs | forecast_current | forecast_hourly | marine_current | marine_hourly * * Formato item (v2): * { path, meta: { name, unit, decimals? }, olds: [], enabled: true } * * Fonti: * 1. cache locale (data/rulesets.json) — sopravvive ai restart * 2. GET HTTP {API_URL}/rulesets//active — bootstrap o riconciliazione * 3. push WS realtime ({_t:'ruleset_update'}) — runtime updates * * Dopo applyRemote(type, ruleset): * - emette 'update' (type, new, prev) * - emette 'update:' (new, prev) * - invia ack al server (via realtime control message ruleset_ack) * * I consumer (logs.local, openmeteo) si sottoscrivono per applicare il cambio. * Quando arriva un update di tipo 'logs', il consumer triggera un session_reset * (cosi' Influx separa storico vecchio da nuovo schema). */ const fs = require('fs'); const fsp = require('fs').promises; const path = require('path'); const EventEmitter = require('events'); const { LOG_PATHS, FORECAST_CURRENT, FORECAST_HOURLY, MARINE_CURRENT, MARINE_HOURLY } = require('../rules'); const TYPES = ['logs', 'forecast_current', 'forecast_hourly', 'marine_current', 'marine_hourly']; const cacheFile = path.join(__dirname, '../../data/rulesets.json'); const emitter = new EventEmitter(); const API_URL = process.env.API_URL; // state: { type -> { id, version: {major,minor,patch,str}, content: [items], _default } } let state = {}; // =============== DEFAULTS (fallback iniziale, no DB / no cache) =============== /** Mapping legacy openmeteo code → SK path */ const LEGACY_SK_MAP = { 'temperature_2m': 'meb.forecast.temperature', 'wind_speed_10m': 'meb.forecast.wind.speed', 'wind_direction_10m': 'meb.forecast.wind.direction', 'wind_gusts_10m': 'meb.forecast.wind.gusts', 'precipitation': 'meb.forecast.precipitation', 'rain': 'meb.forecast.rain', 'relative_humidity_2m': 'meb.forecast.humidity', 'pressure_msl': 'meb.forecast.pressure', 'precipitation_probability': 'meb.forecast.precipitationProbability', 'cloud_cover': 'meb.forecast.cloudCover', 'wave_height': 'meb.waves.height', 'wave_direction': 'meb.waves.direction', 'wave_period': 'meb.waves.period', 'wave_peak_period': 'meb.waves.peakPeriod', 'ocean_current_velocity': 'meb.waves.currentVelocity', 'ocean_current_direction': 'meb.waves.currentDirection', }; function defaultItem(p, meta = {}) { return { path: p, meta, olds: [], enabled: true }; } function buildDefaults() { const mk = (items) => ({ id: null, version: { major: 1, minor: 0, patch: 0, str: '1.0.0' }, description: 'default', content: items, _default: true, }); return { logs: mk(LOG_PATHS.map(p => defaultItem(p, {}))), forecast_current: mk(FORECAST_CURRENT.map(c => defaultItem(c, { sk_path: LEGACY_SK_MAP[c] }))), forecast_hourly: mk(FORECAST_HOURLY.map(c => defaultItem(c, { sk_path: LEGACY_SK_MAP[c] }))), marine_current: mk(MARINE_CURRENT.map(c => defaultItem(c, { sk_path: LEGACY_SK_MAP[c] }))), marine_hourly: mk(MARINE_HOURLY.map(c => defaultItem(c, { sk_path: LEGACY_SK_MAP[c] }))), }; } // =============== CACHE I/O =============== async function ensureDir() { try { await fsp.mkdir(path.dirname(cacheFile), { recursive: true }); } catch {} } async function loadCache() { try { const raw = await fsp.readFile(cacheFile, 'utf-8'); const parsed = JSON.parse(raw); if (parsed && typeof parsed === 'object') return parsed; } catch {} return {}; } async function saveCache() { try { await ensureDir(); await fsp.writeFile(cacheFile, JSON.stringify(state, null, 2)); } catch (err) { console.warn('[RULESETS] save cache failed:', err.message); } } // =============== INIT + BOOTSTRAP =============== /** * Init: carica cache → applica defaults sui type mancanti. Non emette eventi. * Successivamente `bootstrapFromServer()` chiama gli endpoint per riconciliare. */ async function init() { const cached = await loadCache(); const defaults = buildDefaults(); state = {}; for (const t of TYPES) state[t] = cached[t] || defaults[t]; console.log('[RULESETS] init:', TYPES.map(t => `${t}@v${state[t]?.version?.str || '?'}${state[t]?._default ? '(d)' : ''}`).join(' ')); } /** * Tenta di scaricare la versione attiva di ogni tipo dal server. * Se ottenuta e nuova, la applica (emette gli eventi). */ async function bootstrapFromServer() { if (!API_URL) { console.warn('[RULESETS] API_URL non configurato, salto bootstrap'); return; } for (const type of TYPES) { try { const r = await fetch(`${API_URL}/rulesets/${type}/active`, { signal: AbortSignal.timeout(8000) }); if (!r.ok) { if (r.status !== 404) console.warn(`[RULESETS] ${type} bootstrap ${r.status}`); continue; } const remote = await r.json(); // formato server: { id, type, version:{...,str}, content:[...] } await applyRemote(type, remote, { source: 'bootstrap' }); } catch (err) { console.warn(`[RULESETS] bootstrap ${type} err:`, err.message); } } } // =============== ACCESSORS =============== function get(type) { return state[type] || null; } function getEnabledItems(type) { const rs = state[type]; if (!rs) return []; return (rs.content || []).filter(it => it.enabled !== false); } function getEnabledPaths(type) { return getEnabledItems(type).map(it => it.path).filter(Boolean); } function getPathMap(type) { const map = {}; for (const it of getEnabledItems(type)) { if (it.path && it.meta?.sk_path) map[it.path] = it.meta.sk_path; } return map; } function getMetaForPath(type, p) { return (state[type]?.content || []).find(it => it.path === p)?.meta || null; } function versionStr(type) { return state[type]?.version?.str || null; } function rulesetId(type) { return state[type]?.id || null; } // =============== APPLY =============== /** * Applica un ruleset ricevuto. Salva cache, emette update events, * e (se non e' bootstrap) manda ack al server. * * Idempotente per versione: ignora payload con version <= corrente. */ async function applyRemote(type, ruleset, { source = 'push' } = {}) { if (!TYPES.includes(type)) { console.warn(`[RULESETS] tipo sconosciuto: ${type}`); return false; } if (!ruleset || !Array.isArray(ruleset.content)) { console.warn(`[RULESETS] ruleset invalido per ${type}`); return false; } const prev = state[type]; // dedup per id+version: scarta replays if (prev && !prev._default && ruleset.id && prev.id === ruleset.id && prev.version?.str === ruleset.version?.str) { return false; } state[type] = { id: ruleset.id, version: ruleset.version, description: ruleset.description, content: ruleset.content, _default: false, }; await saveCache(); const prevV = prev?.version?.str || '∅'; const newV = ruleset.version?.str || '?'; console.log(`[RULESETS] ${type} ${prevV} → ${newV} (${ruleset.content.length} items, src=${source})`); emitter.emit('update', type, state[type], prev); emitter.emit(`update:${type}`, state[type], prev); // ack al server (skipped on bootstrap to avoid loop) if (source !== 'bootstrap' && ruleset.id) { try { const realtime = require('./realtime/core'); realtime.sendRaw({ _t: 'ruleset_ack', type, ruleset_id: ruleset.id, version: ruleset.version?.str || '?', }); } catch (err) { console.warn('[RULESETS] ack send failed:', err.message); } } return true; } function onUpdate(listener) { emitter.on('update', listener); return () => emitter.off('update', listener); } function onUpdateOf(type, listener) { emitter.on(`update:${type}`, listener); return () => emitter.off(`update:${type}`, listener); } module.exports = { TYPES, init, bootstrapFromServer, get, getEnabledItems, getEnabledPaths, getPathMap, getMetaForPath, versionStr, rulesetId, applyRemote, onUpdate, onUpdateOf, };