const WebSocket = require('ws'); const { encode, decode } = require('@msgpack/msgpack'); const url = require('url'); const tokenStore = require('./helper/tokenStore'); const redisHelper = require('./helper/redis'); const influxWriter = require('./helper/influxWriter'); const ws = new WebSocket.Server({ port: process.env.SOCKET_PORT, perMessageDeflate: false, verifyClient: async (info, callback) => { const { query } = url.parse(info.req.url, true); const token = query.token; if (!token) { return callback(false, 401, 'token not passed'); } try { const sessionData = await tokenStore.consumeToken(token); if (!sessionData) { return callback(false, 401, 'token not valid or expired'); } info.req.sensorSession = sessionData; callback(true); } catch (error) { callback(false, 500, `internal server error: ${error}`); } } }); ws.on('connection', async (client, req) => { const session = req.sensorSession; const sensorId = session.sensorId; client.sensorId = sensorId; // Registra la sessione su Redis try { await redisHelper.setSession(sensorId, { ...session.metadata, connectedAt: Math.floor(Date.now() / 1000) }); } catch (err) { console.error(`[WS] Redis setSession error for ${sensorId}:`, err); } /** * Gestione messaggi in arrivo dal sensore. * * Il sensore invia dati codificati in MessagePack (binario). * Il formato atteso รจ un array compatto (per risparmiare spazio): * * [timestamp, measurement, { field1: value1, field2: value2, ... }] * * Esempio pratico (dati meteo da barca): * [1710681000, "weather", { t: 22.5, h: 65, p: 1013.2, w: 12.3 }] * * Dove le chiavi abbreviate sono: * t = temperature, h = humidity, p = pressure, w = windSpeed ... * * Il server decodifica il messaggio e prepara il punto per InfluxDB. */ client.on('message', async (raw) => { try { const msg = decode(raw); // Rispondi con ACK minimale in msgpack: { a: 1 } = acknowledged client.send(encode({ a: 1 })); const [timestamp, measurement, fields] = msg; // Pubblica su Redis per i watchers live redisHelper.publishSensorData(sensorId, { timestamp, measurement, fields }); // Controlla se ci sono watchers attivi const watchers = await redisHelper.getWatcherCount(sensorId); if (measurement === 'forecast_batch') { influxWriter.writeForecastBatch(sensorId, fields); } else if (watchers > 0) { // Con watchers: accumula nel buffer, flusha ogni 10 punti influxWriter.bufferPoint(sensorId, timestamp, measurement, fields); } else { // Senza watchers: scrivi immediatamente (comportamento originale) influxWriter.writePoint(sensorId, timestamp, measurement, fields); } } catch (err) { console.error(`[WS|${sensorId}] decode error:`, err); // Rispondi con errore in msgpack: { e: 1 } = error client.send(encode({ e: 1 })); } }); client.on('error', (err) => { console.error(`[WS|${sensorId}] error:`, err); }); client.on('close', async () => { try { await redisHelper.deleteSession(sensorId); } catch (err) { console.error(`[WS] Redis deleteSession error for ${sensorId}:`, err); } }); }); console.log(`[WS] Realtime websocket server on port: ${process.env.SOCKET_PORT}`);