diff --git a/console/src/pages/rulesets.html b/console/src/pages/rulesets.html
index 150df62..6abe57a 100644
--- a/console/src/pages/rulesets.html
+++ b/console/src/pages/rulesets.html
@@ -40,7 +40,6 @@
-
@@ -591,30 +590,6 @@ function flash(text, elId = 'savingIndicator') {
setTimeout(() => el.classList.remove('visible'), 1500);
}
-// ========== Force Update ==========
-
-document.getElementById('forceUpdateBtn').onclick = async () => {
- const btn = document.getElementById('forceUpdateBtn');
- btn.disabled = true;
- btn.textContent = 'Invio in corso...';
- try {
- const res = await api('POST', '/rules/force-update');
- btn.textContent = `Inviato a ${res.sensors || 0} sensori`;
- flash('Update inviato');
- setTimeout(() => {
- btn.textContent = 'Forza Update Sensori';
- btn.disabled = false;
- }, 3000);
- } catch (err) {
- console.error('Error force-updating:', err);
- btn.textContent = 'Errore!';
- setTimeout(() => {
- btn.textContent = 'Forza Update Sensori';
- btn.disabled = false;
- }, 3000);
- }
-};
-
// ========== Init ==========
document.addEventListener('DOMContentLoaded', () => loadRules());
diff --git a/console/src/static/styles/rulesets.css b/console/src/static/styles/rulesets.css
index 8642521..0b3b55a 100644
--- a/console/src/static/styles/rulesets.css
+++ b/console/src/static/styles/rulesets.css
@@ -135,33 +135,6 @@
background-position: right 10px center;
}
-/* Force Update button */
-.rs-force-btn {
- padding: 8px 20px;
- border: 1px solid #f59e0b;
- border-radius: 10px;
- background: #fffbeb;
- color: #b45309;
- font-size: 0.85rem;
- font-weight: 600;
- cursor: pointer;
- transition: all 0.2s ease;
- font-family: inherit;
-}
-
-.rs-force-btn:hover:not(:disabled) {
- background: #fef3c7;
- border-color: #d97706;
- transform: translateY(-1px);
- box-shadow: 0 4px 12px rgba(245, 158, 11, 0.2);
-}
-
-.rs-force-btn:disabled {
- opacity: 0.7;
- cursor: not-allowed;
- transform: none;
-}
-
.rs-new-btn {
padding: 8px 20px;
border: none;
diff --git a/realtime/client-example.js b/realtime/client-example.js
deleted file mode 100644
index 455f884..0000000
--- a/realtime/client-example.js
+++ /dev/null
@@ -1,78 +0,0 @@
-const WebSocket = require('ws');
-const { encode } = require('@msgpack/msgpack');
-
-const SERVER_URL = process.env.SERVER_URL || 'http://localhost:3000';
-const WS_URL = process.env.WS_URL || 'ws://localhost:3000';
-const SENSOR_NAME = process.env.SENSOR_NAME || 'sensor-01';
-const SENSOR_CODE = process.env.SENSOR_CODE || 'password123';
-
-async function authenticate() {
- const res = await fetch(`${SERVER_URL}/connect/`, {
- method: 'POST',
- headers: { 'Content-Type': 'application/json' },
- body: JSON.stringify({ name: SENSOR_NAME, code: SENSOR_CODE }),
- });
-
- if (!res.ok) {
- const err = await res.json();
- throw new Error(`Auth failed: ${err.error}`);
- }
-
- const data = await res.json();
- console.log('Authenticated, token received');
- return data.t;
-}
-
-function connectWebSocket(token) {
- const ws = new WebSocket(`${WS_URL}?token=${token}`);
-
- ws.on('open', () => {
- console.log('WebSocket connected');
- startSendingData(ws);
- });
-
- ws.on('pong', () => {
- // Keepalive pong received
- });
-
- ws.on('close', (code, reason) => {
- console.log(`WebSocket closed: ${code} ${reason}`);
- process.exit(1);
- });
-
- ws.on('error', (err) => {
- console.error('WebSocket error:', err.message);
- });
-}
-
-function startSendingData(ws) {
- setInterval(() => {
- if (ws.readyState !== WebSocket.OPEN) return;
-
- const packet = {
- ts: Date.now(),
- t: 20 + Math.random() * 10, // temperature °C
- h: 50 + Math.random() * 30, // humidity %
- spd: Math.random() * 30, // speed
- cog: Math.random() * 360, // course over ground
- sog: 5 + Math.random() * 10, // speed over ground kn
- hdg: Math.random() * 360, // heading true
- lat: 43.7230 + Math.random() * 0.01, // latitude
- lon: 10.3966 + Math.random() * 0.01, // longitude
- };
-
- ws.send(Buffer.from(encode(packet)));
- }, 1000);
-}
-
-async function main() {
- try {
- const token = await authenticate();
- connectWebSocket(token);
- } catch (err) {
- console.error(err.message);
- process.exit(1);
- }
-}
-
-main();
diff --git a/realtime/src/index.js b/realtime/src/index.js
index 0d38ee0..78bb823 100644
--- a/realtime/src/index.js
+++ b/realtime/src/index.js
@@ -44,29 +44,6 @@ app.use('/connect', require('./routes/connect'));
app.use('/sensors', require('./routes/sensors'));
app.use('/sessions', require('./routes/sessions'));
-/**
- * POST /push-rules — Riceve rules attive dall'API e le pusha a tutti i sensori connessi.
- * Autenticato con x-api-key (service-to-service).
- */
-app.post('/push-rules', (req, res) => {
- const apiKey = req.headers['x-api-key'];
- if (!apiKey || apiKey !== process.env.INTERNAL_API_KEY) {
- return res.status(401).json({ error: 'unauthorized' });
- }
-
- const payload = req.body;
- if (!payload || Object.keys(payload).length === 0) {
- return res.status(400).json({ error: 'empty payload' });
- }
-
- // Wrappa con _t per identificare il tipo di messaggio nel plugin
- const message = { _t: 'rules_update', ...payload };
- const sensors = wsHandler.pushToAllSensors(message);
-
- console.log(`[PUSH-RULES] Inviato a ${sensors} sensori:`, Object.keys(payload));
- res.json({ status: 'ok', sensors });
-});
-
const server = app.listen(3000, '0.0.0.0', () => {
console.log(`Realtime started`);
});
diff --git a/realtime/src/store/influx.js b/realtime/src/store/influx.js
index 98f26f4..8253b9d 100644
--- a/realtime/src/store/influx.js
+++ b/realtime/src/store/influx.js
@@ -13,42 +13,11 @@ const writeApi = client.getWriteApi(org, bucket, 'ms', {
batchSize: 50,
});
-// Mapping legacy per sensor_data (logs telemetry)
-const fieldMap = {
- t: 'temperature',
- h: 'humidity',
- spd: 'speed',
- cog: 'cog',
- sog: 'sog',
- hdg: 'headingTrue',
- lat: 'latitude',
- lon: 'longitude',
-};
-
/**
- * Scrive dati telemetria sensore (logs) con mapping campi abbreviati.
- * Measurement: sensor_data
- */
-function writeSensorData(fields, sensor, session, timestamp) {
- const point = new Point('sensor_data')
- .tag('sensor', sensor)
- .tag('session', session)
- .timestamp(timestamp);
-
- for (const [short, long] of Object.entries(fieldMap)) {
- if (fields[short] !== undefined) {
- point.floatField(long, fields[short]);
- }
- }
-
- writeApi.writePoint(point);
-}
-
-/**
- * Scrive dati generici (weather, forecast, ecc.) senza mapping.
- * I campi vengono scritti con il nome originale (ref da Open-Meteo).
- * @param {string} measurement - nome della measurement InfluxDB (es. 'weather_current', 'weather_forecast')
- * @param {Object} fields - campi { ref: value }
+ * Scrive dati generici su InfluxDB senza mapping.
+ * I campi vengono scritti con il nome originale.
+ * @param {string} measurement - nome della measurement (es. 'logs', 'weather')
+ * @param {Object} fields - campi { key: value }
* @param {string} sensor - nome del sensore
* @param {string} session - id sessione
* @param {number} timestamp - timestamp unix ms
@@ -73,8 +42,7 @@ function writeGenericData(measurement, fields, sensor, session, timestamp) {
/**
* Scrive un batch di punti forecast (previsioni orarie).
- * Ogni punto ha il proprio timestamp.
- * @param {Array} points - array di [timestamp_ms, { ref: value, ... }]
+ * @param {Array} points - array di [timestamp_ms, { key: value, ... }]
* @param {string} sensor - nome del sensore
* @param {string} session - id sessione
*/
@@ -86,10 +54,10 @@ function writeForecastBatch(points, sensor, session) {
async function queryHistory(sensor, session, since) {
const queryApi = client.getQueryApi(org);
- const query = `
+ const fluxQuery = `
from(bucket: "${bucket}")
|> range(start: ${since})
- |> filter(fn: (r) => r._measurement == "sensor_data")
+ |> filter(fn: (r) => r._measurement == "logs")
|> filter(fn: (r) => r.sensor == "${sensor}")
|> filter(fn: (r) => r.session == "${session}")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
@@ -97,7 +65,7 @@ async function queryHistory(sensor, session, since) {
const rows = [];
return new Promise((resolve, reject) => {
- queryApi.queryRows(query, {
+ queryApi.queryRows(fluxQuery, {
next(row, tableMeta) {
rows.push(tableMeta.toObject(row));
},
@@ -107,4 +75,4 @@ async function queryHistory(sensor, session, since) {
});
}
-module.exports = { writeSensorData, writeGenericData, writeForecastBatch, queryHistory };
+module.exports = { writeGenericData, writeForecastBatch, queryHistory };
diff --git a/realtime/src/ws/handler.js b/realtime/src/ws/handler.js
index c2a3d0a..1a9d6e1 100644
--- a/realtime/src/ws/handler.js
+++ b/realtime/src/ws/handler.js
@@ -1,7 +1,7 @@
const { WebSocketServer } = require('ws');
const { decode } = require('@msgpack/msgpack');
const { consumeConnectionToken, appendAsConnection, query, hset, del } = require('../store/redis');
-const { writeSensorData, writeGenericData, writeForecastBatch, queryHistory } = require('../store/influx');
+const { writeGenericData, writeForecastBatch } = require('../store/influx');
// In-memory registries
const sensorWatchers = new Map(); // sensorName → Set (watchers)
@@ -12,41 +12,6 @@ function generateSessionId() {
return `s${num}`;
}
-// Map sensor short keys → console field keys + measurement category
-const fieldMapping = {
- t: { key: 'temp', measurement: 'weather' },
- h: { key: 'hum', measurement: 'weather' },
- spd: { key: 'wSpd', measurement: 'weather' },
- cog: { key: 'cog', measurement: 'navigation' },
- sog: { key: 'sog', measurement: 'navigation' },
- hdg: { key: 'hdg', measurement: 'navigation' },
- lat: { key: 'lat', measurement: 'navigation' },
- lon: { key: 'lon', measurement: 'navigation' },
-};
-
-/**
- * Transforms a sensor packet (short keys) into grouped messages
- * for the console: { timestamp, measurement, fields }
- */
-function transformPacket(packet) {
- const { ts, ...rawFields } = packet;
- const groups = {};
-
- for (const [short, val] of Object.entries(rawFields)) {
- const mapping = fieldMapping[short];
- if (!mapping) continue;
- const { key, measurement } = mapping;
- if (!groups[measurement]) groups[measurement] = {};
- groups[measurement][key] = val;
- }
-
- const messages = [];
- for (const [measurement, fields] of Object.entries(groups)) {
- messages.push({ timestamp: ts, measurement, fields });
- }
- return messages;
-}
-
function setup(server) {
const wss = new WebSocketServer({ noServer: true });
@@ -72,9 +37,8 @@ function setup(server) {
wss.handleUpgrade(req, socket, head, (ws) => {
ws.sensorName = sensor;
ws.sessionId = generateSessionId();
- ws.sessionLabel = ws.sessionId; // default label = sessionId
+ ws.sessionLabel = ws.sessionId;
ws.connectedAt = new Date().toISOString();
- ws.rulesVersions = null; // populated by _t:init message
handleSensorConnection(ws);
});
@@ -108,51 +72,41 @@ function handleSensorConnection(ws) {
try {
const packet = decode(data);
- // Messaggio di inizializzazione con versioni rulesets e uptime
+ // Messaggio di inizializzazione
if (packet._t === 'init') {
- ws.rulesVersions = packet.rules || {};
ws.sensorUptime = packet.uptime || null;
- console.log(`[${sensorName}] Init — rules:`, ws.rulesVersions, '| uptime:', ws.sensorUptime);
- // Salva in Redis
- const metaFields = [];
- for (const [type, ver] of Object.entries(ws.rulesVersions)) {
- metaFields.push(`rules_${type}`, ver);
- }
+ console.log(`[${sensorName}] Init — uptime:`, ws.sensorUptime);
if (ws.sensorUptime != null) {
- metaFields.push('uptime', String(ws.sensorUptime));
+ hset(`sensors:${sensorName}`, 'uptime', String(ws.sensorUptime));
}
- if (metaFields.length > 0) {
- hset(`sensors:${sensorName}`, ...metaFields);
- }
- return; // non scrivere su InfluxDB
+ return;
}
const { ts, _m, ...fields } = packet;
// Route per tipo di measurement
- if (_m === 'weather') {
- // Dati meteo current — salva con measurement generico
- writeGenericData('weather_current', fields, sensorName, ws.sessionLabel, ts);
- } else if (_m === 'forecast_batch') {
- // Batch previsioni orarie — fields è un array [[ts, {fields}], ...]
+ if (_m === 'forecast_batch') {
+ // Batch previsioni orarie
if (Array.isArray(fields.points)) {
writeForecastBatch(fields.points, sensorName, ws.sessionLabel);
}
} else {
- // Dati telemetria sensore (logs) — mapping abbreviato
- writeSensorData(fields, sensorName, ws.sessionLabel, ts);
+ // weather, logs, o altro — scrivi tutti i campi
+ const measurement = _m || 'sensor_data';
+ writeGenericData(measurement, fields, sensorName, ws.sessionLabel, ts);
}
- // Broadcast to watchers
+ // Broadcast ai watchers: invia dati grezzi con measurement e fields
const watchers = sensorWatchers.get(sensorName);
if (watchers && watchers.size > 0) {
- const messages = transformPacket(packet);
- for (const msg of messages) {
- const json = JSON.stringify(msg);
- for (const watcher of watchers) {
- if (watcher.readyState === watcher.OPEN) {
- watcher.send(json);
- }
+ const msg = JSON.stringify({
+ timestamp: ts,
+ measurement: _m || 'sensor_data',
+ fields: fields
+ });
+ for (const watcher of watchers) {
+ if (watcher.readyState === watcher.OPEN) {
+ watcher.send(msg);
}
}
}
@@ -198,29 +152,6 @@ function handleWatcherConnection(ws) {
console.log(`Watcher now watching sensor: ${msg.sensorId}`);
- try {
- const sensorInfo = await query(msg.sensorId, 'sensors');
- if (sensorInfo && sensorInfo.timestamp && sensorInfo.session) {
- const history = await queryHistory(msg.sensorId, sensorInfo.session, sensorInfo.timestamp);
- for (const row of history) {
- const ts = new Date(row._time).getTime();
- const rebuilt = { ts };
- for (const [short, { key }] of Object.entries(fieldMapping)) {
- const influxField = { t: 'temperature', h: 'humidity', spd: 'speed', cog: 'cog', sog: 'sog', hdg: 'headingTrue', lat: 'latitude', lon: 'longitude' }[short];
- if (row[influxField] !== undefined) {
- rebuilt[short] = row[influxField];
- }
- }
- const messages = transformPacket(rebuilt);
- for (const m of messages) {
- ws.send(JSON.stringify(m));
- }
- }
- }
- } catch (err) {
- console.error(`Error fetching history for watcher:`, err.message);
- }
-
} else if (msg.action === 'unwatch') {
if (ws.sensorName) {
sensorWatchers.get(ws.sensorName)?.delete(ws);
@@ -250,27 +181,4 @@ function handleWatcherConnection(ws) {
});
}
-/**
- * Invia un messaggio a tutti i sensori connessi.
- * Usato dal push-rules endpoint per forzare l'aggiornamento delle rules.
- * @param {Object} payload - Il payload da inviare (verrà wrappato con _t)
- * @returns {number} Numero di sensori a cui il messaggio è stato inviato
- */
-function pushToAllSensors(payload) {
- const { encode } = require('@msgpack/msgpack');
- let count = 0;
- for (const [sensorName, ws] of connectedSensors.entries()) {
- if (ws.readyState === ws.OPEN) {
- try {
- ws.send(encode(payload));
- console.log(`[PUSH] Rules update inviato a ${sensorName}`);
- count++;
- } catch (err) {
- console.error(`[PUSH] Errore invio a ${sensorName}:`, err.message);
- }
- }
- }
- return count;
-}
-
-module.exports = { setup, connectedSensors, pushToAllSensors };
+module.exports = { setup, connectedSensors };