96 lines
3.1 KiB
JavaScript
96 lines
3.1 KiB
JavaScript
const WebSocket = require('ws');
|
|
const { encode, decode } = require('@msgpack/msgpack');
|
|
const url = require('url');
|
|
const tokenStore = require('./helper/tokenStore');
|
|
const redisHelper = require('./helper/redis');
|
|
const influxWriter = require('./helper/influxWriter');
|
|
|
|
module.exports = function setupSensorWebSocket(server) {
|
|
const wsPath = process.env.SENSOR_WS_PATH || '/sensor';
|
|
|
|
const ws = new WebSocket.Server({
|
|
server,
|
|
path: wsPath,
|
|
perMessageDeflate: false,
|
|
verifyClient: async (info, callback) => {
|
|
console.log('[WS|verifyClient] URL:', info.req.url);
|
|
const { query } = url.parse(info.req.url, true);
|
|
const token = query.token;
|
|
console.log('[WS|verifyClient] Token ricevuto:', token);
|
|
|
|
if (!token) {
|
|
return callback(false, 401, 'token not passed');
|
|
}
|
|
|
|
try {
|
|
const sessionData = await tokenStore.consumeToken(token);
|
|
if (!sessionData) {
|
|
return callback(false, 401, 'token not valid or expired');
|
|
}
|
|
|
|
info.req.sensorSession = sessionData;
|
|
callback(true);
|
|
} catch (error) {
|
|
callback(false, 500, `internal server error: ${error}`);
|
|
}
|
|
}
|
|
});
|
|
|
|
ws.on('connection', async (client, req) => {
|
|
const session = req.sensorSession;
|
|
const sensorId = session.sensorId;
|
|
|
|
client.sensorId = sensorId;
|
|
|
|
try {
|
|
await redisHelper.setSession(sensorId, {
|
|
...session.metadata,
|
|
connectedAt: Math.floor(Date.now() / 1000)
|
|
});
|
|
} catch (err) {
|
|
console.error(`[WS] Redis setSession error for ${sensorId}:`, err);
|
|
}
|
|
|
|
client.on('message', async (raw) => {
|
|
try {
|
|
const msg = decode(raw);
|
|
|
|
client.send(encode({ a: 1 }));
|
|
|
|
const [timestamp, measurement, fields] = msg;
|
|
|
|
redisHelper.publishSensorData(sensorId, { timestamp, measurement, fields });
|
|
|
|
const watchers = await redisHelper.getWatcherCount(sensorId);
|
|
|
|
if (measurement === 'forecast_batch') {
|
|
influxWriter.writeForecastBatch(sensorId, fields);
|
|
} else if (watchers > 0) {
|
|
influxWriter.bufferPoint(sensorId, timestamp, measurement, fields);
|
|
} else {
|
|
influxWriter.writePoint(sensorId, timestamp, measurement, fields);
|
|
}
|
|
|
|
} catch (err) {
|
|
console.error(`[WS|${sensorId}] decode error:`, err);
|
|
client.send(encode({ e: 1 }));
|
|
}
|
|
});
|
|
|
|
client.on('error', (err) => {
|
|
console.error(`[WS|${sensorId}] error:`, err);
|
|
});
|
|
|
|
client.on('close', async () => {
|
|
try {
|
|
await redisHelper.deleteSession(sensorId);
|
|
} catch (err) {
|
|
console.error(`[WS] Redis deleteSession error for ${sensorId}:`, err);
|
|
}
|
|
});
|
|
});
|
|
|
|
console.log(`[WS] Sensor websocket server ready on ${wsPath}`);
|
|
|
|
return ws;
|
|
}; |