From 137c6131c30483ad1f8ff2ddc4609ffa9d68c44d Mon Sep 17 00:00:00 2001 From: Giuseppe Raffa <77052701+sesee3@users.noreply.github.com> Date: Tue, 14 Apr 2026 19:05:37 +0200 Subject: [PATCH] feat: implement WebSocket server for real-time sensor data handling and add sensor status update routes --- realtime/src/index.js | 5 +- realtime/src/routes/connect.js | 8 +- realtime/src/routes/sensors.js | 34 ++++- realtime/src/routes/sessions.js | 51 +++++++- realtime/src/store/influx.js | 65 ++++++++++ realtime/src/store/redis.js | 12 +- realtime/src/ws/handler.js | 220 ++++++++++++++++++++++++++++++++ 7 files changed, 382 insertions(+), 13 deletions(-) create mode 100644 realtime/src/store/influx.js create mode 100644 realtime/src/ws/handler.js diff --git a/realtime/src/index.js b/realtime/src/index.js index 59c254c..e28c824 100644 --- a/realtime/src/index.js +++ b/realtime/src/index.js @@ -4,6 +4,7 @@ const app = express(); const db = require('./store/db') const redis = require('./store/redis'); +const wsHandler = require('./ws/handler'); app.use(express.json()); @@ -34,6 +35,8 @@ app.use('/connect', require('./routes/connect')); app.use('/sensors', require('./routes/sensors')); app.use('/sessions', require('./routes/sessions')); -app.listen(3000, '0.0.0.0', () => { +const server = app.listen(3000, '0.0.0.0', () => { console.log(`Realtime started`); }); + +wsHandler.setup(server); diff --git a/realtime/src/routes/connect.js b/realtime/src/routes/connect.js index f76751a..b4af8f2 100644 --- a/realtime/src/routes/connect.js +++ b/realtime/src/routes/connect.js @@ -1,6 +1,6 @@ const router = require('express').Router(); const db = require('../store/db'); -const { appendAsConnection, createConnectionToken } = require('../store/redis'); +const { createConnectionToken } = require('../store/redis'); const crypto = require('crypto'); /** @@ -32,7 +32,7 @@ router.post('/new', async (req, res) => { return res.status(409).json({ error: 'name already exists' }); } console.error('Error creating sensor', err); - res.status(500).json({ error: 'internal server error' }); + res.status(500).json({ error: `internal server error, ${err}` }); } }); @@ -59,14 +59,12 @@ router.post('/', async (req, res) => { if (hash !== storedHash) { return res.status(401).json({ error: 'invalid name or code' }); } - - await appendAsConnection(name, 'pending', new Date().toISOString()); const token = await createConnectionToken(name); res.status(200).json({ s: 'ok', t: token }); } catch (err) { console.error('Error verifying connection', err); - res.status(500).json({ error: 'internal server error' }); + res.status(500).json({ error: `internal server error, ${err}` }); } }); diff --git a/realtime/src/routes/sensors.js b/realtime/src/routes/sensors.js index 4a8a764..8fd317e 100644 --- a/realtime/src/routes/sensors.js +++ b/realtime/src/routes/sensors.js @@ -7,7 +7,7 @@ router.get('/', async (req, res) => { res.json(result.rows); } catch (err) { console.error('Error fetching sensors', err); - res.status(500).json({ error: 'internal server error' }); + res.status(500).json({ error: `internal server error, ${err}` }); } }); @@ -21,10 +21,40 @@ router.get('/:id', async (req, res) => { res.json(result.rows[0]); } catch (err) { console.error('Error fetching sensor', err); - res.status(500).json({ error: 'internal server error' }); + res.status(500).json({ error: `internal server error, ${err}` }); } }); +//Toggle availability +router.post('/:id/inactive', async (req, res) => { + const { id } = req.params; + try { + const result = await db.query('sensors', 'SELECT id, name FROM sensors WHERE id = $1', [id]); + if (result.rows.length === 0) { + return res.status(404).json({ error: 'sensor not found' }); + } + await db.query('sensors', 'UPDATE sensors SET active = false WHERE id = $1', [id]); + res.json({ status: 'ok' }); + } catch (err) { + console.error('Error updating sensor status', err); + res.status(500).json({ error: `internal server error, ${err}` }); + } +}); +router.post('/:id/active', async (req, res) => { + const { id } = req.params; + try { + const result = await db.query('sensors', 'SELECT id, name FROM sensors WHERE id = $1', [id]); + if (result.rows.length === 0) { + return res.status(404).json({ error: 'sensor not found' }); + } + await db.query('sensors', 'UPDATE sensors SET active = true WHERE id = $1', [id]); + res.json({ status: 'ok' }); + } catch (err) { + console.error('Error updating sensor status', err); + res.status(500).json({ error: `internal server error, ${err}` }); + } + +}); module.exports = router; \ No newline at end of file diff --git a/realtime/src/routes/sessions.js b/realtime/src/routes/sessions.js index 8126b88..8b1182e 100644 --- a/realtime/src/routes/sessions.js +++ b/realtime/src/routes/sessions.js @@ -1,14 +1,59 @@ const router = require('express').Router(); const db = require('../store/db'); -const { query } = require('../store/redis'); +const { queryAll, query } = require('../store/redis'); -router.get('/pendingtokens', (req, res) => { +router.get('/', async (req, res) => { + try { + const keys = await queryAll('sensors'); + const sessions = {}; + for (const key of keys) { + const name = key.replace('sensors:', ''); + const info = await query(name, 'sensors'); + sessions[name] = { + name, + connectedAt: info.timestamp || null, + session: info.session || null, + status: info.status || 'unknown', + }; + } + res.json(sessions); + } catch (err) { + console.error('Error fetching sessions', err); + res.status(500).json({ error: 'internal server error' }); + } +}); + +router.get('/pending', (req, res) => { try { const pendingTokens = queryAll('snsr_pending_token'); res.json(pendingTokens); } catch (err) { console.error('Error fetching pending tokens', err); - res.status(500).json({ error: 'internal server error' }); + res.status(500).json({ error: `Error fetching pending tokens, ${err}` }); + } +}); + +router.get('/connected', (req, res) => { + try { + const connectedSensors = queryAll('snsr_connected'); + res.json(connectedSensors); + } catch (err) { + console.error('Error fetching connected sensors', err); + res.status(500).json({ error: `Error fetching connected sensors, ${err}` }); + } +}); + +router.get('/connected/:id', async (req, res) => { + const { id } = req.params; + try { + const sensor = await query(`snsr_connected:${id}`); + if (!sensor) { + return res.status(404).json({ error: 'sensor not connected' }); + } + res.json({ id, name: sensor }); + } catch (err) { + console.error('Error fetching sensor connection status', err); + res.status(500).json({ error: `Error fetching sensor connection status, ${err}` }); } }); diff --git a/realtime/src/store/influx.js b/realtime/src/store/influx.js new file mode 100644 index 0000000..16e903c --- /dev/null +++ b/realtime/src/store/influx.js @@ -0,0 +1,65 @@ +const { InfluxDB, Point } = require('@influxdata/influxdb-client'); + +const client = new InfluxDB({ + url: process.env.INFLX_URL, + token: process.env.INFLX_TOKEN, +}); + +const bucket = process.env.INFLX_BUCKET || 'sensors'; +const org = process.env.INFLX_ORG; + +const writeApi = client.getWriteApi(org, bucket, 'ms', { + flushInterval: 100, + batchSize: 50, +}); + +const fieldMap = { + t: 'temperature', + h: 'humidity', + spd: 'speed', + cog: 'cog', + sog: 'sog', + hdg: 'headingTrue', + lat: 'latitude', + lon: 'longitude', +}; + +function writeSensorData(fields, sensor, session, timestamp) { + const point = new Point('sensor_data') + .tag('sensor', sensor) + .tag('session', session) + .timestamp(timestamp); + + for (const [short, long] of Object.entries(fieldMap)) { + if (fields[short] !== undefined) { + point.floatField(long, fields[short]); + } + } + + writeApi.writePoint(point); +} + +async function queryHistory(sensor, session, since) { + const queryApi = client.getQueryApi(org); + const query = ` + from(bucket: "${bucket}") + |> range(start: ${since}) + |> filter(fn: (r) => r._measurement == "sensor_data") + |> filter(fn: (r) => r.sensor == "${sensor}") + |> filter(fn: (r) => r.session == "${session}") + |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") + `; + + const rows = []; + return new Promise((resolve, reject) => { + queryApi.queryRows(query, { + next(row, tableMeta) { + rows.push(tableMeta.toObject(row)); + }, + error: reject, + complete() { resolve(rows); }, + }); + }); +} + +module.exports = { writeSensorData, queryHistory }; diff --git a/realtime/src/store/redis.js b/realtime/src/store/redis.js index 8d7a5c0..53d59b8 100644 --- a/realtime/src/store/redis.js +++ b/realtime/src/store/redis.js @@ -1,6 +1,6 @@ const redis = require('ioredis'); -const connectionsToken = "snsr_pending_token"; +const connectionsToken = "sensors_pending"; const connectedSensorsKey = "sensors"; @@ -122,4 +122,12 @@ async function queryAll(from) { configure(); -module.exports = { checkRedis, appendAsConnection, createConnectionToken, consumeConnectionToken, query, queryAll }; \ No newline at end of file +async function hset(key, ...args) { + return client.hset(key, ...args); +} + +async function del(key) { + return client.del(key); +} + +module.exports = { checkRedis, appendAsConnection, createConnectionToken, consumeConnectionToken, query, queryAll, hset, del }; \ No newline at end of file diff --git a/realtime/src/ws/handler.js b/realtime/src/ws/handler.js new file mode 100644 index 0000000..6b497ca --- /dev/null +++ b/realtime/src/ws/handler.js @@ -0,0 +1,220 @@ +const { WebSocketServer } = require('ws'); +const { decode } = require('@msgpack/msgpack'); +const { consumeConnectionToken, appendAsConnection, query, hset, del } = require('../store/redis'); +const { writeSensorData, queryHistory } = require('../store/influx'); + +// In-memory map: sensorName → Set +const sensorWatchers = new Map(); + +function generateSessionId() { + const num = Math.floor(1000 + Math.random() * 9000); + return `s${num}`; +} + +// Map sensor short keys → console field keys + measurement category +const fieldMapping = { + t: { key: 'temp', measurement: 'weather' }, + h: { key: 'hum', measurement: 'weather' }, + spd: { key: 'wSpd', measurement: 'weather' }, + cog: { key: 'cog', measurement: 'navigation' }, + sog: { key: 'sog', measurement: 'navigation' }, + hdg: { key: 'hdg', measurement: 'navigation' }, + lat: { key: 'lat', measurement: 'navigation' }, + lon: { key: 'lon', measurement: 'navigation' }, +}; + +/** + * Transforms a sensor packet (short keys) into grouped messages + * for the console: { timestamp, measurement, fields } + */ +function transformPacket(packet) { + const { ts, ...rawFields } = packet; + const groups = {}; + + for (const [short, val] of Object.entries(rawFields)) { + const mapping = fieldMapping[short]; + if (!mapping) continue; + const { key, measurement } = mapping; + if (!groups[measurement]) groups[measurement] = {}; + groups[measurement][key] = val; + } + + const messages = []; + for (const [measurement, fields] of Object.entries(groups)) { + messages.push({ timestamp: ts, measurement, fields }); + } + return messages; +} + +function setup(server) { + const wss = new WebSocketServer({ noServer: true }); + + server.on('upgrade', async (req, socket, head) => { + const url = new URL(req.url, `http://${req.headers.host}`); + const path = url.pathname; + + if (path === '/' || path === '') { + const token = url.searchParams.get('token'); + if (!token) { + socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n'); + socket.destroy(); + return; + } + + const sensor = await consumeConnectionToken(token); + if (!sensor) { + socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n'); + socket.destroy(); + return; + } + + wss.handleUpgrade(req, socket, head, (ws) => { + ws.sensorName = sensor; + ws.sessionId = generateSessionId(); + ws.connectedAt = new Date().toISOString(); + handleSensorConnection(ws); + }); + + } else if (path === '/live') { + // Accept upgrade without requiring query params. + // The console sends { action: 'watch', sensorId } after connecting. + wss.handleUpgrade(req, socket, head, (ws) => { + handleWatcherConnection(ws); + }); + + } else { + socket.write('HTTP/1.1 404 Not Found\r\n\r\n'); + socket.destroy(); + } + }); +} + +function handleSensorConnection(ws) { + const { sensorName, sessionId, connectedAt } = ws; + console.log(`Sensor connected: ${sensorName} (session: ${sessionId})`); + + appendAsConnection(sensorName, 'connected', connectedAt); + hset(`sensors:${sensorName}`, 'session', sessionId); + + const pingInterval = setInterval(() => { + if (ws.readyState === ws.OPEN) ws.ping(); + }, 30000); + + ws.on('message', (data) => { + try { + const packet = decode(data); + const { ts, ...fields } = packet; + + writeSensorData(fields, sensorName, sessionId, ts); + + // Broadcast to watchers as JSON messages grouped by measurement + const watchers = sensorWatchers.get(sensorName); + if (watchers && watchers.size > 0) { + const messages = transformPacket(packet); + for (const msg of messages) { + const json = JSON.stringify(msg); + for (const watcher of watchers) { + if (watcher.readyState === watcher.OPEN) { + watcher.send(json); + } + } + } + } + } catch (err) { + console.error(`Error processing sensor data from ${sensorName}:`, err.message); + } + }); + + ws.on('close', () => { + console.log(`Sensor disconnected: ${sensorName}`); + clearInterval(pingInterval); + appendAsConnection(sensorName, 'disconnected', new Date().toISOString()); + del(`sensors:${sensorName}`); + }); + + ws.on('error', (err) => { + console.error(`WebSocket error for sensor ${sensorName}:`, err.message); + }); +} + +function handleWatcherConnection(ws) { + console.log('Watcher connected, waiting for watch action...'); + + ws.on('message', async (data) => { + try { + const msg = JSON.parse(data.toString()); + + if (msg.action === 'watch' && msg.sensorId) { + // Unwatch previous sensor if any + if (ws.sensorName) { + sensorWatchers.get(ws.sensorName)?.delete(ws); + if (sensorWatchers.get(ws.sensorName)?.size === 0) { + sensorWatchers.delete(ws.sensorName); + } + } + + ws.sensorName = msg.sensorId; + + // Register as watcher + if (!sensorWatchers.has(msg.sensorId)) { + sensorWatchers.set(msg.sensorId, new Set()); + } + sensorWatchers.get(msg.sensorId).add(ws); + + console.log(`Watcher now watching sensor: ${msg.sensorId}`); + + // Send history since sensor connected + try { + const sensorInfo = await query(msg.sensorId, 'sensors'); + if (sensorInfo && sensorInfo.timestamp && sensorInfo.session) { + const history = await queryHistory(msg.sensorId, sensorInfo.session, sensorInfo.timestamp); + for (const row of history) { + const ts = new Date(row._time).getTime(); + // Send each historical row as individual messages grouped by measurement + const rebuilt = { ts }; + for (const [short, { key }] of Object.entries(fieldMapping)) { + const influxField = { t: 'temperature', h: 'humidity', spd: 'speed', cog: 'cog', sog: 'sog', hdg: 'headingTrue', lat: 'latitude', lon: 'longitude' }[short]; + if (row[influxField] !== undefined) { + rebuilt[short] = row[influxField]; + } + } + const messages = transformPacket(rebuilt); + for (const m of messages) { + ws.send(JSON.stringify(m)); + } + } + } + } catch (err) { + console.error(`Error fetching history for watcher:`, err.message); + } + + } else if (msg.action === 'unwatch') { + if (ws.sensorName) { + sensorWatchers.get(ws.sensorName)?.delete(ws); + if (sensorWatchers.get(ws.sensorName)?.size === 0) { + sensorWatchers.delete(ws.sensorName); + } + ws.sensorName = null; + } + } + } catch (err) { + // Ignore non-JSON messages + } + }); + + ws.on('close', () => { + if (ws.sensorName) { + sensorWatchers.get(ws.sensorName)?.delete(ws); + if (sensorWatchers.get(ws.sensorName)?.size === 0) { + sensorWatchers.delete(ws.sensorName); + } + } + console.log('Watcher disconnected'); + }); + + ws.on('error', (err) => { + console.error('WebSocket error for watcher:', err.message); + }); +} + +module.exports = { setup };