From 5912c00a825e97e0f959f417735ee775dbc16499 Mon Sep 17 00:00:00 2001 From: Giuseppe Raffa <77052701+sesee3@users.noreply.github.com> Date: Thu, 16 Apr 2026 14:27:27 +0200 Subject: [PATCH] refactor: remove rules endpoint and related logic - Deleted the rules routes and associated logic from the API. - Removed rules-related functionality from params.sensor.js. - Updated dashboard and rulesets HTML to remove references to rulesets. - Removed force update button and related functionality from rulesets page. - Cleaned up styles related to the force update button. - Removed unused WebSocket client example. - Updated realtime server to eliminate rules pushing logic. - Refactored WebSocket handler to streamline data processing. --- api/sql/rules_schema.sql | 171 -------- api/src/index.js | 4 - api/src/routes/params.sensor.js | 51 --- api/src/routes/rules.js | 562 ------------------------- console/src/pages/dashboard.html | 4 +- console/src/pages/rulesets.html | 25 -- console/src/static/styles/rulesets.css | 27 -- realtime/client-example.js | 78 ---- realtime/src/index.js | 23 - realtime/src/store/influx.js | 50 +-- realtime/src/ws/handler.js | 134 +----- 11 files changed, 32 insertions(+), 1097 deletions(-) delete mode 100644 api/sql/rules_schema.sql delete mode 100644 api/src/routes/rules.js delete mode 100644 realtime/client-example.js diff --git a/api/sql/rules_schema.sql b/api/sql/rules_schema.sql deleted file mode 100644 index 78b9a49..0000000 --- a/api/sql/rules_schema.sql +++ /dev/null @@ -1,171 +0,0 @@ --- ============================================================ --- Schema completo per il database "rules" --- ============================================================ - --- ============ DATA / BROWSER ============ - -CREATE TABLE IF NOT EXISTS dataread ( - id CHAR(8) PRIMARY KEY, - version VARCHAR(20) NOT NULL, - tags TEXT[] DEFAULT '{}', - active BOOLEAN NOT NULL DEFAULT false, - archived BOOLEAN NOT NULL DEFAULT false, - created_at TIMESTAMP NOT NULL DEFAULT NOW(), - updated_at TIMESTAMP NOT NULL DEFAULT NOW() -); - -CREATE TABLE IF NOT EXISTS datareaditems ( - id BIGSERIAL PRIMARY KEY, - rule_id CHAR(8) NOT NULL REFERENCES dataread(id) ON DELETE CASCADE, - category VARCHAR(50) NOT NULL, - path VARCHAR(200) NOT NULL, - unit VARCHAR(20), - enabled BOOLEAN NOT NULL DEFAULT true, - UNIQUE(rule_id, path) -); - -CREATE INDEX IF NOT EXISTS idx_datareaditems_rule ON datareaditems(rule_id); - --- ============ WEATHER (current, ogni 5 min) ============ - -CREATE TABLE IF NOT EXISTS weather ( - id CHAR(8) PRIMARY KEY, - version VARCHAR(20) NOT NULL, - description TEXT, - tags TEXT[] DEFAULT '{}', - active BOOLEAN NOT NULL DEFAULT false, - archived BOOLEAN NOT NULL DEFAULT false, - created_at TIMESTAMP NOT NULL DEFAULT NOW(), - updated_at TIMESTAMP NOT NULL DEFAULT NOW() -); - -CREATE TABLE IF NOT EXISTS weatheritems ( - id BIGSERIAL PRIMARY KEY, - rule_id CHAR(8) NOT NULL REFERENCES weather(id) ON DELETE CASCADE, - group_name VARCHAR(50) NOT NULL, - ref VARCHAR(50) NOT NULL, - name VARCHAR(50) NOT NULL, - unit VARCHAR(20) NOT NULL, - enabled BOOLEAN NOT NULL DEFAULT true, - UNIQUE(rule_id, ref) -); - -CREATE INDEX IF NOT EXISTS idx_weatheritems_rule ON weatheritems(rule_id); - --- ============ LATERFORECASTS (hourly 7gg, ogni 1 ora) ============ - -CREATE TABLE IF NOT EXISTS laterforecasts ( - id CHAR(8) PRIMARY KEY, - version VARCHAR(20) NOT NULL, - description TEXT, - tags TEXT[] DEFAULT '{}', - active BOOLEAN NOT NULL DEFAULT false, - archived BOOLEAN NOT NULL DEFAULT false, - created_at TIMESTAMP NOT NULL DEFAULT NOW(), - updated_at TIMESTAMP NOT NULL DEFAULT NOW() -); - -CREATE TABLE IF NOT EXISTS laterforecastitems ( - id BIGSERIAL PRIMARY KEY, - rule_id CHAR(8) NOT NULL REFERENCES laterforecasts(id) ON DELETE CASCADE, - group_name VARCHAR(50) NOT NULL, - ref VARCHAR(50) NOT NULL, - name VARCHAR(50) NOT NULL, - unit VARCHAR(20) NOT NULL, - enabled BOOLEAN NOT NULL DEFAULT true, - UNIQUE(rule_id, ref) -); - -CREATE INDEX IF NOT EXISTS idx_laterforecastitems_rule ON laterforecastitems(rule_id); - --- ============ LOGS ============ - -CREATE TABLE IF NOT EXISTS logs ( - id CHAR(8) PRIMARY KEY, - version VARCHAR(20) NOT NULL, - description TEXT, - tags TEXT[] DEFAULT '{}', - active BOOLEAN NOT NULL DEFAULT false, - archived BOOLEAN NOT NULL DEFAULT false, - browser_rule_id CHAR(8) REFERENCES dataread(id), - created_at TIMESTAMP NOT NULL DEFAULT NOW(), - updated_at TIMESTAMP NOT NULL DEFAULT NOW() -); - -CREATE TABLE IF NOT EXISTS logitems ( - id BIGSERIAL PRIMARY KEY, - rule_id CHAR(8) NOT NULL REFERENCES logs(id) ON DELETE CASCADE, - path VARCHAR(200) NOT NULL, - ref VARCHAR(4) NOT NULL, - unit VARCHAR(20) NOT NULL, - measurement VARCHAR(50) NOT NULL, - enabled BOOLEAN NOT NULL DEFAULT true, - UNIQUE(rule_id, ref), - UNIQUE(rule_id, path), - CHECK (LENGTH(ref) <= 4) -); - -CREATE INDEX IF NOT EXISTS idx_logitems_rule ON logitems(rule_id); - --- ============ KIOSK ============ - -CREATE TABLE IF NOT EXISTS kiosktemplates ( - id CHAR(8) PRIMARY KEY NOT NULL, - name VARCHAR(50) NOT NULL, - tags TEXT[] DEFAULT '{}', - active BOOLEAN NOT NULL DEFAULT false, - archived BOOLEAN NOT NULL DEFAULT false, - created_at TIMESTAMP NOT NULL DEFAULT NOW(), - updated_at TIMESTAMP NOT NULL DEFAULT NOW() -); - -CREATE TABLE IF NOT EXISTS kioskelements ( - id BIGSERIAL PRIMARY KEY, - template_id CHAR(8) NOT NULL REFERENCES kiosktemplates(id) ON DELETE CASCADE, - font INT NOT NULL, - label VARCHAR(100) NOT NULL, - x INT NOT NULL, - y INT NOT NULL, - width INT NOT NULL, - height INT NOT NULL, - color VARCHAR(20) NOT NULL, - UNIQUE(template_id) -); - -CREATE INDEX IF NOT EXISTS idx_kioskelements_template ON kioskelements(template_id); - --- ============ VINCOLI: una sola rule attiva per tipo ============ - -CREATE UNIQUE INDEX IF NOT EXISTS idx_one_active_dataread - ON dataread(active) WHERE active = true AND archived = false; - -CREATE UNIQUE INDEX IF NOT EXISTS idx_one_active_weather - ON weather(active) WHERE active = true AND archived = false; - -CREATE UNIQUE INDEX IF NOT EXISTS idx_one_active_laterforecasts - ON laterforecasts(active) WHERE active = true AND archived = false; - -CREATE UNIQUE INDEX IF NOT EXISTS idx_one_active_logs - ON logs(active) WHERE active = true AND archived = false; - --- ============ FIX per schema esistente ============ - -DO $$ -BEGIN - IF EXISTS ( - SELECT 1 FROM information_schema.columns - WHERE table_name = 'datareaditems' AND column_name = 'enables' - ) THEN - ALTER TABLE datareaditems RENAME COLUMN enables TO enabled; - END IF; -END $$; - -DO $$ -BEGIN - IF NOT EXISTS ( - SELECT 1 FROM information_schema.columns - WHERE table_name = 'weather' AND column_name = 'description' - ) THEN - ALTER TABLE weather ADD COLUMN description TEXT; - END IF; -END $$; diff --git a/api/src/index.js b/api/src/index.js index 885547b..28557a8 100644 --- a/api/src/index.js +++ b/api/src/index.js @@ -96,10 +96,6 @@ app.use('/params', paramsRoutes) const settingsRoutes = require('./routes/settings') app.use('/settings', settingsRoutes) -const rulesRoutes = require('./routes/rules') -app.use('/rules', rulesRoutes) - - app.listen(PORT, '0.0.0.0', () => { console.log(`Started on port ${PORT}`); }); diff --git a/api/src/routes/params.sensor.js b/api/src/routes/params.sensor.js index 8348043..984f231 100644 --- a/api/src/routes/params.sensor.js +++ b/api/src/routes/params.sensor.js @@ -61,55 +61,4 @@ router.get('/:sensorCode/active', authenticateSensor, async (req, res) => { } }); -// --- Mapping tipo rules → tabelle --- -const RULES_TYPE_MAP = { - weather: { rules: 'weather', items: 'weatheritems' }, - laterforecasts: { rules: 'laterforecasts', items: 'laterforecastitems' }, - data: { rules: 'dataread', items: 'datareaditems' }, - logs: { rules: 'logs', items: 'logitems' } -}; - -/** - * GET /params/sensor/:sensorCode/rules?type=weather - * Ritorna la rule attiva con items per il tipo specificato. - * Autenticazione tramite sensor code (non richiede JWT/API key). - */ -router.get('/:sensorCode/rules', authenticateSensor, async (req, res) => { - const { type } = req.query; - - if (!type || !RULES_TYPE_MAP[type]) { - return res.status(400).json({ error: `invalid type, must be one of: ${Object.keys(RULES_TYPE_MAP).join(', ')}` }); - } - - const tables = RULES_TYPE_MAP[type]; - - try { - const { rows: ruleRows } = await query( - `SELECT * FROM ${tables.rules} WHERE active = true AND archived = false LIMIT 1`, - [], 'rules' - ); - - if (ruleRows.length === 0) { - return res.status(404).json({ error: `no active ${type} rule found` }); - } - - const rule = ruleRows[0]; - const { rows: items } = await query( - `SELECT * FROM ${tables.items} WHERE rule_id = $1 AND enabled = true`, - [rule.id], 'rules' - ); - - res.json({ - id: rule.id, - version: rule.version, - description: rule.description, - tags: rule.tags, - items - }); - } catch (err) { - console.error('[PARAMS/SENSOR] Rules error:', err.message); - res.status(500).json({ error: 'Internal server error' }); - } -}); - module.exports = router; diff --git a/api/src/routes/rules.js b/api/src/routes/rules.js deleted file mode 100644 index e356de4..0000000 --- a/api/src/routes/rules.js +++ /dev/null @@ -1,562 +0,0 @@ -const router = require('express').Router(); -const db = require('../storage/postgres'); -const { encode } = require('@msgpack/msgpack'); - -// Mapping tipo → tabelle -const TYPE_MAP = { - weather: { rules: 'weather', items: 'weatheritems' }, - laterforecasts: { rules: 'laterforecasts', items: 'laterforecastitems' }, - data: { rules: 'dataread', items: 'datareaditems' }, - logs: { rules: 'logs', items: 'logitems' } -}; - -const VALID_TYPES = Object.keys(TYPE_MAP); - -/** - * GET /rules — Lista tutte le rules (id, version, active) per ogni tipo - */ -router.get('/', async (req, res) => { - try { - const result = {}; - for (const [type, tables] of Object.entries(TYPE_MAP)) { - const { rows } = await db.query( - `SELECT * FROM ${tables.rules} ORDER BY created_at DESC`, - [], 'rules' - ); - result[type] = rows; - } - res.json(result); - } catch (err) { - console.error('Error fetching rules', err); - res.status(500).json({ error: 'internal server error' }); - } -}); - -/** - * GET /rules/versions — Ritorna solo id+version delle 3 rules attive (check rapido) - */ -router.get('/versions', async (req, res) => { - try { - const versions = {}; - for (const [type, tables] of Object.entries(TYPE_MAP)) { - const { rows } = await db.query( - `SELECT id, version FROM ${tables.rules} WHERE active = true AND archived = false LIMIT 1`, - [], 'rules' - ); - versions[type] = rows[0] || null; - } - res.json(versions); - } catch (err) { - console.error('Error fetching versions', err); - res.status(500).json({ error: 'internal server error' }); - } -}); - -/** - * GET /rules/active?type=weather — Ritorna la rule attiva completa con items - * Supporta Accept: application/msgpack per formato compatto - */ -router.get('/active', async (req, res) => { - const { type } = req.query; - if (!type || !VALID_TYPES.includes(type)) { - return res.status(400).json({ error: `invalid type, must be one of: ${VALID_TYPES.join(', ')}` }); - } - - const tables = TYPE_MAP[type]; - - try { - const { rows: ruleRows } = await db.query( - `SELECT * FROM ${tables.rules} WHERE active = true AND archived = false LIMIT 1`, - [], 'rules' - ); - - if (ruleRows.length === 0) { - return res.status(404).json({ error: `no active ${type} rule found` }); - } - - const rule = ruleRows[0]; - - const { rows: items } = await db.query( - `SELECT * FROM ${tables.items} WHERE rule_id = $1 AND enabled = true`, - [rule.id], 'rules' - ); - - const payload = { - id: rule.id, - version: rule.version, - description: rule.description, - tags: rule.tags, - items - }; - - // Se il client accetta msgpack, rispondi in binario - if (req.accepts('application/msgpack')) { - res.set('Content-Type', 'application/msgpack'); - return res.send(Buffer.from(encode(payload))); - } - - res.json(payload); - } catch (err) { - console.error('Error fetching active rule', err); - res.status(500).json({ error: 'internal server error' }); - } -}); - -/** - * GET /rules/:type/:id — Dettaglio di una rule specifica con items - */ -router.get('/:type/:id', async (req, res) => { - const { type, id } = req.params; - if (!VALID_TYPES.includes(type)) { - return res.status(400).json({ error: 'invalid rule type' }); - } - - const tables = TYPE_MAP[type]; - - try { - const { rows: ruleRows } = await db.query( - `SELECT * FROM ${tables.rules} WHERE id = $1`, - [id], 'rules' - ); - - if (ruleRows.length === 0) { - return res.status(404).json({ error: 'rule not found' }); - } - - const rule = ruleRows[0]; - const { rows: items } = await db.query( - `SELECT * FROM ${tables.items} WHERE rule_id = $1`, - [rule.id], 'rules' - ); - - res.json({ ...rule, items }); - } catch (err) { - console.error('Error fetching rule', err); - res.status(500).json({ error: 'internal server error' }); - } -}); - -/** - * PATCH /rules/update?type=weather&from=1.0.0 - * Ritorna le rules con versione > from (per aggiornamenti incrementali) - */ -router.patch('/update', async (req, res) => { - const { type, from } = req.query; - - if (!type || !VALID_TYPES.includes(type)) { - return res.status(400).json({ error: 'invalid rule type' }); - } - if (!from) { - return res.status(400).json({ error: 'missing from parameter' }); - } - - const tables = TYPE_MAP[type]; - - try { - const { rows } = await db.query( - `SELECT * FROM ${tables.rules} WHERE version > $1 AND archived = false ORDER BY version ASC`, - [from], 'rules' - ); - - if (rows.length === 0) { - return res.status(404).json({ error: 'no rules found with version greater than specified' }); - } - - // Per ogni rule, allegare gli items - const results = []; - for (const rule of rows) { - const { rows: items } = await db.query( - `SELECT * FROM ${tables.items} WHERE rule_id = $1`, - [rule.id], 'rules' - ); - results.push({ ...rule, items }); - } - - if (req.accepts('application/msgpack')) { - res.set('Content-Type', 'application/msgpack'); - return res.send(Buffer.from(encode(results))); - } - - res.json(results); - } catch (err) { - console.error('Error fetching rules update', err); - res.status(500).json({ error: 'internal server error' }); - } -}); - -// --- ID Generation --- -const TYPE_PREFIX = { weather: 'w', laterforecasts: 'f', data: 'd', logs: 'l' }; - -function generateId(type) { - const prefix = TYPE_PREFIX[type] || 'x'; - const num = Math.floor(1000 + Math.random() * 9000); - // Pad to 8 chars with trailing zeros - return (prefix + num).padEnd(8, '0'); -} - -// --- ITEM FIELD DEFINITIONS per tipo --- -const ITEM_FIELDS = { - weather: ['group_name', 'ref', 'name', 'unit', 'enabled'], - laterforecasts: ['group_name', 'ref', 'name', 'unit', 'enabled'], - data: ['category', 'path', 'unit', 'enabled'], - logs: ['path', 'ref', 'unit', 'measurement', 'enabled'] -}; - -// Campi rule aggiornabili -const RULE_UPDATE_FIELDS = { - weather: ['version', 'tags', 'description'], - laterforecasts: ['version', 'tags', 'description'], - data: ['version', 'tags'], - logs: ['version', 'tags', 'description', 'browser_rule_id'] -}; - -/** - * POST /rules/:type — Crea una nuova rule - */ -router.post('/:type', async (req, res) => { - const { type } = req.params; - if (!VALID_TYPES.includes(type)) { - return res.status(400).json({ error: 'invalid rule type' }); - } - - const tables = TYPE_MAP[type]; - const id = generateId(type); - const { version, tags, description, browser_rule_id } = req.body; - - if (!version) { - return res.status(400).json({ error: 'version is required' }); - } - - try { - let sql, params; - if (type === 'weather') { - sql = `INSERT INTO ${tables.rules} (id, version, tags, description) VALUES ($1, $2, $3, $4) RETURNING *`; - params = [id, version, tags || [], description || null]; - } else if (type === 'logs') { - sql = `INSERT INTO ${tables.rules} (id, version, tags, description, browser_rule_id) VALUES ($1, $2, $3, $4, $5) RETURNING *`; - params = [id, version, tags || [], description || null, browser_rule_id || null]; - } else { - sql = `INSERT INTO ${tables.rules} (id, version, tags) VALUES ($1, $2, $3) RETURNING *`; - params = [id, version, tags || []]; - } - - const { rows } = await db.query(sql, params, 'rules'); - res.status(201).json(rows[0]); - } catch (err) { - console.error('Error creating rule', err); - res.status(500).json({ error: 'internal server error' }); - } -}); - -/** - * PUT /rules/:type/:id — Aggiorna una rule - */ -router.put('/:type/:id', async (req, res) => { - const { type, id } = req.params; - if (!VALID_TYPES.includes(type)) { - return res.status(400).json({ error: 'invalid rule type' }); - } - - const tables = TYPE_MAP[type]; - const allowed = RULE_UPDATE_FIELDS[type]; - const sets = []; - const params = []; - let idx = 1; - - for (const field of allowed) { - if (req.body[field] !== undefined) { - sets.push(`${field} = $${idx}`); - params.push(req.body[field]); - idx++; - } - } - - if (sets.length === 0) { - return res.status(400).json({ error: 'no valid fields to update' }); - } - - sets.push(`updated_at = NOW()`); - params.push(id); - - try { - const sql = `UPDATE ${tables.rules} SET ${sets.join(', ')} WHERE id = $${idx} RETURNING *`; - const { rows } = await db.query(sql, params, 'rules'); - if (rows.length === 0) return res.status(404).json({ error: 'rule not found' }); - res.json(rows[0]); - } catch (err) { - console.error('Error updating rule', err); - res.status(500).json({ error: 'internal server error' }); - } -}); - -/** - * DELETE /rules/:type/:id — Elimina una rule (cascade items) - */ -router.delete('/:type/:id', async (req, res) => { - const { type, id } = req.params; - if (!VALID_TYPES.includes(type)) { - return res.status(400).json({ error: 'invalid rule type' }); - } - - const tables = TYPE_MAP[type]; - - try { - const { rowCount } = await db.query( - `DELETE FROM ${tables.rules} WHERE id = $1`, [id], 'rules' - ); - if (rowCount === 0) return res.status(404).json({ error: 'rule not found' }); - res.json({ status: 'ok' }); - } catch (err) { - console.error('Error deleting rule', err); - res.status(500).json({ error: 'internal server error' }); - } -}); - -/** - * PATCH /rules/:type/:id/active — Toggle active - * Disattiva tutte le altre, poi attiva questa (o disattiva se già attiva) - */ -router.patch('/:type/:id/active', async (req, res) => { - const { type, id } = req.params; - if (!VALID_TYPES.includes(type)) { - return res.status(400).json({ error: 'invalid rule type' }); - } - - const tables = TYPE_MAP[type]; - - try { - // Check current state - const { rows: current } = await db.query( - `SELECT active FROM ${tables.rules} WHERE id = $1`, [id], 'rules' - ); - if (current.length === 0) return res.status(404).json({ error: 'rule not found' }); - - const isActive = current[0].active; - - if (isActive) { - // Disattiva - await db.query(`UPDATE ${tables.rules} SET active = false, updated_at = NOW() WHERE id = $1`, [id], 'rules'); - } else { - // Disattiva tutte, poi attiva questa - await db.query(`UPDATE ${tables.rules} SET active = false, updated_at = NOW() WHERE active = true`, [], 'rules'); - await db.query(`UPDATE ${tables.rules} SET active = true, updated_at = NOW() WHERE id = $1`, [id], 'rules'); - } - - res.json({ status: 'ok', active: !isActive }); - } catch (err) { - console.error('Error toggling active', err); - res.status(500).json({ error: 'internal server error' }); - } -}); - -/** - * PATCH /rules/:type/:id/archive — Toggle archived - */ -router.patch('/:type/:id/archive', async (req, res) => { - const { type, id } = req.params; - if (!VALID_TYPES.includes(type)) { - return res.status(400).json({ error: 'invalid rule type' }); - } - - const tables = TYPE_MAP[type]; - - try { - const { rows } = await db.query( - `UPDATE ${tables.rules} SET archived = NOT archived, updated_at = NOW() WHERE id = $1 RETURNING archived`, - [id], 'rules' - ); - if (rows.length === 0) return res.status(404).json({ error: 'rule not found' }); - res.json({ status: 'ok', archived: rows[0].archived }); - } catch (err) { - console.error('Error toggling archive', err); - res.status(500).json({ error: 'internal server error' }); - } -}); - -/** - * POST /rules/:type/:id/items — Aggiungi item - */ -router.post('/:type/:id/items', async (req, res) => { - const { type, id } = req.params; - if (!VALID_TYPES.includes(type)) { - return res.status(400).json({ error: 'invalid rule type' }); - } - - const tables = TYPE_MAP[type]; - const fields = ITEM_FIELDS[type].filter(f => f !== 'enabled'); - const values = fields.map(f => req.body[f]); - - // Validate required fields present - for (let i = 0; i < fields.length; i++) { - if (values[i] === undefined || values[i] === null || values[i] === '') { - return res.status(400).json({ error: `${fields[i]} is required` }); - } - } - - const allFields = ['rule_id', ...fields]; - const allValues = [id, ...values]; - const placeholders = allValues.map((_, i) => `$${i + 1}`).join(', '); - - try { - const sql = `INSERT INTO ${tables.items} (${allFields.join(', ')}) VALUES (${placeholders}) RETURNING *`; - const { rows } = await db.query(sql, allValues, 'rules'); - res.status(201).json(rows[0]); - } catch (err) { - console.error('Error creating item', err); - if (err.code === '23505') { - return res.status(409).json({ error: 'duplicate item (unique constraint)' }); - } - res.status(500).json({ error: 'internal server error' }); - } -}); - -/** - * PUT /rules/:type/:id/items/:itemId — Aggiorna item - */ -router.put('/:type/:id/items/:itemId', async (req, res) => { - const { type, id, itemId } = req.params; - if (!VALID_TYPES.includes(type)) { - return res.status(400).json({ error: 'invalid rule type' }); - } - - const tables = TYPE_MAP[type]; - const allowed = ITEM_FIELDS[type]; - const sets = []; - const params = []; - let idx = 1; - - for (const field of allowed) { - if (req.body[field] !== undefined) { - sets.push(`${field} = $${idx}`); - params.push(req.body[field]); - idx++; - } - } - - if (sets.length === 0) { - return res.status(400).json({ error: 'no valid fields to update' }); - } - - params.push(itemId, id); - - try { - const sql = `UPDATE ${tables.items} SET ${sets.join(', ')} WHERE id = $${idx} AND rule_id = $${idx + 1} RETURNING *`; - const { rows } = await db.query(sql, params, 'rules'); - if (rows.length === 0) return res.status(404).json({ error: 'item not found' }); - res.json(rows[0]); - } catch (err) { - console.error('Error updating item', err); - res.status(500).json({ error: 'internal server error' }); - } -}); - -/** - * DELETE /rules/:type/:id/items/:itemId — Elimina item - */ -router.delete('/:type/:id/items/:itemId', async (req, res) => { - const { type, id, itemId } = req.params; - if (!VALID_TYPES.includes(type)) { - return res.status(400).json({ error: 'invalid rule type' }); - } - - const tables = TYPE_MAP[type]; - - try { - const { rowCount } = await db.query( - `DELETE FROM ${tables.items} WHERE id = $1 AND rule_id = $2`, [itemId, id], 'rules' - ); - if (rowCount === 0) return res.status(404).json({ error: 'item not found' }); - res.json({ status: 'ok' }); - } catch (err) { - console.error('Error deleting item', err); - res.status(500).json({ error: 'internal server error' }); - } -}); - -/** - * PATCH /rules/:type/:id/items/:itemId/toggle — Toggle enabled - */ -router.patch('/:type/:id/items/:itemId/toggle', async (req, res) => { - const { type, id, itemId } = req.params; - if (!VALID_TYPES.includes(type)) { - return res.status(400).json({ error: 'invalid rule type' }); - } - - const tables = TYPE_MAP[type]; - - try { - const { rows } = await db.query( - `UPDATE ${tables.items} SET enabled = NOT enabled WHERE id = $1 AND rule_id = $2 RETURNING enabled`, - [itemId, id], 'rules' - ); - if (rows.length === 0) return res.status(404).json({ error: 'item not found' }); - res.json({ status: 'ok', enabled: rows[0].enabled }); - } catch (err) { - console.error('Error toggling item', err); - res.status(500).json({ error: 'internal server error' }); - } -}); - -/** - * POST /rules/force-update — Forza l'invio delle rules attive a tutti i sensori connessi. - * Legge le 3 rules attive dal DB, poi chiama il realtime server che le pushta via WebSocket. - */ -router.post('/force-update', async (req, res) => { - try { - const payload = {}; - - for (const [type, tables] of Object.entries(TYPE_MAP)) { - const { rows: ruleRows } = await db.query( - `SELECT * FROM ${tables.rules} WHERE active = true AND archived = false LIMIT 1`, - [], 'rules' - ); - - if (ruleRows.length === 0) continue; - - const rule = ruleRows[0]; - const { rows: items } = await db.query( - `SELECT * FROM ${tables.items} WHERE rule_id = $1 AND enabled = true`, - [rule.id], 'rules' - ); - - payload[type] = { - id: rule.id, - version: rule.version, - description: rule.description, - tags: rule.tags, - items - }; - } - - if (Object.keys(payload).length === 0) { - return res.status(404).json({ error: 'no active rules found' }); - } - - // Invia al realtime server per il push ai sensori connessi - const REALTIME_URL = process.env.REALTIME_INTERNAL_URL || 'http://realtime:3000'; - const API_KEY = process.env.INTERNAL_API_KEY; - - const rtRes = await fetch(`${REALTIME_URL}/push-rules`, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'x-api-key': API_KEY - }, - body: JSON.stringify(payload) - }); - - if (!rtRes.ok) { - const err = await rtRes.json().catch(() => ({})); - console.error('[RULES] Force-update: realtime error', err); - return res.status(502).json({ error: 'realtime server error', detail: err.error }); - } - - const result = await rtRes.json(); - res.json({ status: 'ok', pushed: Object.keys(payload), sensors: result.sensors || 0 }); - } catch (err) { - console.error('Error force-updating rules', err); - res.status(500).json({ error: 'internal server error' }); - } -}); - -module.exports = router; diff --git a/console/src/pages/dashboard.html b/console/src/pages/dashboard.html index eb04d05..4b88301 100644 --- a/console/src/pages/dashboard.html +++ b/console/src/pages/dashboard.html @@ -31,12 +31,12 @@ - +
diff --git a/console/src/pages/rulesets.html b/console/src/pages/rulesets.html index 150df62..6abe57a 100644 --- a/console/src/pages/rulesets.html +++ b/console/src/pages/rulesets.html @@ -40,7 +40,6 @@
-
@@ -591,30 +590,6 @@ function flash(text, elId = 'savingIndicator') { setTimeout(() => el.classList.remove('visible'), 1500); } -// ========== Force Update ========== - -document.getElementById('forceUpdateBtn').onclick = async () => { - const btn = document.getElementById('forceUpdateBtn'); - btn.disabled = true; - btn.textContent = 'Invio in corso...'; - try { - const res = await api('POST', '/rules/force-update'); - btn.textContent = `Inviato a ${res.sensors || 0} sensori`; - flash('Update inviato'); - setTimeout(() => { - btn.textContent = 'Forza Update Sensori'; - btn.disabled = false; - }, 3000); - } catch (err) { - console.error('Error force-updating:', err); - btn.textContent = 'Errore!'; - setTimeout(() => { - btn.textContent = 'Forza Update Sensori'; - btn.disabled = false; - }, 3000); - } -}; - // ========== Init ========== document.addEventListener('DOMContentLoaded', () => loadRules()); diff --git a/console/src/static/styles/rulesets.css b/console/src/static/styles/rulesets.css index 8642521..0b3b55a 100644 --- a/console/src/static/styles/rulesets.css +++ b/console/src/static/styles/rulesets.css @@ -135,33 +135,6 @@ background-position: right 10px center; } -/* Force Update button */ -.rs-force-btn { - padding: 8px 20px; - border: 1px solid #f59e0b; - border-radius: 10px; - background: #fffbeb; - color: #b45309; - font-size: 0.85rem; - font-weight: 600; - cursor: pointer; - transition: all 0.2s ease; - font-family: inherit; -} - -.rs-force-btn:hover:not(:disabled) { - background: #fef3c7; - border-color: #d97706; - transform: translateY(-1px); - box-shadow: 0 4px 12px rgba(245, 158, 11, 0.2); -} - -.rs-force-btn:disabled { - opacity: 0.7; - cursor: not-allowed; - transform: none; -} - .rs-new-btn { padding: 8px 20px; border: none; diff --git a/realtime/client-example.js b/realtime/client-example.js deleted file mode 100644 index 455f884..0000000 --- a/realtime/client-example.js +++ /dev/null @@ -1,78 +0,0 @@ -const WebSocket = require('ws'); -const { encode } = require('@msgpack/msgpack'); - -const SERVER_URL = process.env.SERVER_URL || 'http://localhost:3000'; -const WS_URL = process.env.WS_URL || 'ws://localhost:3000'; -const SENSOR_NAME = process.env.SENSOR_NAME || 'sensor-01'; -const SENSOR_CODE = process.env.SENSOR_CODE || 'password123'; - -async function authenticate() { - const res = await fetch(`${SERVER_URL}/connect/`, { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ name: SENSOR_NAME, code: SENSOR_CODE }), - }); - - if (!res.ok) { - const err = await res.json(); - throw new Error(`Auth failed: ${err.error}`); - } - - const data = await res.json(); - console.log('Authenticated, token received'); - return data.t; -} - -function connectWebSocket(token) { - const ws = new WebSocket(`${WS_URL}?token=${token}`); - - ws.on('open', () => { - console.log('WebSocket connected'); - startSendingData(ws); - }); - - ws.on('pong', () => { - // Keepalive pong received - }); - - ws.on('close', (code, reason) => { - console.log(`WebSocket closed: ${code} ${reason}`); - process.exit(1); - }); - - ws.on('error', (err) => { - console.error('WebSocket error:', err.message); - }); -} - -function startSendingData(ws) { - setInterval(() => { - if (ws.readyState !== WebSocket.OPEN) return; - - const packet = { - ts: Date.now(), - t: 20 + Math.random() * 10, // temperature °C - h: 50 + Math.random() * 30, // humidity % - spd: Math.random() * 30, // speed - cog: Math.random() * 360, // course over ground - sog: 5 + Math.random() * 10, // speed over ground kn - hdg: Math.random() * 360, // heading true - lat: 43.7230 + Math.random() * 0.01, // latitude - lon: 10.3966 + Math.random() * 0.01, // longitude - }; - - ws.send(Buffer.from(encode(packet))); - }, 1000); -} - -async function main() { - try { - const token = await authenticate(); - connectWebSocket(token); - } catch (err) { - console.error(err.message); - process.exit(1); - } -} - -main(); diff --git a/realtime/src/index.js b/realtime/src/index.js index 0d38ee0..78bb823 100644 --- a/realtime/src/index.js +++ b/realtime/src/index.js @@ -44,29 +44,6 @@ app.use('/connect', require('./routes/connect')); app.use('/sensors', require('./routes/sensors')); app.use('/sessions', require('./routes/sessions')); -/** - * POST /push-rules — Riceve rules attive dall'API e le pusha a tutti i sensori connessi. - * Autenticato con x-api-key (service-to-service). - */ -app.post('/push-rules', (req, res) => { - const apiKey = req.headers['x-api-key']; - if (!apiKey || apiKey !== process.env.INTERNAL_API_KEY) { - return res.status(401).json({ error: 'unauthorized' }); - } - - const payload = req.body; - if (!payload || Object.keys(payload).length === 0) { - return res.status(400).json({ error: 'empty payload' }); - } - - // Wrappa con _t per identificare il tipo di messaggio nel plugin - const message = { _t: 'rules_update', ...payload }; - const sensors = wsHandler.pushToAllSensors(message); - - console.log(`[PUSH-RULES] Inviato a ${sensors} sensori:`, Object.keys(payload)); - res.json({ status: 'ok', sensors }); -}); - const server = app.listen(3000, '0.0.0.0', () => { console.log(`Realtime started`); }); diff --git a/realtime/src/store/influx.js b/realtime/src/store/influx.js index 98f26f4..8253b9d 100644 --- a/realtime/src/store/influx.js +++ b/realtime/src/store/influx.js @@ -13,42 +13,11 @@ const writeApi = client.getWriteApi(org, bucket, 'ms', { batchSize: 50, }); -// Mapping legacy per sensor_data (logs telemetry) -const fieldMap = { - t: 'temperature', - h: 'humidity', - spd: 'speed', - cog: 'cog', - sog: 'sog', - hdg: 'headingTrue', - lat: 'latitude', - lon: 'longitude', -}; - /** - * Scrive dati telemetria sensore (logs) con mapping campi abbreviati. - * Measurement: sensor_data - */ -function writeSensorData(fields, sensor, session, timestamp) { - const point = new Point('sensor_data') - .tag('sensor', sensor) - .tag('session', session) - .timestamp(timestamp); - - for (const [short, long] of Object.entries(fieldMap)) { - if (fields[short] !== undefined) { - point.floatField(long, fields[short]); - } - } - - writeApi.writePoint(point); -} - -/** - * Scrive dati generici (weather, forecast, ecc.) senza mapping. - * I campi vengono scritti con il nome originale (ref da Open-Meteo). - * @param {string} measurement - nome della measurement InfluxDB (es. 'weather_current', 'weather_forecast') - * @param {Object} fields - campi { ref: value } + * Scrive dati generici su InfluxDB senza mapping. + * I campi vengono scritti con il nome originale. + * @param {string} measurement - nome della measurement (es. 'logs', 'weather') + * @param {Object} fields - campi { key: value } * @param {string} sensor - nome del sensore * @param {string} session - id sessione * @param {number} timestamp - timestamp unix ms @@ -73,8 +42,7 @@ function writeGenericData(measurement, fields, sensor, session, timestamp) { /** * Scrive un batch di punti forecast (previsioni orarie). - * Ogni punto ha il proprio timestamp. - * @param {Array} points - array di [timestamp_ms, { ref: value, ... }] + * @param {Array} points - array di [timestamp_ms, { key: value, ... }] * @param {string} sensor - nome del sensore * @param {string} session - id sessione */ @@ -86,10 +54,10 @@ function writeForecastBatch(points, sensor, session) { async function queryHistory(sensor, session, since) { const queryApi = client.getQueryApi(org); - const query = ` + const fluxQuery = ` from(bucket: "${bucket}") |> range(start: ${since}) - |> filter(fn: (r) => r._measurement == "sensor_data") + |> filter(fn: (r) => r._measurement == "logs") |> filter(fn: (r) => r.sensor == "${sensor}") |> filter(fn: (r) => r.session == "${session}") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") @@ -97,7 +65,7 @@ async function queryHistory(sensor, session, since) { const rows = []; return new Promise((resolve, reject) => { - queryApi.queryRows(query, { + queryApi.queryRows(fluxQuery, { next(row, tableMeta) { rows.push(tableMeta.toObject(row)); }, @@ -107,4 +75,4 @@ async function queryHistory(sensor, session, since) { }); } -module.exports = { writeSensorData, writeGenericData, writeForecastBatch, queryHistory }; +module.exports = { writeGenericData, writeForecastBatch, queryHistory }; diff --git a/realtime/src/ws/handler.js b/realtime/src/ws/handler.js index c2a3d0a..1a9d6e1 100644 --- a/realtime/src/ws/handler.js +++ b/realtime/src/ws/handler.js @@ -1,7 +1,7 @@ const { WebSocketServer } = require('ws'); const { decode } = require('@msgpack/msgpack'); const { consumeConnectionToken, appendAsConnection, query, hset, del } = require('../store/redis'); -const { writeSensorData, writeGenericData, writeForecastBatch, queryHistory } = require('../store/influx'); +const { writeGenericData, writeForecastBatch } = require('../store/influx'); // In-memory registries const sensorWatchers = new Map(); // sensorName → Set (watchers) @@ -12,41 +12,6 @@ function generateSessionId() { return `s${num}`; } -// Map sensor short keys → console field keys + measurement category -const fieldMapping = { - t: { key: 'temp', measurement: 'weather' }, - h: { key: 'hum', measurement: 'weather' }, - spd: { key: 'wSpd', measurement: 'weather' }, - cog: { key: 'cog', measurement: 'navigation' }, - sog: { key: 'sog', measurement: 'navigation' }, - hdg: { key: 'hdg', measurement: 'navigation' }, - lat: { key: 'lat', measurement: 'navigation' }, - lon: { key: 'lon', measurement: 'navigation' }, -}; - -/** - * Transforms a sensor packet (short keys) into grouped messages - * for the console: { timestamp, measurement, fields } - */ -function transformPacket(packet) { - const { ts, ...rawFields } = packet; - const groups = {}; - - for (const [short, val] of Object.entries(rawFields)) { - const mapping = fieldMapping[short]; - if (!mapping) continue; - const { key, measurement } = mapping; - if (!groups[measurement]) groups[measurement] = {}; - groups[measurement][key] = val; - } - - const messages = []; - for (const [measurement, fields] of Object.entries(groups)) { - messages.push({ timestamp: ts, measurement, fields }); - } - return messages; -} - function setup(server) { const wss = new WebSocketServer({ noServer: true }); @@ -72,9 +37,8 @@ function setup(server) { wss.handleUpgrade(req, socket, head, (ws) => { ws.sensorName = sensor; ws.sessionId = generateSessionId(); - ws.sessionLabel = ws.sessionId; // default label = sessionId + ws.sessionLabel = ws.sessionId; ws.connectedAt = new Date().toISOString(); - ws.rulesVersions = null; // populated by _t:init message handleSensorConnection(ws); }); @@ -108,51 +72,41 @@ function handleSensorConnection(ws) { try { const packet = decode(data); - // Messaggio di inizializzazione con versioni rulesets e uptime + // Messaggio di inizializzazione if (packet._t === 'init') { - ws.rulesVersions = packet.rules || {}; ws.sensorUptime = packet.uptime || null; - console.log(`[${sensorName}] Init — rules:`, ws.rulesVersions, '| uptime:', ws.sensorUptime); - // Salva in Redis - const metaFields = []; - for (const [type, ver] of Object.entries(ws.rulesVersions)) { - metaFields.push(`rules_${type}`, ver); - } + console.log(`[${sensorName}] Init — uptime:`, ws.sensorUptime); if (ws.sensorUptime != null) { - metaFields.push('uptime', String(ws.sensorUptime)); + hset(`sensors:${sensorName}`, 'uptime', String(ws.sensorUptime)); } - if (metaFields.length > 0) { - hset(`sensors:${sensorName}`, ...metaFields); - } - return; // non scrivere su InfluxDB + return; } const { ts, _m, ...fields } = packet; // Route per tipo di measurement - if (_m === 'weather') { - // Dati meteo current — salva con measurement generico - writeGenericData('weather_current', fields, sensorName, ws.sessionLabel, ts); - } else if (_m === 'forecast_batch') { - // Batch previsioni orarie — fields è un array [[ts, {fields}], ...] + if (_m === 'forecast_batch') { + // Batch previsioni orarie if (Array.isArray(fields.points)) { writeForecastBatch(fields.points, sensorName, ws.sessionLabel); } } else { - // Dati telemetria sensore (logs) — mapping abbreviato - writeSensorData(fields, sensorName, ws.sessionLabel, ts); + // weather, logs, o altro — scrivi tutti i campi + const measurement = _m || 'sensor_data'; + writeGenericData(measurement, fields, sensorName, ws.sessionLabel, ts); } - // Broadcast to watchers + // Broadcast ai watchers: invia dati grezzi con measurement e fields const watchers = sensorWatchers.get(sensorName); if (watchers && watchers.size > 0) { - const messages = transformPacket(packet); - for (const msg of messages) { - const json = JSON.stringify(msg); - for (const watcher of watchers) { - if (watcher.readyState === watcher.OPEN) { - watcher.send(json); - } + const msg = JSON.stringify({ + timestamp: ts, + measurement: _m || 'sensor_data', + fields: fields + }); + for (const watcher of watchers) { + if (watcher.readyState === watcher.OPEN) { + watcher.send(msg); } } } @@ -198,29 +152,6 @@ function handleWatcherConnection(ws) { console.log(`Watcher now watching sensor: ${msg.sensorId}`); - try { - const sensorInfo = await query(msg.sensorId, 'sensors'); - if (sensorInfo && sensorInfo.timestamp && sensorInfo.session) { - const history = await queryHistory(msg.sensorId, sensorInfo.session, sensorInfo.timestamp); - for (const row of history) { - const ts = new Date(row._time).getTime(); - const rebuilt = { ts }; - for (const [short, { key }] of Object.entries(fieldMapping)) { - const influxField = { t: 'temperature', h: 'humidity', spd: 'speed', cog: 'cog', sog: 'sog', hdg: 'headingTrue', lat: 'latitude', lon: 'longitude' }[short]; - if (row[influxField] !== undefined) { - rebuilt[short] = row[influxField]; - } - } - const messages = transformPacket(rebuilt); - for (const m of messages) { - ws.send(JSON.stringify(m)); - } - } - } - } catch (err) { - console.error(`Error fetching history for watcher:`, err.message); - } - } else if (msg.action === 'unwatch') { if (ws.sensorName) { sensorWatchers.get(ws.sensorName)?.delete(ws); @@ -250,27 +181,4 @@ function handleWatcherConnection(ws) { }); } -/** - * Invia un messaggio a tutti i sensori connessi. - * Usato dal push-rules endpoint per forzare l'aggiornamento delle rules. - * @param {Object} payload - Il payload da inviare (verrà wrappato con _t) - * @returns {number} Numero di sensori a cui il messaggio è stato inviato - */ -function pushToAllSensors(payload) { - const { encode } = require('@msgpack/msgpack'); - let count = 0; - for (const [sensorName, ws] of connectedSensors.entries()) { - if (ws.readyState === ws.OPEN) { - try { - ws.send(encode(payload)); - console.log(`[PUSH] Rules update inviato a ${sensorName}`); - count++; - } catch (err) { - console.error(`[PUSH] Errore invio a ${sensorName}:`, err.message); - } - } - } - return count; -} - -module.exports = { setup, connectedSensors, pushToAllSensors }; +module.exports = { setup, connectedSensors };