feat: add support for later forecasts and implement force update functionality for rules

This commit is contained in:
Giuseppe Raffa
2026-04-16 08:14:10 +02:00
parent c0be21a718
commit edd7226966
8 changed files with 481 additions and 20 deletions

171
api/sql/rules_schema.sql Normal file
View File

@@ -0,0 +1,171 @@
-- ============================================================
-- Schema completo per il database "rules"
-- ============================================================
-- ============ DATA / BROWSER ============
CREATE TABLE IF NOT EXISTS dataread (
id CHAR(8) PRIMARY KEY,
version VARCHAR(20) NOT NULL,
tags TEXT[] DEFAULT '{}',
active BOOLEAN NOT NULL DEFAULT false,
archived BOOLEAN NOT NULL DEFAULT false,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS datareaditems (
id BIGSERIAL PRIMARY KEY,
rule_id CHAR(8) NOT NULL REFERENCES dataread(id) ON DELETE CASCADE,
category VARCHAR(50) NOT NULL,
path VARCHAR(200) NOT NULL,
unit VARCHAR(20),
enabled BOOLEAN NOT NULL DEFAULT true,
UNIQUE(rule_id, path)
);
CREATE INDEX IF NOT EXISTS idx_datareaditems_rule ON datareaditems(rule_id);
-- ============ WEATHER (current, ogni 5 min) ============
CREATE TABLE IF NOT EXISTS weather (
id CHAR(8) PRIMARY KEY,
version VARCHAR(20) NOT NULL,
description TEXT,
tags TEXT[] DEFAULT '{}',
active BOOLEAN NOT NULL DEFAULT false,
archived BOOLEAN NOT NULL DEFAULT false,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS weatheritems (
id BIGSERIAL PRIMARY KEY,
rule_id CHAR(8) NOT NULL REFERENCES weather(id) ON DELETE CASCADE,
group_name VARCHAR(50) NOT NULL,
ref VARCHAR(50) NOT NULL,
name VARCHAR(50) NOT NULL,
unit VARCHAR(20) NOT NULL,
enabled BOOLEAN NOT NULL DEFAULT true,
UNIQUE(rule_id, ref)
);
CREATE INDEX IF NOT EXISTS idx_weatheritems_rule ON weatheritems(rule_id);
-- ============ LATERFORECASTS (hourly 7gg, ogni 1 ora) ============
CREATE TABLE IF NOT EXISTS laterforecasts (
id CHAR(8) PRIMARY KEY,
version VARCHAR(20) NOT NULL,
description TEXT,
tags TEXT[] DEFAULT '{}',
active BOOLEAN NOT NULL DEFAULT false,
archived BOOLEAN NOT NULL DEFAULT false,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS laterforecastitems (
id BIGSERIAL PRIMARY KEY,
rule_id CHAR(8) NOT NULL REFERENCES laterforecasts(id) ON DELETE CASCADE,
group_name VARCHAR(50) NOT NULL,
ref VARCHAR(50) NOT NULL,
name VARCHAR(50) NOT NULL,
unit VARCHAR(20) NOT NULL,
enabled BOOLEAN NOT NULL DEFAULT true,
UNIQUE(rule_id, ref)
);
CREATE INDEX IF NOT EXISTS idx_laterforecastitems_rule ON laterforecastitems(rule_id);
-- ============ LOGS ============
CREATE TABLE IF NOT EXISTS logs (
id CHAR(8) PRIMARY KEY,
version VARCHAR(20) NOT NULL,
description TEXT,
tags TEXT[] DEFAULT '{}',
active BOOLEAN NOT NULL DEFAULT false,
archived BOOLEAN NOT NULL DEFAULT false,
browser_rule_id CHAR(8) REFERENCES dataread(id),
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS logitems (
id BIGSERIAL PRIMARY KEY,
rule_id CHAR(8) NOT NULL REFERENCES logs(id) ON DELETE CASCADE,
path VARCHAR(200) NOT NULL,
ref VARCHAR(4) NOT NULL,
unit VARCHAR(20) NOT NULL,
measurement VARCHAR(50) NOT NULL,
enabled BOOLEAN NOT NULL DEFAULT true,
UNIQUE(rule_id, ref),
UNIQUE(rule_id, path),
CHECK (LENGTH(ref) <= 4)
);
CREATE INDEX IF NOT EXISTS idx_logitems_rule ON logitems(rule_id);
-- ============ KIOSK ============
CREATE TABLE IF NOT EXISTS kiosktemplates (
id CHAR(8) PRIMARY KEY NOT NULL,
name VARCHAR(50) NOT NULL,
tags TEXT[] DEFAULT '{}',
active BOOLEAN NOT NULL DEFAULT false,
archived BOOLEAN NOT NULL DEFAULT false,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS kioskelements (
id BIGSERIAL PRIMARY KEY,
template_id CHAR(8) NOT NULL REFERENCES kiosktemplates(id) ON DELETE CASCADE,
font INT NOT NULL,
label VARCHAR(100) NOT NULL,
x INT NOT NULL,
y INT NOT NULL,
width INT NOT NULL,
height INT NOT NULL,
color VARCHAR(20) NOT NULL,
UNIQUE(template_id)
);
CREATE INDEX IF NOT EXISTS idx_kioskelements_template ON kioskelements(template_id);
-- ============ VINCOLI: una sola rule attiva per tipo ============
CREATE UNIQUE INDEX IF NOT EXISTS idx_one_active_dataread
ON dataread(active) WHERE active = true AND archived = false;
CREATE UNIQUE INDEX IF NOT EXISTS idx_one_active_weather
ON weather(active) WHERE active = true AND archived = false;
CREATE UNIQUE INDEX IF NOT EXISTS idx_one_active_laterforecasts
ON laterforecasts(active) WHERE active = true AND archived = false;
CREATE UNIQUE INDEX IF NOT EXISTS idx_one_active_logs
ON logs(active) WHERE active = true AND archived = false;
-- ============ FIX per schema esistente ============
DO $$
BEGIN
IF EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'datareaditems' AND column_name = 'enables'
) THEN
ALTER TABLE datareaditems RENAME COLUMN enables TO enabled;
END IF;
END $$;
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'weather' AND column_name = 'description'
) THEN
ALTER TABLE weather ADD COLUMN description TEXT;
END IF;
END $$;

View File

@@ -2,23 +2,18 @@ const router = require('express').Router();
const crypto = require('crypto');
const { query } = require('../storage/postgres');
const sets = ['forecasts', 'sensors'];
const sets = ['forecasts', 'sensors', 'marine'];
function hashSensorCode(code) {
return crypto.createHash('sha256').update(code).digest('hex');
}
/**
* GET /params/sensor/:sensorCode/active?set=sensors
* Autenticazione tramite SENSOR_CODE (stesso meccanismo di realtime)
* Middleware: valida sensor code e verifica che il sensore sia attivo.
* Salva sensor.id in req.sensorId.
*/
router.get('/:sensorCode/active', async (req, res) => {
async function authenticateSensor(req, res, next) {
const { sensorCode } = req.params;
const { set } = req.query;
if (!set || !sets.includes(set))
return res.status(400).json({ error: 'SET parameter invalid' });
try {
const hashed = hashSensorCode(sensorCode);
const sensor = await query(
@@ -30,11 +25,29 @@ router.get('/:sensorCode/active', async (req, res) => {
if (!sensor.rows[0]) {
return res.status(401).json({ error: 'Sensor code not valid' });
}
if (!sensor.rows[0].active) {
return res.status(403).json({ error: 'Sensor is not active' });
}
req.sensorId = sensor.rows[0].id;
next();
} catch (err) {
console.error('[PARAMS/SENSOR] Auth error:', err.message);
res.status(500).json({ error: 'Internal server error' });
}
}
/**
* GET /params/sensor/:sensorCode/active?set=sensors
* Ritorna il set di parametri attivo (forecasts, sensors, marine)
*/
router.get('/:sensorCode/active', authenticateSensor, async (req, res) => {
const { set } = req.query;
if (!set || !sets.includes(set))
return res.status(400).json({ error: 'SET parameter invalid' });
try {
const result = await query(
`SELECT * FROM ${set} WHERE active = true LIMIT 1`,
[],
@@ -48,4 +61,55 @@ router.get('/:sensorCode/active', async (req, res) => {
}
});
// --- Mapping tipo rules → tabelle ---
const RULES_TYPE_MAP = {
weather: { rules: 'weather', items: 'weatheritems' },
laterforecasts: { rules: 'laterforecasts', items: 'laterforecastitems' },
data: { rules: 'dataread', items: 'datareaditems' },
logs: { rules: 'logs', items: 'logitems' }
};
/**
* GET /params/sensor/:sensorCode/rules?type=weather
* Ritorna la rule attiva con items per il tipo specificato.
* Autenticazione tramite sensor code (non richiede JWT/API key).
*/
router.get('/:sensorCode/rules', authenticateSensor, async (req, res) => {
const { type } = req.query;
if (!type || !RULES_TYPE_MAP[type]) {
return res.status(400).json({ error: `invalid type, must be one of: ${Object.keys(RULES_TYPE_MAP).join(', ')}` });
}
const tables = RULES_TYPE_MAP[type];
try {
const { rows: ruleRows } = await query(
`SELECT * FROM ${tables.rules} WHERE active = true AND archived = false LIMIT 1`,
[], 'rules'
);
if (ruleRows.length === 0) {
return res.status(404).json({ error: `no active ${type} rule found` });
}
const rule = ruleRows[0];
const { rows: items } = await query(
`SELECT * FROM ${tables.items} WHERE rule_id = $1 AND enabled = true`,
[rule.id], 'rules'
);
res.json({
id: rule.id,
version: rule.version,
description: rule.description,
tags: rule.tags,
items
});
} catch (err) {
console.error('[PARAMS/SENSOR] Rules error:', err.message);
res.status(500).json({ error: 'Internal server error' });
}
});
module.exports = router;

View File

@@ -4,9 +4,10 @@ const { encode } = require('@msgpack/msgpack');
// Mapping tipo → tabelle
const TYPE_MAP = {
weather: { rules: 'weather', items: 'weatheritems' },
data: { rules: 'dataread', items: 'datareaditems' },
logs: { rules: 'logs', items: 'logitems' }
weather: { rules: 'weather', items: 'weatheritems' },
laterforecasts: { rules: 'laterforecasts', items: 'laterforecastitems' },
data: { rules: 'dataread', items: 'datareaditems' },
logs: { rules: 'logs', items: 'logitems' }
};
const VALID_TYPES = Object.keys(TYPE_MAP);
@@ -184,7 +185,7 @@ router.patch('/update', async (req, res) => {
});
// --- ID Generation ---
const TYPE_PREFIX = { weather: 'w', data: 'd', logs: 'l' };
const TYPE_PREFIX = { weather: 'w', laterforecasts: 'f', data: 'd', logs: 'l' };
function generateId(type) {
const prefix = TYPE_PREFIX[type] || 'x';
@@ -196,6 +197,7 @@ function generateId(type) {
// --- ITEM FIELD DEFINITIONS per tipo ---
const ITEM_FIELDS = {
weather: ['group_name', 'ref', 'name', 'unit', 'enabled'],
laterforecasts: ['group_name', 'ref', 'name', 'unit', 'enabled'],
data: ['category', 'path', 'unit', 'enabled'],
logs: ['path', 'ref', 'unit', 'measurement', 'enabled']
};
@@ -203,6 +205,7 @@ const ITEM_FIELDS = {
// Campi rule aggiornabili
const RULE_UPDATE_FIELDS = {
weather: ['version', 'tags', 'description'],
laterforecasts: ['version', 'tags', 'description'],
data: ['version', 'tags'],
logs: ['version', 'tags', 'description', 'browser_rule_id']
};
@@ -494,4 +497,66 @@ router.patch('/:type/:id/items/:itemId/toggle', async (req, res) => {
}
});
/**
* POST /rules/force-update — Forza l'invio delle rules attive a tutti i sensori connessi.
* Legge le 3 rules attive dal DB, poi chiama il realtime server che le pushta via WebSocket.
*/
router.post('/force-update', async (req, res) => {
try {
const payload = {};
for (const [type, tables] of Object.entries(TYPE_MAP)) {
const { rows: ruleRows } = await db.query(
`SELECT * FROM ${tables.rules} WHERE active = true AND archived = false LIMIT 1`,
[], 'rules'
);
if (ruleRows.length === 0) continue;
const rule = ruleRows[0];
const { rows: items } = await db.query(
`SELECT * FROM ${tables.items} WHERE rule_id = $1 AND enabled = true`,
[rule.id], 'rules'
);
payload[type] = {
id: rule.id,
version: rule.version,
description: rule.description,
tags: rule.tags,
items
};
}
if (Object.keys(payload).length === 0) {
return res.status(404).json({ error: 'no active rules found' });
}
// Invia al realtime server per il push ai sensori connessi
const REALTIME_URL = process.env.REALTIME_INTERNAL_URL || 'http://realtime:3000';
const API_KEY = process.env.INTERNAL_API_KEY;
const rtRes = await fetch(`${REALTIME_URL}/push-rules`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'x-api-key': API_KEY
},
body: JSON.stringify(payload)
});
if (!rtRes.ok) {
const err = await rtRes.json().catch(() => ({}));
console.error('[RULES] Force-update: realtime error', err);
return res.status(502).json({ error: 'realtime server error', detail: err.error });
}
const result = await rtRes.json();
res.json({ status: 'ok', pushed: Object.keys(payload), sensors: result.sensors || 0 });
} catch (err) {
console.error('Error force-updating rules', err);
res.status(500).json({ error: 'internal server error' });
}
});
module.exports = router;

View File

@@ -18,6 +18,7 @@
<h1>Rulesets</h1>
<div class="rs-type-picker" id="typePicker">
<button class="active" data-type="weather">Weather</button>
<button data-type="laterforecasts">Forecasts</button>
<button data-type="data">Data</button>
<button data-type="logs">Logs</button>
</div>
@@ -39,6 +40,7 @@
</select>
</div>
<div class="rs-toolbar-right">
<button class="rs-force-btn" id="forceUpdateBtn">Forza Update Sensori</button>
<button class="rs-new-btn" id="newRuleBtn">+ Nuova Rule</button>
</div>
</div>
@@ -136,6 +138,12 @@ const ITEM_SCHEMA = {
{ key: 'name', label: 'Nome', cls: 'wide' },
{ key: 'unit', label: 'Unita', cls: 'narrow' },
],
laterforecasts: [
{ key: 'group_name', label: 'Gruppo', cls: 'medium' },
{ key: 'ref', label: 'Ref', cls: 'medium' },
{ key: 'name', label: 'Nome', cls: 'wide' },
{ key: 'unit', label: 'Unita', cls: 'narrow' },
],
data: [
{ key: 'category', label: 'Categoria', cls: 'medium' },
{ key: 'path', label: 'Path', cls: 'wide' },
@@ -149,7 +157,7 @@ const ITEM_SCHEMA = {
],
};
const HAS_DESC = { weather: true, data: false, logs: true };
const HAS_DESC = { weather: true, laterforecasts: true, data: false, logs: true };
// ========== API helpers ==========
@@ -583,6 +591,30 @@ function flash(text, elId = 'savingIndicator') {
setTimeout(() => el.classList.remove('visible'), 1500);
}
// ========== Force Update ==========
document.getElementById('forceUpdateBtn').onclick = async () => {
const btn = document.getElementById('forceUpdateBtn');
btn.disabled = true;
btn.textContent = 'Invio in corso...';
try {
const res = await api('POST', '/rules/force-update');
btn.textContent = `Inviato a ${res.sensors || 0} sensori`;
flash('Update inviato');
setTimeout(() => {
btn.textContent = 'Forza Update Sensori';
btn.disabled = false;
}, 3000);
} catch (err) {
console.error('Error force-updating:', err);
btn.textContent = 'Errore!';
setTimeout(() => {
btn.textContent = 'Forza Update Sensori';
btn.disabled = false;
}, 3000);
}
};
// ========== Init ==========
document.addEventListener('DOMContentLoaded', () => loadRules());

View File

@@ -135,6 +135,33 @@
background-position: right 10px center;
}
/* Force Update button */
.rs-force-btn {
padding: 8px 20px;
border: 1px solid #f59e0b;
border-radius: 10px;
background: #fffbeb;
color: #b45309;
font-size: 0.85rem;
font-weight: 600;
cursor: pointer;
transition: all 0.2s ease;
font-family: inherit;
}
.rs-force-btn:hover:not(:disabled) {
background: #fef3c7;
border-color: #d97706;
transform: translateY(-1px);
box-shadow: 0 4px 12px rgba(245, 158, 11, 0.2);
}
.rs-force-btn:disabled {
opacity: 0.7;
cursor: not-allowed;
transform: none;
}
.rs-new-btn {
padding: 8px 20px;
border: none;

View File

@@ -44,6 +44,29 @@ app.use('/connect', require('./routes/connect'));
app.use('/sensors', require('./routes/sensors'));
app.use('/sessions', require('./routes/sessions'));
/**
* POST /push-rules — Riceve rules attive dall'API e le pusha a tutti i sensori connessi.
* Autenticato con x-api-key (service-to-service).
*/
app.post('/push-rules', (req, res) => {
const apiKey = req.headers['x-api-key'];
if (!apiKey || apiKey !== process.env.INTERNAL_API_KEY) {
return res.status(401).json({ error: 'unauthorized' });
}
const payload = req.body;
if (!payload || Object.keys(payload).length === 0) {
return res.status(400).json({ error: 'empty payload' });
}
// Wrappa con _t per identificare il tipo di messaggio nel plugin
const message = { _t: 'rules_update', ...payload };
const sensors = wsHandler.pushToAllSensors(message);
console.log(`[PUSH-RULES] Inviato a ${sensors} sensori:`, Object.keys(payload));
res.json({ status: 'ok', sensors });
});
const server = app.listen(3000, '0.0.0.0', () => {
console.log(`Realtime started`);
});

View File

@@ -13,6 +13,7 @@ const writeApi = client.getWriteApi(org, bucket, 'ms', {
batchSize: 50,
});
// Mapping legacy per sensor_data (logs telemetry)
const fieldMap = {
t: 'temperature',
h: 'humidity',
@@ -24,6 +25,10 @@ const fieldMap = {
lon: 'longitude',
};
/**
* Scrive dati telemetria sensore (logs) con mapping campi abbreviati.
* Measurement: sensor_data
*/
function writeSensorData(fields, sensor, session, timestamp) {
const point = new Point('sensor_data')
.tag('sensor', sensor)
@@ -39,6 +44,46 @@ function writeSensorData(fields, sensor, session, timestamp) {
writeApi.writePoint(point);
}
/**
* Scrive dati generici (weather, forecast, ecc.) senza mapping.
* I campi vengono scritti con il nome originale (ref da Open-Meteo).
* @param {string} measurement - nome della measurement InfluxDB (es. 'weather_current', 'weather_forecast')
* @param {Object} fields - campi { ref: value }
* @param {string} sensor - nome del sensore
* @param {string} session - id sessione
* @param {number} timestamp - timestamp unix ms
*/
function writeGenericData(measurement, fields, sensor, session, timestamp) {
const point = new Point(measurement)
.tag('sensor', sensor)
.tag('session', session)
.timestamp(timestamp);
for (const [key, value] of Object.entries(fields)) {
if (value === null || value === undefined) continue;
if (typeof value === 'number') {
point.floatField(key, value);
} else if (typeof value === 'string') {
point.stringField(key, value);
}
}
writeApi.writePoint(point);
}
/**
* Scrive un batch di punti forecast (previsioni orarie).
* Ogni punto ha il proprio timestamp.
* @param {Array} points - array di [timestamp_ms, { ref: value, ... }]
* @param {string} sensor - nome del sensore
* @param {string} session - id sessione
*/
function writeForecastBatch(points, sensor, session) {
for (const [ts, fields] of points) {
writeGenericData('weather_forecast', fields, sensor, session, ts);
}
}
async function queryHistory(sensor, session, since) {
const queryApi = client.getQueryApi(org);
const query = `
@@ -62,4 +107,4 @@ async function queryHistory(sensor, session, since) {
});
}
module.exports = { writeSensorData, queryHistory };
module.exports = { writeSensorData, writeGenericData, writeForecastBatch, queryHistory };

View File

@@ -1,7 +1,7 @@
const { WebSocketServer } = require('ws');
const { decode } = require('@msgpack/msgpack');
const { consumeConnectionToken, appendAsConnection, query, hset, del } = require('../store/redis');
const { writeSensorData, queryHistory } = require('../store/influx');
const { writeSensorData, writeGenericData, writeForecastBatch, queryHistory } = require('../store/influx');
// In-memory registries
const sensorWatchers = new Map(); // sensorName → Set<WebSocket> (watchers)
@@ -129,8 +129,19 @@ function handleSensorConnection(ws) {
const { ts, _m, ...fields } = packet;
// Usa sessionLabel (puo' cambiare a runtime dalla console)
writeSensorData(fields, sensorName, ws.sessionLabel, ts);
// Route per tipo di measurement
if (_m === 'weather') {
// Dati meteo current — salva con measurement generico
writeGenericData('weather_current', fields, sensorName, ws.sessionLabel, ts);
} else if (_m === 'forecast_batch') {
// Batch previsioni orarie — fields è un array [[ts, {fields}], ...]
if (Array.isArray(fields.points)) {
writeForecastBatch(fields.points, sensorName, ws.sessionLabel);
}
} else {
// Dati telemetria sensore (logs) — mapping abbreviato
writeSensorData(fields, sensorName, ws.sessionLabel, ts);
}
// Broadcast to watchers
const watchers = sensorWatchers.get(sensorName);
@@ -239,4 +250,27 @@ function handleWatcherConnection(ws) {
});
}
module.exports = { setup, connectedSensors };
/**
* Invia un messaggio a tutti i sensori connessi.
* Usato dal push-rules endpoint per forzare l'aggiornamento delle rules.
* @param {Object} payload - Il payload da inviare (verrà wrappato con _t)
* @returns {number} Numero di sensori a cui il messaggio è stato inviato
*/
function pushToAllSensors(payload) {
const { encode } = require('@msgpack/msgpack');
let count = 0;
for (const [sensorName, ws] of connectedSensors.entries()) {
if (ws.readyState === ws.OPEN) {
try {
ws.send(encode(payload));
console.log(`[PUSH] Rules update inviato a ${sensorName}`);
count++;
} catch (err) {
console.error(`[PUSH] Errore invio a ${sensorName}:`, err.message);
}
}
}
return count;
}
module.exports = { setup, connectedSensors, pushToAllSensors };