feat: Implement rulesets and layout management for kiosk plugin

- Added rulesets manager to handle various data types and updates via HTTP and WebSocket.
- Introduced layout store for managing kiosk layouts with caching and server synchronization.
- Enhanced dashboard and data routes to support new layout and ruleset features.
- Updated kiosk HTML and JavaScript to utilize new layout rendering and data binding.
- Removed obsolete map route and integrated map functionality into the new tile renderer.
- Improved Telegram commands to reflect changes in data structure and logging.
- Refactored weather fetching intervals to prevent multiple instances.
- Added SSE stream for real-time layout updates in the kiosk.
This commit is contained in:
Giuseppe Raffa
2026-05-12 10:17:54 +02:00
parent bb8d267cd4
commit c2c1598226
27 changed files with 1061 additions and 326 deletions

View File

@@ -1,250 +1,158 @@
/**
* Logger locale + invio realtime.
*
* Scrive una riga ogni secondo in un CSV locale (1 sessione = 1 file) e
* contemporaneamente invia i field via WebSocket al realtime per InfluxDB.
*
* Path-list dinamica: i path letti ad ogni tick vengono presi dal ruleset
* 'logs' (cores/rulesets). Quando il ruleset cambia (push WS o cambio remoto):
* - la sessione corrente viene chiusa (CSV finalizzato)
* - si manda `session_reset` al server (nuovo session_id su Influx)
* - si apre un nuovo CSV con i nuovi path
* - lo storico Influx resta coerente: tag `session` cambia, tag
* `ruleset_version` riflette la nuova versione applicata.
*/
const fs = require('fs').promises;
const fsSync = require('fs');
const pth = require('path');
const os = require('os');
const skFlow = require('../config/skFlow');
const realtime = require('./realtime/core');
const rulesets = require('./rulesets');
const logsDirectory = pth.join(__dirname, '../../data/logs/');
// Intervallo di scrittura fisso: 1 secondo
const WRITE_INTERVAL = 1000;
// Stato della sessione attiva
let session = null;
let writeInterval = null;
// Paths da registrare (impostati da init)
let logPaths = [];
/**
* Risolutori speciali per path che non sono direttamente accessibili via skFlow.get().
* Per ogni path speciale, definisce una funzione che restituisce il valore.
*/
const SPECIAL_RESOLVERS = {
'navigation.position.latitude': () => {
const pos = skFlow.get('navigation.position');
return pos?.latitude ?? null;
},
'navigation.position.longitude': () => {
const pos = skFlow.get('navigation.position');
return pos?.longitude ?? null;
},
'system.uptime': () => Math.floor(process.uptime())
'navigation.position.latitude': () => skFlow.get('navigation.position')?.latitude ?? null,
'navigation.position.longitude': () => skFlow.get('navigation.position')?.longitude ?? null,
'system.uptime': () => Math.floor(process.uptime()),
};
function resolveValue(p) {
if (SPECIAL_RESOLVERS[p]) return SPECIAL_RESOLVERS[p]();
return skFlow.get(p);
}
/**
* Risolve il valore di un path, gestendo i casi speciali.
* @param {String} path - il path da risolvere
* @returns {*} il valore
* Init: prende i path iniziali dal ruleset attivo e si sottoscrive ai cambi.
* @param {Array<String>} [fallback] - opzionale, lista path se non c'e' ruleset
*/
function resolveValue(path) {
// Controlla se c'e un risolutore speciale
if (SPECIAL_RESOLVERS[path]) {
return SPECIAL_RESOLVERS[path]();
function init(fallback) {
logPaths = rulesets.getEnabledPaths('logs');
if ((!logPaths || logPaths.length === 0) && Array.isArray(fallback)) {
logPaths = fallback;
}
// Path standard: leggi dal databrowser Signal K
return skFlow.get(path);
console.log(`[LOGS] init: ${logPaths.length} path da ruleset v=${rulesets.versionStr('logs') || '?'}`);
rulesets.onUpdateOf('logs', async (next) => {
const newPaths = (next.content || []).filter(it => it.enabled !== false).map(it => it.path);
console.log(`[LOGS] ruleset update: ${newPaths.length} path → restart recording`);
const wasRecording = !!session;
try {
if (wasRecording) await stopRecording();
logPaths = newPaths;
if (realtime.isConnected()) realtime.sendRaw({ _t: 'session_reset' });
if (wasRecording) await startRecording();
} catch (err) {
console.error('[LOGS] restart on ruleset update failed:', err.message);
}
});
}
/**
* Inizializza i path da registrare.
* @param {Array<String>} paths - array di path da rules.js LOG_PATHS
*/
function init(paths) {
logPaths = paths;
console.log(`[LOGS] Inizializzati ${paths.length} path`);
}
/**
* Assicura che la cartella logs esista
*/
async function ensureDir() {
try {
await fs.mkdir(logsDirectory, { recursive: true });
} catch (e) {}
try { await fs.mkdir(logsDirectory, { recursive: true }); } catch (e) {}
}
/**
* Avvia la registrazione: crea un nuovo file CSV e inizia a scrivere ogni secondo.
* @param {String} name - nome del file (opzionale, default: data/ora corrente)
*/
async function startRecording(name) {
// Se c'e gia una sessione attiva, fermala prima
if (session) {
await stopRecording();
}
if (!name) {
const now = new Date();
name = now.toISOString().replace(/[:.]/g, '-');
}
if (logPaths.length === 0) {
console.warn('[LOGS] Nessun path configurato, la registrazione non catturera dati');
}
if (session) await stopRecording();
if (!name) name = new Date().toISOString().replace(/[:.]/g, '-');
if (logPaths.length === 0) console.warn('[LOGS] nessun path configurato');
await ensureDir();
// Header CSV: timestamp + tutti i path
const header = ['timestamp', ...logPaths].join(',') + '\n';
const filePath = pth.join(logsDirectory, `${name}.csv`);
await fs.writeFile(filePath, header);
session = {
name: name,
paths: logPaths,
startTime: new Date(),
elements: 0,
filePath: filePath
};
// Scrivi ogni secondo
writeInterval = setInterval(() => {
writeLog();
}, WRITE_INTERVAL);
console.log(`[LOGS] Registrazione avviata: ${name} (${logPaths.length} colonne, intervallo ${WRITE_INTERVAL}ms)`);
session = { name, paths: logPaths.slice(), startTime: new Date(), elements: 0, filePath };
writeInterval = setInterval(() => { writeLog(); }, WRITE_INTERVAL);
console.log(`[LOGS] started: ${name} (${logPaths.length} cols)`);
return session;
}
/**
* Scrive una riga nel CSV e invia i dati al server via WebSocket.
*/
async function writeLog() {
if (!session) return;
try {
const timestamp = new Date().toISOString();
// Risolvi tutti i valori
const fields = {};
const csvValues = [];
for (const path of session.paths) {
const val = resolveValue(path);
fields[path] = val;
// Formatta per CSV
if (val === null || val === undefined) {
csvValues.push('');
} else if (typeof val === 'object') {
csvValues.push(JSON.stringify(val).replace(/,/g, ';'));
} else {
csvValues.push(val);
}
for (const p of session.paths) {
const val = resolveValue(p);
fields[p] = val;
if (val === null || val === undefined) csvValues.push('');
else if (typeof val === 'object') csvValues.push(JSON.stringify(val).replace(/,/g, ';'));
else csvValues.push(val);
}
// Scrivi riga CSV nel file locale
const row = [timestamp, ...csvValues].join(',') + '\n';
await fs.appendFile(session.filePath, row);
session.elements++;
// Invia al server via WebSocket (se connesso)
if (realtime.isConnected()) {
realtime.send([Date.now(), 'logs', fields]);
}
} catch (error) {
console.error('[LOGS] Errore scrittura:', error.message);
if (realtime.isConnected()) realtime.send([Date.now(), 'logs', fields]);
} catch (err) {
console.error('[LOGS] write error:', err.message);
}
}
/**
* Interrompe la registrazione e chiude il file.
*/
async function stopRecording() {
if (writeInterval) {
clearInterval(writeInterval);
writeInterval = null;
}
if (writeInterval) { clearInterval(writeInterval); writeInterval = null; }
if (session) {
console.log(`[LOGS] Registrazione fermata: ${session.name} (${session.elements} righe)`);
console.log(`[LOGS] stopped ${session.name} (${session.elements} righe)`);
session = null;
}
}
/**
* Ottieni i dati del file come stringa CSV.
* @param {String} name - il nome del file (senza estensione)
* @returns {String|null}
*/
async function getLog(name) {
try {
const filePath = pth.join(logsDirectory, `${name}.csv`);
const content = await fs.readFile(filePath, 'utf-8');
return content;
} catch (error) {
console.error('[LOGS] Errore lettura log:', error.message);
return null;
}
try { return await fs.readFile(pth.join(logsDirectory, `${name}.csv`), 'utf-8'); }
catch (err) { console.error('[LOGS] read err:', err.message); return null; }
}
/**
* Ottieni il percorso del file CSV.
* @param {String} name - il nome del file (senza estensione)
* @returns {String|null}
*/
function getLogFile(name) {
const filePath = pth.join(logsDirectory, `${name}.csv`);
if (fsSync.existsSync(filePath)) {
return filePath;
}
return null;
const p = pth.join(logsDirectory, `${name}.csv`);
return fsSync.existsSync(p) ? p : null;
}
/**
* Ottieni la lista di tutti i file di log disponibili.
* @returns {Array}
*/
async function listLogs() {
await ensureDir();
try {
const files = await fs.readdir(logsDirectory);
const csvFiles = files.filter(f => f.endsWith('.csv'));
const result = [];
for (const file of csvFiles) {
const filePath = pth.join(logsDirectory, file);
const stat = await fs.stat(filePath);
result.push({
const csv = files.filter(f => f.endsWith('.csv'));
const out = [];
for (const file of csv) {
const p = pth.join(logsDirectory, file);
const stat = await fs.stat(p);
out.push({
name: file.replace('.csv', ''),
filename: file,
size: (stat.size / (1024 * 1024)).toFixed(2),
created: stat.birthtime,
modified: stat.mtime
modified: stat.mtime,
});
}
return result.sort((a, b) => b.modified - a.modified);
} catch (error) {
console.error('[LOGS] Errore lista log:', error.message);
return out.sort((a, b) => b.modified - a.modified);
} catch (err) {
console.error('[LOGS] list err:', err.message);
return [];
}
}
/**
* Ottieni informazioni sulla sessione di registrazione attiva.
* @returns {Object|null}
*/
function getSession() {
if (!session) return null;
return {
name: session.name,
paths: session.paths,
startTime: session.startTime,
elements: session.elements,
delay: WRITE_INTERVAL
name: session.name, paths: session.paths, startTime: session.startTime,
elements: session.elements, delay: WRITE_INTERVAL,
};
}
module.exports = {
init,
startRecording,
stopRecording,
getLog,
getLogFile,
getSession,
listLogs
};
module.exports = { init, startRecording, stopRecording, getLog, getLogFile, getSession, listLogs };

View File

@@ -1,41 +1,28 @@
const skFlow = require('../config/skFlow');
const realtimeCore = require('./realtime/core');
const {
FORECAST_CURRENT,
FORECAST_HOURLY,
MARINE_CURRENT,
MARINE_HOURLY
} = require('../rules');
const rulesets = require('./rulesets');
/**
* Helper: legge i codici openmeteo + il path SK destinato per i 4 tipi forecast/marine,
* leggendoli dal ruleset attivo. Fall back ai default in rules.js se il ruleset
* non e' ancora stato caricato.
*/
function paramsAndMap(type) {
const items = rulesets.getEnabledItems(type);
if (!items.length) return { codes: [], map: {} };
const codes = items.map(it => it.path).filter(Boolean);
const map = {};
for (const it of items) {
if (it.meta?.sk_path) map[it.path] = it.meta.sk_path;
}
return { codes, map };
}
const FETCH_TIMEOUT = 10000;
const FORECAST_API = 'https://api.open-meteo.com/v1/forecast';
const MARINE_API = 'https://marine-api.open-meteo.com/v1/marine';
/**
* Mapping da parametri API Open-Meteo a path Signal K.
* Questi path vengono pubblicati sul databrowser e letti dai log.
*/
const FORECAST_PATH_MAP = {
'temperature_2m': 'meb.forecasts.temperature',
'wind_speed_10m': 'meb.forecast.wind.speed',
'wind_direction_10m': 'meb.forecast.wind.direction',
'wind_gusts_10m': 'meb.forecast.wind.gusts',
'precipitation': 'meb.forecast.precipitation',
'rain': 'meb.forecast.rain',
'relative_humidity_2m': 'meb.forecast.humidity',
'pressure_msl': 'meb.forecast.pressure',
'precipitation_probability':'meb.forecast.precipitationProbability',
'cloud_cover': 'meb.forecast.cloudCover',
};
const MARINE_PATH_MAP = {
'wave_height': 'meb.waves.height',
'wave_direction': 'meb.waves.direction',
'wave_period': 'meb.waves.period',
'wave_peak_period': 'meb.waves.peakPeriod',
'ocean_current_velocity': 'meb.waves.currentVelocity',
'ocean_current_direction': 'meb.waves.currentDirection',
};
// I path map sono ora ottenuti dal ruleset (rulesets.getPathMap('forecast_*'/'marine_*'))
/**
* Fetch JSON con timeout
@@ -59,19 +46,20 @@ async function fetchJSON(url) {
*/
function publishCurrentToSignalK(forecastData, marineData) {
const skData = {};
const fMap = paramsAndMap('forecast_current').map;
const mMap = paramsAndMap('marine_current').map;
if (forecastData?.current) {
for (const [key, value] of Object.entries(forecastData.current)) {
if (key === 'time' || key === 'interval') continue;
const skPath = FORECAST_PATH_MAP[key];
const skPath = fMap[key];
if (skPath && value != null) skData[skPath] = value;
}
}
if (marineData?.current) {
for (const [key, value] of Object.entries(marineData.current)) {
if (key === 'time' || key === 'interval') continue;
const skPath = MARINE_PATH_MAP[key];
const skPath = mMap[key];
if (skPath && value != null) skData[skPath] = value;
}
}
@@ -79,7 +67,7 @@ function publishCurrentToSignalK(forecastData, marineData) {
if (Object.keys(skData).length > 0) {
skData['meb.weather.timestamp'] = Date.now();
skFlow.publish(skData);
console.log(`[OPENMETEO] Pubblicati ${Object.keys(skData).length} valori su Signal K`);
console.log(`[OPENMETEO] pubblicati ${Object.keys(skData).length} valori su Signal K`);
}
}
@@ -89,19 +77,20 @@ function publishCurrentToSignalK(forecastData, marineData) {
*/
function sendCurrentToRealtime(forecastData, marineData) {
const fields = {};
const fMap = paramsAndMap('forecast_current').map;
const mMap = paramsAndMap('marine_current').map;
if (forecastData?.current) {
for (const [key, value] of Object.entries(forecastData.current)) {
if (key === 'time' || key === 'interval') continue;
const skPath = FORECAST_PATH_MAP[key];
const skPath = fMap[key];
if (skPath && value != null) fields[skPath] = value;
}
}
if (marineData?.current) {
for (const [key, value] of Object.entries(marineData.current)) {
if (key === 'time' || key === 'interval') continue;
const skPath = MARINE_PATH_MAP[key];
const skPath = mMap[key];
if (skPath && value != null) fields[skPath] = value;
}
}
@@ -123,6 +112,8 @@ function sendForecastBatchToRealtime(forecastData, marineData) {
const times = forecastHourly?.time || marineHourly?.time;
const points = [];
const fMap = paramsAndMap('forecast_hourly').map;
const mMap = paramsAndMap('marine_hourly').map;
for (let i = 0; i < times.length; i++) {
const ts = new Date(times[i]).getTime();
@@ -131,15 +122,14 @@ function sendForecastBatchToRealtime(forecastData, marineData) {
if (forecastHourly) {
for (const [key, values] of Object.entries(forecastHourly)) {
if (key === 'time') continue;
const skPath = FORECAST_PATH_MAP[key];
const skPath = fMap[key];
if (skPath && values?.[i] != null) fields[skPath] = values[i];
}
}
if (marineHourly) {
for (const [key, values] of Object.entries(marineHourly)) {
if (key === 'time') continue;
const skPath = MARINE_PATH_MAP[key];
const skPath = mMap[key];
if (skPath && values?.[i] != null) fields[skPath] = values[i];
}
}
@@ -166,27 +156,30 @@ async function fetchCurrentWeather(location) {
return;
}
if (FORECAST_CURRENT.length === 0 && MARINE_CURRENT.length === 0) {
const forecastCurrent = paramsAndMap('forecast_current').codes;
const marineCurrent = paramsAndMap('marine_current').codes;
if (forecastCurrent.length === 0 && marineCurrent.length === 0) {
console.warn('[OPENMETEO] Nessun parametro current configurato');
return;
}
console.log(`[OPENMETEO] Fetch current — forecast: ${FORECAST_CURRENT.length} params, marine: ${MARINE_CURRENT.length} params`);
console.log(`[OPENMETEO] Fetch current — forecast:${forecastCurrent.length} marine:${marineCurrent.length}`);
let forecastData = null, marineData = null;
try {
const promises = [];
if (FORECAST_CURRENT.length > 0) {
const url = `${FORECAST_API}?latitude=${location.latitude}&longitude=${location.longitude}&current=${FORECAST_CURRENT.join(',')}`;
if (forecastCurrent.length > 0) {
const url = `${FORECAST_API}?latitude=${location.latitude}&longitude=${location.longitude}&current=${forecastCurrent.join(',')}`;
promises.push(fetchJSON(url).then(d => { forecastData = d; }).catch(e => {
console.error(`[OPENMETEO] Errore forecast current: ${e.message}`);
}));
}
if (MARINE_CURRENT.length > 0) {
const url = `${MARINE_API}?latitude=${location.latitude}&longitude=${location.longitude}&current=${MARINE_CURRENT.join(',')}&models=ecmwf_wam`;
if (marineCurrent.length > 0) {
const url = `${MARINE_API}?latitude=${location.latitude}&longitude=${location.longitude}&current=${marineCurrent.join(',')}&models=ecmwf_wam`;
promises.push(fetchJSON(url).then(d => { marineData = d; }).catch(e => {
console.error(`[OPENMETEO] Errore marine current: ${e.message}`);
}));
@@ -211,27 +204,30 @@ async function fetchHourlyForecasts(location) {
return;
}
if (FORECAST_HOURLY.length === 0 && MARINE_HOURLY.length === 0) {
const forecastHourly = paramsAndMap('forecast_hourly').codes;
const marineHourly = paramsAndMap('marine_hourly').codes;
if (forecastHourly.length === 0 && marineHourly.length === 0) {
console.warn('[OPENMETEO] Nessun parametro hourly configurato');
return;
}
console.log(`[OPENMETEO] Fetch hourly 7gg — forecast: ${FORECAST_HOURLY.length} params, marine: ${MARINE_HOURLY.length} params`);
console.log(`[OPENMETEO] Fetch hourly 7gg — forecast:${forecastHourly.length} marine:${marineHourly.length}`);
let forecastData = null, marineData = null;
try {
const promises = [];
if (FORECAST_HOURLY.length > 0) {
const url = `${FORECAST_API}?latitude=${location.latitude}&longitude=${location.longitude}&hourly=${FORECAST_HOURLY.join(',')}&forecast_days=7`;
if (forecastHourly.length > 0) {
const url = `${FORECAST_API}?latitude=${location.latitude}&longitude=${location.longitude}&hourly=${forecastHourly.join(',')}&forecast_days=7`;
promises.push(fetchJSON(url).then(d => { forecastData = d; }).catch(e => {
console.error(`[OPENMETEO] Errore forecast hourly: ${e.message}`);
}));
}
if (MARINE_HOURLY.length > 0) {
const url = `${MARINE_API}?latitude=${location.latitude}&longitude=${location.longitude}&hourly=${MARINE_HOURLY.join(',')}&forecast_days=7&models=ecmwf_wam`;
if (marineHourly.length > 0) {
const url = `${MARINE_API}?latitude=${location.latitude}&longitude=${location.longitude}&hourly=${marineHourly.join(',')}&forecast_days=7&models=ecmwf_wam`;
promises.push(fetchJSON(url).then(d => { marineData = d; }).catch(e => {
console.error(`[OPENMETEO] Errore marine hourly: ${e.message}`);
}));

View File

@@ -8,10 +8,10 @@ const REALTIME_URL = process.env.REALTIME_URL;
*/
async function authenticate() {
const SENSOR_CODE = configManager.getSensorCode();
const SENSOR_NAME = configManager.getSensorName();
const SENSOR_ID = configManager.getSensorId();
if (!REALTIME_URL || !SENSOR_CODE || !SENSOR_NAME) {
console.error('[REALTIME|AUTH] REALTIME_URL, SENSOR_CODE o SENSOR_NAME non configurati');
if (!REALTIME_URL || !SENSOR_CODE || !SENSOR_ID) {
console.error('[REALTIME|AUTH] REALTIME_URL, SENSOR_CODE o SENSOR_ID non configurati');
return null;
}
@@ -20,7 +20,7 @@ async function authenticate() {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
name: SENSOR_NAME,
id: SENSOR_ID,
code: SENSOR_CODE
})
});
@@ -38,8 +38,8 @@ async function authenticate() {
return null;
}
console.log(`[REALTIME|AUTH] Autenticato: ${SENSOR_NAME}, token valido 5s`);
return { socketToken: data.t };
console.log(`[REALTIME|AUTH] Autenticato: ${SENSOR_ID}`);
return { socketToken: data.t, sensorId: SENSOR_ID };
} catch (error) {
console.error(`[REALTIME|AUTH] error: ${error.message}`);

View File

@@ -22,21 +22,32 @@ async function init() {
async function connectToServer() {
if (isShuttingDown) return;
console.log('CONNECTING......')
console.log('[REALTIME] connecting...');
const result = await auth.authenticate();
console.log('AUTH RESULT:', result);
let result;
try {
result = await auth.authenticate();
} catch (err) {
console.error('[REALTIME] auth error:', err.message);
scheduleReconnect();
return;
}
if (!result) {
scheduleReconnect();
return;
}
const connected = await socket.connect(result.socketToken, () => {
if (!isShuttingDown) {
scheduleReconnect();
}
});
let connected = false;
try {
connected = await socket.connect(result.socketToken, () => {
if (!isShuttingDown) {
scheduleReconnect();
}
});
} catch (err) {
console.error('[REALTIME] socket connect error:', err.message);
}
if (!connected) {
scheduleReconnect();

View File

@@ -1,17 +1,40 @@
const WebSocket = require('ws');
const os = require('os');
const { encode } = require('@msgpack/msgpack');
const { encode, decode } = require('@msgpack/msgpack');
const SOCKET_URL = process.env.REALTIME_SOCKET_URL;
let ws = null;
let onDisconnect = null;
let currentSessionId = null;
const QUEUE_MAX = 500;
const pendingQueue = [];
function flushQueue() {
if (!ws || ws.readyState !== WebSocket.OPEN) return;
while (pendingQueue.length > 0) {
const buf = pendingQueue.shift();
try { ws.send(buf); }
catch (err) {
console.error('[REALTIME|WS] Errore flush:', err.message);
return;
}
}
}
function enqueue(buf) {
if (pendingQueue.length >= QUEUE_MAX) {
pendingQueue.shift();
}
pendingQueue.push(buf);
}
/**
* Apre una connessione WebSocket al server realtime usando il token temporaneo.
* @param {string} socketToken - Token temporaneo ottenuto da auth.authenticate()
* @param {Function} onClose - Callback chiamata quando la connessione si chiude
* @returns {Promise<boolean>} true se la connessione è riuscita
* @returns {Promise<boolean>} true se la connessione e' riuscita
*/
function connect(socketToken, onClose) {
return new Promise((resolve) => {
@@ -32,23 +55,57 @@ function connect(socketToken, onClose) {
ws.on('open', () => {
console.log('[REALTIME|WS] Connesso');
// Invia init con system uptime
const initPayload = {
_t: 'init',
uptime: Math.floor(os.uptime())
};
const initPayload = { _t: 'init', uptime: Math.floor(os.uptime()) };
ws.send(encode(initPayload));
console.log('[REALTIME|WS] Init inviato:', initPayload);
flushQueue();
resolve(true);
});
ws.on('message', () => {
// Il server non invia messaggi ai sensori per ora
ws.on('message', (raw) => {
let msg;
try { msg = decode(raw); }
catch (err) {
console.warn('[REALTIME|WS] payload non msgpack:', err.message);
return;
}
if (msg && typeof msg === 'object') {
if (msg._t === 'hello') {
currentSessionId = msg.sessionId || null;
console.log(`[REALTIME|WS] hello dal server (session=${currentSessionId})`);
return;
}
if (msg._t === 'session_id') {
currentSessionId = msg.sessionId || null;
console.log(`[REALTIME|WS] session_reset ack: ${msg.prev} -> ${msg.sessionId}`);
return;
}
if (msg._t === 'ruleset_update') {
console.log(`[REALTIME|WS] ruleset_update type=${msg.type} v=${msg.ruleset?.version?.str || '?'}`);
try {
require('../rulesets').applyRemote(msg.type, msg.ruleset, { source: 'push' });
} catch (err) { console.warn('[REALTIME|WS] ruleset apply error:', err.message); }
return;
}
if (msg._t === 'kiosk_layout_update') {
console.log(`[REALTIME|WS] kiosk_layout_update v=${msg.layout?.version || '?'}`);
try {
require('../../tools/kiosk/server-layout-store').applyRemote(msg.layout);
} catch (err) { console.warn('[REALTIME|WS] layout apply error:', err.message); }
return;
}
if (msg._t === 'error') {
console.warn('[REALTIME|WS] error dal server:', msg.message);
return;
}
if (msg._t === 'ack') {
return; // diagnostica, no-op
}
}
});
ws.on('ping', () => {
// ws risponde automaticamente con pong
});
ws.on('ping', () => { /* ws library risponde con pong automaticamente */ });
ws.on('error', (err) => {
console.error(`[REALTIME|WS] Errore: ${err.message}`);
@@ -58,6 +115,7 @@ function connect(socketToken, onClose) {
ws.on('close', (code) => {
console.log(`[REALTIME|WS] Disconnesso (code: ${code})`);
ws = null;
currentSessionId = null;
if (onDisconnect) onDisconnect();
});
});
@@ -68,47 +126,51 @@ function connect(socketToken, onClose) {
* @param {Array} data - Array nel formato [timestamp, measurement, fields]
*/
function send(data) {
if (!ws || ws.readyState !== WebSocket.OPEN) return;
let buf;
try {
const [timestamp, measurement, fields] = data;
const packet = { ts: timestamp, _m: measurement, ...fields };
ws.send(encode(packet));
buf = encode(packet);
} catch (err) {
console.error('[REALTIME|WS] Errore invio:', err.message);
console.error('[REALTIME|WS] Errore encode:', err.message);
return;
}
if (!ws || ws.readyState !== WebSocket.OPEN) {
enqueue(buf);
return;
}
try { ws.send(buf); }
catch (err) { console.error('[REALTIME|WS] Errore invio:', err.message); }
}
/**
* Invia un oggetto raw al server, codificato in msgpack.
* A differenza di send(), non fa transform [ts, measurement, fields].
* @param {Object} obj - Oggetto da inviare direttamente
*/
function sendRaw(obj) {
if (!ws || ws.readyState !== WebSocket.OPEN) return;
try {
ws.send(encode(obj));
} catch (err) {
console.error('[REALTIME|WS] Errore invio raw:', err.message);
let buf;
try { buf = encode(obj); }
catch (err) {
console.error('[REALTIME|WS] Errore encode raw:', err.message);
return;
}
if (!ws || ws.readyState !== WebSocket.OPEN) {
enqueue(buf);
return;
}
try { ws.send(buf); }
catch (err) { console.error('[REALTIME|WS] Errore invio raw:', err.message); }
}
/**
* @returns {boolean} true se la connessione è attiva
*/
function isConnected() {
return ws !== null && ws.readyState === WebSocket.OPEN;
}
/**
* Chiude la connessione WebSocket.
*/
function getSessionId() { return currentSessionId; }
function close() {
onDisconnect = null;
if (ws) {
ws.close();
ws = null;
currentSessionId = null;
}
}
module.exports = { connect, send, sendRaw, isConnected, close };
module.exports = { connect, send, sendRaw, isConnected, getSessionId, close };

247
plugin/cores/rulesets.js Normal file
View File

@@ -0,0 +1,247 @@
/**
* Rulesets manager (plugin side) — formato v2.
*
* Gestisce 5 tipi: logs | forecast_current | forecast_hourly | marine_current | marine_hourly
*
* Formato item (v2):
* { path, meta: { name, unit, decimals? }, olds: [], enabled: true }
*
* Fonti:
* 1. cache locale (data/rulesets.json) — sopravvive ai restart
* 2. GET HTTP {API_URL}/rulesets/<type>/active — bootstrap o riconciliazione
* 3. push WS realtime ({_t:'ruleset_update'}) — runtime updates
*
* Dopo applyRemote(type, ruleset):
* - emette 'update' (type, new, prev)
* - emette 'update:<type>' (new, prev)
* - invia ack al server (via realtime control message ruleset_ack)
*
* I consumer (logs.local, openmeteo) si sottoscrivono per applicare il cambio.
* Quando arriva un update di tipo 'logs', il consumer triggera un session_reset
* (cosi' Influx separa storico vecchio da nuovo schema).
*/
const fs = require('fs');
const fsp = require('fs').promises;
const path = require('path');
const EventEmitter = require('events');
const {
LOG_PATHS,
FORECAST_CURRENT,
FORECAST_HOURLY,
MARINE_CURRENT,
MARINE_HOURLY
} = require('../rules');
const TYPES = ['logs', 'forecast_current', 'forecast_hourly', 'marine_current', 'marine_hourly'];
const cacheFile = path.join(__dirname, '../../data/rulesets.json');
const emitter = new EventEmitter();
const API_URL = process.env.API_URL;
// state: { type -> { id, version: {major,minor,patch,str}, content: [items], _default } }
let state = {};
// =============== DEFAULTS (fallback iniziale, no DB / no cache) ===============
/** Mapping legacy openmeteo code → SK path */
const LEGACY_SK_MAP = {
'temperature_2m': 'meb.forecast.temperature',
'wind_speed_10m': 'meb.forecast.wind.speed',
'wind_direction_10m': 'meb.forecast.wind.direction',
'wind_gusts_10m': 'meb.forecast.wind.gusts',
'precipitation': 'meb.forecast.precipitation',
'rain': 'meb.forecast.rain',
'relative_humidity_2m': 'meb.forecast.humidity',
'pressure_msl': 'meb.forecast.pressure',
'precipitation_probability': 'meb.forecast.precipitationProbability',
'cloud_cover': 'meb.forecast.cloudCover',
'wave_height': 'meb.waves.height',
'wave_direction': 'meb.waves.direction',
'wave_period': 'meb.waves.period',
'wave_peak_period': 'meb.waves.peakPeriod',
'ocean_current_velocity': 'meb.waves.currentVelocity',
'ocean_current_direction': 'meb.waves.currentDirection',
};
function defaultItem(p, meta = {}) {
return { path: p, meta, olds: [], enabled: true };
}
function buildDefaults() {
const mk = (items) => ({
id: null,
version: { major: 1, minor: 0, patch: 0, str: '1.0.0' },
description: 'default',
content: items,
_default: true,
});
return {
logs: mk(LOG_PATHS.map(p => defaultItem(p, {}))),
forecast_current: mk(FORECAST_CURRENT.map(c => defaultItem(c, { sk_path: LEGACY_SK_MAP[c] }))),
forecast_hourly: mk(FORECAST_HOURLY.map(c => defaultItem(c, { sk_path: LEGACY_SK_MAP[c] }))),
marine_current: mk(MARINE_CURRENT.map(c => defaultItem(c, { sk_path: LEGACY_SK_MAP[c] }))),
marine_hourly: mk(MARINE_HOURLY.map(c => defaultItem(c, { sk_path: LEGACY_SK_MAP[c] }))),
};
}
// =============== CACHE I/O ===============
async function ensureDir() {
try { await fsp.mkdir(path.dirname(cacheFile), { recursive: true }); } catch {}
}
async function loadCache() {
try {
const raw = await fsp.readFile(cacheFile, 'utf-8');
const parsed = JSON.parse(raw);
if (parsed && typeof parsed === 'object') return parsed;
} catch {}
return {};
}
async function saveCache() {
try { await ensureDir(); await fsp.writeFile(cacheFile, JSON.stringify(state, null, 2)); }
catch (err) { console.warn('[RULESETS] save cache failed:', err.message); }
}
// =============== INIT + BOOTSTRAP ===============
/**
* Init: carica cache → applica defaults sui type mancanti. Non emette eventi.
* Successivamente `bootstrapFromServer()` chiama gli endpoint per riconciliare.
*/
async function init() {
const cached = await loadCache();
const defaults = buildDefaults();
state = {};
for (const t of TYPES) state[t] = cached[t] || defaults[t];
console.log('[RULESETS] init:',
TYPES.map(t => `${t}@v${state[t]?.version?.str || '?'}${state[t]?._default ? '(d)' : ''}`).join(' '));
}
/**
* Tenta di scaricare la versione attiva di ogni tipo dal server.
* Se ottenuta e nuova, la applica (emette gli eventi).
*/
async function bootstrapFromServer() {
if (!API_URL) {
console.warn('[RULESETS] API_URL non configurato, salto bootstrap');
return;
}
for (const type of TYPES) {
try {
const r = await fetch(`${API_URL}/rulesets/${type}/active`, { signal: AbortSignal.timeout(8000) });
if (!r.ok) { if (r.status !== 404) console.warn(`[RULESETS] ${type} bootstrap ${r.status}`); continue; }
const remote = await r.json();
// formato server: { id, type, version:{...,str}, content:[...] }
await applyRemote(type, remote, { source: 'bootstrap' });
} catch (err) {
console.warn(`[RULESETS] bootstrap ${type} err:`, err.message);
}
}
}
// =============== ACCESSORS ===============
function get(type) { return state[type] || null; }
function getEnabledItems(type) {
const rs = state[type];
if (!rs) return [];
return (rs.content || []).filter(it => it.enabled !== false);
}
function getEnabledPaths(type) {
return getEnabledItems(type).map(it => it.path).filter(Boolean);
}
function getPathMap(type) {
const map = {};
for (const it of getEnabledItems(type)) {
if (it.path && it.meta?.sk_path) map[it.path] = it.meta.sk_path;
}
return map;
}
function getMetaForPath(type, p) {
return (state[type]?.content || []).find(it => it.path === p)?.meta || null;
}
function versionStr(type) { return state[type]?.version?.str || null; }
function rulesetId(type) { return state[type]?.id || null; }
// =============== APPLY ===============
/**
* Applica un ruleset ricevuto. Salva cache, emette update events,
* e (se non e' bootstrap) manda ack al server.
*
* Idempotente per versione: ignora payload con version <= corrente.
*/
async function applyRemote(type, ruleset, { source = 'push' } = {}) {
if (!TYPES.includes(type)) {
console.warn(`[RULESETS] tipo sconosciuto: ${type}`);
return false;
}
if (!ruleset || !Array.isArray(ruleset.content)) {
console.warn(`[RULESETS] ruleset invalido per ${type}`);
return false;
}
const prev = state[type];
// dedup per id+version: scarta replays
if (prev && !prev._default && ruleset.id && prev.id === ruleset.id
&& prev.version?.str === ruleset.version?.str) {
return false;
}
state[type] = {
id: ruleset.id,
version: ruleset.version,
description: ruleset.description,
content: ruleset.content,
_default: false,
};
await saveCache();
const prevV = prev?.version?.str || '∅';
const newV = ruleset.version?.str || '?';
console.log(`[RULESETS] ${type} ${prevV}${newV} (${ruleset.content.length} items, src=${source})`);
emitter.emit('update', type, state[type], prev);
emitter.emit(`update:${type}`, state[type], prev);
// ack al server (skipped on bootstrap to avoid loop)
if (source !== 'bootstrap' && ruleset.id) {
try {
const realtime = require('./realtime/core');
realtime.sendRaw({
_t: 'ruleset_ack',
type,
ruleset_id: ruleset.id,
version: ruleset.version?.str || '?',
});
} catch (err) {
console.warn('[RULESETS] ack send failed:', err.message);
}
}
return true;
}
function onUpdate(listener) { emitter.on('update', listener); return () => emitter.off('update', listener); }
function onUpdateOf(type, listener) { emitter.on(`update:${type}`, listener); return () => emitter.off(`update:${type}`, listener); }
module.exports = {
TYPES,
init,
bootstrapFromServer,
get,
getEnabledItems,
getEnabledPaths,
getPathMap,
getMetaForPath,
versionStr,
rulesetId,
applyRemote,
onUpdate,
onUpdateOf,
};