diff --git a/docker-compose.yml b/docker-compose.yml index afd37b7..386cb51 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -182,6 +182,6 @@ services: networks: meb-public: - external: true + internal: true meb-private: - external: true + internal: true diff --git a/realtime/src/index.js b/realtime/src/index.js index 693e337..cf69a19 100644 --- a/realtime/src/index.js +++ b/realtime/src/index.js @@ -16,8 +16,6 @@ app.use((req, res, next) => { next(); }); -require('./socket'); - app.get('/', (req, res) => { res.redirect('/health'); }); @@ -81,6 +79,8 @@ const server = app.listen(3000, '0.0.0.0', async () => { await require('./helper/authdb').initDB(); }); +require('./socket')(server); + const wss = new WebSocket.Server({ server, path: '/live' }); wss.on('connection', (client) => { diff --git a/realtime/src/socket.js b/realtime/src/socket.js index 3f64a61..a102d8c 100644 --- a/realtime/src/socket.js +++ b/realtime/src/socket.js @@ -5,106 +5,90 @@ const tokenStore = require('./helper/tokenStore'); const redisHelper = require('./helper/redis'); const influxWriter = require('./helper/influxWriter'); -const ws = new WebSocket.Server({ - port: 3000, - perMessageDeflate: false, - verifyClient: async (info, callback) => { - const { query } = url.parse(info.req.url, true); - const token = query.token; +module.exports = function setupSensorWebSocket(server) { + const wsPath = process.env.SENSOR_WS_PATH || '/sensor'; - if (!token) { - return callback(false, 401, 'token not passed'); - } + const ws = new WebSocket.Server({ + server, + path: wsPath, + perMessageDeflate: false, + verifyClient: async (info, callback) => { + const { query } = url.parse(info.req.url, true); + const token = query.token; - try { - const sessionData = await tokenStore.consumeToken(token); - if (!sessionData) { - return callback(false, 401, 'token not valid or expired'); + if (!token) { + return callback(false, 401, 'token not passed'); } - info.req.sensorSession = sessionData; - callback(true); - } catch (error) { - callback(false, 500, `internal server error: ${error}`); + try { + const sessionData = await tokenStore.consumeToken(token); + if (!sessionData) { + return callback(false, 401, 'token not valid or expired'); + } + + info.req.sensorSession = sessionData; + callback(true); + } catch (error) { + callback(false, 500, `internal server error: ${error}`); + } } - } -}); + }); -ws.on('connection', async (client, req) => { - const session = req.sensorSession; - const sensorId = session.sensorId; + ws.on('connection', async (client, req) => { + const session = req.sensorSession; + const sensorId = session.sensorId; - client.sensorId = sensorId; + client.sensorId = sensorId; - // Registra la sessione su Redis - try { - await redisHelper.setSession(sensorId, { - ...session.metadata, - connectedAt: Math.floor(Date.now() / 1000) + try { + await redisHelper.setSession(sensorId, { + ...session.metadata, + connectedAt: Math.floor(Date.now() / 1000) + }); + } catch (err) { + console.error(`[WS] Redis setSession error for ${sensorId}:`, err); + } + + client.on('message', async (raw) => { + try { + const msg = decode(raw); + + client.send(encode({ a: 1 })); + + const [timestamp, measurement, fields] = msg; + + redisHelper.publishSensorData(sensorId, { timestamp, measurement, fields }); + + const watchers = await redisHelper.getWatcherCount(sensorId); + + if (measurement === 'forecast_batch') { + influxWriter.writeForecastBatch(sensorId, fields); + } else if (watchers > 0) { + influxWriter.bufferPoint(sensorId, timestamp, measurement, fields); + } else { + influxWriter.writePoint(sensorId, timestamp, measurement, fields); + } + + } catch (err) { + console.error(`[WS|${sensorId}] decode error:`, err); + client.send(encode({ e: 1 })); + } }); - } catch (err) { - console.error(`[WS] Redis setSession error for ${sensorId}:`, err); - } - /** - * Gestione messaggi in arrivo dal sensore. - * - * Il sensore invia dati codificati in MessagePack (binario). - * Il formato atteso รจ un array compatto (per risparmiare spazio): - * - * [timestamp, measurement, { field1: value1, field2: value2, ... }] - * - * Esempio pratico (dati meteo da barca): - * [1710681000, "weather", { t: 22.5, h: 65, p: 1013.2, w: 12.3 }] - * - * Dove le chiavi abbreviate sono: - * t = temperature, h = humidity, p = pressure, w = windSpeed ... - * - * Il server decodifica il messaggio e prepara il punto per InfluxDB. - */ - client.on('message', async (raw) => { - try { - const msg = decode(raw); + client.on('error', (err) => { + console.error(`[WS|${sensorId}] error:`, err); + }); - // Rispondi con ACK minimale in msgpack: { a: 1 } = acknowledged - client.send(encode({ a: 1 })); - - const [timestamp, measurement, fields] = msg; - - // Pubblica su Redis per i watchers live - redisHelper.publishSensorData(sensorId, { timestamp, measurement, fields }); - - // Controlla se ci sono watchers attivi - const watchers = await redisHelper.getWatcherCount(sensorId); - - if (measurement === 'forecast_batch') { - influxWriter.writeForecastBatch(sensorId, fields); - } else if (watchers > 0) { - // Con watchers: accumula nel buffer, flusha ogni 10 punti - influxWriter.bufferPoint(sensorId, timestamp, measurement, fields); - } else { - // Senza watchers: scrivi immediatamente (comportamento originale) - influxWriter.writePoint(sensorId, timestamp, measurement, fields); + client.on('close', async () => { + try { + await redisHelper.deleteSession(sensorId); + } catch (err) { + console.error(`[WS] Redis deleteSession error for ${sensorId}:`, err); } - - } catch (err) { - console.error(`[WS|${sensorId}] decode error:`, err); - // Rispondi con errore in msgpack: { e: 1 } = error - client.send(encode({ e: 1 })); - } + }); }); - client.on('error', (err) => { - console.error(`[WS|${sensorId}] error:`, err); - }); + console.log(`[WS] Sensor websocket server ready on ${wsPath}`); - client.on('close', async () => { - try { - await redisHelper.deleteSession(sensorId); - } catch (err) { - console.error(`[WS] Redis deleteSession error for ${sensorId}:`, err); - } - }); -}); - -console.log(`[WS] Realtime websocket server`); \ No newline at end of file + return ws; +}; \ No newline at end of file