From c2c15982269d1c6285391760ea7e992f9651f47b Mon Sep 17 00:00:00 2001 From: Giuseppe Raffa <77052701+sesee3@users.noreply.github.com> Date: Tue, 12 May 2026 10:17:54 +0200 Subject: [PATCH] feat: Implement rulesets and layout management for kiosk plugin - Added rulesets manager to handle various data types and updates via HTTP and WebSocket. - Introduced layout store for managing kiosk layouts with caching and server synchronization. - Enhanced dashboard and data routes to support new layout and ruleset features. - Updated kiosk HTML and JavaScript to utilize new layout rendering and data binding. - Removed obsolete map route and integrated map functionality into the new tile renderer. - Improved Telegram commands to reflect changes in data structure and logging. - Refactored weather fetching intervals to prevent multiple instances. - Added SSE stream for real-time layout updates in the kiosk. --- docker-compose.yml | 7 +- package.json | 4 +- plugin/config/configManager.js | 13 +- plugin/config/skSettings.js | 6 +- plugin/cores/aisstream.js | 0 plugin/cores/logs.local.js | 250 +++++++--------------- plugin/cores/openmeteo.js | 102 +++++---- plugin/cores/realtime/auth.js | 12 +- plugin/cores/realtime/core.js | 27 ++- plugin/cores/realtime/socket.js | 128 ++++++++--- plugin/cores/rulesets.js | 247 +++++++++++++++++++++ plugin/cores/weatherkit.js | 0 plugin/index.js | 31 ++- plugin/routes/collection/dashboard.js | 3 - plugin/routes/collection/data.js | 22 +- plugin/routes/collection/kiosk.js | 33 +++ plugin/routes/collection/map.js | 9 - plugin/routes/main.js | 2 - plugin/rules.js | 2 +- plugin/telegram/commands/data.js | 4 +- plugin/telegram/commands/logs.js | 4 +- plugin/tools/kiosk/core.js | 2 +- plugin/tools/kiosk/data-binder.js | 72 +++++++ plugin/tools/kiosk/kiosk.html | 41 +++- plugin/tools/kiosk/layout-client.js | 98 +++++++++ plugin/tools/kiosk/server-layout-store.js | 80 +++++++ plugin/tools/kiosk/tile-renderer.js | 188 ++++++++++++++++ 27 files changed, 1061 insertions(+), 326 deletions(-) delete mode 100644 plugin/cores/aisstream.js create mode 100644 plugin/cores/rulesets.js delete mode 100644 plugin/cores/weatherkit.js delete mode 100644 plugin/routes/collection/map.js create mode 100644 plugin/tools/kiosk/data-binder.js create mode 100644 plugin/tools/kiosk/layout-client.js create mode 100644 plugin/tools/kiosk/server-layout-store.js create mode 100644 plugin/tools/kiosk/tile-renderer.js diff --git a/docker-compose.yml b/docker-compose.yml index 5d7e7fe..52d21b8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,7 +5,7 @@ services: env_file: - .env environment: - - NODE_ENV=development # <--- Aggiunto per attivare l'hot-reload del nostro plugin + - NODE_ENV=development restart: unless-stopped ports: - "3001:3000" @@ -13,8 +13,9 @@ services: resources: limits: memory: 2G - cpus: '1.5' + cpus: "1.5" volumes: - /Users/sese/Local/dev/MEB/signalk/data:/home/node/.signalk:rw - /Users/sese/Local/dev/MEB/meb-plugin:/home/node/.signalk/node_modules/meb:rw - - /Users/sese/Local/dev/MEB/meb-plugin/data:/home/node/.signalk/node_modules/meb/data:rw \ No newline at end of file + - /Users/sese/Local/dev/MEB/meb-plugin/data:/home/node/.signalk/node_modules/meb/data:rw + - /Users/sese/Downloads/python-daly-bms-main/signalk-daly-bms:/home/node/.signalk/node_modules/signalk-daly-bms:rw \ No newline at end of file diff --git a/package.json b/package.json index 820b115..24b52f5 100644 --- a/package.json +++ b/package.json @@ -9,9 +9,11 @@ "signalk-plugin" ], "signalk-plugin-enabled-by-default": true, + "engines": { + "node": ">=18" + }, "dependencies": { "@msgpack/msgpack": "^3.1.3", - "axios": "^1.12.2", "express": "^5.2.1", "node-telegram-bot-api": "^0.66.0", "ws": "^8.19.0" diff --git a/plugin/config/configManager.js b/plugin/config/configManager.js index 5da7242..c2610c8 100644 --- a/plugin/config/configManager.js +++ b/plugin/config/configManager.js @@ -31,10 +31,18 @@ function getSensorCode() { } /** - * Ottiene il nome sensore + * Ottiene l'id testuale del sensore (registrato sul server con POST /connect/new). + * Fallback su SENSOR_ID env per setup standalone. + */ +function getSensorId() { + return pluginOptions.sensor_id || process.env.SENSOR_ID || ''; +} + +/** + * @deprecated usa getSensorId. Mantenuto solo per retro-compatibilita'. */ function getSensorName() { - return pluginOptions.sensor_name || ''; + return getSensorId(); } /** @@ -66,6 +74,7 @@ module.exports = { init, getTelegramToken, getSensorCode, + getSensorId, getSensorName, getSendInterval, getReconnectDelay, diff --git a/plugin/config/skSettings.js b/plugin/config/skSettings.js index 5a7107a..8f92830 100644 --- a/plugin/config/skSettings.js +++ b/plugin/config/skSettings.js @@ -31,10 +31,10 @@ module.exports = { description: 'Inserisci un codice identificativo per inviare i dati al server', default: '', }, - sensor_name: { + sensor_id: { type: 'string', - title: 'Nome Sensore', - description: 'Inserisci un nome per il tuo sensore, che verrà visualizzato nel server', + title: 'ID Sensore', + description: 'ID del sensore generato dal server al momento della registrazione (POST /connect/new)', default: '', }, sensor_interval: { diff --git a/plugin/cores/aisstream.js b/plugin/cores/aisstream.js deleted file mode 100644 index e69de29..0000000 diff --git a/plugin/cores/logs.local.js b/plugin/cores/logs.local.js index 26cc599..7f84aa1 100644 --- a/plugin/cores/logs.local.js +++ b/plugin/cores/logs.local.js @@ -1,250 +1,158 @@ +/** + * Logger locale + invio realtime. + * + * Scrive una riga ogni secondo in un CSV locale (1 sessione = 1 file) e + * contemporaneamente invia i field via WebSocket al realtime per InfluxDB. + * + * Path-list dinamica: i path letti ad ogni tick vengono presi dal ruleset + * 'logs' (cores/rulesets). Quando il ruleset cambia (push WS o cambio remoto): + * - la sessione corrente viene chiusa (CSV finalizzato) + * - si manda `session_reset` al server (nuovo session_id su Influx) + * - si apre un nuovo CSV con i nuovi path + * - lo storico Influx resta coerente: tag `session` cambia, tag + * `ruleset_version` riflette la nuova versione applicata. + */ const fs = require('fs').promises; const fsSync = require('fs'); const pth = require('path'); -const os = require('os'); const skFlow = require('../config/skFlow'); const realtime = require('./realtime/core'); +const rulesets = require('./rulesets'); const logsDirectory = pth.join(__dirname, '../../data/logs/'); - -// Intervallo di scrittura fisso: 1 secondo const WRITE_INTERVAL = 1000; -// Stato della sessione attiva let session = null; let writeInterval = null; - -// Paths da registrare (impostati da init) let logPaths = []; -/** - * Risolutori speciali per path che non sono direttamente accessibili via skFlow.get(). - * Per ogni path speciale, definisce una funzione che restituisce il valore. - */ const SPECIAL_RESOLVERS = { - 'navigation.position.latitude': () => { - const pos = skFlow.get('navigation.position'); - return pos?.latitude ?? null; - }, - 'navigation.position.longitude': () => { - const pos = skFlow.get('navigation.position'); - return pos?.longitude ?? null; - }, - 'system.uptime': () => Math.floor(process.uptime()) + 'navigation.position.latitude': () => skFlow.get('navigation.position')?.latitude ?? null, + 'navigation.position.longitude': () => skFlow.get('navigation.position')?.longitude ?? null, + 'system.uptime': () => Math.floor(process.uptime()), }; +function resolveValue(p) { + if (SPECIAL_RESOLVERS[p]) return SPECIAL_RESOLVERS[p](); + return skFlow.get(p); +} + /** - * Risolve il valore di un path, gestendo i casi speciali. - * @param {String} path - il path da risolvere - * @returns {*} il valore + * Init: prende i path iniziali dal ruleset attivo e si sottoscrive ai cambi. + * @param {Array} [fallback] - opzionale, lista path se non c'e' ruleset */ -function resolveValue(path) { - // Controlla se c'e un risolutore speciale - if (SPECIAL_RESOLVERS[path]) { - return SPECIAL_RESOLVERS[path](); +function init(fallback) { + logPaths = rulesets.getEnabledPaths('logs'); + if ((!logPaths || logPaths.length === 0) && Array.isArray(fallback)) { + logPaths = fallback; } - // Path standard: leggi dal databrowser Signal K - return skFlow.get(path); + console.log(`[LOGS] init: ${logPaths.length} path da ruleset v=${rulesets.versionStr('logs') || '?'}`); + + rulesets.onUpdateOf('logs', async (next) => { + const newPaths = (next.content || []).filter(it => it.enabled !== false).map(it => it.path); + console.log(`[LOGS] ruleset update: ${newPaths.length} path → restart recording`); + const wasRecording = !!session; + try { + if (wasRecording) await stopRecording(); + logPaths = newPaths; + if (realtime.isConnected()) realtime.sendRaw({ _t: 'session_reset' }); + if (wasRecording) await startRecording(); + } catch (err) { + console.error('[LOGS] restart on ruleset update failed:', err.message); + } + }); } -/** - * Inizializza i path da registrare. - * @param {Array} paths - array di path da rules.js LOG_PATHS - */ -function init(paths) { - logPaths = paths; - console.log(`[LOGS] Inizializzati ${paths.length} path`); -} - -/** - * Assicura che la cartella logs esista - */ async function ensureDir() { - try { - await fs.mkdir(logsDirectory, { recursive: true }); - } catch (e) {} + try { await fs.mkdir(logsDirectory, { recursive: true }); } catch (e) {} } -/** - * Avvia la registrazione: crea un nuovo file CSV e inizia a scrivere ogni secondo. - * @param {String} name - nome del file (opzionale, default: data/ora corrente) - */ async function startRecording(name) { - // Se c'e gia una sessione attiva, fermala prima - if (session) { - await stopRecording(); - } - - if (!name) { - const now = new Date(); - name = now.toISOString().replace(/[:.]/g, '-'); - } - - if (logPaths.length === 0) { - console.warn('[LOGS] Nessun path configurato, la registrazione non catturera dati'); - } + if (session) await stopRecording(); + if (!name) name = new Date().toISOString().replace(/[:.]/g, '-'); + if (logPaths.length === 0) console.warn('[LOGS] nessun path configurato'); await ensureDir(); - - // Header CSV: timestamp + tutti i path const header = ['timestamp', ...logPaths].join(',') + '\n'; const filePath = pth.join(logsDirectory, `${name}.csv`); await fs.writeFile(filePath, header); - session = { - name: name, - paths: logPaths, - startTime: new Date(), - elements: 0, - filePath: filePath - }; - - // Scrivi ogni secondo - writeInterval = setInterval(() => { - writeLog(); - }, WRITE_INTERVAL); - - console.log(`[LOGS] Registrazione avviata: ${name} (${logPaths.length} colonne, intervallo ${WRITE_INTERVAL}ms)`); + session = { name, paths: logPaths.slice(), startTime: new Date(), elements: 0, filePath }; + writeInterval = setInterval(() => { writeLog(); }, WRITE_INTERVAL); + console.log(`[LOGS] started: ${name} (${logPaths.length} cols)`); return session; } -/** - * Scrive una riga nel CSV e invia i dati al server via WebSocket. - */ async function writeLog() { if (!session) return; - try { const timestamp = new Date().toISOString(); - - // Risolvi tutti i valori const fields = {}; const csvValues = []; - - for (const path of session.paths) { - const val = resolveValue(path); - fields[path] = val; - - // Formatta per CSV - if (val === null || val === undefined) { - csvValues.push(''); - } else if (typeof val === 'object') { - csvValues.push(JSON.stringify(val).replace(/,/g, ';')); - } else { - csvValues.push(val); - } + for (const p of session.paths) { + const val = resolveValue(p); + fields[p] = val; + if (val === null || val === undefined) csvValues.push(''); + else if (typeof val === 'object') csvValues.push(JSON.stringify(val).replace(/,/g, ';')); + else csvValues.push(val); } - - // Scrivi riga CSV nel file locale const row = [timestamp, ...csvValues].join(',') + '\n'; await fs.appendFile(session.filePath, row); session.elements++; - - // Invia al server via WebSocket (se connesso) - if (realtime.isConnected()) { - realtime.send([Date.now(), 'logs', fields]); - } - - } catch (error) { - console.error('[LOGS] Errore scrittura:', error.message); + if (realtime.isConnected()) realtime.send([Date.now(), 'logs', fields]); + } catch (err) { + console.error('[LOGS] write error:', err.message); } } -/** - * Interrompe la registrazione e chiude il file. - */ async function stopRecording() { - if (writeInterval) { - clearInterval(writeInterval); - writeInterval = null; - } - + if (writeInterval) { clearInterval(writeInterval); writeInterval = null; } if (session) { - console.log(`[LOGS] Registrazione fermata: ${session.name} (${session.elements} righe)`); + console.log(`[LOGS] stopped ${session.name} (${session.elements} righe)`); session = null; } } -/** - * Ottieni i dati del file come stringa CSV. - * @param {String} name - il nome del file (senza estensione) - * @returns {String|null} - */ async function getLog(name) { - try { - const filePath = pth.join(logsDirectory, `${name}.csv`); - const content = await fs.readFile(filePath, 'utf-8'); - return content; - } catch (error) { - console.error('[LOGS] Errore lettura log:', error.message); - return null; - } + try { return await fs.readFile(pth.join(logsDirectory, `${name}.csv`), 'utf-8'); } + catch (err) { console.error('[LOGS] read err:', err.message); return null; } } -/** - * Ottieni il percorso del file CSV. - * @param {String} name - il nome del file (senza estensione) - * @returns {String|null} - */ function getLogFile(name) { - const filePath = pth.join(logsDirectory, `${name}.csv`); - if (fsSync.existsSync(filePath)) { - return filePath; - } - return null; + const p = pth.join(logsDirectory, `${name}.csv`); + return fsSync.existsSync(p) ? p : null; } -/** - * Ottieni la lista di tutti i file di log disponibili. - * @returns {Array} - */ async function listLogs() { await ensureDir(); try { const files = await fs.readdir(logsDirectory); - const csvFiles = files.filter(f => f.endsWith('.csv')); - - const result = []; - for (const file of csvFiles) { - const filePath = pth.join(logsDirectory, file); - const stat = await fs.stat(filePath); - result.push({ + const csv = files.filter(f => f.endsWith('.csv')); + const out = []; + for (const file of csv) { + const p = pth.join(logsDirectory, file); + const stat = await fs.stat(p); + out.push({ name: file.replace('.csv', ''), filename: file, size: (stat.size / (1024 * 1024)).toFixed(2), created: stat.birthtime, - modified: stat.mtime + modified: stat.mtime, }); } - - return result.sort((a, b) => b.modified - a.modified); - } catch (error) { - console.error('[LOGS] Errore lista log:', error.message); + return out.sort((a, b) => b.modified - a.modified); + } catch (err) { + console.error('[LOGS] list err:', err.message); return []; } } -/** - * Ottieni informazioni sulla sessione di registrazione attiva. - * @returns {Object|null} - */ function getSession() { if (!session) return null; return { - name: session.name, - paths: session.paths, - startTime: session.startTime, - elements: session.elements, - delay: WRITE_INTERVAL + name: session.name, paths: session.paths, startTime: session.startTime, + elements: session.elements, delay: WRITE_INTERVAL, }; } -module.exports = { - init, - startRecording, - stopRecording, - getLog, - getLogFile, - getSession, - listLogs -}; +module.exports = { init, startRecording, stopRecording, getLog, getLogFile, getSession, listLogs }; diff --git a/plugin/cores/openmeteo.js b/plugin/cores/openmeteo.js index 79c5626..54e38f8 100644 --- a/plugin/cores/openmeteo.js +++ b/plugin/cores/openmeteo.js @@ -1,41 +1,28 @@ const skFlow = require('../config/skFlow'); const realtimeCore = require('./realtime/core'); -const { - FORECAST_CURRENT, - FORECAST_HOURLY, - MARINE_CURRENT, - MARINE_HOURLY -} = require('../rules'); +const rulesets = require('./rulesets'); + +/** + * Helper: legge i codici openmeteo + il path SK destinato per i 4 tipi forecast/marine, + * leggendoli dal ruleset attivo. Fall back ai default in rules.js se il ruleset + * non e' ancora stato caricato. + */ +function paramsAndMap(type) { + const items = rulesets.getEnabledItems(type); + if (!items.length) return { codes: [], map: {} }; + const codes = items.map(it => it.path).filter(Boolean); + const map = {}; + for (const it of items) { + if (it.meta?.sk_path) map[it.path] = it.meta.sk_path; + } + return { codes, map }; +} const FETCH_TIMEOUT = 10000; const FORECAST_API = 'https://api.open-meteo.com/v1/forecast'; const MARINE_API = 'https://marine-api.open-meteo.com/v1/marine'; -/** - * Mapping da parametri API Open-Meteo a path Signal K. - * Questi path vengono pubblicati sul databrowser e letti dai log. - */ -const FORECAST_PATH_MAP = { - 'temperature_2m': 'meb.forecasts.temperature', - 'wind_speed_10m': 'meb.forecast.wind.speed', - 'wind_direction_10m': 'meb.forecast.wind.direction', - 'wind_gusts_10m': 'meb.forecast.wind.gusts', - 'precipitation': 'meb.forecast.precipitation', - 'rain': 'meb.forecast.rain', - 'relative_humidity_2m': 'meb.forecast.humidity', - 'pressure_msl': 'meb.forecast.pressure', - 'precipitation_probability':'meb.forecast.precipitationProbability', - 'cloud_cover': 'meb.forecast.cloudCover', -}; - -const MARINE_PATH_MAP = { - 'wave_height': 'meb.waves.height', - 'wave_direction': 'meb.waves.direction', - 'wave_period': 'meb.waves.period', - 'wave_peak_period': 'meb.waves.peakPeriod', - 'ocean_current_velocity': 'meb.waves.currentVelocity', - 'ocean_current_direction': 'meb.waves.currentDirection', -}; +// I path map sono ora ottenuti dal ruleset (rulesets.getPathMap('forecast_*'/'marine_*')) /** * Fetch JSON con timeout @@ -59,19 +46,20 @@ async function fetchJSON(url) { */ function publishCurrentToSignalK(forecastData, marineData) { const skData = {}; + const fMap = paramsAndMap('forecast_current').map; + const mMap = paramsAndMap('marine_current').map; if (forecastData?.current) { for (const [key, value] of Object.entries(forecastData.current)) { if (key === 'time' || key === 'interval') continue; - const skPath = FORECAST_PATH_MAP[key]; + const skPath = fMap[key]; if (skPath && value != null) skData[skPath] = value; } } - if (marineData?.current) { for (const [key, value] of Object.entries(marineData.current)) { if (key === 'time' || key === 'interval') continue; - const skPath = MARINE_PATH_MAP[key]; + const skPath = mMap[key]; if (skPath && value != null) skData[skPath] = value; } } @@ -79,7 +67,7 @@ function publishCurrentToSignalK(forecastData, marineData) { if (Object.keys(skData).length > 0) { skData['meb.weather.timestamp'] = Date.now(); skFlow.publish(skData); - console.log(`[OPENMETEO] Pubblicati ${Object.keys(skData).length} valori su Signal K`); + console.log(`[OPENMETEO] pubblicati ${Object.keys(skData).length} valori su Signal K`); } } @@ -89,19 +77,20 @@ function publishCurrentToSignalK(forecastData, marineData) { */ function sendCurrentToRealtime(forecastData, marineData) { const fields = {}; + const fMap = paramsAndMap('forecast_current').map; + const mMap = paramsAndMap('marine_current').map; if (forecastData?.current) { for (const [key, value] of Object.entries(forecastData.current)) { if (key === 'time' || key === 'interval') continue; - const skPath = FORECAST_PATH_MAP[key]; + const skPath = fMap[key]; if (skPath && value != null) fields[skPath] = value; } } - if (marineData?.current) { for (const [key, value] of Object.entries(marineData.current)) { if (key === 'time' || key === 'interval') continue; - const skPath = MARINE_PATH_MAP[key]; + const skPath = mMap[key]; if (skPath && value != null) fields[skPath] = value; } } @@ -123,6 +112,8 @@ function sendForecastBatchToRealtime(forecastData, marineData) { const times = forecastHourly?.time || marineHourly?.time; const points = []; + const fMap = paramsAndMap('forecast_hourly').map; + const mMap = paramsAndMap('marine_hourly').map; for (let i = 0; i < times.length; i++) { const ts = new Date(times[i]).getTime(); @@ -131,15 +122,14 @@ function sendForecastBatchToRealtime(forecastData, marineData) { if (forecastHourly) { for (const [key, values] of Object.entries(forecastHourly)) { if (key === 'time') continue; - const skPath = FORECAST_PATH_MAP[key]; + const skPath = fMap[key]; if (skPath && values?.[i] != null) fields[skPath] = values[i]; } } - if (marineHourly) { for (const [key, values] of Object.entries(marineHourly)) { if (key === 'time') continue; - const skPath = MARINE_PATH_MAP[key]; + const skPath = mMap[key]; if (skPath && values?.[i] != null) fields[skPath] = values[i]; } } @@ -166,27 +156,30 @@ async function fetchCurrentWeather(location) { return; } - if (FORECAST_CURRENT.length === 0 && MARINE_CURRENT.length === 0) { + const forecastCurrent = paramsAndMap('forecast_current').codes; + const marineCurrent = paramsAndMap('marine_current').codes; + + if (forecastCurrent.length === 0 && marineCurrent.length === 0) { console.warn('[OPENMETEO] Nessun parametro current configurato'); return; } - console.log(`[OPENMETEO] Fetch current — forecast: ${FORECAST_CURRENT.length} params, marine: ${MARINE_CURRENT.length} params`); + console.log(`[OPENMETEO] Fetch current — forecast:${forecastCurrent.length} marine:${marineCurrent.length}`); let forecastData = null, marineData = null; try { const promises = []; - if (FORECAST_CURRENT.length > 0) { - const url = `${FORECAST_API}?latitude=${location.latitude}&longitude=${location.longitude}¤t=${FORECAST_CURRENT.join(',')}`; + if (forecastCurrent.length > 0) { + const url = `${FORECAST_API}?latitude=${location.latitude}&longitude=${location.longitude}¤t=${forecastCurrent.join(',')}`; promises.push(fetchJSON(url).then(d => { forecastData = d; }).catch(e => { console.error(`[OPENMETEO] Errore forecast current: ${e.message}`); })); } - if (MARINE_CURRENT.length > 0) { - const url = `${MARINE_API}?latitude=${location.latitude}&longitude=${location.longitude}¤t=${MARINE_CURRENT.join(',')}&models=ecmwf_wam`; + if (marineCurrent.length > 0) { + const url = `${MARINE_API}?latitude=${location.latitude}&longitude=${location.longitude}¤t=${marineCurrent.join(',')}&models=ecmwf_wam`; promises.push(fetchJSON(url).then(d => { marineData = d; }).catch(e => { console.error(`[OPENMETEO] Errore marine current: ${e.message}`); })); @@ -211,27 +204,30 @@ async function fetchHourlyForecasts(location) { return; } - if (FORECAST_HOURLY.length === 0 && MARINE_HOURLY.length === 0) { + const forecastHourly = paramsAndMap('forecast_hourly').codes; + const marineHourly = paramsAndMap('marine_hourly').codes; + + if (forecastHourly.length === 0 && marineHourly.length === 0) { console.warn('[OPENMETEO] Nessun parametro hourly configurato'); return; } - console.log(`[OPENMETEO] Fetch hourly 7gg — forecast: ${FORECAST_HOURLY.length} params, marine: ${MARINE_HOURLY.length} params`); + console.log(`[OPENMETEO] Fetch hourly 7gg — forecast:${forecastHourly.length} marine:${marineHourly.length}`); let forecastData = null, marineData = null; try { const promises = []; - if (FORECAST_HOURLY.length > 0) { - const url = `${FORECAST_API}?latitude=${location.latitude}&longitude=${location.longitude}&hourly=${FORECAST_HOURLY.join(',')}&forecast_days=7`; + if (forecastHourly.length > 0) { + const url = `${FORECAST_API}?latitude=${location.latitude}&longitude=${location.longitude}&hourly=${forecastHourly.join(',')}&forecast_days=7`; promises.push(fetchJSON(url).then(d => { forecastData = d; }).catch(e => { console.error(`[OPENMETEO] Errore forecast hourly: ${e.message}`); })); } - if (MARINE_HOURLY.length > 0) { - const url = `${MARINE_API}?latitude=${location.latitude}&longitude=${location.longitude}&hourly=${MARINE_HOURLY.join(',')}&forecast_days=7&models=ecmwf_wam`; + if (marineHourly.length > 0) { + const url = `${MARINE_API}?latitude=${location.latitude}&longitude=${location.longitude}&hourly=${marineHourly.join(',')}&forecast_days=7&models=ecmwf_wam`; promises.push(fetchJSON(url).then(d => { marineData = d; }).catch(e => { console.error(`[OPENMETEO] Errore marine hourly: ${e.message}`); })); diff --git a/plugin/cores/realtime/auth.js b/plugin/cores/realtime/auth.js index 56e2f5e..92166d1 100644 --- a/plugin/cores/realtime/auth.js +++ b/plugin/cores/realtime/auth.js @@ -8,10 +8,10 @@ const REALTIME_URL = process.env.REALTIME_URL; */ async function authenticate() { const SENSOR_CODE = configManager.getSensorCode(); - const SENSOR_NAME = configManager.getSensorName(); + const SENSOR_ID = configManager.getSensorId(); - if (!REALTIME_URL || !SENSOR_CODE || !SENSOR_NAME) { - console.error('[REALTIME|AUTH] REALTIME_URL, SENSOR_CODE o SENSOR_NAME non configurati'); + if (!REALTIME_URL || !SENSOR_CODE || !SENSOR_ID) { + console.error('[REALTIME|AUTH] REALTIME_URL, SENSOR_CODE o SENSOR_ID non configurati'); return null; } @@ -20,7 +20,7 @@ async function authenticate() { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ - name: SENSOR_NAME, + id: SENSOR_ID, code: SENSOR_CODE }) }); @@ -38,8 +38,8 @@ async function authenticate() { return null; } - console.log(`[REALTIME|AUTH] Autenticato: ${SENSOR_NAME}, token valido 5s`); - return { socketToken: data.t }; + console.log(`[REALTIME|AUTH] Autenticato: ${SENSOR_ID}`); + return { socketToken: data.t, sensorId: SENSOR_ID }; } catch (error) { console.error(`[REALTIME|AUTH] error: ${error.message}`); diff --git a/plugin/cores/realtime/core.js b/plugin/cores/realtime/core.js index 587edfc..1ada933 100644 --- a/plugin/cores/realtime/core.js +++ b/plugin/cores/realtime/core.js @@ -22,21 +22,32 @@ async function init() { async function connectToServer() { if (isShuttingDown) return; - console.log('CONNECTING......') + console.log('[REALTIME] connecting...'); - const result = await auth.authenticate(); - console.log('AUTH RESULT:', result); + let result; + try { + result = await auth.authenticate(); + } catch (err) { + console.error('[REALTIME] auth error:', err.message); + scheduleReconnect(); + return; + } if (!result) { scheduleReconnect(); return; } - const connected = await socket.connect(result.socketToken, () => { - if (!isShuttingDown) { - scheduleReconnect(); - } - }); + let connected = false; + try { + connected = await socket.connect(result.socketToken, () => { + if (!isShuttingDown) { + scheduleReconnect(); + } + }); + } catch (err) { + console.error('[REALTIME] socket connect error:', err.message); + } if (!connected) { scheduleReconnect(); diff --git a/plugin/cores/realtime/socket.js b/plugin/cores/realtime/socket.js index 207dbeb..d0b0b83 100644 --- a/plugin/cores/realtime/socket.js +++ b/plugin/cores/realtime/socket.js @@ -1,17 +1,40 @@ const WebSocket = require('ws'); const os = require('os'); -const { encode } = require('@msgpack/msgpack'); +const { encode, decode } = require('@msgpack/msgpack'); const SOCKET_URL = process.env.REALTIME_SOCKET_URL; let ws = null; let onDisconnect = null; +let currentSessionId = null; + +const QUEUE_MAX = 500; +const pendingQueue = []; + +function flushQueue() { + if (!ws || ws.readyState !== WebSocket.OPEN) return; + while (pendingQueue.length > 0) { + const buf = pendingQueue.shift(); + try { ws.send(buf); } + catch (err) { + console.error('[REALTIME|WS] Errore flush:', err.message); + return; + } + } +} + +function enqueue(buf) { + if (pendingQueue.length >= QUEUE_MAX) { + pendingQueue.shift(); + } + pendingQueue.push(buf); +} /** * Apre una connessione WebSocket al server realtime usando il token temporaneo. * @param {string} socketToken - Token temporaneo ottenuto da auth.authenticate() * @param {Function} onClose - Callback chiamata quando la connessione si chiude - * @returns {Promise} true se la connessione è riuscita + * @returns {Promise} true se la connessione e' riuscita */ function connect(socketToken, onClose) { return new Promise((resolve) => { @@ -32,23 +55,57 @@ function connect(socketToken, onClose) { ws.on('open', () => { console.log('[REALTIME|WS] Connesso'); - // Invia init con system uptime - const initPayload = { - _t: 'init', - uptime: Math.floor(os.uptime()) - }; + const initPayload = { _t: 'init', uptime: Math.floor(os.uptime()) }; ws.send(encode(initPayload)); console.log('[REALTIME|WS] Init inviato:', initPayload); + flushQueue(); resolve(true); }); - ws.on('message', () => { - // Il server non invia messaggi ai sensori per ora + ws.on('message', (raw) => { + let msg; + try { msg = decode(raw); } + catch (err) { + console.warn('[REALTIME|WS] payload non msgpack:', err.message); + return; + } + + if (msg && typeof msg === 'object') { + if (msg._t === 'hello') { + currentSessionId = msg.sessionId || null; + console.log(`[REALTIME|WS] hello dal server (session=${currentSessionId})`); + return; + } + if (msg._t === 'session_id') { + currentSessionId = msg.sessionId || null; + console.log(`[REALTIME|WS] session_reset ack: ${msg.prev} -> ${msg.sessionId}`); + return; + } + if (msg._t === 'ruleset_update') { + console.log(`[REALTIME|WS] ruleset_update type=${msg.type} v=${msg.ruleset?.version?.str || '?'}`); + try { + require('../rulesets').applyRemote(msg.type, msg.ruleset, { source: 'push' }); + } catch (err) { console.warn('[REALTIME|WS] ruleset apply error:', err.message); } + return; + } + if (msg._t === 'kiosk_layout_update') { + console.log(`[REALTIME|WS] kiosk_layout_update v=${msg.layout?.version || '?'}`); + try { + require('../../tools/kiosk/server-layout-store').applyRemote(msg.layout); + } catch (err) { console.warn('[REALTIME|WS] layout apply error:', err.message); } + return; + } + if (msg._t === 'error') { + console.warn('[REALTIME|WS] error dal server:', msg.message); + return; + } + if (msg._t === 'ack') { + return; // diagnostica, no-op + } + } }); - ws.on('ping', () => { - // ws risponde automaticamente con pong - }); + ws.on('ping', () => { /* ws library risponde con pong automaticamente */ }); ws.on('error', (err) => { console.error(`[REALTIME|WS] Errore: ${err.message}`); @@ -58,6 +115,7 @@ function connect(socketToken, onClose) { ws.on('close', (code) => { console.log(`[REALTIME|WS] Disconnesso (code: ${code})`); ws = null; + currentSessionId = null; if (onDisconnect) onDisconnect(); }); }); @@ -68,47 +126,51 @@ function connect(socketToken, onClose) { * @param {Array} data - Array nel formato [timestamp, measurement, fields] */ function send(data) { - if (!ws || ws.readyState !== WebSocket.OPEN) return; - + let buf; try { const [timestamp, measurement, fields] = data; const packet = { ts: timestamp, _m: measurement, ...fields }; - ws.send(encode(packet)); + buf = encode(packet); } catch (err) { - console.error('[REALTIME|WS] Errore invio:', err.message); + console.error('[REALTIME|WS] Errore encode:', err.message); + return; } + if (!ws || ws.readyState !== WebSocket.OPEN) { + enqueue(buf); + return; + } + try { ws.send(buf); } + catch (err) { console.error('[REALTIME|WS] Errore invio:', err.message); } } -/** - * Invia un oggetto raw al server, codificato in msgpack. - * A differenza di send(), non fa transform [ts, measurement, fields]. - * @param {Object} obj - Oggetto da inviare direttamente - */ function sendRaw(obj) { - if (!ws || ws.readyState !== WebSocket.OPEN) return; - try { - ws.send(encode(obj)); - } catch (err) { - console.error('[REALTIME|WS] Errore invio raw:', err.message); + let buf; + try { buf = encode(obj); } + catch (err) { + console.error('[REALTIME|WS] Errore encode raw:', err.message); + return; } + if (!ws || ws.readyState !== WebSocket.OPEN) { + enqueue(buf); + return; + } + try { ws.send(buf); } + catch (err) { console.error('[REALTIME|WS] Errore invio raw:', err.message); } } -/** - * @returns {boolean} true se la connessione è attiva - */ function isConnected() { return ws !== null && ws.readyState === WebSocket.OPEN; } -/** - * Chiude la connessione WebSocket. - */ +function getSessionId() { return currentSessionId; } + function close() { onDisconnect = null; if (ws) { ws.close(); ws = null; + currentSessionId = null; } } -module.exports = { connect, send, sendRaw, isConnected, close }; +module.exports = { connect, send, sendRaw, isConnected, getSessionId, close }; diff --git a/plugin/cores/rulesets.js b/plugin/cores/rulesets.js new file mode 100644 index 0000000..0c1d201 --- /dev/null +++ b/plugin/cores/rulesets.js @@ -0,0 +1,247 @@ +/** + * Rulesets manager (plugin side) — formato v2. + * + * Gestisce 5 tipi: logs | forecast_current | forecast_hourly | marine_current | marine_hourly + * + * Formato item (v2): + * { path, meta: { name, unit, decimals? }, olds: [], enabled: true } + * + * Fonti: + * 1. cache locale (data/rulesets.json) — sopravvive ai restart + * 2. GET HTTP {API_URL}/rulesets//active — bootstrap o riconciliazione + * 3. push WS realtime ({_t:'ruleset_update'}) — runtime updates + * + * Dopo applyRemote(type, ruleset): + * - emette 'update' (type, new, prev) + * - emette 'update:' (new, prev) + * - invia ack al server (via realtime control message ruleset_ack) + * + * I consumer (logs.local, openmeteo) si sottoscrivono per applicare il cambio. + * Quando arriva un update di tipo 'logs', il consumer triggera un session_reset + * (cosi' Influx separa storico vecchio da nuovo schema). + */ + +const fs = require('fs'); +const fsp = require('fs').promises; +const path = require('path'); +const EventEmitter = require('events'); + +const { + LOG_PATHS, + FORECAST_CURRENT, + FORECAST_HOURLY, + MARINE_CURRENT, + MARINE_HOURLY +} = require('../rules'); + +const TYPES = ['logs', 'forecast_current', 'forecast_hourly', 'marine_current', 'marine_hourly']; +const cacheFile = path.join(__dirname, '../../data/rulesets.json'); +const emitter = new EventEmitter(); + +const API_URL = process.env.API_URL; + +// state: { type -> { id, version: {major,minor,patch,str}, content: [items], _default } } +let state = {}; + +// =============== DEFAULTS (fallback iniziale, no DB / no cache) =============== + +/** Mapping legacy openmeteo code → SK path */ +const LEGACY_SK_MAP = { + 'temperature_2m': 'meb.forecast.temperature', + 'wind_speed_10m': 'meb.forecast.wind.speed', + 'wind_direction_10m': 'meb.forecast.wind.direction', + 'wind_gusts_10m': 'meb.forecast.wind.gusts', + 'precipitation': 'meb.forecast.precipitation', + 'rain': 'meb.forecast.rain', + 'relative_humidity_2m': 'meb.forecast.humidity', + 'pressure_msl': 'meb.forecast.pressure', + 'precipitation_probability': 'meb.forecast.precipitationProbability', + 'cloud_cover': 'meb.forecast.cloudCover', + 'wave_height': 'meb.waves.height', + 'wave_direction': 'meb.waves.direction', + 'wave_period': 'meb.waves.period', + 'wave_peak_period': 'meb.waves.peakPeriod', + 'ocean_current_velocity': 'meb.waves.currentVelocity', + 'ocean_current_direction': 'meb.waves.currentDirection', +}; + +function defaultItem(p, meta = {}) { + return { path: p, meta, olds: [], enabled: true }; +} + +function buildDefaults() { + const mk = (items) => ({ + id: null, + version: { major: 1, minor: 0, patch: 0, str: '1.0.0' }, + description: 'default', + content: items, + _default: true, + }); + return { + logs: mk(LOG_PATHS.map(p => defaultItem(p, {}))), + forecast_current: mk(FORECAST_CURRENT.map(c => defaultItem(c, { sk_path: LEGACY_SK_MAP[c] }))), + forecast_hourly: mk(FORECAST_HOURLY.map(c => defaultItem(c, { sk_path: LEGACY_SK_MAP[c] }))), + marine_current: mk(MARINE_CURRENT.map(c => defaultItem(c, { sk_path: LEGACY_SK_MAP[c] }))), + marine_hourly: mk(MARINE_HOURLY.map(c => defaultItem(c, { sk_path: LEGACY_SK_MAP[c] }))), + }; +} + +// =============== CACHE I/O =============== + +async function ensureDir() { + try { await fsp.mkdir(path.dirname(cacheFile), { recursive: true }); } catch {} +} + +async function loadCache() { + try { + const raw = await fsp.readFile(cacheFile, 'utf-8'); + const parsed = JSON.parse(raw); + if (parsed && typeof parsed === 'object') return parsed; + } catch {} + return {}; +} + +async function saveCache() { + try { await ensureDir(); await fsp.writeFile(cacheFile, JSON.stringify(state, null, 2)); } + catch (err) { console.warn('[RULESETS] save cache failed:', err.message); } +} + +// =============== INIT + BOOTSTRAP =============== + +/** + * Init: carica cache → applica defaults sui type mancanti. Non emette eventi. + * Successivamente `bootstrapFromServer()` chiama gli endpoint per riconciliare. + */ +async function init() { + const cached = await loadCache(); + const defaults = buildDefaults(); + state = {}; + for (const t of TYPES) state[t] = cached[t] || defaults[t]; + console.log('[RULESETS] init:', + TYPES.map(t => `${t}@v${state[t]?.version?.str || '?'}${state[t]?._default ? '(d)' : ''}`).join(' ')); +} + +/** + * Tenta di scaricare la versione attiva di ogni tipo dal server. + * Se ottenuta e nuova, la applica (emette gli eventi). + */ +async function bootstrapFromServer() { + if (!API_URL) { + console.warn('[RULESETS] API_URL non configurato, salto bootstrap'); + return; + } + for (const type of TYPES) { + try { + const r = await fetch(`${API_URL}/rulesets/${type}/active`, { signal: AbortSignal.timeout(8000) }); + if (!r.ok) { if (r.status !== 404) console.warn(`[RULESETS] ${type} bootstrap ${r.status}`); continue; } + const remote = await r.json(); + // formato server: { id, type, version:{...,str}, content:[...] } + await applyRemote(type, remote, { source: 'bootstrap' }); + } catch (err) { + console.warn(`[RULESETS] bootstrap ${type} err:`, err.message); + } + } +} + +// =============== ACCESSORS =============== + +function get(type) { return state[type] || null; } + +function getEnabledItems(type) { + const rs = state[type]; + if (!rs) return []; + return (rs.content || []).filter(it => it.enabled !== false); +} + +function getEnabledPaths(type) { + return getEnabledItems(type).map(it => it.path).filter(Boolean); +} + +function getPathMap(type) { + const map = {}; + for (const it of getEnabledItems(type)) { + if (it.path && it.meta?.sk_path) map[it.path] = it.meta.sk_path; + } + return map; +} + +function getMetaForPath(type, p) { + return (state[type]?.content || []).find(it => it.path === p)?.meta || null; +} + +function versionStr(type) { return state[type]?.version?.str || null; } +function rulesetId(type) { return state[type]?.id || null; } + +// =============== APPLY =============== + +/** + * Applica un ruleset ricevuto. Salva cache, emette update events, + * e (se non e' bootstrap) manda ack al server. + * + * Idempotente per versione: ignora payload con version <= corrente. + */ +async function applyRemote(type, ruleset, { source = 'push' } = {}) { + if (!TYPES.includes(type)) { + console.warn(`[RULESETS] tipo sconosciuto: ${type}`); + return false; + } + if (!ruleset || !Array.isArray(ruleset.content)) { + console.warn(`[RULESETS] ruleset invalido per ${type}`); + return false; + } + const prev = state[type]; + // dedup per id+version: scarta replays + if (prev && !prev._default && ruleset.id && prev.id === ruleset.id + && prev.version?.str === ruleset.version?.str) { + return false; + } + state[type] = { + id: ruleset.id, + version: ruleset.version, + description: ruleset.description, + content: ruleset.content, + _default: false, + }; + await saveCache(); + const prevV = prev?.version?.str || '∅'; + const newV = ruleset.version?.str || '?'; + console.log(`[RULESETS] ${type} ${prevV} → ${newV} (${ruleset.content.length} items, src=${source})`); + + emitter.emit('update', type, state[type], prev); + emitter.emit(`update:${type}`, state[type], prev); + + // ack al server (skipped on bootstrap to avoid loop) + if (source !== 'bootstrap' && ruleset.id) { + try { + const realtime = require('./realtime/core'); + realtime.sendRaw({ + _t: 'ruleset_ack', + type, + ruleset_id: ruleset.id, + version: ruleset.version?.str || '?', + }); + } catch (err) { + console.warn('[RULESETS] ack send failed:', err.message); + } + } + return true; +} + +function onUpdate(listener) { emitter.on('update', listener); return () => emitter.off('update', listener); } +function onUpdateOf(type, listener) { emitter.on(`update:${type}`, listener); return () => emitter.off(`update:${type}`, listener); } + +module.exports = { + TYPES, + init, + bootstrapFromServer, + get, + getEnabledItems, + getEnabledPaths, + getPathMap, + getMetaForPath, + versionStr, + rulesetId, + applyRemote, + onUpdate, + onUpdateOf, +}; diff --git a/plugin/cores/weatherkit.js b/plugin/cores/weatherkit.js deleted file mode 100644 index e69de29..0000000 diff --git a/plugin/index.js b/plugin/index.js index f43dd1e..a328b92 100644 --- a/plugin/index.js +++ b/plugin/index.js @@ -6,6 +6,8 @@ const skFlow = require('./config/skFlow.js') const telegram = require('./telegram/core.js') const recorder = require('./cores/logs.local.js') const realtime = require('./cores/realtime/core.js') +const rulesets = require('./cores/rulesets.js') +const layoutStore = require('./tools/kiosk/server-layout-store.js') const { LOG_PATHS } = require('./rules') module.exports = function(app) { @@ -17,6 +19,10 @@ module.exports = function(app) { plugin.description = 'MEB custom plugin'; plugin.version = '1.5.0'; + let weatherIntervalId = null; + let forecastIntervalId = null; + let positionWaitIntervalId = null; + plugin.start = async function(options) { // Inizializza il gestore della configurazione con le opzioni del plugin @@ -43,12 +49,22 @@ module.exports = function(app) { // Inizializza il modulo per la pubblicazione dei dati Signal K skFlow.init(app); + // Carica i rulesets (cache su disco + defaults), poi avvia consumer + await rulesets.init(); + + // Layout kiosk (cache su disco) + await layoutStore.init(); + // Inizializza il bot Telegram telegram.init(); - // Avvia la connessione realtime al server + // Avvia la connessione realtime al server (ricevera' ruleset_update via WS) realtime.init(); + // In parallelo bootstrap HTTP delle versioni attive (puo' fallire silenzioso se server down) + rulesets.bootstrapFromServer().catch(err => console.warn('[INDEX] bootstrap rulesets:', err.message)); + layoutStore.bootstrapFromServer().catch(err => console.warn('[INDEX] bootstrap layout:', err.message)); + // Inizializza e avvia subito la registrazione log (1 riga/secondo) recorder.init(LOG_PATHS); try { @@ -83,8 +99,9 @@ module.exports = function(app) { // Intervalli: current ogni 5 min, hourly ogni 1 ora const startWeatherIntervals = () => { - setInterval(fetchWeather, 5 * 60 * 1000); - setInterval(fetchForecasts, 60 * 60 * 1000); + if (weatherIntervalId || forecastIntervalId) return; + weatherIntervalId = setInterval(fetchWeather, 5 * 60 * 1000); + forecastIntervalId = setInterval(fetchForecasts, 60 * 60 * 1000); }; // Aspetta la posizione GPS, poi avvia il fetch meteo @@ -93,10 +110,11 @@ module.exports = function(app) { await openmeteo.fetchAll(position); startWeatherIntervals(); } else { - const waitForPosition = setInterval(async () => { + positionWaitIntervalId = setInterval(async () => { const pos = skFlow.get('navigation.position'); if (pos) { - clearInterval(waitForPosition); + clearInterval(positionWaitIntervalId); + positionWaitIntervalId = null; await openmeteo.fetchAll(pos); startWeatherIntervals(); } @@ -106,6 +124,9 @@ module.exports = function(app) { } plugin.stop = function() { + if (weatherIntervalId) { clearInterval(weatherIntervalId); weatherIntervalId = null; } + if (forecastIntervalId) { clearInterval(forecastIntervalId); forecastIntervalId = null; } + if (positionWaitIntervalId) { clearInterval(positionWaitIntervalId); positionWaitIntervalId = null; } recorder.stopRecording(); realtime.stop(); app.debug('Plugin stopped'); diff --git a/plugin/routes/collection/dashboard.js b/plugin/routes/collection/dashboard.js index 5d8fcd7..694ed25 100644 --- a/plugin/routes/collection/dashboard.js +++ b/plugin/routes/collection/dashboard.js @@ -10,7 +10,4 @@ router.get('/', (req, res) => { res.sendFile(path.join(kioskPath, 'dashboard.html')); }); - -router.get('/api/', (req, res) => {}); - module.exports = router; \ No newline at end of file diff --git a/plugin/routes/collection/data.js b/plugin/routes/collection/data.js index 503bfd9..757a523 100644 --- a/plugin/routes/collection/data.js +++ b/plugin/routes/collection/data.js @@ -4,21 +4,23 @@ const db = require('../../config/skFlow') const config = require('../../config/configManager.js') router.get('/', (req, res) => { - const { path } = req.query; - const data = db.get(path); - res.json(data); + const { path, source } = req.query; + if (source) { + return res.json(db.getBySource(source)); + } + res.json(db.get(path)); }); -router.get('/', (req, res) => { - const { source } = req.query; - const data = db.getBySource(source); - res.json(data); -}); +const maskToken = (t) => { + if (!t) return null; + const s = String(t); + return s.length > 8 ? `${s.slice(0, 5)}…${s.slice(-3)}` : '***'; +}; router.get('/info', (req, res) => { const info = { - - telegram: config.getTelegramToken(), + telegram_configured: Boolean(config.getTelegramToken()), + telegram_token_preview: maskToken(config.getTelegramToken()), sensor: { name: config.getSensorName(), diff --git a/plugin/routes/collection/kiosk.js b/plugin/routes/collection/kiosk.js index d7c95a3..a6b1e37 100644 --- a/plugin/routes/collection/kiosk.js +++ b/plugin/routes/collection/kiosk.js @@ -3,16 +3,48 @@ const express = require('express'); const path = require('path'); const fs = require('fs'); const configManager = require('../../config/configManager.js'); +const layoutStore = require('../../tools/kiosk/server-layout-store.js'); const kioskPath = path.join(__dirname, '../../tools/kiosk'); const htmlFile = path.join(kioskPath, 'kiosk.html'); +// API: layout corrente +router.get('/layout', (req, res) => { + const l = layoutStore.get(); + if (!l) return res.status(404).json({ error: 'no layout' }); + res.json(l); +}); + +// SSE stream per gli update del layout (live, niente polling) +router.get('/stream', (req, res) => { + res.set({ + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + 'X-Accel-Buffering': 'no', + }); + res.flushHeaders?.(); + + // invia subito il layout corrente + const cur = layoutStore.get(); + if (cur) res.write(`event: layout\ndata: ${JSON.stringify(cur)}\n\n`); + + const off = layoutStore.onUpdate((layout) => { + try { res.write(`event: layout\ndata: ${JSON.stringify(layout)}\n\n`); } catch {} + }); + + const ping = setInterval(() => { try { res.write(': ping\n\n'); } catch {} }, 25000); + + req.on('close', () => { clearInterval(ping); off(); }); +}); + router.use('/', express.static(kioskPath)); router.get('/', (req, res) => { const apiUrl = process.env.API_URL || 'https://api.mebboat.it'; const realtimeUrl = process.env.REALTIME_URL || 'https://realtime.mebboat.it'; const realtimeWsUrl = process.env.REALTIME_SOCKET_URL || 'wss://realtime.mebboat.it'; + const mapboxKey = process.env.MAPBOX_API_KEY || ''; const sensorCode = configManager.getSensorCode(); const sensorName = configManager.getSensorName(); @@ -21,6 +53,7 @@ router.get('/', (req, res) => { + `; diff --git a/plugin/routes/collection/map.js b/plugin/routes/collection/map.js deleted file mode 100644 index 513f78d..0000000 --- a/plugin/routes/collection/map.js +++ /dev/null @@ -1,9 +0,0 @@ -const router = require('express').Router(); -const path = require('path') -//Endpoints per controllare lo stato di un servizio di mappe da implementare poi.. - -router.get('/', (req, res) => { - res.sendFile(path.join(__dirname, '../tools/map/map.html')); -}); - -module.exports = router; \ No newline at end of file diff --git a/plugin/routes/main.js b/plugin/routes/main.js index cb6ed19..a0e317b 100644 --- a/plugin/routes/main.js +++ b/plugin/routes/main.js @@ -1,14 +1,12 @@ // Il file generale che raggruppa le api const router = require('express').Router(); const cloudRoutes = require('./collection/cloud') -const mapRoutes = require('./collection/map') const dataRoutes = require('./collection/data') const recRoutes = require('./collection/rec') const dashboard = require('./collection/dashboard') const kiosk = require('./collection/kiosk') router.use('/cloud', cloudRoutes) -router.use('/map', mapRoutes) router.use('/data', dataRoutes) router.use('/rec', recRoutes) diff --git a/plugin/rules.js b/plugin/rules.js index 3a24da0..12bac47 100644 --- a/plugin/rules.js +++ b/plugin/rules.js @@ -40,7 +40,7 @@ const MARINE_HOURLY = [ ]; const LOG_PATHS = [ - 'meb.forecasts.temperature', + 'meb.forecast.temperature', 'meb.forecast.wind.direction', 'meb.forecast.wind.speed', 'meb.waves.direction', diff --git a/plugin/telegram/commands/data.js b/plugin/telegram/commands/data.js index be9e80c..40971e9 100644 --- a/plugin/telegram/commands/data.js +++ b/plugin/telegram/commands/data.js @@ -39,7 +39,7 @@ module.exports = { } // Mare - const marine = skFlow.getWithFilter('meb.marine'); + const marine = skFlow.getWithFilter('meb.waves'); text += '\n*Dati Meteo del mare*\n\n'; if (marine && Object.keys(marine).length > 0) { for (const [path, value] of Object.entries(marine)) { @@ -50,7 +50,7 @@ module.exports = { text += 'Nessun dato disponibile.\n'; } - bot.sendMessage(chatId, text, { + return bot.sendMessage(chatId, text, { parse_mode: 'Markdown', reply_to_message_id: msg.message_id, reply_markup: liveMarkup(msg.message_id, 'data') diff --git a/plugin/telegram/commands/logs.js b/plugin/telegram/commands/logs.js index f0c4a9e..37360d0 100644 --- a/plugin/telegram/commands/logs.js +++ b/plugin/telegram/commands/logs.js @@ -8,7 +8,7 @@ module.exports = { const logs = await recorder.listLogs(); if (!logs || logs.length === 0) { - bot.sendMessage(chatId, 'Nessun file di log disponibile.', { + await bot.sendMessage(chatId, 'Nessun file di log disponibile.', { reply_to_message_id: msg.message_id, reply_markup: closeButton(msg.message_id) }); @@ -44,7 +44,7 @@ module.exports = { // Aggiungi il bottone chiudi keyboard.push([{ text: '<- Chiudi', callback_data: `close:${msg.message_id}` }]); - bot.sendMessage(chatId, text, { + await bot.sendMessage(chatId, text, { parse_mode: 'Markdown', reply_to_message_id: msg.message_id, reply_markup: { inline_keyboard: keyboard } diff --git a/plugin/tools/kiosk/core.js b/plugin/tools/kiosk/core.js index 9250dc7..6c7c80f 100644 --- a/plugin/tools/kiosk/core.js +++ b/plugin/tools/kiosk/core.js @@ -4,7 +4,7 @@ const paths = [ ] window.skPaths = paths; -mapboxgl.accessToken = 'pk.eyJ1Ijoic2VzZWUzIiwiYSI6ImNtZ2dydndkMDBsNjUya3NjeW91dW41MzcifQ.M2qxj0wL1W7plRzIataojQ'; +mapboxgl.accessToken = document.querySelector('meta[name="mapbox-key"]')?.content || ''; let map = null; let boatMark = null; diff --git a/plugin/tools/kiosk/data-binder.js b/plugin/tools/kiosk/data-binder.js new file mode 100644 index 0000000..9c78005 --- /dev/null +++ b/plugin/tools/kiosk/data-binder.js @@ -0,0 +1,72 @@ +/** + * data-binder.js — apre una singola sottoscrizione SignalK delta locale + * e smista i valori ai tile per path. + * + * Espone window.dataBinder con: + * subscribe(path, fn) → unsubscribe() + * getLatest(path) → ultimo valore visto (o null) + * onConnState(fn) → notifica connesso/disconnesso + */ +(function () { + const subs = new Map(); // path -> Set + const latest = new Map(); // path -> { value, ts } + const stateListeners = new Set(); + let ws = null; + let reconnectTimer = null; + + function notify(path, value) { + latest.set(path, { value, ts: Date.now() }); + const ss = subs.get(path); + if (!ss) return; + for (const fn of ss) { try { fn(value); } catch (e) { console.warn(e); } } + } + + function notifyState(s) { for (const fn of stateListeners) { try { fn(s); } catch (_) {} } } + + function connect() { + const url = `ws://${location.host}/signalk/v1/stream?subscribe=all`; + try { ws = new WebSocket(url); } + catch (err) { console.error('[BINDER] WS create:', err); scheduleReconnect(); return; } + + ws.onopen = () => { notifyState({ connected: true }); console.log('[BINDER] WS connected'); }; + ws.onerror = (e) => console.warn('[BINDER] WS error', e); + ws.onclose = () => { notifyState({ connected: false }); scheduleReconnect(); }; + ws.onmessage = (ev) => { + let msg; + try { msg = JSON.parse(ev.data); } catch { return; } + if (!msg.updates) return; + for (const u of msg.updates) { + if (!u.values) continue; + for (const v of u.values) { + if (!v.path) continue; + notify(v.path, v.value); + // espandi position in lat/lon per chi si iscrive a quelli + if (v.path === 'navigation.position' && v.value && typeof v.value === 'object') { + if (v.value.latitude != null) notify('navigation.position.latitude', v.value.latitude); + if (v.value.longitude != null) notify('navigation.position.longitude', v.value.longitude); + } + } + } + }; + } + + function scheduleReconnect() { + if (reconnectTimer) return; + reconnectTimer = setTimeout(() => { reconnectTimer = null; connect(); }, 3000); + } + + function subscribe(path, fn) { + if (!subs.has(path)) subs.set(path, new Set()); + subs.get(path).add(fn); + const last = latest.get(path); + if (last) { try { fn(last.value); } catch (_) {} } + return () => subs.get(path)?.delete(fn); + } + + function getLatest(path) { return latest.get(path)?.value ?? null; } + function onConnState(fn) { stateListeners.add(fn); return () => stateListeners.delete(fn); } + + connect(); + + window.dataBinder = { subscribe, getLatest, onConnState }; +})(); diff --git a/plugin/tools/kiosk/kiosk.html b/plugin/tools/kiosk/kiosk.html index 856459b..573becb 100644 --- a/plugin/tools/kiosk/kiosk.html +++ b/plugin/tools/kiosk/kiosk.html @@ -3,22 +3,41 @@ -Kiosk +MEB Kiosk + -
+
boot…
- - + + + + + + diff --git a/plugin/tools/kiosk/layout-client.js b/plugin/tools/kiosk/layout-client.js new file mode 100644 index 0000000..da5fb4e --- /dev/null +++ b/plugin/tools/kiosk/layout-client.js @@ -0,0 +1,98 @@ +/** + * layout-client.js — apre l'EventSource /meb/kiosk/stream del plugin, riceve + * il layout corrente (e i successivi update), e ridisegna la griglia. + * + * Il plugin NON edita: si limita a mostrare. La logica di binding ai dati + * SignalK (local stream) e' in data-binder.js; il render dei tile e' in + * tile-renderer.js. + */ +(function () { + const grid = document.getElementById('grid'); + const chip = document.getElementById('statusChip'); + + let activeUnbinds = []; // funzioni di unbind per i tile correnti + let currentLayout = null; + + function setStatus(text, isErr = false) { + chip.textContent = text; + chip.classList.toggle('err', isErr); + } + + function clearGrid() { + for (const off of activeUnbinds) { try { off(); } catch (_) {} } + activeUnbinds = []; + grid.innerHTML = ''; + } + + function applyLayout(layoutRow) { + const content = layoutRow?.content; + if (!content || !Array.isArray(content.tiles)) { + console.warn('[KIOSK] layout vuoto/non valido'); + return; + } + clearGrid(); + currentLayout = layoutRow; + + // grid CSS variables + grid.style.setProperty('--cols', String(content.grid?.cols ?? 12)); + grid.style.setProperty('--rows', String(content.grid?.rows ?? 8)); + grid.style.setProperty('--gap', String(content.grid?.gap ?? 4) + 'px'); + + // theme (opzionale) + if (content.theme === 'light') { + document.body.style.background = '#f4f5f7'; + document.body.style.color = '#111'; + } else { + document.body.style.background = '#0b1220'; + document.body.style.color = '#fff'; + } + + for (const tile of content.tiles) { + const el = document.createElement('div'); + el.className = `tile tile-${tile.type}`; + el.style.gridColumn = `${tile.x + 1} / span ${tile.w}`; + el.style.gridRow = `${tile.y + 1} / span ${tile.h}`; + grid.appendChild(el); + + const r = window.tileRenderer.get(tile.type); + if (!r) { el.innerHTML = `
tipo sconosciuto: ${tile.type}
`; continue; } + try { + r.mount(el, tile); + const off = r.bind(el, tile, window.dataBinder); + if (off) activeUnbinds.push(off); + } catch (err) { + console.error('[KIOSK] render', tile.id, err); + el.innerHTML = `
errore render ${tile.id}
`; + } + } + + setStatus(`v${layoutRow.version || '?'} · ${content.tiles.length} tiles`); + } + + // ====== Stream del plugin (SSE) ====== + function openStream() { + const es = new EventSource('/meb/kiosk/stream'); + es.addEventListener('layout', (ev) => { + try { applyLayout(JSON.parse(ev.data)); } + catch (err) { console.warn('[KIOSK] layout parse:', err); } + }); + es.onerror = () => setStatus('stream disconnesso', true); + es.onopen = () => setStatus('stream ok'); + } + + // status connessione SignalK + if (window.dataBinder) { + window.dataBinder.onConnState(({ connected }) => { + if (!connected) setStatus('signalk disconnesso', true); + else if (currentLayout) setStatus(`v${currentLayout.version || '?'} · live`); + }); + } + + // bootstrap: prima fetch one-shot del layout (per non aspettare il primo evento SSE), + // poi apri lo stream per gli update successivi + fetch('/meb/kiosk/layout') + .then(r => r.ok ? r.json() : null) + .then(l => { if (l) applyLayout(l); }) + .catch(() => {}) + .finally(openStream); +})(); diff --git a/plugin/tools/kiosk/server-layout-store.js b/plugin/tools/kiosk/server-layout-store.js new file mode 100644 index 0000000..7e9fbb5 --- /dev/null +++ b/plugin/tools/kiosk/server-layout-store.js @@ -0,0 +1,80 @@ +/** + * Store del kiosk layout lato plugin. + * + * - cache su disco: data/kiosk-layout.json + * - bootstrap HTTP: GET {API_URL}/kiosklayouts/sensor//active + * - apply push WS: invocato da socket.js quando arriva _t:'kiosk_layout_update' + * - SSE stream: espone un emitter consumato dalla route /meb/kiosk/stream + * + * Il plugin NON sceglie il layout: lo riceve dal server e lo mostra. + */ +const fs = require('fs').promises; +const path = require('path'); +const EventEmitter = require('events'); +const configManager = require('../../config/configManager'); + +const cacheFile = path.join(__dirname, '../../../data/kiosk-layout.json'); +const emitter = new EventEmitter(); + +let current = null; + +async function ensureDir() { + try { await fs.mkdir(path.dirname(cacheFile), { recursive: true }); } catch {} +} + +async function loadCache() { + try { + const raw = await fs.readFile(cacheFile, 'utf-8'); + return JSON.parse(raw); + } catch { return null; } +} + +async function saveCache() { + if (!current) return; + try { await ensureDir(); await fs.writeFile(cacheFile, JSON.stringify(current, null, 2)); } + catch (err) { console.warn('[KIOSK|STORE] save cache failed:', err.message); } +} + +async function init() { + current = await loadCache(); + if (current) { + console.log(`[KIOSK|STORE] cache caricata v=${current.version || '?'} (${current.content?.tiles?.length || 0} tiles)`); + } else { + console.log('[KIOSK|STORE] nessun layout in cache'); + } +} + +async function bootstrapFromServer() { + const API_URL = process.env.API_URL; + if (!API_URL) { console.warn('[KIOSK|STORE] API_URL non configurato'); return; } + const sensorId = configManager.getSensorName() || process.env.SENSOR_ID; + if (!sensorId) { console.warn('[KIOSK|STORE] SENSOR_ID non configurato'); return; } + try { + const r = await fetch(`${API_URL}/kiosklayouts/sensor/${encodeURIComponent(sensorId)}/active`, { + signal: AbortSignal.timeout(8000) + }); + if (!r.ok) { if (r.status !== 404) console.warn('[KIOSK|STORE] bootstrap', r.status); return; } + const layout = await r.json(); + await applyRemote(layout); + } catch (err) { + console.warn('[KIOSK|STORE] bootstrap err:', err.message); + } +} + +/** + * Applica un layout (dedup per id+version), salva cache, emette 'update'. + */ +async function applyRemote(layout) { + if (!layout || !layout.content) return false; + if (current && current.id === layout.id && current.version === layout.version) return false; + current = layout; + await saveCache(); + console.log(`[KIOSK|STORE] applicato v=${layout.version || '?'} (${layout.content.tiles?.length || 0} tiles)`); + emitter.emit('update', current); + return true; +} + +function get() { return current; } +function onUpdate(fn) { emitter.on('update', fn); return () => emitter.off('update', fn); } + +module.exports = { init, bootstrapFromServer, applyRemote, get, onUpdate }; diff --git a/plugin/tools/kiosk/tile-renderer.js b/plugin/tools/kiosk/tile-renderer.js new file mode 100644 index 0000000..0aa0771 --- /dev/null +++ b/plugin/tools/kiosk/tile-renderer.js @@ -0,0 +1,188 @@ +/** + * tile-renderer.js — registra renderer per ogni tipo di tile. + * + * Ogni renderer espone: + * mount(el, tile) → setup DOM iniziale + * bind(el, tile, binder) → sottoscrizione ai path → ritorna unbind() + * + * Tipi supportati: value | gauge | map + */ +(function () { + const renderers = {}; + + function register(type, r) { renderers[type] = r; } + function get(type) { return renderers[type]; } + + function format(value, { decimals = 1 } = {}) { + if (value == null || Number.isNaN(value)) return '—'; + if (typeof value === 'number') { + const factor = Math.pow(10, decimals); + return (Math.round(value * factor) / factor).toString(); + } + return String(value); + } + + // ============================== VALUE ================================== + register('value', { + mount(el, tile) { + const d = tile.display || {}; + el.innerHTML = ` +
${escapeHtml(d.title || tile.source?.path || '')}
+
+ + ${d.unit ? `${escapeHtml(d.unit)}` : ''} +
`; + const val = el.querySelector('[data-val]'); + if (d.font) val.style.fontFamily = d.font; + if (d.fontSize) val.style.fontSize = d.fontSize; + if (d.color) val.style.color = d.color; + if (d.bg) el.style.background = d.bg; + }, + bind(el, tile, binder) { + const num = el.querySelector('[data-num]'); + const off = binder.subscribe(tile.source.path, (v) => { + num.textContent = format(v, { decimals: tile.display?.decimals ?? 1 }); + el.classList.remove('stale'); + }); + const staleTimer = setInterval(() => { + const l = binder.getLatest(tile.source.path); + if (!l && !el.classList.contains('stale')) el.classList.add('stale'); + }, 5000); + return () => { off(); clearInterval(staleTimer); }; + } + }); + + // ============================== GAUGE ================================== + // SVG gauge semplice (arco 270°, lancetta). Range = tile.range = [min, max]. + register('gauge', { + mount(el, tile) { + const d = tile.display || {}; + el.innerHTML = ` +
${escapeHtml(d.title || tile.source?.path || '')}
+ + + + + + + ${escapeHtml(d.unit || '')} + `; + if (d.bg) el.style.background = d.bg; + }, + bind(el, tile, binder) { + const [min, max] = tile.range || [0, 100]; + const arc = el.querySelector('[data-arc]'); + const needle = el.querySelector('[data-needle]'); + const num = el.querySelector('[data-num]'); + // pre-compute arc length (perimetro semicerchio R=80) + const totalLen = 376.99; // ~ 80 * Math.PI * 1.5 (270deg) + arc.setAttribute('stroke-dasharray', `${totalLen} ${totalLen}`); + const off = binder.subscribe(tile.source.path, (v) => { + const num_v = (v == null || Number.isNaN(+v)) ? null : Number(v); + num.textContent = format(num_v, { decimals: tile.display?.decimals ?? 1 }); + if (num_v == null) return; + const ratio = Math.max(0, Math.min(1, (num_v - min) / (max - min))); + arc.setAttribute('stroke-dashoffset', String(totalLen * (1 - ratio))); + // needle: da -135° a +135° + const deg = -135 + 270 * ratio; + needle.setAttribute('transform', `rotate(${deg} 100 120)`); + el.classList.remove('stale'); + }); + return off; + } + }); + + // ============================== MAP ==================================== + let mapboxLoadingPromise = null; + function loadMapbox() { + if (window.mapboxgl) return Promise.resolve(); + if (mapboxLoadingPromise) return mapboxLoadingPromise; + mapboxLoadingPromise = new Promise((resolve, reject) => { + const s = document.createElement('script'); + s.src = 'https://api.mapbox.com/mapbox-gl-js/v3.6.0/mapbox-gl.js'; + s.onload = resolve; s.onerror = reject; + document.head.appendChild(s); + }); + return mapboxLoadingPromise; + } + + register('map', { + mount(el, tile) { + const d = tile.display || {}; + el.innerHTML = ` +
${escapeHtml(d.title || 'Mappa')}
+
+
`; + if (d.bg) el.style.background = d.bg; + }, + bind(el, tile, binder) { + const mapHost = el.querySelector('[data-map]'); + const overlays = el.querySelector('[data-overlays]'); + const extras = Array.isArray(tile.extras) ? tile.extras : []; + + // pre-popola overlay + const extraEls = extras.map((ex, i) => { + const span = document.createElement('span'); + span.dataset.idx = i; + span.innerHTML = `${escapeHtml(ex.label || ex.path)}: ${escapeHtml(ex.unit || '')}`; + overlays.appendChild(span); + return span.querySelector('[data-ex]'); + }); + + let map = null, marker = null; + const mapboxKey = document.querySelector('meta[name="mapbox-key"]')?.content || ''; + const offs = []; + + loadMapbox().then(() => { + if (!window.mapboxgl) return; + window.mapboxgl.accessToken = mapboxKey; + map = new window.mapboxgl.Map({ + container: mapHost, + style: { + version: 8, + sources: { + osm: { type:'raster', tiles:['https://a.tile.openstreetmap.org/{z}/{x}/{y}.png'], tileSize:256 }, + openseamap: { type:'raster', tiles:['https://tiles.openseamap.org/seamark/{z}/{x}/{y}.png'], tileSize:256 }, + }, + layers: [ + { id:'osm-layer', type:'raster', source:'osm', minzoom:0, maxzoom:22 }, + { id:'sea-layer', type:'raster', source:'openseamap', minzoom:0, maxzoom:18 }, + ], + }, + center: [0, 0], zoom: 2, + }); + marker = new window.mapboxgl.Marker({ color: tile.display?.color || '#ef4444' }) + .setLngLat([0, 0]).addTo(map); + }).catch(err => console.warn('[KIOSK|MAP] mapbox load failed:', err)); + + let curLat = null, curLon = null; + const update = () => { + if (curLat == null || curLon == null || !map || !marker) return; + marker.setLngLat([curLon, curLat]); + map.flyTo({ center: [curLon, curLat], zoom: Math.max(13, map.getZoom()) }); + }; + + offs.push(binder.subscribe(tile.source.latPath, v => { curLat = Number(v); update(); })); + offs.push(binder.subscribe(tile.source.lonPath, v => { curLon = Number(v); update(); })); + + // extras + extras.forEach((ex, i) => { + offs.push(binder.subscribe(ex.path, v => { + extraEls[i].textContent = format(v, { decimals: ex.decimals ?? 1 }); + })); + }); + + return () => { + offs.forEach(o => o && o()); + if (map) { try { map.remove(); } catch (_) {} map = null; } + }; + } + }); + + function escapeHtml(s) { + return String(s ?? '').replace(/[&<>"']/g, c => + ({'&':'&','<':'<','>':'>','"':'"',"'":'''}[c])); + } + + window.tileRenderer = { register, get }; +})();