refactor: update WebSocket server setup and improve session handling

This commit is contained in:
Giuseppe Raffa
2026-04-14 15:04:10 +02:00
parent ccd6143253
commit 8b5937fa19
3 changed files with 76 additions and 92 deletions

View File

@@ -182,6 +182,6 @@ services:
networks:
meb-public:
external: true
internal: true
meb-private:
external: true
internal: true

View File

@@ -16,8 +16,6 @@ app.use((req, res, next) => {
next();
});
require('./socket');
app.get('/', (req, res) => {
res.redirect('/health');
});
@@ -81,6 +79,8 @@ const server = app.listen(3000, '0.0.0.0', async () => {
await require('./helper/authdb').initDB();
});
require('./socket')(server);
const wss = new WebSocket.Server({ server, path: '/live' });
wss.on('connection', (client) => {

View File

@@ -5,106 +5,90 @@ const tokenStore = require('./helper/tokenStore');
const redisHelper = require('./helper/redis');
const influxWriter = require('./helper/influxWriter');
const ws = new WebSocket.Server({
port: 3000,
perMessageDeflate: false,
verifyClient: async (info, callback) => {
const { query } = url.parse(info.req.url, true);
const token = query.token;
module.exports = function setupSensorWebSocket(server) {
const wsPath = process.env.SENSOR_WS_PATH || '/sensor';
if (!token) {
return callback(false, 401, 'token not passed');
}
const ws = new WebSocket.Server({
server,
path: wsPath,
perMessageDeflate: false,
verifyClient: async (info, callback) => {
const { query } = url.parse(info.req.url, true);
const token = query.token;
try {
const sessionData = await tokenStore.consumeToken(token);
if (!sessionData) {
return callback(false, 401, 'token not valid or expired');
if (!token) {
return callback(false, 401, 'token not passed');
}
info.req.sensorSession = sessionData;
callback(true);
} catch (error) {
callback(false, 500, `internal server error: ${error}`);
try {
const sessionData = await tokenStore.consumeToken(token);
if (!sessionData) {
return callback(false, 401, 'token not valid or expired');
}
info.req.sensorSession = sessionData;
callback(true);
} catch (error) {
callback(false, 500, `internal server error: ${error}`);
}
}
}
});
});
ws.on('connection', async (client, req) => {
const session = req.sensorSession;
const sensorId = session.sensorId;
ws.on('connection', async (client, req) => {
const session = req.sensorSession;
const sensorId = session.sensorId;
client.sensorId = sensorId;
client.sensorId = sensorId;
// Registra la sessione su Redis
try {
await redisHelper.setSession(sensorId, {
...session.metadata,
connectedAt: Math.floor(Date.now() / 1000)
try {
await redisHelper.setSession(sensorId, {
...session.metadata,
connectedAt: Math.floor(Date.now() / 1000)
});
} catch (err) {
console.error(`[WS] Redis setSession error for ${sensorId}:`, err);
}
client.on('message', async (raw) => {
try {
const msg = decode(raw);
client.send(encode({ a: 1 }));
const [timestamp, measurement, fields] = msg;
redisHelper.publishSensorData(sensorId, { timestamp, measurement, fields });
const watchers = await redisHelper.getWatcherCount(sensorId);
if (measurement === 'forecast_batch') {
influxWriter.writeForecastBatch(sensorId, fields);
} else if (watchers > 0) {
influxWriter.bufferPoint(sensorId, timestamp, measurement, fields);
} else {
influxWriter.writePoint(sensorId, timestamp, measurement, fields);
}
} catch (err) {
console.error(`[WS|${sensorId}] decode error:`, err);
client.send(encode({ e: 1 }));
}
});
} catch (err) {
console.error(`[WS] Redis setSession error for ${sensorId}:`, err);
}
/**
* Gestione messaggi in arrivo dal sensore.
*
* Il sensore invia dati codificati in MessagePack (binario).
* Il formato atteso è un array compatto (per risparmiare spazio):
*
* [timestamp, measurement, { field1: value1, field2: value2, ... }]
*
* Esempio pratico (dati meteo da barca):
* [1710681000, "weather", { t: 22.5, h: 65, p: 1013.2, w: 12.3 }]
*
* Dove le chiavi abbreviate sono:
* t = temperature, h = humidity, p = pressure, w = windSpeed ...
*
* Il server decodifica il messaggio e prepara il punto per InfluxDB.
*/
client.on('message', async (raw) => {
try {
const msg = decode(raw);
client.on('error', (err) => {
console.error(`[WS|${sensorId}] error:`, err);
});
// Rispondi con ACK minimale in msgpack: { a: 1 } = acknowledged
client.send(encode({ a: 1 }));
const [timestamp, measurement, fields] = msg;
// Pubblica su Redis per i watchers live
redisHelper.publishSensorData(sensorId, { timestamp, measurement, fields });
// Controlla se ci sono watchers attivi
const watchers = await redisHelper.getWatcherCount(sensorId);
if (measurement === 'forecast_batch') {
influxWriter.writeForecastBatch(sensorId, fields);
} else if (watchers > 0) {
// Con watchers: accumula nel buffer, flusha ogni 10 punti
influxWriter.bufferPoint(sensorId, timestamp, measurement, fields);
} else {
// Senza watchers: scrivi immediatamente (comportamento originale)
influxWriter.writePoint(sensorId, timestamp, measurement, fields);
client.on('close', async () => {
try {
await redisHelper.deleteSession(sensorId);
} catch (err) {
console.error(`[WS] Redis deleteSession error for ${sensorId}:`, err);
}
} catch (err) {
console.error(`[WS|${sensorId}] decode error:`, err);
// Rispondi con errore in msgpack: { e: 1 } = error
client.send(encode({ e: 1 }));
}
});
});
client.on('error', (err) => {
console.error(`[WS|${sensorId}] error:`, err);
});
console.log(`[WS] Sensor websocket server ready on ${wsPath}`);
client.on('close', async () => {
try {
await redisHelper.deleteSession(sensorId);
} catch (err) {
console.error(`[WS] Redis deleteSession error for ${sensorId}:`, err);
}
});
});
console.log(`[WS] Realtime websocket server`);
return ws;
};