refactor: remove rules endpoint and related logic
- Deleted the rules routes and associated logic from the API. - Removed rules-related functionality from params.sensor.js. - Updated dashboard and rulesets HTML to remove references to rulesets. - Removed force update button and related functionality from rulesets page. - Cleaned up styles related to the force update button. - Removed unused WebSocket client example. - Updated realtime server to eliminate rules pushing logic. - Refactored WebSocket handler to streamline data processing.
This commit is contained in:
@@ -1,78 +0,0 @@
|
||||
const WebSocket = require('ws');
|
||||
const { encode } = require('@msgpack/msgpack');
|
||||
|
||||
const SERVER_URL = process.env.SERVER_URL || 'http://localhost:3000';
|
||||
const WS_URL = process.env.WS_URL || 'ws://localhost:3000';
|
||||
const SENSOR_NAME = process.env.SENSOR_NAME || 'sensor-01';
|
||||
const SENSOR_CODE = process.env.SENSOR_CODE || 'password123';
|
||||
|
||||
async function authenticate() {
|
||||
const res = await fetch(`${SERVER_URL}/connect/`, {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ name: SENSOR_NAME, code: SENSOR_CODE }),
|
||||
});
|
||||
|
||||
if (!res.ok) {
|
||||
const err = await res.json();
|
||||
throw new Error(`Auth failed: ${err.error}`);
|
||||
}
|
||||
|
||||
const data = await res.json();
|
||||
console.log('Authenticated, token received');
|
||||
return data.t;
|
||||
}
|
||||
|
||||
function connectWebSocket(token) {
|
||||
const ws = new WebSocket(`${WS_URL}?token=${token}`);
|
||||
|
||||
ws.on('open', () => {
|
||||
console.log('WebSocket connected');
|
||||
startSendingData(ws);
|
||||
});
|
||||
|
||||
ws.on('pong', () => {
|
||||
// Keepalive pong received
|
||||
});
|
||||
|
||||
ws.on('close', (code, reason) => {
|
||||
console.log(`WebSocket closed: ${code} ${reason}`);
|
||||
process.exit(1);
|
||||
});
|
||||
|
||||
ws.on('error', (err) => {
|
||||
console.error('WebSocket error:', err.message);
|
||||
});
|
||||
}
|
||||
|
||||
function startSendingData(ws) {
|
||||
setInterval(() => {
|
||||
if (ws.readyState !== WebSocket.OPEN) return;
|
||||
|
||||
const packet = {
|
||||
ts: Date.now(),
|
||||
t: 20 + Math.random() * 10, // temperature °C
|
||||
h: 50 + Math.random() * 30, // humidity %
|
||||
spd: Math.random() * 30, // speed
|
||||
cog: Math.random() * 360, // course over ground
|
||||
sog: 5 + Math.random() * 10, // speed over ground kn
|
||||
hdg: Math.random() * 360, // heading true
|
||||
lat: 43.7230 + Math.random() * 0.01, // latitude
|
||||
lon: 10.3966 + Math.random() * 0.01, // longitude
|
||||
};
|
||||
|
||||
ws.send(Buffer.from(encode(packet)));
|
||||
}, 1000);
|
||||
}
|
||||
|
||||
async function main() {
|
||||
try {
|
||||
const token = await authenticate();
|
||||
connectWebSocket(token);
|
||||
} catch (err) {
|
||||
console.error(err.message);
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
main();
|
||||
@@ -44,29 +44,6 @@ app.use('/connect', require('./routes/connect'));
|
||||
app.use('/sensors', require('./routes/sensors'));
|
||||
app.use('/sessions', require('./routes/sessions'));
|
||||
|
||||
/**
|
||||
* POST /push-rules — Riceve rules attive dall'API e le pusha a tutti i sensori connessi.
|
||||
* Autenticato con x-api-key (service-to-service).
|
||||
*/
|
||||
app.post('/push-rules', (req, res) => {
|
||||
const apiKey = req.headers['x-api-key'];
|
||||
if (!apiKey || apiKey !== process.env.INTERNAL_API_KEY) {
|
||||
return res.status(401).json({ error: 'unauthorized' });
|
||||
}
|
||||
|
||||
const payload = req.body;
|
||||
if (!payload || Object.keys(payload).length === 0) {
|
||||
return res.status(400).json({ error: 'empty payload' });
|
||||
}
|
||||
|
||||
// Wrappa con _t per identificare il tipo di messaggio nel plugin
|
||||
const message = { _t: 'rules_update', ...payload };
|
||||
const sensors = wsHandler.pushToAllSensors(message);
|
||||
|
||||
console.log(`[PUSH-RULES] Inviato a ${sensors} sensori:`, Object.keys(payload));
|
||||
res.json({ status: 'ok', sensors });
|
||||
});
|
||||
|
||||
const server = app.listen(3000, '0.0.0.0', () => {
|
||||
console.log(`Realtime started`);
|
||||
});
|
||||
|
||||
@@ -13,42 +13,11 @@ const writeApi = client.getWriteApi(org, bucket, 'ms', {
|
||||
batchSize: 50,
|
||||
});
|
||||
|
||||
// Mapping legacy per sensor_data (logs telemetry)
|
||||
const fieldMap = {
|
||||
t: 'temperature',
|
||||
h: 'humidity',
|
||||
spd: 'speed',
|
||||
cog: 'cog',
|
||||
sog: 'sog',
|
||||
hdg: 'headingTrue',
|
||||
lat: 'latitude',
|
||||
lon: 'longitude',
|
||||
};
|
||||
|
||||
/**
|
||||
* Scrive dati telemetria sensore (logs) con mapping campi abbreviati.
|
||||
* Measurement: sensor_data
|
||||
*/
|
||||
function writeSensorData(fields, sensor, session, timestamp) {
|
||||
const point = new Point('sensor_data')
|
||||
.tag('sensor', sensor)
|
||||
.tag('session', session)
|
||||
.timestamp(timestamp);
|
||||
|
||||
for (const [short, long] of Object.entries(fieldMap)) {
|
||||
if (fields[short] !== undefined) {
|
||||
point.floatField(long, fields[short]);
|
||||
}
|
||||
}
|
||||
|
||||
writeApi.writePoint(point);
|
||||
}
|
||||
|
||||
/**
|
||||
* Scrive dati generici (weather, forecast, ecc.) senza mapping.
|
||||
* I campi vengono scritti con il nome originale (ref da Open-Meteo).
|
||||
* @param {string} measurement - nome della measurement InfluxDB (es. 'weather_current', 'weather_forecast')
|
||||
* @param {Object} fields - campi { ref: value }
|
||||
* Scrive dati generici su InfluxDB senza mapping.
|
||||
* I campi vengono scritti con il nome originale.
|
||||
* @param {string} measurement - nome della measurement (es. 'logs', 'weather')
|
||||
* @param {Object} fields - campi { key: value }
|
||||
* @param {string} sensor - nome del sensore
|
||||
* @param {string} session - id sessione
|
||||
* @param {number} timestamp - timestamp unix ms
|
||||
@@ -73,8 +42,7 @@ function writeGenericData(measurement, fields, sensor, session, timestamp) {
|
||||
|
||||
/**
|
||||
* Scrive un batch di punti forecast (previsioni orarie).
|
||||
* Ogni punto ha il proprio timestamp.
|
||||
* @param {Array} points - array di [timestamp_ms, { ref: value, ... }]
|
||||
* @param {Array} points - array di [timestamp_ms, { key: value, ... }]
|
||||
* @param {string} sensor - nome del sensore
|
||||
* @param {string} session - id sessione
|
||||
*/
|
||||
@@ -86,10 +54,10 @@ function writeForecastBatch(points, sensor, session) {
|
||||
|
||||
async function queryHistory(sensor, session, since) {
|
||||
const queryApi = client.getQueryApi(org);
|
||||
const query = `
|
||||
const fluxQuery = `
|
||||
from(bucket: "${bucket}")
|
||||
|> range(start: ${since})
|
||||
|> filter(fn: (r) => r._measurement == "sensor_data")
|
||||
|> filter(fn: (r) => r._measurement == "logs")
|
||||
|> filter(fn: (r) => r.sensor == "${sensor}")
|
||||
|> filter(fn: (r) => r.session == "${session}")
|
||||
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|
||||
@@ -97,7 +65,7 @@ async function queryHistory(sensor, session, since) {
|
||||
|
||||
const rows = [];
|
||||
return new Promise((resolve, reject) => {
|
||||
queryApi.queryRows(query, {
|
||||
queryApi.queryRows(fluxQuery, {
|
||||
next(row, tableMeta) {
|
||||
rows.push(tableMeta.toObject(row));
|
||||
},
|
||||
@@ -107,4 +75,4 @@ async function queryHistory(sensor, session, since) {
|
||||
});
|
||||
}
|
||||
|
||||
module.exports = { writeSensorData, writeGenericData, writeForecastBatch, queryHistory };
|
||||
module.exports = { writeGenericData, writeForecastBatch, queryHistory };
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
const { WebSocketServer } = require('ws');
|
||||
const { decode } = require('@msgpack/msgpack');
|
||||
const { consumeConnectionToken, appendAsConnection, query, hset, del } = require('../store/redis');
|
||||
const { writeSensorData, writeGenericData, writeForecastBatch, queryHistory } = require('../store/influx');
|
||||
const { writeGenericData, writeForecastBatch } = require('../store/influx');
|
||||
|
||||
// In-memory registries
|
||||
const sensorWatchers = new Map(); // sensorName → Set<WebSocket> (watchers)
|
||||
@@ -12,41 +12,6 @@ function generateSessionId() {
|
||||
return `s${num}`;
|
||||
}
|
||||
|
||||
// Map sensor short keys → console field keys + measurement category
|
||||
const fieldMapping = {
|
||||
t: { key: 'temp', measurement: 'weather' },
|
||||
h: { key: 'hum', measurement: 'weather' },
|
||||
spd: { key: 'wSpd', measurement: 'weather' },
|
||||
cog: { key: 'cog', measurement: 'navigation' },
|
||||
sog: { key: 'sog', measurement: 'navigation' },
|
||||
hdg: { key: 'hdg', measurement: 'navigation' },
|
||||
lat: { key: 'lat', measurement: 'navigation' },
|
||||
lon: { key: 'lon', measurement: 'navigation' },
|
||||
};
|
||||
|
||||
/**
|
||||
* Transforms a sensor packet (short keys) into grouped messages
|
||||
* for the console: { timestamp, measurement, fields }
|
||||
*/
|
||||
function transformPacket(packet) {
|
||||
const { ts, ...rawFields } = packet;
|
||||
const groups = {};
|
||||
|
||||
for (const [short, val] of Object.entries(rawFields)) {
|
||||
const mapping = fieldMapping[short];
|
||||
if (!mapping) continue;
|
||||
const { key, measurement } = mapping;
|
||||
if (!groups[measurement]) groups[measurement] = {};
|
||||
groups[measurement][key] = val;
|
||||
}
|
||||
|
||||
const messages = [];
|
||||
for (const [measurement, fields] of Object.entries(groups)) {
|
||||
messages.push({ timestamp: ts, measurement, fields });
|
||||
}
|
||||
return messages;
|
||||
}
|
||||
|
||||
function setup(server) {
|
||||
const wss = new WebSocketServer({ noServer: true });
|
||||
|
||||
@@ -72,9 +37,8 @@ function setup(server) {
|
||||
wss.handleUpgrade(req, socket, head, (ws) => {
|
||||
ws.sensorName = sensor;
|
||||
ws.sessionId = generateSessionId();
|
||||
ws.sessionLabel = ws.sessionId; // default label = sessionId
|
||||
ws.sessionLabel = ws.sessionId;
|
||||
ws.connectedAt = new Date().toISOString();
|
||||
ws.rulesVersions = null; // populated by _t:init message
|
||||
handleSensorConnection(ws);
|
||||
});
|
||||
|
||||
@@ -108,51 +72,41 @@ function handleSensorConnection(ws) {
|
||||
try {
|
||||
const packet = decode(data);
|
||||
|
||||
// Messaggio di inizializzazione con versioni rulesets e uptime
|
||||
// Messaggio di inizializzazione
|
||||
if (packet._t === 'init') {
|
||||
ws.rulesVersions = packet.rules || {};
|
||||
ws.sensorUptime = packet.uptime || null;
|
||||
console.log(`[${sensorName}] Init — rules:`, ws.rulesVersions, '| uptime:', ws.sensorUptime);
|
||||
// Salva in Redis
|
||||
const metaFields = [];
|
||||
for (const [type, ver] of Object.entries(ws.rulesVersions)) {
|
||||
metaFields.push(`rules_${type}`, ver);
|
||||
}
|
||||
console.log(`[${sensorName}] Init — uptime:`, ws.sensorUptime);
|
||||
if (ws.sensorUptime != null) {
|
||||
metaFields.push('uptime', String(ws.sensorUptime));
|
||||
hset(`sensors:${sensorName}`, 'uptime', String(ws.sensorUptime));
|
||||
}
|
||||
if (metaFields.length > 0) {
|
||||
hset(`sensors:${sensorName}`, ...metaFields);
|
||||
}
|
||||
return; // non scrivere su InfluxDB
|
||||
return;
|
||||
}
|
||||
|
||||
const { ts, _m, ...fields } = packet;
|
||||
|
||||
// Route per tipo di measurement
|
||||
if (_m === 'weather') {
|
||||
// Dati meteo current — salva con measurement generico
|
||||
writeGenericData('weather_current', fields, sensorName, ws.sessionLabel, ts);
|
||||
} else if (_m === 'forecast_batch') {
|
||||
// Batch previsioni orarie — fields è un array [[ts, {fields}], ...]
|
||||
if (_m === 'forecast_batch') {
|
||||
// Batch previsioni orarie
|
||||
if (Array.isArray(fields.points)) {
|
||||
writeForecastBatch(fields.points, sensorName, ws.sessionLabel);
|
||||
}
|
||||
} else {
|
||||
// Dati telemetria sensore (logs) — mapping abbreviato
|
||||
writeSensorData(fields, sensorName, ws.sessionLabel, ts);
|
||||
// weather, logs, o altro — scrivi tutti i campi
|
||||
const measurement = _m || 'sensor_data';
|
||||
writeGenericData(measurement, fields, sensorName, ws.sessionLabel, ts);
|
||||
}
|
||||
|
||||
// Broadcast to watchers
|
||||
// Broadcast ai watchers: invia dati grezzi con measurement e fields
|
||||
const watchers = sensorWatchers.get(sensorName);
|
||||
if (watchers && watchers.size > 0) {
|
||||
const messages = transformPacket(packet);
|
||||
for (const msg of messages) {
|
||||
const json = JSON.stringify(msg);
|
||||
for (const watcher of watchers) {
|
||||
if (watcher.readyState === watcher.OPEN) {
|
||||
watcher.send(json);
|
||||
}
|
||||
const msg = JSON.stringify({
|
||||
timestamp: ts,
|
||||
measurement: _m || 'sensor_data',
|
||||
fields: fields
|
||||
});
|
||||
for (const watcher of watchers) {
|
||||
if (watcher.readyState === watcher.OPEN) {
|
||||
watcher.send(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -198,29 +152,6 @@ function handleWatcherConnection(ws) {
|
||||
|
||||
console.log(`Watcher now watching sensor: ${msg.sensorId}`);
|
||||
|
||||
try {
|
||||
const sensorInfo = await query(msg.sensorId, 'sensors');
|
||||
if (sensorInfo && sensorInfo.timestamp && sensorInfo.session) {
|
||||
const history = await queryHistory(msg.sensorId, sensorInfo.session, sensorInfo.timestamp);
|
||||
for (const row of history) {
|
||||
const ts = new Date(row._time).getTime();
|
||||
const rebuilt = { ts };
|
||||
for (const [short, { key }] of Object.entries(fieldMapping)) {
|
||||
const influxField = { t: 'temperature', h: 'humidity', spd: 'speed', cog: 'cog', sog: 'sog', hdg: 'headingTrue', lat: 'latitude', lon: 'longitude' }[short];
|
||||
if (row[influxField] !== undefined) {
|
||||
rebuilt[short] = row[influxField];
|
||||
}
|
||||
}
|
||||
const messages = transformPacket(rebuilt);
|
||||
for (const m of messages) {
|
||||
ws.send(JSON.stringify(m));
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(`Error fetching history for watcher:`, err.message);
|
||||
}
|
||||
|
||||
} else if (msg.action === 'unwatch') {
|
||||
if (ws.sensorName) {
|
||||
sensorWatchers.get(ws.sensorName)?.delete(ws);
|
||||
@@ -250,27 +181,4 @@ function handleWatcherConnection(ws) {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Invia un messaggio a tutti i sensori connessi.
|
||||
* Usato dal push-rules endpoint per forzare l'aggiornamento delle rules.
|
||||
* @param {Object} payload - Il payload da inviare (verrà wrappato con _t)
|
||||
* @returns {number} Numero di sensori a cui il messaggio è stato inviato
|
||||
*/
|
||||
function pushToAllSensors(payload) {
|
||||
const { encode } = require('@msgpack/msgpack');
|
||||
let count = 0;
|
||||
for (const [sensorName, ws] of connectedSensors.entries()) {
|
||||
if (ws.readyState === ws.OPEN) {
|
||||
try {
|
||||
ws.send(encode(payload));
|
||||
console.log(`[PUSH] Rules update inviato a ${sensorName}`);
|
||||
count++;
|
||||
} catch (err) {
|
||||
console.error(`[PUSH] Errore invio a ${sensorName}:`, err.message);
|
||||
}
|
||||
}
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
module.exports = { setup, connectedSensors, pushToAllSensors };
|
||||
module.exports = { setup, connectedSensors };
|
||||
|
||||
Reference in New Issue
Block a user