feat: update session handling and add session history endpoint

This commit is contained in:
Giuseppe Raffa
2026-04-16 15:37:10 +02:00
parent 5912c00a82
commit 981f498eb7
5 changed files with 354 additions and 92 deletions

View File

@@ -1,9 +1,11 @@
const router = require('express').Router();
const { queryAll, query, hset } = require('../store/redis');
const { connectedSensors } = require('../ws/handler');
const { flush, exportSessionCSV } = require('../store/influx');
const db = require('../store/db');
/**
* GET /sessions — Lista tutte le sessioni dei sensori con metadata e rules versions
* GET /sessions — Lista tutte le sessioni attive dei sensori
*/
router.get('/', async (req, res) => {
try {
@@ -14,15 +16,9 @@ router.get('/', async (req, res) => {
const info = await query(name, 'sensors');
sessions[name] = {
name,
connectedAt: info.timestamp || null,
connectedAt: info.connectedAt || info.timestamp || null,
session: info.session || null,
sessionLabel: info.sessionLabel || info.session || null,
status: info.status || 'unknown',
rules: {
weather: info.rules_weather || null,
data: info.rules_data || null,
logs: info.rules_logs || null,
}
status: info.status || 'unknown'
};
}
res.json(sessions);
@@ -32,6 +28,21 @@ router.get('/', async (req, res) => {
}
});
/**
* GET /sessions/history — Lista tutte le sessioni passate (da sessiondataref)
*/
router.get('/history', async (req, res) => {
try {
const result = await db.query('sensors',
`SELECT * FROM sessiondataref ORDER BY created_at DESC LIMIT 100`
);
res.json(result.rows);
} catch (err) {
console.error('Error fetching session history:', err.message);
res.status(500).json({ error: 'internal server error' });
}
});
/**
* GET /sessions/pending — Lista token di connessione pendenti
*/
@@ -67,7 +78,7 @@ router.get('/connected', async (req, res) => {
});
/**
* GET /sessions/connected/:id — Verifica se un sensore specifico è connesso
* GET /sessions/connected/:id — Verifica se un sensore specifico e connesso
*/
router.get('/connected/:id', async (req, res) => {
const { id } = req.params;
@@ -84,37 +95,161 @@ router.get('/connected/:id', async (req, res) => {
});
/**
* POST /sessions/:id/label — Cambia il label della sessione per un sensore connesso.
* Non interrompe il flusso dati. I nuovi punti InfluxDB avranno il nuovo tag.
* POST /sessions/:id/flush — Forza il flush del buffer InfluxDB
*/
router.post('/:id/label', async (req, res) => {
const { id } = req.params;
const { label } = req.body;
if (!label || typeof label !== 'string' || label.trim().length === 0) {
return res.status(400).json({ error: 'label is required' });
}
const trimmedLabel = label.trim();
// Trova il WS client connesso
const ws = connectedSensors.get(id);
if (!ws) {
return res.status(404).json({ error: 'sensor not connected' });
}
// Aggiorna in memoria (effetto immediato sui prossimi punti InfluxDB)
ws.sessionLabel = trimmedLabel;
// Aggiorna in Redis per persistenza
router.post('/:id/flush', async (req, res) => {
try {
await hset(`sensors:${id}`, 'sessionLabel', trimmedLabel);
await flush();
res.json({ status: 'ok' });
} catch (err) {
console.error('Error updating session label in Redis', err);
console.error('Error flushing:', err.message);
res.status(500).json({ error: 'flush failed' });
}
});
/**
* GET /sessions/:id/csv — Esporta tutti i dati della sessione come CSV.
* Usa il session_id da Redis (sensore connesso) oppure il query param ?session=sXXXX.
* Il CSV contiene tutti i dati logs per quel sensor + session da InfluxDB.
*/
router.get('/:id/csv', async (req, res) => {
const sensorName = req.params.id;
try {
// Determina il session_id: da query param, da Redis, o dall'ultimo in DB
let sessionId = req.query.session || null;
if (!sessionId) {
// Prova da Redis (sensore connesso)
const info = await query(sensorName, 'sensors');
sessionId = info?.session || null;
}
if (!sessionId) {
// Ultima sessione in sessiondataref
const result = await db.query('sensors',
`SELECT session_id FROM sessiondataref WHERE sensor_name = $1 ORDER BY created_at DESC LIMIT 1`,
[sensorName]
);
sessionId = result.rows[0]?.session_id || null;
}
if (!sessionId) {
return res.status(404).json({ error: 'No session found for this sensor' });
}
// Determina il range temporale: da connectedAt della sessione
let since = req.query.from ? new Date(parseInt(req.query.from)).toISOString() : null;
if (!since) {
// Cerca il created_at nella sessiondataref
const result = await db.query('sensors',
`SELECT created_at FROM sessiondataref WHERE session_id = $1`,
[sessionId]
);
since = result.rows[0]?.created_at?.toISOString() || '-30d';
}
const csv = await exportSessionCSV(sensorName, sessionId, since);
if (!csv) {
return res.status(404).json({ error: 'No data found for this session' });
}
res.setHeader('Content-Type', 'text/csv');
res.setHeader('Content-Disposition', `attachment; filename="session_${sessionId}_${sensorName}.csv"`);
res.send(csv);
} catch (err) {
console.error('Error exporting CSV:', err.message);
res.status(500).json({ error: 'CSV export failed' });
}
});
/**
* GET /sessions/:id/details — Ottieni i dettagli della sessione corrente
*/
router.get('/:id/details', async (req, res) => {
const sensorName = req.params.id;
const sessionId = req.query.session || null;
try {
let result;
if (sessionId) {
result = await db.query('sensors',
`SELECT * FROM sessiondataref WHERE session_id = $1`,
[sessionId]
);
} else {
// Ultima sessione per questo sensore
result = await db.query('sensors',
`SELECT * FROM sessiondataref WHERE sensor_name = $1 ORDER BY created_at DESC LIMIT 1`,
[sensorName]
);
}
if (result.rows.length === 0) {
return res.status(404).json({ error: 'Session not found' });
}
res.json(result.rows[0]);
} catch (err) {
console.error('Error fetching session details:', err.message);
res.status(500).json({ error: 'internal server error' });
}
});
/**
* PUT /sessions/:id/details — Aggiorna nome, descrizione o tags della sessione.
* Body: { name?, description?, tags? }
*/
router.put('/:id/details', async (req, res) => {
const sensorName = req.params.id;
const { session: sessionId, name, description, tags } = req.body;
if (!sessionId) {
return res.status(400).json({ error: 'session id is required in body' });
}
console.log(`[${id}] Session label changed to: ${trimmedLabel}`);
res.json({ status: 'ok', label: trimmedLabel });
try {
const updates = [];
const values = [];
let idx = 1;
if (name !== undefined) {
updates.push(`name = $${idx++}`);
values.push(name);
}
if (description !== undefined) {
updates.push(`description = $${idx++}`);
values.push(description);
}
if (tags !== undefined) {
updates.push(`tags = $${idx++}`);
values.push(tags);
}
if (updates.length === 0) {
return res.status(400).json({ error: 'No fields to update' });
}
updates.push(`updated_at = NOW()`);
values.push(sessionId);
const result = await db.query('sensors',
`UPDATE sessiondataref SET ${updates.join(', ')} WHERE session_id = $${idx} RETURNING *`,
values
);
if (result.rows.length === 0) {
return res.status(404).json({ error: 'Session not found' });
}
res.json(result.rows[0]);
} catch (err) {
console.error('Error updating session details:', err.message);
res.status(500).json({ error: 'internal server error' });
}
});
module.exports = router;

View File

@@ -2,7 +2,7 @@ const { Pool } = require('pg');
const baseConfig = {
user: process.env.DB_USER,
password: process.env.DB_PSW,
password: process.env.DB_PASSWORD,
host: process.env.DB_HOST,
port: process.env.DB_PORT,
max: 10,
@@ -11,14 +11,19 @@ const baseConfig = {
};
const dbs = {
data: { name: process.env.DATA_DB },
sensors: { name: process.env.SENSORS_DB }
data: { name: process.env.DATA_DB || 'data' },
sensors: { name: process.env.SENSORS_DB || 'sensors' }
}
const pools = {};
function getPool(db) {
const dbConfig = dbs[db];
if (!dbConfig) throw new Error(`Database ${db} not configured`);
return new Pool({ ...baseConfig, database: dbConfig.name });
if (!pools[db]) {
pools[db] = new Pool({ ...baseConfig, database: dbConfig.name });
}
return pools[db];
}
async function checkConnection(db) {
@@ -26,8 +31,8 @@ async function checkConnection(db) {
await getPool(db).query('SELECT NOW()');
return true;
} catch (err) {
console.error(`Error connecting to ${db} database`, err);
return false;
console.error(`Error connecting to ${db} database`, err.message);
return false;
}
}
@@ -38,6 +43,7 @@ async function query(db, text, params) {
async function init() {
try {
// Tabella sensori
await query('sensors', `
CREATE TABLE IF NOT EXISTS sensors (
id SERIAL PRIMARY KEY,
@@ -46,11 +52,28 @@ async function init() {
created_at TIMESTAMPTZ DEFAULT NOW()
);
`);
// Tabella sessioni: mappa session_id (tag InfluxDB) a metadati custom
await query('sensors', `
CREATE TABLE IF NOT EXISTS sessiondataref (
id SERIAL PRIMARY KEY,
session_id VARCHAR(32) UNIQUE NOT NULL,
sensor_name VARCHAR(255) NOT NULL,
name VARCHAR(255),
description TEXT,
tags TEXT[] DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
disconnected_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ DEFAULT NOW()
);
`);
console.log('[DB] Tabelle verificate (sensors, sessiondataref)');
} catch (err) {
console.error('Error creating sensors table', err);
console.error('[DB] Error creating tables:', err.message);
}
}
init();
module.exports = { checkConnection, query };
module.exports = { checkConnection, query };

View File

@@ -15,11 +15,10 @@ const writeApi = client.getWriteApi(org, bucket, 'ms', {
/**
* Scrive dati generici su InfluxDB senza mapping.
* I campi vengono scritti con il nome originale.
* @param {string} measurement - nome della measurement (es. 'logs', 'weather')
* @param {Object} fields - campi { key: value }
* @param {string} sensor - nome del sensore
* @param {string} session - id sessione
* @param {string} session - id sessione (tag immutabile)
* @param {number} timestamp - timestamp unix ms
*/
function writeGenericData(measurement, fields, sensor, session, timestamp) {
@@ -52,6 +51,24 @@ function writeForecastBatch(points, sensor, session) {
}
}
/**
* Forza il flush del buffer di scrittura.
*/
async function flush() {
try {
await writeApi.flush();
} catch (err) {
console.error('[INFLUX] Flush error:', err.message);
}
}
/**
* Query storica per una sessione: ritorna righe pivotate con tutti i campi.
* @param {string} sensor - nome sensore
* @param {string} session - session_id (tag InfluxDB)
* @param {string} since - ISO timestamp o duration (es. "-30d")
* @returns {Array<Object>}
*/
async function queryHistory(sensor, session, since) {
const queryApi = client.getQueryApi(org);
const fluxQuery = `
@@ -61,6 +78,7 @@ async function queryHistory(sensor, session, since) {
|> filter(fn: (r) => r.sensor == "${sensor}")
|> filter(fn: (r) => r.session == "${session}")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> sort(columns: ["_time"])
`;
const rows = [];
@@ -75,4 +93,44 @@ async function queryHistory(sensor, session, since) {
});
}
module.exports = { writeGenericData, writeForecastBatch, queryHistory };
/**
* Esporta tutti i dati di una sessione come CSV.
* @param {string} sensor - nome sensore
* @param {string} session - session_id
* @param {string} since - ISO timestamp inizio (opzionale, default -30d)
* @returns {string} CSV content
*/
async function exportSessionCSV(sensor, session, since) {
const start = since || '-30d';
const rows = await queryHistory(sensor, session, start);
if (rows.length === 0) return '';
// Raccogli tutti i field names (esclusi meta InfluxDB)
const metaKeys = new Set(['result', 'table', '_start', '_stop', '_measurement', 'sensor', 'session', '']);
const fieldNames = new Set();
for (const row of rows) {
for (const key of Object.keys(row)) {
if (!metaKeys.has(key) && key !== '_time') {
fieldNames.add(key);
}
}
}
const fields = Array.from(fieldNames).sort();
const header = ['timestamp', ...fields].join(',');
const csvRows = rows.map(row => {
const ts = row._time || '';
const values = fields.map(f => {
const v = row[f];
if (v === null || v === undefined) return '';
return v;
});
return [ts, ...values].join(',');
});
return header + '\n' + csvRows.join('\n') + '\n';
}
module.exports = { writeGenericData, writeForecastBatch, flush, queryHistory, exportSessionCSV };

View File

@@ -2,6 +2,7 @@ const { WebSocketServer } = require('ws');
const { decode } = require('@msgpack/msgpack');
const { consumeConnectionToken, appendAsConnection, query, hset, del } = require('../store/redis');
const { writeGenericData, writeForecastBatch } = require('../store/influx');
const db = require('../store/db');
// In-memory registries
const sensorWatchers = new Map(); // sensorName → Set<WebSocket> (watchers)
@@ -37,7 +38,6 @@ function setup(server) {
wss.handleUpgrade(req, socket, head, (ws) => {
ws.sensorName = sensor;
ws.sessionId = generateSessionId();
ws.sessionLabel = ws.sessionId;
ws.connectedAt = new Date().toISOString();
handleSensorConnection(ws);
});
@@ -54,15 +54,28 @@ function setup(server) {
});
}
function handleSensorConnection(ws) {
const { sensorName, sessionId, sessionLabel, connectedAt } = ws;
async function handleSensorConnection(ws) {
const { sensorName, sessionId, connectedAt } = ws;
console.log(`Sensor connected: ${sensorName} (session: ${sessionId})`);
// Register in global registry
connectedSensors.set(sensorName, ws);
appendAsConnection(sensorName, 'connected', connectedAt);
hset(`sensors:${sensorName}`, 'session', sessionId, 'sessionLabel', sessionLabel);
hset(`sensors:${sensorName}`, 'session', sessionId, 'connectedAt', connectedAt);
// Crea riga in sessiondataref su PostgreSQL (nome di default = sessionId)
try {
await db.query('sensors',
`INSERT INTO sessiondataref (session_id, sensor_name, name, created_at)
VALUES ($1, $2, $3, NOW())
ON CONFLICT (session_id) DO NOTHING`,
[sessionId, sensorName, sessionId]
);
console.log(`[${sensorName}] Session ${sessionId} registrata in sessiondataref`);
} catch (err) {
console.error(`[${sensorName}] Errore creazione sessiondataref:`, err.message);
}
const pingInterval = setInterval(() => {
if (ws.readyState === ws.OPEN) ws.ping();
@@ -84,19 +97,17 @@ function handleSensorConnection(ws) {
const { ts, _m, ...fields } = packet;
// Route per tipo di measurement
// InfluxDB: usa SEMPRE sessionId come tag (non cambia mai)
if (_m === 'forecast_batch') {
// Batch previsioni orarie
if (Array.isArray(fields.points)) {
writeForecastBatch(fields.points, sensorName, ws.sessionLabel);
writeForecastBatch(fields.points, sensorName, sessionId);
}
} else {
// weather, logs, o altro — scrivi tutti i campi
const measurement = _m || 'sensor_data';
writeGenericData(measurement, fields, sensorName, ws.sessionLabel, ts);
writeGenericData(measurement, fields, sensorName, sessionId, ts);
}
// Broadcast ai watchers: invia dati grezzi con measurement e fields
// Broadcast ai watchers
const watchers = sensorWatchers.get(sensorName);
if (watchers && watchers.size > 0) {
const msg = JSON.stringify({
@@ -115,11 +126,22 @@ function handleSensorConnection(ws) {
}
});
ws.on('close', () => {
ws.on('close', async () => {
console.log(`Sensor disconnected: ${sensorName}`);
clearInterval(pingInterval);
connectedSensors.delete(sensorName);
appendAsConnection(sensorName, 'disconnected', new Date().toISOString());
// Aggiorna disconnected_at in sessiondataref
try {
await db.query('sensors',
`UPDATE sessiondataref SET disconnected_at = NOW() WHERE session_id = $1`,
[sessionId]
);
} catch (err) {
console.error(`[${sensorName}] Errore update disconnected_at:`, err.message);
}
del(`sensors:${sensorName}`);
});