From 981f498eb74ac4db65b6bf4a9b7a1508595df0a6 Mon Sep 17 00:00:00 2001 From: Giuseppe Raffa <77052701+sesee3@users.noreply.github.com> Date: Thu, 16 Apr 2026 15:37:10 +0200 Subject: [PATCH] feat: update session handling and add session history endpoint --- console/src/pages/live.html | 92 ++++++++------ realtime/src/routes/sessions.js | 207 ++++++++++++++++++++++++++------ realtime/src/store/db.js | 39 ++++-- realtime/src/store/influx.js | 64 +++++++++- realtime/src/ws/handler.js | 44 +++++-- 5 files changed, 354 insertions(+), 92 deletions(-) diff --git a/console/src/pages/live.html b/console/src/pages/live.html index f072500..0dc610f 100644 --- a/console/src/pages/live.html +++ b/console/src/pages/live.html @@ -258,31 +258,46 @@ function getColorForField(key) { } const FIELD_DEFS = { - temp: { name: 'Temperatura', unit: '°C', category: 'weather' }, - hum: { name: 'Umidita', unit: '%', category: 'weather' }, - pres: { name: 'Pressione', unit: 'hPa', category: 'weather' }, - wSpd: { name: 'Velocita Vento', unit: 'km/h', category: 'weather' }, - wDir: { name: 'Direzione Vento', unit: '°', category: 'weather' }, - gust: { name: 'Raffiche', unit: 'km/h', category: 'weather' }, - rain: { name: 'Pioggia', unit: 'mm', category: 'weather' }, - prec: { name: 'Precipitazioni', unit: 'mm', category: 'weather' }, - lat: { name: 'Latitudine', unit: '°', category: 'navigation' }, - lon: { name: 'Longitudine', unit: '°', category: 'navigation' }, - hdg: { name: 'Heading', unit: '°', category: 'navigation' }, - sog: { name: 'Velocita SOG', unit: 'kn', category: 'navigation' }, - cog: { name: 'Rotta COG', unit: '°', category: 'navigation' }, - depth: { name: 'Profondita', unit: 'm', category: 'navigation' }, - engTemp: { name: 'Temp. Motore', unit: '°C', category: 'engine' }, - wvH: { name: 'Altezza Onde', unit: 'm', category: 'weather' }, - wvP: { name: 'Periodo Onde', unit: 's', category: 'weather' }, - wvD: { name: 'Direzione Onde', unit: '°', category: 'weather' }, - curD: { name: 'Dir. Corrente', unit: '°', category: 'weather' }, - curV: { name: 'Vel. Corrente', unit: 'm/s', category: 'weather' }, - fTemp: { name: 'Prev. Temperatura', unit: '°C', category: 'weather' }, - fWSpd: { name: 'Prev. Vento', unit: 'km/h', category: 'weather' } + // Meteo (da openmeteo → SignalK → logs) + 'meb.forecasts.temperature': { name: 'Temperatura', unit: '°C', category: 'weather' }, + 'meb.forecast.wind.speed': { name: 'Velocita Vento', unit: 'km/h', category: 'weather' }, + 'meb.forecast.wind.direction': { name: 'Direzione Vento', unit: '°', category: 'weather' }, + 'meb.forecast.wind.gusts': { name: 'Raffiche', unit: 'km/h', category: 'weather' }, + 'meb.forecast.humidity': { name: 'Umidita', unit: '%', category: 'weather' }, + 'meb.forecast.pressure': { name: 'Pressione', unit: 'hPa', category: 'weather' }, + 'meb.forecast.precipitation': { name: 'Precipitazioni', unit: 'mm', category: 'weather' }, + 'meb.forecast.rain': { name: 'Pioggia', unit: 'mm', category: 'weather' }, + 'meb.forecast.cloudCover': { name: 'Copertura Nuvole', unit: '%', category: 'weather' }, + 'meb.forecast.precipitationProbability': { name: 'Prob. Precipitazioni', unit: '%', category: 'weather' }, + // Marine + 'meb.waves.height': { name: 'Altezza Onde', unit: 'm', category: 'weather' }, + 'meb.waves.direction': { name: 'Direzione Onde', unit: '°', category: 'weather' }, + 'meb.waves.period': { name: 'Periodo Onde', unit: 's', category: 'weather' }, + 'meb.waves.peakPeriod': { name: 'Periodo Picco', unit: 's', category: 'weather' }, + 'meb.waves.currentVelocity': { name: 'Vel. Corrente', unit: 'm/s', category: 'weather' }, + 'meb.waves.currentDirection': { name: 'Dir. Corrente', unit: '°', category: 'weather' }, + // Navigazione + 'navigation.position.latitude': { name: 'Latitudine', unit: '°', category: 'navigation' }, + 'navigation.position.longitude': { name: 'Longitudine', unit: '°', category: 'navigation' }, + 'navigation.headingTrue': { name: 'Heading', unit: '°', category: 'navigation' }, + 'navigation.speedOverGround': { name: 'Velocita SOG', unit: 'kn', category: 'navigation' }, + 'navigation.courseOverGroundTrue': { name: 'Rotta COG', unit: '°', category: 'navigation' }, + // Elettrica + 'electrical.batteries.service.Voltage': { name: 'Batteria Serv. V', unit: 'V', category: 'engine' }, + 'electrical.batteries.service.current': { name: 'Batteria Serv. A', unit: 'A', category: 'engine' }, + 'electrical.batteries.service.stateOfCharge': { name: 'Batteria Serv. SoC', unit: '%', category: 'engine' }, + 'electrical.batteries.traction.Voltage': { name: 'Batteria Traz. V', unit: 'V', category: 'engine' }, + 'electrical.batteries.traction.current': { name: 'Batteria Traz. A', unit: 'A', category: 'engine' }, + 'electrical.batteries.traction.stateOfCharge': { name: 'Batteria Traz. SoC', unit: '%', category: 'engine' }, + 'electrical.batteries.traction.temperature': { name: 'Batteria Traz. Temp', unit: '°C', category: 'engine' }, + 'electrical.batteries.traction.power': { name: 'Batteria Traz. W', unit: 'W', category: 'engine' }, + // Motore + 'propulsion.0.revolutions': { name: 'Giri Motore', unit: 'RPM', category: 'engine' }, + // Sistema + 'system.uptime': { name: 'Uptime', unit: 's', category: 'engine' } }; -const MEASUREMENT_CATEGORY = { weather: 'weather', navigation: 'navigation', engine: 'engine' }; -const ALWAYS_FILL_BOTTOM_FIELDS = ['lat', 'lon']; +const MEASUREMENT_CATEGORY = { weather: 'weather', navigation: 'navigation', logs: 'navigation', engine: 'engine' }; +const ALWAYS_FILL_BOTTOM_FIELDS = ['navigation.position.latitude', 'navigation.position.longitude']; async function loadSessions() { document.getElementById('sessionList').innerHTML = '
Caricamento...
'; @@ -301,23 +316,27 @@ async function loadSessions() { const meta = typeof rawMeta === 'string' ? JSON.parse(rawMeta) : rawMeta; const item = document.createElement('div'); item.className = 'session-item'; - const connTime = meta.connectedAt ? new Date(meta.connectedAt * 1000).toLocaleTimeString('it-IT') : '—'; - item.innerHTML = `
${meta.name || sId}${sId}
Connesso: ${connTime}
`; + const connTime = meta.connectedAt ? new Date(meta.connectedAt).toLocaleTimeString('it-IT') : '—'; + const sessId = meta.session || '—'; + item.innerHTML = `
${meta.name || sId}${sessId}
Connesso: ${connTime}
`; item.onclick = () => selectSession(sId, meta); document.getElementById('sessionList').appendChild(item); } } catch (err) { } } +let currentSessionId = null; // InfluxDB session tag (es. s1234) + function selectSession(sId, meta) { currentSensorId = sId; - sessionStartTime = meta.connectedAt ? meta.connectedAt * 1000 : Date.now(); + currentSessionId = meta.session || null; + sessionStartTime = meta.connectedAt ? new Date(meta.connectedAt).getTime() : Date.now(); document.getElementById('sessionOverlay').style.display = 'none'; document.getElementById('mainContent').style.display = ''; document.getElementById('bottomBar').style.display = ''; document.getElementById('sensorName').textContent = meta.name || sId; document.getElementById('sessionInfoTitle').textContent = `Sensore: ${meta.name || sId}`; - document.getElementById('currentSessionLabel').textContent = meta.sessionLabel || meta.session || sId; + document.getElementById('currentSessionLabel').textContent = currentSessionId || sId; liveData = {}; Object.values(miniCharts).forEach(c => c.destroy()); miniCharts = {}; @@ -389,7 +408,11 @@ function handleSensorData(msg) { if (redrawExpChart) updateExpandedChart(); if (redrawCompChart) updateCompChart(); - if (measurement === 'logs' && fields.lat && fields.lon) updateMap(fields.lat, fields.lon, fields.hdg, fields.wDir, fields.wvD); + const lat = fields['navigation.position.latitude']; + const lon = fields['navigation.position.longitude']; + if (lat != null && lon != null) { + updateMap(lat, lon, fields['navigation.headingTrue'], fields['meb.forecast.wind.direction'], fields['meb.waves.direction']); + } } function createHybCard(key, def, val) { @@ -681,7 +704,8 @@ document.getElementById('downloadBtn').onclick = async () => { btn.textContent = '...'; await fetch(`${REALTIME_URL}/sessions/${currentSensorId}/flush`, { method: 'POST' }); - const csvUrl = `${REALTIME_URL}/sessions/${currentSensorId}/csv?from=${sessionStartTime}`; + const sessionParam = currentSessionId ? `&session=${currentSessionId}` : ''; + const csvUrl = `${REALTIME_URL}/sessions/${currentSensorId}/csv?from=${sessionStartTime}${sessionParam}`; const res = await fetch(csvUrl); const blob = await res.blob(); @@ -733,12 +757,12 @@ document.getElementById('cancelSessionLabelBtn').onclick = () => { }; document.getElementById('saveSessionLabelBtn').onclick = async () => { const label = document.getElementById('sessionLabelInput').value.trim(); - if (!label || !currentSensorId) return; + if (!label || !currentSensorId || !currentSessionId) return; try { - const res = await fetch(`${REALTIME_URL}/sessions/${currentSensorId}/label`, { - method: 'POST', + const res = await fetch(`${REALTIME_URL}/sessions/${currentSensorId}/details`, { + method: 'PUT', headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ label }) + body: JSON.stringify({ session: currentSessionId, name: label }) }); if (res.ok) { document.getElementById('currentSessionLabel').textContent = label; diff --git a/realtime/src/routes/sessions.js b/realtime/src/routes/sessions.js index 0eb27b7..d0049d5 100644 --- a/realtime/src/routes/sessions.js +++ b/realtime/src/routes/sessions.js @@ -1,9 +1,11 @@ const router = require('express').Router(); const { queryAll, query, hset } = require('../store/redis'); const { connectedSensors } = require('../ws/handler'); +const { flush, exportSessionCSV } = require('../store/influx'); +const db = require('../store/db'); /** - * GET /sessions — Lista tutte le sessioni dei sensori con metadata e rules versions + * GET /sessions — Lista tutte le sessioni attive dei sensori */ router.get('/', async (req, res) => { try { @@ -14,15 +16,9 @@ router.get('/', async (req, res) => { const info = await query(name, 'sensors'); sessions[name] = { name, - connectedAt: info.timestamp || null, + connectedAt: info.connectedAt || info.timestamp || null, session: info.session || null, - sessionLabel: info.sessionLabel || info.session || null, - status: info.status || 'unknown', - rules: { - weather: info.rules_weather || null, - data: info.rules_data || null, - logs: info.rules_logs || null, - } + status: info.status || 'unknown' }; } res.json(sessions); @@ -32,6 +28,21 @@ router.get('/', async (req, res) => { } }); +/** + * GET /sessions/history — Lista tutte le sessioni passate (da sessiondataref) + */ +router.get('/history', async (req, res) => { + try { + const result = await db.query('sensors', + `SELECT * FROM sessiondataref ORDER BY created_at DESC LIMIT 100` + ); + res.json(result.rows); + } catch (err) { + console.error('Error fetching session history:', err.message); + res.status(500).json({ error: 'internal server error' }); + } +}); + /** * GET /sessions/pending — Lista token di connessione pendenti */ @@ -67,7 +78,7 @@ router.get('/connected', async (req, res) => { }); /** - * GET /sessions/connected/:id — Verifica se un sensore specifico è connesso + * GET /sessions/connected/:id — Verifica se un sensore specifico e connesso */ router.get('/connected/:id', async (req, res) => { const { id } = req.params; @@ -84,37 +95,161 @@ router.get('/connected/:id', async (req, res) => { }); /** - * POST /sessions/:id/label — Cambia il label della sessione per un sensore connesso. - * Non interrompe il flusso dati. I nuovi punti InfluxDB avranno il nuovo tag. + * POST /sessions/:id/flush — Forza il flush del buffer InfluxDB */ -router.post('/:id/label', async (req, res) => { - const { id } = req.params; - const { label } = req.body; - - if (!label || typeof label !== 'string' || label.trim().length === 0) { - return res.status(400).json({ error: 'label is required' }); - } - - const trimmedLabel = label.trim(); - - // Trova il WS client connesso - const ws = connectedSensors.get(id); - if (!ws) { - return res.status(404).json({ error: 'sensor not connected' }); - } - - // Aggiorna in memoria (effetto immediato sui prossimi punti InfluxDB) - ws.sessionLabel = trimmedLabel; - - // Aggiorna in Redis per persistenza +router.post('/:id/flush', async (req, res) => { try { - await hset(`sensors:${id}`, 'sessionLabel', trimmedLabel); + await flush(); + res.json({ status: 'ok' }); } catch (err) { - console.error('Error updating session label in Redis', err); + console.error('Error flushing:', err.message); + res.status(500).json({ error: 'flush failed' }); + } +}); + +/** + * GET /sessions/:id/csv — Esporta tutti i dati della sessione come CSV. + * Usa il session_id da Redis (sensore connesso) oppure il query param ?session=sXXXX. + * Il CSV contiene tutti i dati logs per quel sensor + session da InfluxDB. + */ +router.get('/:id/csv', async (req, res) => { + const sensorName = req.params.id; + + try { + // Determina il session_id: da query param, da Redis, o dall'ultimo in DB + let sessionId = req.query.session || null; + + if (!sessionId) { + // Prova da Redis (sensore connesso) + const info = await query(sensorName, 'sensors'); + sessionId = info?.session || null; + } + + if (!sessionId) { + // Ultima sessione in sessiondataref + const result = await db.query('sensors', + `SELECT session_id FROM sessiondataref WHERE sensor_name = $1 ORDER BY created_at DESC LIMIT 1`, + [sensorName] + ); + sessionId = result.rows[0]?.session_id || null; + } + + if (!sessionId) { + return res.status(404).json({ error: 'No session found for this sensor' }); + } + + // Determina il range temporale: da connectedAt della sessione + let since = req.query.from ? new Date(parseInt(req.query.from)).toISOString() : null; + + if (!since) { + // Cerca il created_at nella sessiondataref + const result = await db.query('sensors', + `SELECT created_at FROM sessiondataref WHERE session_id = $1`, + [sessionId] + ); + since = result.rows[0]?.created_at?.toISOString() || '-30d'; + } + + const csv = await exportSessionCSV(sensorName, sessionId, since); + + if (!csv) { + return res.status(404).json({ error: 'No data found for this session' }); + } + + res.setHeader('Content-Type', 'text/csv'); + res.setHeader('Content-Disposition', `attachment; filename="session_${sessionId}_${sensorName}.csv"`); + res.send(csv); + + } catch (err) { + console.error('Error exporting CSV:', err.message); + res.status(500).json({ error: 'CSV export failed' }); + } +}); + +/** + * GET /sessions/:id/details — Ottieni i dettagli della sessione corrente + */ +router.get('/:id/details', async (req, res) => { + const sensorName = req.params.id; + const sessionId = req.query.session || null; + + try { + let result; + if (sessionId) { + result = await db.query('sensors', + `SELECT * FROM sessiondataref WHERE session_id = $1`, + [sessionId] + ); + } else { + // Ultima sessione per questo sensore + result = await db.query('sensors', + `SELECT * FROM sessiondataref WHERE sensor_name = $1 ORDER BY created_at DESC LIMIT 1`, + [sensorName] + ); + } + + if (result.rows.length === 0) { + return res.status(404).json({ error: 'Session not found' }); + } + + res.json(result.rows[0]); + } catch (err) { + console.error('Error fetching session details:', err.message); + res.status(500).json({ error: 'internal server error' }); + } +}); + +/** + * PUT /sessions/:id/details — Aggiorna nome, descrizione o tags della sessione. + * Body: { name?, description?, tags? } + */ +router.put('/:id/details', async (req, res) => { + const sensorName = req.params.id; + const { session: sessionId, name, description, tags } = req.body; + + if (!sessionId) { + return res.status(400).json({ error: 'session id is required in body' }); } - console.log(`[${id}] Session label changed to: ${trimmedLabel}`); - res.json({ status: 'ok', label: trimmedLabel }); + try { + const updates = []; + const values = []; + let idx = 1; + + if (name !== undefined) { + updates.push(`name = $${idx++}`); + values.push(name); + } + if (description !== undefined) { + updates.push(`description = $${idx++}`); + values.push(description); + } + if (tags !== undefined) { + updates.push(`tags = $${idx++}`); + values.push(tags); + } + + if (updates.length === 0) { + return res.status(400).json({ error: 'No fields to update' }); + } + + updates.push(`updated_at = NOW()`); + values.push(sessionId); + + const result = await db.query('sensors', + `UPDATE sessiondataref SET ${updates.join(', ')} WHERE session_id = $${idx} RETURNING *`, + values + ); + + if (result.rows.length === 0) { + return res.status(404).json({ error: 'Session not found' }); + } + + res.json(result.rows[0]); + } catch (err) { + console.error('Error updating session details:', err.message); + res.status(500).json({ error: 'internal server error' }); + } }); module.exports = router; diff --git a/realtime/src/store/db.js b/realtime/src/store/db.js index 8ff2139..72535af 100644 --- a/realtime/src/store/db.js +++ b/realtime/src/store/db.js @@ -2,7 +2,7 @@ const { Pool } = require('pg'); const baseConfig = { user: process.env.DB_USER, - password: process.env.DB_PSW, + password: process.env.DB_PASSWORD, host: process.env.DB_HOST, port: process.env.DB_PORT, max: 10, @@ -11,14 +11,19 @@ const baseConfig = { }; const dbs = { - data: { name: process.env.DATA_DB }, - sensors: { name: process.env.SENSORS_DB } + data: { name: process.env.DATA_DB || 'data' }, + sensors: { name: process.env.SENSORS_DB || 'sensors' } } +const pools = {}; + function getPool(db) { const dbConfig = dbs[db]; if (!dbConfig) throw new Error(`Database ${db} not configured`); - return new Pool({ ...baseConfig, database: dbConfig.name }); + if (!pools[db]) { + pools[db] = new Pool({ ...baseConfig, database: dbConfig.name }); + } + return pools[db]; } async function checkConnection(db) { @@ -26,8 +31,8 @@ async function checkConnection(db) { await getPool(db).query('SELECT NOW()'); return true; } catch (err) { - console.error(`Error connecting to ${db} database`, err); - return false; + console.error(`Error connecting to ${db} database`, err.message); + return false; } } @@ -38,6 +43,7 @@ async function query(db, text, params) { async function init() { try { + // Tabella sensori await query('sensors', ` CREATE TABLE IF NOT EXISTS sensors ( id SERIAL PRIMARY KEY, @@ -46,11 +52,28 @@ async function init() { created_at TIMESTAMPTZ DEFAULT NOW() ); `); + + // Tabella sessioni: mappa session_id (tag InfluxDB) a metadati custom + await query('sensors', ` + CREATE TABLE IF NOT EXISTS sessiondataref ( + id SERIAL PRIMARY KEY, + session_id VARCHAR(32) UNIQUE NOT NULL, + sensor_name VARCHAR(255) NOT NULL, + name VARCHAR(255), + description TEXT, + tags TEXT[] DEFAULT '{}', + created_at TIMESTAMPTZ DEFAULT NOW(), + disconnected_at TIMESTAMPTZ, + updated_at TIMESTAMPTZ DEFAULT NOW() + ); + `); + + console.log('[DB] Tabelle verificate (sensors, sessiondataref)'); } catch (err) { - console.error('Error creating sensors table', err); + console.error('[DB] Error creating tables:', err.message); } } init(); -module.exports = { checkConnection, query }; \ No newline at end of file +module.exports = { checkConnection, query }; diff --git a/realtime/src/store/influx.js b/realtime/src/store/influx.js index 8253b9d..63bbfc2 100644 --- a/realtime/src/store/influx.js +++ b/realtime/src/store/influx.js @@ -15,11 +15,10 @@ const writeApi = client.getWriteApi(org, bucket, 'ms', { /** * Scrive dati generici su InfluxDB senza mapping. - * I campi vengono scritti con il nome originale. * @param {string} measurement - nome della measurement (es. 'logs', 'weather') * @param {Object} fields - campi { key: value } * @param {string} sensor - nome del sensore - * @param {string} session - id sessione + * @param {string} session - id sessione (tag immutabile) * @param {number} timestamp - timestamp unix ms */ function writeGenericData(measurement, fields, sensor, session, timestamp) { @@ -52,6 +51,24 @@ function writeForecastBatch(points, sensor, session) { } } +/** + * Forza il flush del buffer di scrittura. + */ +async function flush() { + try { + await writeApi.flush(); + } catch (err) { + console.error('[INFLUX] Flush error:', err.message); + } +} + +/** + * Query storica per una sessione: ritorna righe pivotate con tutti i campi. + * @param {string} sensor - nome sensore + * @param {string} session - session_id (tag InfluxDB) + * @param {string} since - ISO timestamp o duration (es. "-30d") + * @returns {Array} + */ async function queryHistory(sensor, session, since) { const queryApi = client.getQueryApi(org); const fluxQuery = ` @@ -61,6 +78,7 @@ async function queryHistory(sensor, session, since) { |> filter(fn: (r) => r.sensor == "${sensor}") |> filter(fn: (r) => r.session == "${session}") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") + |> sort(columns: ["_time"]) `; const rows = []; @@ -75,4 +93,44 @@ async function queryHistory(sensor, session, since) { }); } -module.exports = { writeGenericData, writeForecastBatch, queryHistory }; +/** + * Esporta tutti i dati di una sessione come CSV. + * @param {string} sensor - nome sensore + * @param {string} session - session_id + * @param {string} since - ISO timestamp inizio (opzionale, default -30d) + * @returns {string} CSV content + */ +async function exportSessionCSV(sensor, session, since) { + const start = since || '-30d'; + const rows = await queryHistory(sensor, session, start); + + if (rows.length === 0) return ''; + + // Raccogli tutti i field names (esclusi meta InfluxDB) + const metaKeys = new Set(['result', 'table', '_start', '_stop', '_measurement', 'sensor', 'session', '']); + const fieldNames = new Set(); + for (const row of rows) { + for (const key of Object.keys(row)) { + if (!metaKeys.has(key) && key !== '_time') { + fieldNames.add(key); + } + } + } + + const fields = Array.from(fieldNames).sort(); + const header = ['timestamp', ...fields].join(','); + + const csvRows = rows.map(row => { + const ts = row._time || ''; + const values = fields.map(f => { + const v = row[f]; + if (v === null || v === undefined) return ''; + return v; + }); + return [ts, ...values].join(','); + }); + + return header + '\n' + csvRows.join('\n') + '\n'; +} + +module.exports = { writeGenericData, writeForecastBatch, flush, queryHistory, exportSessionCSV }; diff --git a/realtime/src/ws/handler.js b/realtime/src/ws/handler.js index 1a9d6e1..5a60b23 100644 --- a/realtime/src/ws/handler.js +++ b/realtime/src/ws/handler.js @@ -2,6 +2,7 @@ const { WebSocketServer } = require('ws'); const { decode } = require('@msgpack/msgpack'); const { consumeConnectionToken, appendAsConnection, query, hset, del } = require('../store/redis'); const { writeGenericData, writeForecastBatch } = require('../store/influx'); +const db = require('../store/db'); // In-memory registries const sensorWatchers = new Map(); // sensorName → Set (watchers) @@ -37,7 +38,6 @@ function setup(server) { wss.handleUpgrade(req, socket, head, (ws) => { ws.sensorName = sensor; ws.sessionId = generateSessionId(); - ws.sessionLabel = ws.sessionId; ws.connectedAt = new Date().toISOString(); handleSensorConnection(ws); }); @@ -54,15 +54,28 @@ function setup(server) { }); } -function handleSensorConnection(ws) { - const { sensorName, sessionId, sessionLabel, connectedAt } = ws; +async function handleSensorConnection(ws) { + const { sensorName, sessionId, connectedAt } = ws; console.log(`Sensor connected: ${sensorName} (session: ${sessionId})`); // Register in global registry connectedSensors.set(sensorName, ws); appendAsConnection(sensorName, 'connected', connectedAt); - hset(`sensors:${sensorName}`, 'session', sessionId, 'sessionLabel', sessionLabel); + hset(`sensors:${sensorName}`, 'session', sessionId, 'connectedAt', connectedAt); + + // Crea riga in sessiondataref su PostgreSQL (nome di default = sessionId) + try { + await db.query('sensors', + `INSERT INTO sessiondataref (session_id, sensor_name, name, created_at) + VALUES ($1, $2, $3, NOW()) + ON CONFLICT (session_id) DO NOTHING`, + [sessionId, sensorName, sessionId] + ); + console.log(`[${sensorName}] Session ${sessionId} registrata in sessiondataref`); + } catch (err) { + console.error(`[${sensorName}] Errore creazione sessiondataref:`, err.message); + } const pingInterval = setInterval(() => { if (ws.readyState === ws.OPEN) ws.ping(); @@ -84,19 +97,17 @@ function handleSensorConnection(ws) { const { ts, _m, ...fields } = packet; - // Route per tipo di measurement + // InfluxDB: usa SEMPRE sessionId come tag (non cambia mai) if (_m === 'forecast_batch') { - // Batch previsioni orarie if (Array.isArray(fields.points)) { - writeForecastBatch(fields.points, sensorName, ws.sessionLabel); + writeForecastBatch(fields.points, sensorName, sessionId); } } else { - // weather, logs, o altro — scrivi tutti i campi const measurement = _m || 'sensor_data'; - writeGenericData(measurement, fields, sensorName, ws.sessionLabel, ts); + writeGenericData(measurement, fields, sensorName, sessionId, ts); } - // Broadcast ai watchers: invia dati grezzi con measurement e fields + // Broadcast ai watchers const watchers = sensorWatchers.get(sensorName); if (watchers && watchers.size > 0) { const msg = JSON.stringify({ @@ -115,11 +126,22 @@ function handleSensorConnection(ws) { } }); - ws.on('close', () => { + ws.on('close', async () => { console.log(`Sensor disconnected: ${sensorName}`); clearInterval(pingInterval); connectedSensors.delete(sensorName); appendAsConnection(sensorName, 'disconnected', new Date().toISOString()); + + // Aggiorna disconnected_at in sessiondataref + try { + await db.query('sensors', + `UPDATE sessiondataref SET disconnected_at = NOW() WHERE session_id = $1`, + [sessionId] + ); + } catch (err) { + console.error(`[${sensorName}] Errore update disconnected_at:`, err.message); + } + del(`sensors:${sensorName}`); });