feat: implement WebSocket server for real-time sensor data handling and add sensor status update routes

This commit is contained in:
Giuseppe Raffa
2026-04-14 19:05:37 +02:00
parent a34048ae6b
commit 137c6131c3
7 changed files with 382 additions and 13 deletions

220
realtime/src/ws/handler.js Normal file
View File

@@ -0,0 +1,220 @@
const { WebSocketServer } = require('ws');
const { decode } = require('@msgpack/msgpack');
const { consumeConnectionToken, appendAsConnection, query, hset, del } = require('../store/redis');
const { writeSensorData, queryHistory } = require('../store/influx');
// In-memory map: sensorName → Set<WebSocket>
const sensorWatchers = new Map();
function generateSessionId() {
const num = Math.floor(1000 + Math.random() * 9000);
return `s${num}`;
}
// Map sensor short keys → console field keys + measurement category
const fieldMapping = {
t: { key: 'temp', measurement: 'weather' },
h: { key: 'hum', measurement: 'weather' },
spd: { key: 'wSpd', measurement: 'weather' },
cog: { key: 'cog', measurement: 'navigation' },
sog: { key: 'sog', measurement: 'navigation' },
hdg: { key: 'hdg', measurement: 'navigation' },
lat: { key: 'lat', measurement: 'navigation' },
lon: { key: 'lon', measurement: 'navigation' },
};
/**
* Transforms a sensor packet (short keys) into grouped messages
* for the console: { timestamp, measurement, fields }
*/
function transformPacket(packet) {
const { ts, ...rawFields } = packet;
const groups = {};
for (const [short, val] of Object.entries(rawFields)) {
const mapping = fieldMapping[short];
if (!mapping) continue;
const { key, measurement } = mapping;
if (!groups[measurement]) groups[measurement] = {};
groups[measurement][key] = val;
}
const messages = [];
for (const [measurement, fields] of Object.entries(groups)) {
messages.push({ timestamp: ts, measurement, fields });
}
return messages;
}
function setup(server) {
const wss = new WebSocketServer({ noServer: true });
server.on('upgrade', async (req, socket, head) => {
const url = new URL(req.url, `http://${req.headers.host}`);
const path = url.pathname;
if (path === '/' || path === '') {
const token = url.searchParams.get('token');
if (!token) {
socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n');
socket.destroy();
return;
}
const sensor = await consumeConnectionToken(token);
if (!sensor) {
socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n');
socket.destroy();
return;
}
wss.handleUpgrade(req, socket, head, (ws) => {
ws.sensorName = sensor;
ws.sessionId = generateSessionId();
ws.connectedAt = new Date().toISOString();
handleSensorConnection(ws);
});
} else if (path === '/live') {
// Accept upgrade without requiring query params.
// The console sends { action: 'watch', sensorId } after connecting.
wss.handleUpgrade(req, socket, head, (ws) => {
handleWatcherConnection(ws);
});
} else {
socket.write('HTTP/1.1 404 Not Found\r\n\r\n');
socket.destroy();
}
});
}
function handleSensorConnection(ws) {
const { sensorName, sessionId, connectedAt } = ws;
console.log(`Sensor connected: ${sensorName} (session: ${sessionId})`);
appendAsConnection(sensorName, 'connected', connectedAt);
hset(`sensors:${sensorName}`, 'session', sessionId);
const pingInterval = setInterval(() => {
if (ws.readyState === ws.OPEN) ws.ping();
}, 30000);
ws.on('message', (data) => {
try {
const packet = decode(data);
const { ts, ...fields } = packet;
writeSensorData(fields, sensorName, sessionId, ts);
// Broadcast to watchers as JSON messages grouped by measurement
const watchers = sensorWatchers.get(sensorName);
if (watchers && watchers.size > 0) {
const messages = transformPacket(packet);
for (const msg of messages) {
const json = JSON.stringify(msg);
for (const watcher of watchers) {
if (watcher.readyState === watcher.OPEN) {
watcher.send(json);
}
}
}
}
} catch (err) {
console.error(`Error processing sensor data from ${sensorName}:`, err.message);
}
});
ws.on('close', () => {
console.log(`Sensor disconnected: ${sensorName}`);
clearInterval(pingInterval);
appendAsConnection(sensorName, 'disconnected', new Date().toISOString());
del(`sensors:${sensorName}`);
});
ws.on('error', (err) => {
console.error(`WebSocket error for sensor ${sensorName}:`, err.message);
});
}
function handleWatcherConnection(ws) {
console.log('Watcher connected, waiting for watch action...');
ws.on('message', async (data) => {
try {
const msg = JSON.parse(data.toString());
if (msg.action === 'watch' && msg.sensorId) {
// Unwatch previous sensor if any
if (ws.sensorName) {
sensorWatchers.get(ws.sensorName)?.delete(ws);
if (sensorWatchers.get(ws.sensorName)?.size === 0) {
sensorWatchers.delete(ws.sensorName);
}
}
ws.sensorName = msg.sensorId;
// Register as watcher
if (!sensorWatchers.has(msg.sensorId)) {
sensorWatchers.set(msg.sensorId, new Set());
}
sensorWatchers.get(msg.sensorId).add(ws);
console.log(`Watcher now watching sensor: ${msg.sensorId}`);
// Send history since sensor connected
try {
const sensorInfo = await query(msg.sensorId, 'sensors');
if (sensorInfo && sensorInfo.timestamp && sensorInfo.session) {
const history = await queryHistory(msg.sensorId, sensorInfo.session, sensorInfo.timestamp);
for (const row of history) {
const ts = new Date(row._time).getTime();
// Send each historical row as individual messages grouped by measurement
const rebuilt = { ts };
for (const [short, { key }] of Object.entries(fieldMapping)) {
const influxField = { t: 'temperature', h: 'humidity', spd: 'speed', cog: 'cog', sog: 'sog', hdg: 'headingTrue', lat: 'latitude', lon: 'longitude' }[short];
if (row[influxField] !== undefined) {
rebuilt[short] = row[influxField];
}
}
const messages = transformPacket(rebuilt);
for (const m of messages) {
ws.send(JSON.stringify(m));
}
}
}
} catch (err) {
console.error(`Error fetching history for watcher:`, err.message);
}
} else if (msg.action === 'unwatch') {
if (ws.sensorName) {
sensorWatchers.get(ws.sensorName)?.delete(ws);
if (sensorWatchers.get(ws.sensorName)?.size === 0) {
sensorWatchers.delete(ws.sensorName);
}
ws.sensorName = null;
}
}
} catch (err) {
// Ignore non-JSON messages
}
});
ws.on('close', () => {
if (ws.sensorName) {
sensorWatchers.get(ws.sensorName)?.delete(ws);
if (sensorWatchers.get(ws.sensorName)?.size === 0) {
sensorWatchers.delete(ws.sensorName);
}
}
console.log('Watcher disconnected');
});
ws.on('error', (err) => {
console.error('WebSocket error for watcher:', err.message);
});
}
module.exports = { setup };