diff --git a/realtime/src/helper/authdb.js b/realtime/src/helper/authdb.js deleted file mode 100644 index 0c6839c..0000000 --- a/realtime/src/helper/authdb.js +++ /dev/null @@ -1,106 +0,0 @@ -const { Pool } = require('pg'); -const { hash, generateShortId } = require('./cryptoUtils'); - -const pool = new Pool({ - user: process.env.DB_USER, - host: process.env.DB_HOST, - database: process.env.SENSORS_DB, - password: process.env.DB_PASSWORD, - port: process.env.DB_PORT, -}); - -async function checkDB() { - try { - await pool.query('SELECT NOW()'); - return true; - } catch (error) { - console.error('Database connection failed:', error); - return false; - } -} - -async function initDB() { - try { - await pool.query(` - CREATE TABLE IF NOT EXISTS sensors ( - id VARCHAR(10) PRIMARY KEY, - name VARCHAR(100) NOT NULL, - code_hash TEXT NOT NULL UNIQUE, - is_active BOOLEAN DEFAULT TRUE, - last_seen TIMESTAMP DEFAULT NOW(), - created_at TIMESTAMP DEFAULT NOW() - ); - CREATE INDEX IF NOT EXISTS idx_sensors_code_hash ON sensors(code_hash); - `); - console.log('[DB] Database schema initialized (sensors table ensured)'); - } catch (error) { - console.error('[DB] Schema initialization failed:', error); - } -} - -/** - * Restituisce i dati del sensore in base al token ricevuto. - * Il token viene hashato prima della comparazione con il database. - * @param {string} token - il codice segreto del sensore (raw) - */ -async function getSensor(token) { - const hashed = hash(token); - const result = await pool.query('SELECT id, is_active, name, last_seen, created_at FROM sensors WHERE code_hash = $1', [hashed]); - return result.rows[0]; -} - -async function createSensor(name, code) { - const hashedCode = hash(code); - - // Verifica se l'hash esiste già - const result = await pool.query('SELECT id FROM sensors WHERE code_hash = $1', [hashedCode]); - if (result.rows.length > 0) { - throw new Error('Sensor with this code already exists'); - } - - // Genera un ID casuale di 8 caratteri (ottimizzato per spazio, non solo alfanumerico) - const sensorId = generateShortId(8); - - await pool.query('INSERT INTO sensors (id, name, code_hash, is_active, last_seen, created_at) VALUES ($1, $2, $3, $4, $5, $6)', - [sensorId, name, hashedCode, true, new Date(), new Date()]); -} - -/** - * Aggiorna l'ultima attività del sensore. - * @param {*} id - l'id del sensore - * @returns {Promise} - */ -async function updateLastSeen(id) { - await pool.query('UPDATE sensors SET last_seen = NOW() WHERE id = $1', [id]); -} - -/** - * Modifica la disponibilità del sensore. - * @param {*} id - l'id del sensore - * @param {*} is_active - la disponibilità del sensore - * @returns {Promise} - */ -async function setSensorActivity(id, is_active) { - await pool.query('UPDATE sensors SET is_active = $1 WHERE id = $2', [is_active, id]); -} - -async function sensorsExists(id) { - const result = await pool.query('SELECT id FROM sensors WHERE id = $1', [id]); - return result.rows.length > 0; -} - -async function getSensors() { - const resutls = await pool.query('SELECT id, is_active, name, last_seen, created_at FROM sensors'); - return resutls.rows; -} - -module.exports = { - checkDB, - initDB, - getSensor, - updateLastSeen, - setSensorActivity, - getSensors, - sensorsExists, - createSensor -} \ No newline at end of file diff --git a/realtime/src/helper/cryptoUtils.js b/realtime/src/helper/cryptoUtils.js deleted file mode 100644 index ec2bdab..0000000 --- a/realtime/src/helper/cryptoUtils.js +++ /dev/null @@ -1,35 +0,0 @@ -const crypto = require('crypto'); - -/** - * Genera un hash SHA256 in formato esadecimale da una stringa. - * Utilizzato per rendere compatibili authdb.js e tokenStore.js. - */ -function hash(text) { - if (!text) return null; - return crypto.createHash('sha256').update(text).digest('hex'); -} - -/** - * Genera una stringa casuale di lunghezza 'length'. - * Ottimizzata per risparmiare spazio (8 caratteri). - * Include lettere, numeri e simboli per massimizzare l'entropia (non solo alfanumerico). - */ -function generateShortId(length = 8) { - const charset = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*'; - let result = ''; - while (result.length < length) { - const bytes = crypto.randomBytes(length); - for (let i = 0; i < bytes.length && result.length < length; i++) { - // Selezioniamo solo i byte che rientrano nel range del charset per evitare bias - if (bytes[i] < 256 - (256 % charset.length)) { - result += charset[bytes[i] % charset.length]; - } - } - } - return result; -} - -module.exports = { - hash, - generateShortId -}; diff --git a/realtime/src/helper/influxReader.js b/realtime/src/helper/influxReader.js deleted file mode 100644 index 7e8b3e1..0000000 --- a/realtime/src/helper/influxReader.js +++ /dev/null @@ -1,75 +0,0 @@ -const { InfluxDB } = require('@influxdata/influxdb-client'); - -const url = process.env.INFLX_URL; -const token = process.env.INFLX_TOKEN; -const org = process.env.INFLX_ORG; -const bucket = 'boat'; - -const client = new InfluxDB({ url, token }); -const queryApi = client.getQueryApi(org); - -/** - * Query tutti i dati di una sessione sensore da un timestamp di inizio. - * Ritorna array di righe { _time, _measurement, _field, _value, sensor }. - */ -async function querySessionData(sensorId, fromTimestamp) { - const from = new Date(fromTimestamp).toISOString(); - - const query = ` - from(bucket: "${bucket}") - |> range(start: ${from}) - |> filter(fn: (r) => r["sensor"] == "${sensorId}") - |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") - `; - - const rows = []; - return new Promise((resolve, reject) => { - queryApi.queryRows(query, { - next(row, tableMeta) { - const obj = tableMeta.toObject(row); - rows.push(obj); - }, - error(err) { - console.error(`[INFLUX] Query error:`, err.message); - reject(err); - }, - complete() { - resolve(rows); - } - }); - }); -} - -/** - * Formatta i risultati della query in CSV. - */ -function formatCSV(rows) { - if (rows.length === 0) return ''; - - // Raccogli tutte le colonne uniche escludendo meta-campi InfluxDB - const excludeKeys = new Set(['result', 'table', '_start', '_stop', '']); - const allKeys = new Set(); - for (const row of rows) { - for (const key of Object.keys(row)) { - if (!excludeKeys.has(key)) allKeys.add(key); - } - } - - const columns = ['_time', '_measurement', 'sensor', - ...Array.from(allKeys).filter(k => !['_time', '_measurement', 'sensor'].includes(k)).sort() - ]; - - const header = columns.join(','); - const lines = rows.map(row => - columns.map(col => { - const val = row[col]; - if (val == null) return ''; - if (typeof val === 'string' && val.includes(',')) return `"${val}"`; - return val; - }).join(',') - ); - - return header + '\n' + lines.join('\n'); -} - -module.exports = { querySessionData, formatCSV }; diff --git a/realtime/src/helper/influxWriter.js b/realtime/src/helper/influxWriter.js deleted file mode 100644 index a666ebd..0000000 --- a/realtime/src/helper/influxWriter.js +++ /dev/null @@ -1,156 +0,0 @@ -const { InfluxDB, Point } = require('@influxdata/influxdb-client'); - -const url = process.env.INFLX_URL; -const token = process.env.INFLX_TOKEN; -const org = process.env.INFLX_ORG; -const boatTelemetry = 'boat'; - -const client = new InfluxDB({ url, token }); -const writeApi = client.getWriteApi(org, boatTelemetry); - -const FIELD_MAP = { - logs: { - lat: 'latitude', lon: 'longitude', hdg: 'heading', - sog: 'speed_over_ground', cog: 'course_over_ground', - depth: 'depth', engTemp: 'engine_temperature', - fTemp: 'forecast_temperature', fHum: 'forecast_humidity', fPres: 'forecast_pressure', - fWSpd: 'forecast_wind_speed', fWDir: 'forecast_wind_direction', - wvH: 'wave_height', wvP: 'wave_period', wvD: 'wave_direction', - curD: 'current_direction', curV: 'current_velocity' - }, - weather: { - temp: 'temperature', hum: 'humidity', pres: 'pressure', - wSpd: 'wind_speed', wDir: 'wind_direction', gust: 'wind_gusts', - rain: 'rain', prec: 'precipitation', - wvH: 'wave_height', wvP: 'wave_period', wvD: 'wave_direction', - wvPkP: 'wave_peak_period', curD: 'current_direction', curV: 'current_velocity' - }, - forecast: { - temp: 'temperature', hum: 'humidity', pres: 'pressure', - wSpd: 'wind_speed', wDir: 'wind_direction', - precProb: 'precipitation_probability', prec: 'precipitation', - rain: 'rain', cloud: 'cloud_cover', - wvH: 'wave_height', wvP: 'wave_period', wvD: 'wave_direction', - curD: 'current_direction', curV: 'current_velocity' - } -}; - -async function writePoint(sensorId, timestamp, measurement, fields) { - try { - const map = FIELD_MAP[measurement] || {}; - const point = new Point(measurement) - .tag('sensor', sensorId) - .timestamp(new Date(timestamp)); - - for (const [key, value] of Object.entries(fields)) { - if (value == null) continue; - const fieldName = map[key] || key; - - if (typeof value === 'number') { - point.floatField(fieldName, value); - } else if (typeof value === 'string') { - point.stringField(fieldName, value); - } else if (typeof value === 'boolean') { - point.booleanField(fieldName, value); - } - } - - writeApi.writePoint(point); - await writeApi.flush(); - } catch (error) { - console.error(`[INFLUX] Errore writePoint (${measurement}):`, error.message); - } -} - -async function writeForecastBatch(sensorId, points) { - try { - const map = FIELD_MAP.forecast || {}; - - for (const [ts, fields] of points) { - const point = new Point('forecast') - .tag('sensor', sensorId) - .timestamp(new Date(ts)); - - for (const [key, value] of Object.entries(fields)) { - if (value == null) continue; - const fieldName = map[key] || key; - - if (typeof value === 'number') { - point.floatField(fieldName, value); - } else if (typeof value === 'string') { - point.stringField(fieldName, value); - } - } - writeApi.writePoint(point); - } - - await writeApi.flush(); - console.log(`[INFLUX] Scritti ${points.length} punti forecast per sensore ${sensorId}`); - } catch (error) { - console.error(`[INFLUX] Errore writeForecastBatch:`, error.message); - } -} - -// --- Batch buffer per watchers --- -const BATCH_SIZE = 10; -const batchBuffers = new Map(); // sensorId → [{timestamp, measurement, fields}, ...] - -function bufferPoint(sensorId, timestamp, measurement, fields) { - if (!batchBuffers.has(sensorId)) { - batchBuffers.set(sensorId, []); - } - const buffer = batchBuffers.get(sensorId); - buffer.push({ timestamp, measurement, fields }); - - if (buffer.length >= BATCH_SIZE) { - const batch = buffer.splice(0, BATCH_SIZE); - writeBatch(sensorId, batch); - } -} - -async function writeBatch(sensorId, batch) { - try { - for (const { timestamp, measurement, fields } of batch) { - const map = FIELD_MAP[measurement] || {}; - const point = new Point(measurement) - .tag('sensor', sensorId) - .timestamp(new Date(timestamp)); - - for (const [key, value] of Object.entries(fields)) { - if (value == null) continue; - const fieldName = map[key] || key; - if (typeof value === 'number') { - point.floatField(fieldName, value); - } else if (typeof value === 'string') { - point.stringField(fieldName, value); - } else if (typeof value === 'boolean') { - point.booleanField(fieldName, value); - } - } - writeApi.writePoint(point); - } - await writeApi.flush(); - console.log(`[INFLUX] Batch scritto: ${batch.length} punti per sensore ${sensorId}`); - } catch (error) { - console.error(`[INFLUX] Errore writeBatch:`, error.message); - } -} - -async function flushBuffer(sensorId) { - const buffer = batchBuffers.get(sensorId); - if (!buffer || buffer.length === 0) return []; - - const remaining = buffer.splice(0); - await writeBatch(sensorId, remaining); - return remaining; -} - -function getBufferedPoints(sensorId) { - return batchBuffers.get(sensorId) || []; -} - -function clearBuffer(sensorId) { - batchBuffers.delete(sensorId); -} - -module.exports = { writePoint, writeForecastBatch, bufferPoint, flushBuffer, getBufferedPoints, clearBuffer, FIELD_MAP }; diff --git a/realtime/src/helper/redis.js b/realtime/src/helper/redis.js deleted file mode 100644 index 7888e5a..0000000 --- a/realtime/src/helper/redis.js +++ /dev/null @@ -1,91 +0,0 @@ -const Redis = require('ioredis'); - -const redis = new Redis({ - host: process.env.REDIS_HOST, - port: process.env.REDIS_PORT, - password: process.env.REDIS_PASSWORD, -}); - -// Client dedicato per subscribe (ioredis richiede client separato) -const redisSub = new Redis({ - host: process.env.REDIS_HOST, - port: process.env.REDIS_PORT, - password: process.env.REDIS_PASSWORD, -}); - -redis.on('error', (error) => { - console.error('Redis error:', error); -}); - -redis.on('connect', () => { - console.log('Server connected to Redis DB'); -}); - -redisSub.on('error', (error) => { - console.error('Redis sub error:', error); -}); - -const sensors_hash_map = 'sensors:sessions'; - -async function setSession(sensorID, metadata) { - await redis.hset(sensors_hash_map, sensorID, JSON.stringify(metadata)); -} - -async function getSession(sensorID) { - return await redis.hget(sensors_hash_map, sensorID); -} - -async function deleteSession(sensorID) { - await redis.hdel(sensors_hash_map, sensorID); -} - -async function getSessions() { - return await redis.hgetall(sensors_hash_map); -} - -// --- Pub/Sub per live watchers --- - -async function publishSensorData(sensorId, data) { - await redis.publish(`sensor:data:${sensorId}`, JSON.stringify(data)); -} - -async function addWatcher(sensorId) { - return await redis.incr(`sensor:watchers:${sensorId}`); -} - -async function removeWatcher(sensorId) { - const count = await redis.decr(`sensor:watchers:${sensorId}`); - if (count <= 0) { - await redis.del(`sensor:watchers:${sensorId}`); - return 0; - } - return count; -} - -async function getWatcherCount(sensorId) { - const count = await redis.get(`sensor:watchers:${sensorId}`); - return parseInt(count) || 0; -} - -async function checkRedis() { - try { - await redis.ping(); - return true; - } catch (error) { - return false; - } -} - -module.exports = { - setSession, - getSession, - deleteSession, - getSessions, - publishSensorData, - addWatcher, - removeWatcher, - getWatcherCount, - checkRedis, - redis, - redisSub -}; \ No newline at end of file diff --git a/realtime/src/helper/tokenStore.js b/realtime/src/helper/tokenStore.js deleted file mode 100644 index c987cc6..0000000 --- a/realtime/src/helper/tokenStore.js +++ /dev/null @@ -1,52 +0,0 @@ -const { redis } = require('./redis'); -const { generateShortId } = require('./cryptoUtils'); - -const TOKEN_PREFIX = 'token:pending:'; - -/** - * Genera un nuovo token effimero valido per i prossimi 5 secondi. - */ -async function setToken(sensorId, metadata = {}, duration = 5) { - const token = generateShortId(8); - const key = `${TOKEN_PREFIX}${token}`; - - const payload = JSON.stringify({ - sensorId, - metadata, - createdAt: Date.now() - }); - - await redis.set(key, payload, 'EX', duration); - - return token; -} - -/** - * Consuma (valida e rimuove) un token. - * @returns {Object|null} - I dati della sessione se valida, altrimenti null - */ -async function consumeToken(token) { - const key = `${TOKEN_PREFIX}${token}`; - - // Recupera il token - const rawData = await redis.get(key); - if (!rawData) { - return null; - } - - // Il token è monouso: lo cancelliamo subito dopo la lettura - await redis.del(key); - - try { - const data = JSON.parse(rawData); - return data; // Ritorna l'intero oggetto (sensorId, metadata, ecc.) - } catch (e) { - console.error('Error parsing token data:', e); - return null; - } -} - -module.exports = { - setToken, - consumeToken -}; diff --git a/realtime/src/index.js b/realtime/src/index.js index cf69a19..64b9a9c 100644 --- a/realtime/src/index.js +++ b/realtime/src/index.js @@ -1,155 +1,10 @@ const express = require('express'); -const WebSocket = require('ws'); -const Redis = require('ioredis'); -const redisHelper = require('./helper/redis'); -const influxWriter = require('./helper/influxWriter'); -const influxReader = require('./helper/influxReader'); - const app = express(); + app.use(express.json()); -app.use((req, res, next) => { - res.header('Access-Control-Allow-Origin', req.headers.origin || '*'); - res.header('Access-Control-Allow-Credentials', 'true'); - res.header('Access-Control-Allow-Headers', 'Content-Type, Authorization'); - res.header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS'); - if (req.method === 'OPTIONS') return res.sendStatus(204); - next(); -}); -app.get('/', (req, res) => { - res.redirect('/health'); -}); +app.get('/', (req, res) => {}); -app.get('/health', async (req, res) => { - const dbConnected = await require('./helper/authdb').checkDB(); - const redisConnected = await redisHelper.checkRedis(); - console.log('DATABASE LOGS', process.env.DB_USER, process.env.DB_HOST, process.env.DB_NAME, process.env.DB_PASSWORD, process.env.DB_PORT); - console.log('REDIS LOGS', process.env.REDIS_HOST, process.env.REDIS_PORT); - - res.status(200).send({ - status: dbConnected && redisConnected ? 'OK' : 'DEGRADED', - database: dbConnected ? 'connected' : 'disconnected', - redis: redisConnected ? 'connected' : 'disconnected', - service: 'realtime', - version: process.env.VERSION, - build: process.env.VERSION_BUILD, - state: process.env.VERSION_STATE, - }); -}); - -app.use('/sensors', require('./routes/sensors')); -app.use('/connect', require('./routes/connect')); -app.use('/sessions', require('./routes/sessions')); - -// --- Flush buffer e CSV export --- - -app.post('/sessions/:sensorId/flush', async (req, res) => { - try { - const { sensorId } = req.params; - const flushed = await influxWriter.flushBuffer(sensorId); - res.status(200).json({ flushed: flushed.length }); - } catch (error) { - res.status(500).json({ error: error.message }); - } -}); - -app.get('/sessions/:sensorId/csv', async (req, res) => { - try { - const { sensorId } = req.params; - const { from } = req.query; - if (!from) return res.status(400).json({ error: 'from timestamp required' }); - - // Flusha prima il buffer residuo - await influxWriter.flushBuffer(sensorId); - - const rows = await influxReader.querySessionData(sensorId, parseInt(from)); - const csv = influxReader.formatCSV(rows); - - res.setHeader('Content-Type', 'text/csv'); - res.setHeader('Content-Disposition', `attachment; filename="session_${sensorId}.csv"`); - res.send(csv); - } catch (error) { - res.status(500).json({ error: error.message }); - } -}); - - -const server = app.listen(3000, '0.0.0.0', async () => { +app.listen(3000, '0.0.0.0', () => { console.log(`Realtime started`); - await require('./helper/authdb').initDB(); }); - -require('./socket')(server); - -const wss = new WebSocket.Server({ server, path: '/live' }); - -wss.on('connection', (client) => { - let watchedSensor = null; - let subscriber = null; - - client.on('message', async (raw) => { - try { - const msg = JSON.parse(raw); - - if (msg.action === 'watch' && msg.sensorId) { - // Rimuovi watch precedente se esiste - if (watchedSensor) { - await redisHelper.removeWatcher(watchedSensor); - if (subscriber) { - subscriber.unsubscribe(); - subscriber.quit(); - subscriber = null; - } - } - - watchedSensor = msg.sensorId; - await redisHelper.addWatcher(watchedSensor); - - // Subscriber Redis dedicato per questo client - subscriber = new Redis({ - host: process.env.REDIS_HOST, - port: process.env.REDIS_PORT - }); - - subscriber.subscribe(`sensor:data:${watchedSensor}`); - subscriber.on('message', (channel, message) => { - if (client.readyState === WebSocket.OPEN) { - client.send(message); // Dati gia' JSON - } - }); - - client.send(JSON.stringify({ type: 'watching', sensorId: watchedSensor })); - } - - if (msg.action === 'unwatch') { - if (watchedSensor) { - await redisHelper.removeWatcher(watchedSensor); - watchedSensor = null; - } - if (subscriber) { - subscriber.unsubscribe(); - subscriber.quit(); - subscriber = null; - } - client.send(JSON.stringify({ type: 'unwatched' })); - } - - } catch (err) { - console.error('[WS-LIVE] Message error:', err); - } - }); - - client.on('close', async () => { - if (watchedSensor) { - await redisHelper.removeWatcher(watchedSensor); - } - if (subscriber) { - subscriber.unsubscribe(); - subscriber.quit(); - } - }); - - client.on('error', (err) => { - console.error('[WS-LIVE] Client error:', err); - }); -}); \ No newline at end of file diff --git a/realtime/src/routes/connect.js b/realtime/src/routes/connect.js deleted file mode 100644 index 497e8b5..0000000 --- a/realtime/src/routes/connect.js +++ /dev/null @@ -1,74 +0,0 @@ -const express = require('express'); -const db = require('../helper/authdb'); -const tokenStore = require('../helper/tokenStore'); -const redis = require('../helper/redis'); -const router = express.Router(); - -/** - * POST /connect - * Il sensore invia il suo codice segreto (token) e metadati opzionali. - * Se autentica, riceve un token effimero per la connessione WebSocket. - */ -router.post('/', async (req, res) => { - try { - const { token, metadata } = req.body; - - if (!token) { - return res.status(400).send({ error: 'Token is required' }); - } - - const sensor = await db.getSensor(token); - - if (!sensor) { - return res.status(401).send({ error: 'token not valid' }); - } - if (!sensor.is_active) { - return res.status(403).send({ error: 'token not valid' }); - } - - // Genera il token effimero valido per max 5 secondi - const socketToken = await tokenStore.setToken(sensor.id, metadata, 5); - - return res.status(200).send({ - socketToken, - sensorId: sensor.id, - expiresIn: 5 - }); - } catch (error) { - return res.status(500).send({ error: `${error}` }); - } -}); - -/** - * DELETE /connect/:sensorId - * Disconnette forzatamente un sensore rimuovendo la sua sessione da Redis. - */ -router.delete('/:sensorId', async (req, res) => { - const { sensorId } = req.params; - - try { - await redis.deleteSession(sensorId); - return res.status(200).send({ result: 'disconnected' }); - } catch (error) { - return res.status(500).send({ error: `${error}` }); - } -}); - -/** - * POST /connect/new - * Crea un nuovo sensore nel database. - */ -router.post('/new', async (req, res) => { - const { name, code } = req.body; - if (!name || !code) { - return res.status(400).send({ error: 'Name and code are required' }); - } - try { - await db.createSensor(name, code); - return res.status(200).send({ result: 'created' }); - } catch (error) { - return res.status(500).send({ error: `${error}` }); - } -}); - -module.exports = router; \ No newline at end of file diff --git a/realtime/src/routes/sensors.js b/realtime/src/routes/sensors.js deleted file mode 100644 index 730cb3d..0000000 --- a/realtime/src/routes/sensors.js +++ /dev/null @@ -1,36 +0,0 @@ -const express = require('express'); -const db = require('../helper/authdb'); - -router = express.Router(); - -router.get('/', async (req, res) => { - const sensors = await db.getSensors(); - res.status(200).json(sensors); -}); - -router.post('/:id/:activity', async (req, res) => { - const { id, activity } = req.params; - - let isActive; - if (activity === 'active') { - isActive = true; - } else if (activity === 'inactive') { - isActive = false; - } else { - return res.status(400).json({ error: 'Invalid activity' }); - } - - try { - const exists = await db.sensorsExists(id); - if (!exists) { - return res.status(404).json({ error: `Sensor with id ${id} not found` }); - } - await db.setSensorActivity(id, isActive); - res.status(200).json({ status: `Sensor ${activity}` }); - } catch (error) { - console.error('Error updating sensor ID:', id, error); - res.status(500).json({ error: 'Database error' }); - } -}) - -module.exports = router \ No newline at end of file diff --git a/realtime/src/routes/sessions.js b/realtime/src/routes/sessions.js deleted file mode 100644 index 9a53626..0000000 --- a/realtime/src/routes/sessions.js +++ /dev/null @@ -1,36 +0,0 @@ -const express = require('express'); -const redis = require('../helper/redis'); - -const router = express.Router(); - -/** - * GET /sessions - * Ritorna tutti i sensori attualmente connessi con i loro metadati. - * Se viene passato un parametro ?sensor=ID, restituisce solo quello. - */ -router.get('/', async (req, res) => { - const { sensor } = req.query; - - // Se viene passato un parametro ?sensor=ID, restituiamo solo quello - if (sensor) { - try { - const session = await redis.getSession(sensor); - if (!session) { - return res.status(404).json({ error: 'Session not found' }); - } - return res.status(200).json(JSON.parse(session)); - } catch (error) { - return res.status(500).json({ error: `${error}` }); - } - } - - // Altrimenti restituiamo tutta la lista - try { - const sessions = await redis.getSessions(); - res.status(200).json(sessions); - } catch (error) { - res.status(500).json({ error: `${error}` }); - } -}); - -module.exports = router; \ No newline at end of file diff --git a/realtime/src/socket.js b/realtime/src/socket.js deleted file mode 100644 index e09895d..0000000 --- a/realtime/src/socket.js +++ /dev/null @@ -1,95 +0,0 @@ -const WebSocket = require('ws'); -const { encode, decode } = require('@msgpack/msgpack'); -const url = require('url'); -const tokenStore = require('./helper/tokenStore'); -const redisHelper = require('./helper/redis'); -const influxWriter = require('./helper/influxWriter'); - -module.exports = function setupSensorWebSocket(server) { - const wsPath = process.env.SENSOR_WS_PATH || '/sensor'; - - const ws = new WebSocket.Server({ - server, - path: wsPath, - perMessageDeflate: false, - verifyClient: (info, callback) => { - console.log('[WS|verifyClient] URL:', info.req.url); - const { query } = url.parse(info.req.url, true); - const token = query.token; - console.log('[WS|verifyClient] Token ricevuto:', token); - - if (!token) { - return callback(false, 401, 'token not passed'); - } - - tokenStore.consumeToken(token).then((sessionData) => { - if (!sessionData) { - return callback(false, 401, 'token not valid or expired'); - } - - info.req.sensorSession = sessionData; - callback(true); - }).catch((error) => { - callback(false, 500, `internal server error: ${error}`); - }); - } - }); - - ws.on('connection', async (client, req) => { - const session = req.sensorSession; - const sensorId = session.sensorId; - - client.sensorId = sensorId; - - try { - await redisHelper.setSession(sensorId, { - ...session.metadata, - connectedAt: Math.floor(Date.now() / 1000) - }); - } catch (err) { - console.error(`[WS] Redis setSession error for ${sensorId}:`, err); - } - - client.on('message', async (raw) => { - try { - const msg = decode(raw); - - client.send(encode({ a: 1 })); - - const [timestamp, measurement, fields] = msg; - - redisHelper.publishSensorData(sensorId, { timestamp, measurement, fields }); - - const watchers = await redisHelper.getWatcherCount(sensorId); - - if (measurement === 'forecast_batch') { - influxWriter.writeForecastBatch(sensorId, fields); - } else if (watchers > 0) { - influxWriter.bufferPoint(sensorId, timestamp, measurement, fields); - } else { - influxWriter.writePoint(sensorId, timestamp, measurement, fields); - } - - } catch (err) { - console.error(`[WS|${sensorId}] decode error:`, err); - client.send(encode({ e: 1 })); - } - }); - - client.on('error', (err) => { - console.error(`[WS|${sensorId}] error:`, err); - }); - - client.on('close', async () => { - try { - await redisHelper.deleteSession(sensorId); - } catch (err) { - console.error(`[WS] Redis deleteSession error for ${sensorId}:`, err); - } - }); - }); - - console.log(`[WS] Sensor websocket server ready on ${wsPath}`); - - return ws; -}; \ No newline at end of file