diff --git a/api/sql/rules_schema.sql b/api/sql/rules_schema.sql new file mode 100644 index 0000000..78b9a49 --- /dev/null +++ b/api/sql/rules_schema.sql @@ -0,0 +1,171 @@ +-- ============================================================ +-- Schema completo per il database "rules" +-- ============================================================ + +-- ============ DATA / BROWSER ============ + +CREATE TABLE IF NOT EXISTS dataread ( + id CHAR(8) PRIMARY KEY, + version VARCHAR(20) NOT NULL, + tags TEXT[] DEFAULT '{}', + active BOOLEAN NOT NULL DEFAULT false, + archived BOOLEAN NOT NULL DEFAULT false, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS datareaditems ( + id BIGSERIAL PRIMARY KEY, + rule_id CHAR(8) NOT NULL REFERENCES dataread(id) ON DELETE CASCADE, + category VARCHAR(50) NOT NULL, + path VARCHAR(200) NOT NULL, + unit VARCHAR(20), + enabled BOOLEAN NOT NULL DEFAULT true, + UNIQUE(rule_id, path) +); + +CREATE INDEX IF NOT EXISTS idx_datareaditems_rule ON datareaditems(rule_id); + +-- ============ WEATHER (current, ogni 5 min) ============ + +CREATE TABLE IF NOT EXISTS weather ( + id CHAR(8) PRIMARY KEY, + version VARCHAR(20) NOT NULL, + description TEXT, + tags TEXT[] DEFAULT '{}', + active BOOLEAN NOT NULL DEFAULT false, + archived BOOLEAN NOT NULL DEFAULT false, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS weatheritems ( + id BIGSERIAL PRIMARY KEY, + rule_id CHAR(8) NOT NULL REFERENCES weather(id) ON DELETE CASCADE, + group_name VARCHAR(50) NOT NULL, + ref VARCHAR(50) NOT NULL, + name VARCHAR(50) NOT NULL, + unit VARCHAR(20) NOT NULL, + enabled BOOLEAN NOT NULL DEFAULT true, + UNIQUE(rule_id, ref) +); + +CREATE INDEX IF NOT EXISTS idx_weatheritems_rule ON weatheritems(rule_id); + +-- ============ LATERFORECASTS (hourly 7gg, ogni 1 ora) ============ + +CREATE TABLE IF NOT EXISTS laterforecasts ( + id CHAR(8) PRIMARY KEY, + version VARCHAR(20) NOT NULL, + description TEXT, + tags TEXT[] DEFAULT '{}', + active BOOLEAN NOT NULL DEFAULT false, + archived BOOLEAN NOT NULL DEFAULT false, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS laterforecastitems ( + id BIGSERIAL PRIMARY KEY, + rule_id CHAR(8) NOT NULL REFERENCES laterforecasts(id) ON DELETE CASCADE, + group_name VARCHAR(50) NOT NULL, + ref VARCHAR(50) NOT NULL, + name VARCHAR(50) NOT NULL, + unit VARCHAR(20) NOT NULL, + enabled BOOLEAN NOT NULL DEFAULT true, + UNIQUE(rule_id, ref) +); + +CREATE INDEX IF NOT EXISTS idx_laterforecastitems_rule ON laterforecastitems(rule_id); + +-- ============ LOGS ============ + +CREATE TABLE IF NOT EXISTS logs ( + id CHAR(8) PRIMARY KEY, + version VARCHAR(20) NOT NULL, + description TEXT, + tags TEXT[] DEFAULT '{}', + active BOOLEAN NOT NULL DEFAULT false, + archived BOOLEAN NOT NULL DEFAULT false, + browser_rule_id CHAR(8) REFERENCES dataread(id), + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS logitems ( + id BIGSERIAL PRIMARY KEY, + rule_id CHAR(8) NOT NULL REFERENCES logs(id) ON DELETE CASCADE, + path VARCHAR(200) NOT NULL, + ref VARCHAR(4) NOT NULL, + unit VARCHAR(20) NOT NULL, + measurement VARCHAR(50) NOT NULL, + enabled BOOLEAN NOT NULL DEFAULT true, + UNIQUE(rule_id, ref), + UNIQUE(rule_id, path), + CHECK (LENGTH(ref) <= 4) +); + +CREATE INDEX IF NOT EXISTS idx_logitems_rule ON logitems(rule_id); + +-- ============ KIOSK ============ + +CREATE TABLE IF NOT EXISTS kiosktemplates ( + id CHAR(8) PRIMARY KEY NOT NULL, + name VARCHAR(50) NOT NULL, + tags TEXT[] DEFAULT '{}', + active BOOLEAN NOT NULL DEFAULT false, + archived BOOLEAN NOT NULL DEFAULT false, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS kioskelements ( + id BIGSERIAL PRIMARY KEY, + template_id CHAR(8) NOT NULL REFERENCES kiosktemplates(id) ON DELETE CASCADE, + font INT NOT NULL, + label VARCHAR(100) NOT NULL, + x INT NOT NULL, + y INT NOT NULL, + width INT NOT NULL, + height INT NOT NULL, + color VARCHAR(20) NOT NULL, + UNIQUE(template_id) +); + +CREATE INDEX IF NOT EXISTS idx_kioskelements_template ON kioskelements(template_id); + +-- ============ VINCOLI: una sola rule attiva per tipo ============ + +CREATE UNIQUE INDEX IF NOT EXISTS idx_one_active_dataread + ON dataread(active) WHERE active = true AND archived = false; + +CREATE UNIQUE INDEX IF NOT EXISTS idx_one_active_weather + ON weather(active) WHERE active = true AND archived = false; + +CREATE UNIQUE INDEX IF NOT EXISTS idx_one_active_laterforecasts + ON laterforecasts(active) WHERE active = true AND archived = false; + +CREATE UNIQUE INDEX IF NOT EXISTS idx_one_active_logs + ON logs(active) WHERE active = true AND archived = false; + +-- ============ FIX per schema esistente ============ + +DO $$ +BEGIN + IF EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'datareaditems' AND column_name = 'enables' + ) THEN + ALTER TABLE datareaditems RENAME COLUMN enables TO enabled; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'weather' AND column_name = 'description' + ) THEN + ALTER TABLE weather ADD COLUMN description TEXT; + END IF; +END $$; diff --git a/api/src/routes/params.sensor.js b/api/src/routes/params.sensor.js index d48ce79..8348043 100644 --- a/api/src/routes/params.sensor.js +++ b/api/src/routes/params.sensor.js @@ -2,23 +2,18 @@ const router = require('express').Router(); const crypto = require('crypto'); const { query } = require('../storage/postgres'); -const sets = ['forecasts', 'sensors']; +const sets = ['forecasts', 'sensors', 'marine']; function hashSensorCode(code) { return crypto.createHash('sha256').update(code).digest('hex'); } /** - * GET /params/sensor/:sensorCode/active?set=sensors - * Autenticazione tramite SENSOR_CODE (stesso meccanismo di realtime) + * Middleware: valida sensor code e verifica che il sensore sia attivo. + * Salva sensor.id in req.sensorId. */ -router.get('/:sensorCode/active', async (req, res) => { +async function authenticateSensor(req, res, next) { const { sensorCode } = req.params; - const { set } = req.query; - - if (!set || !sets.includes(set)) - return res.status(400).json({ error: 'SET parameter invalid' }); - try { const hashed = hashSensorCode(sensorCode); const sensor = await query( @@ -30,11 +25,29 @@ router.get('/:sensorCode/active', async (req, res) => { if (!sensor.rows[0]) { return res.status(401).json({ error: 'Sensor code not valid' }); } - if (!sensor.rows[0].active) { return res.status(403).json({ error: 'Sensor is not active' }); } + req.sensorId = sensor.rows[0].id; + next(); + } catch (err) { + console.error('[PARAMS/SENSOR] Auth error:', err.message); + res.status(500).json({ error: 'Internal server error' }); + } +} + +/** + * GET /params/sensor/:sensorCode/active?set=sensors + * Ritorna il set di parametri attivo (forecasts, sensors, marine) + */ +router.get('/:sensorCode/active', authenticateSensor, async (req, res) => { + const { set } = req.query; + + if (!set || !sets.includes(set)) + return res.status(400).json({ error: 'SET parameter invalid' }); + + try { const result = await query( `SELECT * FROM ${set} WHERE active = true LIMIT 1`, [], @@ -48,4 +61,55 @@ router.get('/:sensorCode/active', async (req, res) => { } }); +// --- Mapping tipo rules → tabelle --- +const RULES_TYPE_MAP = { + weather: { rules: 'weather', items: 'weatheritems' }, + laterforecasts: { rules: 'laterforecasts', items: 'laterforecastitems' }, + data: { rules: 'dataread', items: 'datareaditems' }, + logs: { rules: 'logs', items: 'logitems' } +}; + +/** + * GET /params/sensor/:sensorCode/rules?type=weather + * Ritorna la rule attiva con items per il tipo specificato. + * Autenticazione tramite sensor code (non richiede JWT/API key). + */ +router.get('/:sensorCode/rules', authenticateSensor, async (req, res) => { + const { type } = req.query; + + if (!type || !RULES_TYPE_MAP[type]) { + return res.status(400).json({ error: `invalid type, must be one of: ${Object.keys(RULES_TYPE_MAP).join(', ')}` }); + } + + const tables = RULES_TYPE_MAP[type]; + + try { + const { rows: ruleRows } = await query( + `SELECT * FROM ${tables.rules} WHERE active = true AND archived = false LIMIT 1`, + [], 'rules' + ); + + if (ruleRows.length === 0) { + return res.status(404).json({ error: `no active ${type} rule found` }); + } + + const rule = ruleRows[0]; + const { rows: items } = await query( + `SELECT * FROM ${tables.items} WHERE rule_id = $1 AND enabled = true`, + [rule.id], 'rules' + ); + + res.json({ + id: rule.id, + version: rule.version, + description: rule.description, + tags: rule.tags, + items + }); + } catch (err) { + console.error('[PARAMS/SENSOR] Rules error:', err.message); + res.status(500).json({ error: 'Internal server error' }); + } +}); + module.exports = router; diff --git a/api/src/routes/rules.js b/api/src/routes/rules.js index 5b17cf6..e356de4 100644 --- a/api/src/routes/rules.js +++ b/api/src/routes/rules.js @@ -4,9 +4,10 @@ const { encode } = require('@msgpack/msgpack'); // Mapping tipo → tabelle const TYPE_MAP = { - weather: { rules: 'weather', items: 'weatheritems' }, - data: { rules: 'dataread', items: 'datareaditems' }, - logs: { rules: 'logs', items: 'logitems' } + weather: { rules: 'weather', items: 'weatheritems' }, + laterforecasts: { rules: 'laterforecasts', items: 'laterforecastitems' }, + data: { rules: 'dataread', items: 'datareaditems' }, + logs: { rules: 'logs', items: 'logitems' } }; const VALID_TYPES = Object.keys(TYPE_MAP); @@ -184,7 +185,7 @@ router.patch('/update', async (req, res) => { }); // --- ID Generation --- -const TYPE_PREFIX = { weather: 'w', data: 'd', logs: 'l' }; +const TYPE_PREFIX = { weather: 'w', laterforecasts: 'f', data: 'd', logs: 'l' }; function generateId(type) { const prefix = TYPE_PREFIX[type] || 'x'; @@ -196,6 +197,7 @@ function generateId(type) { // --- ITEM FIELD DEFINITIONS per tipo --- const ITEM_FIELDS = { weather: ['group_name', 'ref', 'name', 'unit', 'enabled'], + laterforecasts: ['group_name', 'ref', 'name', 'unit', 'enabled'], data: ['category', 'path', 'unit', 'enabled'], logs: ['path', 'ref', 'unit', 'measurement', 'enabled'] }; @@ -203,6 +205,7 @@ const ITEM_FIELDS = { // Campi rule aggiornabili const RULE_UPDATE_FIELDS = { weather: ['version', 'tags', 'description'], + laterforecasts: ['version', 'tags', 'description'], data: ['version', 'tags'], logs: ['version', 'tags', 'description', 'browser_rule_id'] }; @@ -494,4 +497,66 @@ router.patch('/:type/:id/items/:itemId/toggle', async (req, res) => { } }); +/** + * POST /rules/force-update — Forza l'invio delle rules attive a tutti i sensori connessi. + * Legge le 3 rules attive dal DB, poi chiama il realtime server che le pushta via WebSocket. + */ +router.post('/force-update', async (req, res) => { + try { + const payload = {}; + + for (const [type, tables] of Object.entries(TYPE_MAP)) { + const { rows: ruleRows } = await db.query( + `SELECT * FROM ${tables.rules} WHERE active = true AND archived = false LIMIT 1`, + [], 'rules' + ); + + if (ruleRows.length === 0) continue; + + const rule = ruleRows[0]; + const { rows: items } = await db.query( + `SELECT * FROM ${tables.items} WHERE rule_id = $1 AND enabled = true`, + [rule.id], 'rules' + ); + + payload[type] = { + id: rule.id, + version: rule.version, + description: rule.description, + tags: rule.tags, + items + }; + } + + if (Object.keys(payload).length === 0) { + return res.status(404).json({ error: 'no active rules found' }); + } + + // Invia al realtime server per il push ai sensori connessi + const REALTIME_URL = process.env.REALTIME_INTERNAL_URL || 'http://realtime:3000'; + const API_KEY = process.env.INTERNAL_API_KEY; + + const rtRes = await fetch(`${REALTIME_URL}/push-rules`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'x-api-key': API_KEY + }, + body: JSON.stringify(payload) + }); + + if (!rtRes.ok) { + const err = await rtRes.json().catch(() => ({})); + console.error('[RULES] Force-update: realtime error', err); + return res.status(502).json({ error: 'realtime server error', detail: err.error }); + } + + const result = await rtRes.json(); + res.json({ status: 'ok', pushed: Object.keys(payload), sensors: result.sensors || 0 }); + } catch (err) { + console.error('Error force-updating rules', err); + res.status(500).json({ error: 'internal server error' }); + } +}); + module.exports = router; diff --git a/console/src/pages/rulesets.html b/console/src/pages/rulesets.html index 508c215..150df62 100644 --- a/console/src/pages/rulesets.html +++ b/console/src/pages/rulesets.html @@ -18,6 +18,7 @@

Rulesets

+
@@ -39,6 +40,7 @@
+
@@ -136,6 +138,12 @@ const ITEM_SCHEMA = { { key: 'name', label: 'Nome', cls: 'wide' }, { key: 'unit', label: 'Unita', cls: 'narrow' }, ], + laterforecasts: [ + { key: 'group_name', label: 'Gruppo', cls: 'medium' }, + { key: 'ref', label: 'Ref', cls: 'medium' }, + { key: 'name', label: 'Nome', cls: 'wide' }, + { key: 'unit', label: 'Unita', cls: 'narrow' }, + ], data: [ { key: 'category', label: 'Categoria', cls: 'medium' }, { key: 'path', label: 'Path', cls: 'wide' }, @@ -149,7 +157,7 @@ const ITEM_SCHEMA = { ], }; -const HAS_DESC = { weather: true, data: false, logs: true }; +const HAS_DESC = { weather: true, laterforecasts: true, data: false, logs: true }; // ========== API helpers ========== @@ -583,6 +591,30 @@ function flash(text, elId = 'savingIndicator') { setTimeout(() => el.classList.remove('visible'), 1500); } +// ========== Force Update ========== + +document.getElementById('forceUpdateBtn').onclick = async () => { + const btn = document.getElementById('forceUpdateBtn'); + btn.disabled = true; + btn.textContent = 'Invio in corso...'; + try { + const res = await api('POST', '/rules/force-update'); + btn.textContent = `Inviato a ${res.sensors || 0} sensori`; + flash('Update inviato'); + setTimeout(() => { + btn.textContent = 'Forza Update Sensori'; + btn.disabled = false; + }, 3000); + } catch (err) { + console.error('Error force-updating:', err); + btn.textContent = 'Errore!'; + setTimeout(() => { + btn.textContent = 'Forza Update Sensori'; + btn.disabled = false; + }, 3000); + } +}; + // ========== Init ========== document.addEventListener('DOMContentLoaded', () => loadRules()); diff --git a/console/src/static/styles/rulesets.css b/console/src/static/styles/rulesets.css index 0b3b55a..8642521 100644 --- a/console/src/static/styles/rulesets.css +++ b/console/src/static/styles/rulesets.css @@ -135,6 +135,33 @@ background-position: right 10px center; } +/* Force Update button */ +.rs-force-btn { + padding: 8px 20px; + border: 1px solid #f59e0b; + border-radius: 10px; + background: #fffbeb; + color: #b45309; + font-size: 0.85rem; + font-weight: 600; + cursor: pointer; + transition: all 0.2s ease; + font-family: inherit; +} + +.rs-force-btn:hover:not(:disabled) { + background: #fef3c7; + border-color: #d97706; + transform: translateY(-1px); + box-shadow: 0 4px 12px rgba(245, 158, 11, 0.2); +} + +.rs-force-btn:disabled { + opacity: 0.7; + cursor: not-allowed; + transform: none; +} + .rs-new-btn { padding: 8px 20px; border: none; diff --git a/realtime/src/index.js b/realtime/src/index.js index 78bb823..0d38ee0 100644 --- a/realtime/src/index.js +++ b/realtime/src/index.js @@ -44,6 +44,29 @@ app.use('/connect', require('./routes/connect')); app.use('/sensors', require('./routes/sensors')); app.use('/sessions', require('./routes/sessions')); +/** + * POST /push-rules — Riceve rules attive dall'API e le pusha a tutti i sensori connessi. + * Autenticato con x-api-key (service-to-service). + */ +app.post('/push-rules', (req, res) => { + const apiKey = req.headers['x-api-key']; + if (!apiKey || apiKey !== process.env.INTERNAL_API_KEY) { + return res.status(401).json({ error: 'unauthorized' }); + } + + const payload = req.body; + if (!payload || Object.keys(payload).length === 0) { + return res.status(400).json({ error: 'empty payload' }); + } + + // Wrappa con _t per identificare il tipo di messaggio nel plugin + const message = { _t: 'rules_update', ...payload }; + const sensors = wsHandler.pushToAllSensors(message); + + console.log(`[PUSH-RULES] Inviato a ${sensors} sensori:`, Object.keys(payload)); + res.json({ status: 'ok', sensors }); +}); + const server = app.listen(3000, '0.0.0.0', () => { console.log(`Realtime started`); }); diff --git a/realtime/src/store/influx.js b/realtime/src/store/influx.js index 16e903c..98f26f4 100644 --- a/realtime/src/store/influx.js +++ b/realtime/src/store/influx.js @@ -13,6 +13,7 @@ const writeApi = client.getWriteApi(org, bucket, 'ms', { batchSize: 50, }); +// Mapping legacy per sensor_data (logs telemetry) const fieldMap = { t: 'temperature', h: 'humidity', @@ -24,6 +25,10 @@ const fieldMap = { lon: 'longitude', }; +/** + * Scrive dati telemetria sensore (logs) con mapping campi abbreviati. + * Measurement: sensor_data + */ function writeSensorData(fields, sensor, session, timestamp) { const point = new Point('sensor_data') .tag('sensor', sensor) @@ -39,6 +44,46 @@ function writeSensorData(fields, sensor, session, timestamp) { writeApi.writePoint(point); } +/** + * Scrive dati generici (weather, forecast, ecc.) senza mapping. + * I campi vengono scritti con il nome originale (ref da Open-Meteo). + * @param {string} measurement - nome della measurement InfluxDB (es. 'weather_current', 'weather_forecast') + * @param {Object} fields - campi { ref: value } + * @param {string} sensor - nome del sensore + * @param {string} session - id sessione + * @param {number} timestamp - timestamp unix ms + */ +function writeGenericData(measurement, fields, sensor, session, timestamp) { + const point = new Point(measurement) + .tag('sensor', sensor) + .tag('session', session) + .timestamp(timestamp); + + for (const [key, value] of Object.entries(fields)) { + if (value === null || value === undefined) continue; + if (typeof value === 'number') { + point.floatField(key, value); + } else if (typeof value === 'string') { + point.stringField(key, value); + } + } + + writeApi.writePoint(point); +} + +/** + * Scrive un batch di punti forecast (previsioni orarie). + * Ogni punto ha il proprio timestamp. + * @param {Array} points - array di [timestamp_ms, { ref: value, ... }] + * @param {string} sensor - nome del sensore + * @param {string} session - id sessione + */ +function writeForecastBatch(points, sensor, session) { + for (const [ts, fields] of points) { + writeGenericData('weather_forecast', fields, sensor, session, ts); + } +} + async function queryHistory(sensor, session, since) { const queryApi = client.getQueryApi(org); const query = ` @@ -62,4 +107,4 @@ async function queryHistory(sensor, session, since) { }); } -module.exports = { writeSensorData, queryHistory }; +module.exports = { writeSensorData, writeGenericData, writeForecastBatch, queryHistory }; diff --git a/realtime/src/ws/handler.js b/realtime/src/ws/handler.js index 85670cd..c2a3d0a 100644 --- a/realtime/src/ws/handler.js +++ b/realtime/src/ws/handler.js @@ -1,7 +1,7 @@ const { WebSocketServer } = require('ws'); const { decode } = require('@msgpack/msgpack'); const { consumeConnectionToken, appendAsConnection, query, hset, del } = require('../store/redis'); -const { writeSensorData, queryHistory } = require('../store/influx'); +const { writeSensorData, writeGenericData, writeForecastBatch, queryHistory } = require('../store/influx'); // In-memory registries const sensorWatchers = new Map(); // sensorName → Set (watchers) @@ -129,8 +129,19 @@ function handleSensorConnection(ws) { const { ts, _m, ...fields } = packet; - // Usa sessionLabel (puo' cambiare a runtime dalla console) - writeSensorData(fields, sensorName, ws.sessionLabel, ts); + // Route per tipo di measurement + if (_m === 'weather') { + // Dati meteo current — salva con measurement generico + writeGenericData('weather_current', fields, sensorName, ws.sessionLabel, ts); + } else if (_m === 'forecast_batch') { + // Batch previsioni orarie — fields è un array [[ts, {fields}], ...] + if (Array.isArray(fields.points)) { + writeForecastBatch(fields.points, sensorName, ws.sessionLabel); + } + } else { + // Dati telemetria sensore (logs) — mapping abbreviato + writeSensorData(fields, sensorName, ws.sessionLabel, ts); + } // Broadcast to watchers const watchers = sensorWatchers.get(sensorName); @@ -239,4 +250,27 @@ function handleWatcherConnection(ws) { }); } -module.exports = { setup, connectedSensors }; +/** + * Invia un messaggio a tutti i sensori connessi. + * Usato dal push-rules endpoint per forzare l'aggiornamento delle rules. + * @param {Object} payload - Il payload da inviare (verrà wrappato con _t) + * @returns {number} Numero di sensori a cui il messaggio è stato inviato + */ +function pushToAllSensors(payload) { + const { encode } = require('@msgpack/msgpack'); + let count = 0; + for (const [sensorName, ws] of connectedSensors.entries()) { + if (ws.readyState === ws.OPEN) { + try { + ws.send(encode(payload)); + console.log(`[PUSH] Rules update inviato a ${sensorName}`); + count++; + } catch (err) { + console.error(`[PUSH] Errore invio a ${sensorName}:`, err.message); + } + } + } + return count; +} + +module.exports = { setup, connectedSensors, pushToAllSensors };