reset: removed the old code to start from scratch

This commit is contained in:
Giuseppe Raffa
2026-04-14 15:56:24 +02:00
parent c597d4a414
commit c478f5c13c
11 changed files with 3 additions and 904 deletions

View File

@@ -1,106 +0,0 @@
const { Pool } = require('pg');
const { hash, generateShortId } = require('./cryptoUtils');
const pool = new Pool({
user: process.env.DB_USER,
host: process.env.DB_HOST,
database: process.env.SENSORS_DB,
password: process.env.DB_PASSWORD,
port: process.env.DB_PORT,
});
async function checkDB() {
try {
await pool.query('SELECT NOW()');
return true;
} catch (error) {
console.error('Database connection failed:', error);
return false;
}
}
async function initDB() {
try {
await pool.query(`
CREATE TABLE IF NOT EXISTS sensors (
id VARCHAR(10) PRIMARY KEY,
name VARCHAR(100) NOT NULL,
code_hash TEXT NOT NULL UNIQUE,
is_active BOOLEAN DEFAULT TRUE,
last_seen TIMESTAMP DEFAULT NOW(),
created_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_sensors_code_hash ON sensors(code_hash);
`);
console.log('[DB] Database schema initialized (sensors table ensured)');
} catch (error) {
console.error('[DB] Schema initialization failed:', error);
}
}
/**
* Restituisce i dati del sensore in base al token ricevuto.
* Il token viene hashato prima della comparazione con il database.
* @param {string} token - il codice segreto del sensore (raw)
*/
async function getSensor(token) {
const hashed = hash(token);
const result = await pool.query('SELECT id, is_active, name, last_seen, created_at FROM sensors WHERE code_hash = $1', [hashed]);
return result.rows[0];
}
async function createSensor(name, code) {
const hashedCode = hash(code);
// Verifica se l'hash esiste già
const result = await pool.query('SELECT id FROM sensors WHERE code_hash = $1', [hashedCode]);
if (result.rows.length > 0) {
throw new Error('Sensor with this code already exists');
}
// Genera un ID casuale di 8 caratteri (ottimizzato per spazio, non solo alfanumerico)
const sensorId = generateShortId(8);
await pool.query('INSERT INTO sensors (id, name, code_hash, is_active, last_seen, created_at) VALUES ($1, $2, $3, $4, $5, $6)',
[sensorId, name, hashedCode, true, new Date(), new Date()]);
}
/**
* Aggiorna l'ultima attività del sensore.
* @param {*} id - l'id del sensore
* @returns {Promise<void>}
*/
async function updateLastSeen(id) {
await pool.query('UPDATE sensors SET last_seen = NOW() WHERE id = $1', [id]);
}
/**
* Modifica la disponibilità del sensore.
* @param {*} id - l'id del sensore
* @param {*} is_active - la disponibilità del sensore
* @returns {Promise<void>}
*/
async function setSensorActivity(id, is_active) {
await pool.query('UPDATE sensors SET is_active = $1 WHERE id = $2', [is_active, id]);
}
async function sensorsExists(id) {
const result = await pool.query('SELECT id FROM sensors WHERE id = $1', [id]);
return result.rows.length > 0;
}
async function getSensors() {
const resutls = await pool.query('SELECT id, is_active, name, last_seen, created_at FROM sensors');
return resutls.rows;
}
module.exports = {
checkDB,
initDB,
getSensor,
updateLastSeen,
setSensorActivity,
getSensors,
sensorsExists,
createSensor
}

View File

@@ -1,35 +0,0 @@
const crypto = require('crypto');
/**
* Genera un hash SHA256 in formato esadecimale da una stringa.
* Utilizzato per rendere compatibili authdb.js e tokenStore.js.
*/
function hash(text) {
if (!text) return null;
return crypto.createHash('sha256').update(text).digest('hex');
}
/**
* Genera una stringa casuale di lunghezza 'length'.
* Ottimizzata per risparmiare spazio (8 caratteri).
* Include lettere, numeri e simboli per massimizzare l'entropia (non solo alfanumerico).
*/
function generateShortId(length = 8) {
const charset = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*';
let result = '';
while (result.length < length) {
const bytes = crypto.randomBytes(length);
for (let i = 0; i < bytes.length && result.length < length; i++) {
// Selezioniamo solo i byte che rientrano nel range del charset per evitare bias
if (bytes[i] < 256 - (256 % charset.length)) {
result += charset[bytes[i] % charset.length];
}
}
}
return result;
}
module.exports = {
hash,
generateShortId
};

View File

@@ -1,75 +0,0 @@
const { InfluxDB } = require('@influxdata/influxdb-client');
const url = process.env.INFLX_URL;
const token = process.env.INFLX_TOKEN;
const org = process.env.INFLX_ORG;
const bucket = 'boat';
const client = new InfluxDB({ url, token });
const queryApi = client.getQueryApi(org);
/**
* Query tutti i dati di una sessione sensore da un timestamp di inizio.
* Ritorna array di righe { _time, _measurement, _field, _value, sensor }.
*/
async function querySessionData(sensorId, fromTimestamp) {
const from = new Date(fromTimestamp).toISOString();
const query = `
from(bucket: "${bucket}")
|> range(start: ${from})
|> filter(fn: (r) => r["sensor"] == "${sensorId}")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
`;
const rows = [];
return new Promise((resolve, reject) => {
queryApi.queryRows(query, {
next(row, tableMeta) {
const obj = tableMeta.toObject(row);
rows.push(obj);
},
error(err) {
console.error(`[INFLUX] Query error:`, err.message);
reject(err);
},
complete() {
resolve(rows);
}
});
});
}
/**
* Formatta i risultati della query in CSV.
*/
function formatCSV(rows) {
if (rows.length === 0) return '';
// Raccogli tutte le colonne uniche escludendo meta-campi InfluxDB
const excludeKeys = new Set(['result', 'table', '_start', '_stop', '']);
const allKeys = new Set();
for (const row of rows) {
for (const key of Object.keys(row)) {
if (!excludeKeys.has(key)) allKeys.add(key);
}
}
const columns = ['_time', '_measurement', 'sensor',
...Array.from(allKeys).filter(k => !['_time', '_measurement', 'sensor'].includes(k)).sort()
];
const header = columns.join(',');
const lines = rows.map(row =>
columns.map(col => {
const val = row[col];
if (val == null) return '';
if (typeof val === 'string' && val.includes(',')) return `"${val}"`;
return val;
}).join(',')
);
return header + '\n' + lines.join('\n');
}
module.exports = { querySessionData, formatCSV };

View File

@@ -1,156 +0,0 @@
const { InfluxDB, Point } = require('@influxdata/influxdb-client');
const url = process.env.INFLX_URL;
const token = process.env.INFLX_TOKEN;
const org = process.env.INFLX_ORG;
const boatTelemetry = 'boat';
const client = new InfluxDB({ url, token });
const writeApi = client.getWriteApi(org, boatTelemetry);
const FIELD_MAP = {
logs: {
lat: 'latitude', lon: 'longitude', hdg: 'heading',
sog: 'speed_over_ground', cog: 'course_over_ground',
depth: 'depth', engTemp: 'engine_temperature',
fTemp: 'forecast_temperature', fHum: 'forecast_humidity', fPres: 'forecast_pressure',
fWSpd: 'forecast_wind_speed', fWDir: 'forecast_wind_direction',
wvH: 'wave_height', wvP: 'wave_period', wvD: 'wave_direction',
curD: 'current_direction', curV: 'current_velocity'
},
weather: {
temp: 'temperature', hum: 'humidity', pres: 'pressure',
wSpd: 'wind_speed', wDir: 'wind_direction', gust: 'wind_gusts',
rain: 'rain', prec: 'precipitation',
wvH: 'wave_height', wvP: 'wave_period', wvD: 'wave_direction',
wvPkP: 'wave_peak_period', curD: 'current_direction', curV: 'current_velocity'
},
forecast: {
temp: 'temperature', hum: 'humidity', pres: 'pressure',
wSpd: 'wind_speed', wDir: 'wind_direction',
precProb: 'precipitation_probability', prec: 'precipitation',
rain: 'rain', cloud: 'cloud_cover',
wvH: 'wave_height', wvP: 'wave_period', wvD: 'wave_direction',
curD: 'current_direction', curV: 'current_velocity'
}
};
async function writePoint(sensorId, timestamp, measurement, fields) {
try {
const map = FIELD_MAP[measurement] || {};
const point = new Point(measurement)
.tag('sensor', sensorId)
.timestamp(new Date(timestamp));
for (const [key, value] of Object.entries(fields)) {
if (value == null) continue;
const fieldName = map[key] || key;
if (typeof value === 'number') {
point.floatField(fieldName, value);
} else if (typeof value === 'string') {
point.stringField(fieldName, value);
} else if (typeof value === 'boolean') {
point.booleanField(fieldName, value);
}
}
writeApi.writePoint(point);
await writeApi.flush();
} catch (error) {
console.error(`[INFLUX] Errore writePoint (${measurement}):`, error.message);
}
}
async function writeForecastBatch(sensorId, points) {
try {
const map = FIELD_MAP.forecast || {};
for (const [ts, fields] of points) {
const point = new Point('forecast')
.tag('sensor', sensorId)
.timestamp(new Date(ts));
for (const [key, value] of Object.entries(fields)) {
if (value == null) continue;
const fieldName = map[key] || key;
if (typeof value === 'number') {
point.floatField(fieldName, value);
} else if (typeof value === 'string') {
point.stringField(fieldName, value);
}
}
writeApi.writePoint(point);
}
await writeApi.flush();
console.log(`[INFLUX] Scritti ${points.length} punti forecast per sensore ${sensorId}`);
} catch (error) {
console.error(`[INFLUX] Errore writeForecastBatch:`, error.message);
}
}
// --- Batch buffer per watchers ---
const BATCH_SIZE = 10;
const batchBuffers = new Map(); // sensorId → [{timestamp, measurement, fields}, ...]
function bufferPoint(sensorId, timestamp, measurement, fields) {
if (!batchBuffers.has(sensorId)) {
batchBuffers.set(sensorId, []);
}
const buffer = batchBuffers.get(sensorId);
buffer.push({ timestamp, measurement, fields });
if (buffer.length >= BATCH_SIZE) {
const batch = buffer.splice(0, BATCH_SIZE);
writeBatch(sensorId, batch);
}
}
async function writeBatch(sensorId, batch) {
try {
for (const { timestamp, measurement, fields } of batch) {
const map = FIELD_MAP[measurement] || {};
const point = new Point(measurement)
.tag('sensor', sensorId)
.timestamp(new Date(timestamp));
for (const [key, value] of Object.entries(fields)) {
if (value == null) continue;
const fieldName = map[key] || key;
if (typeof value === 'number') {
point.floatField(fieldName, value);
} else if (typeof value === 'string') {
point.stringField(fieldName, value);
} else if (typeof value === 'boolean') {
point.booleanField(fieldName, value);
}
}
writeApi.writePoint(point);
}
await writeApi.flush();
console.log(`[INFLUX] Batch scritto: ${batch.length} punti per sensore ${sensorId}`);
} catch (error) {
console.error(`[INFLUX] Errore writeBatch:`, error.message);
}
}
async function flushBuffer(sensorId) {
const buffer = batchBuffers.get(sensorId);
if (!buffer || buffer.length === 0) return [];
const remaining = buffer.splice(0);
await writeBatch(sensorId, remaining);
return remaining;
}
function getBufferedPoints(sensorId) {
return batchBuffers.get(sensorId) || [];
}
function clearBuffer(sensorId) {
batchBuffers.delete(sensorId);
}
module.exports = { writePoint, writeForecastBatch, bufferPoint, flushBuffer, getBufferedPoints, clearBuffer, FIELD_MAP };

View File

@@ -1,91 +0,0 @@
const Redis = require('ioredis');
const redis = new Redis({
host: process.env.REDIS_HOST,
port: process.env.REDIS_PORT,
password: process.env.REDIS_PASSWORD,
});
// Client dedicato per subscribe (ioredis richiede client separato)
const redisSub = new Redis({
host: process.env.REDIS_HOST,
port: process.env.REDIS_PORT,
password: process.env.REDIS_PASSWORD,
});
redis.on('error', (error) => {
console.error('Redis error:', error);
});
redis.on('connect', () => {
console.log('Server connected to Redis DB');
});
redisSub.on('error', (error) => {
console.error('Redis sub error:', error);
});
const sensors_hash_map = 'sensors:sessions';
async function setSession(sensorID, metadata) {
await redis.hset(sensors_hash_map, sensorID, JSON.stringify(metadata));
}
async function getSession(sensorID) {
return await redis.hget(sensors_hash_map, sensorID);
}
async function deleteSession(sensorID) {
await redis.hdel(sensors_hash_map, sensorID);
}
async function getSessions() {
return await redis.hgetall(sensors_hash_map);
}
// --- Pub/Sub per live watchers ---
async function publishSensorData(sensorId, data) {
await redis.publish(`sensor:data:${sensorId}`, JSON.stringify(data));
}
async function addWatcher(sensorId) {
return await redis.incr(`sensor:watchers:${sensorId}`);
}
async function removeWatcher(sensorId) {
const count = await redis.decr(`sensor:watchers:${sensorId}`);
if (count <= 0) {
await redis.del(`sensor:watchers:${sensorId}`);
return 0;
}
return count;
}
async function getWatcherCount(sensorId) {
const count = await redis.get(`sensor:watchers:${sensorId}`);
return parseInt(count) || 0;
}
async function checkRedis() {
try {
await redis.ping();
return true;
} catch (error) {
return false;
}
}
module.exports = {
setSession,
getSession,
deleteSession,
getSessions,
publishSensorData,
addWatcher,
removeWatcher,
getWatcherCount,
checkRedis,
redis,
redisSub
};

View File

@@ -1,52 +0,0 @@
const { redis } = require('./redis');
const { generateShortId } = require('./cryptoUtils');
const TOKEN_PREFIX = 'token:pending:';
/**
* Genera un nuovo token effimero valido per i prossimi 5 secondi.
*/
async function setToken(sensorId, metadata = {}, duration = 5) {
const token = generateShortId(8);
const key = `${TOKEN_PREFIX}${token}`;
const payload = JSON.stringify({
sensorId,
metadata,
createdAt: Date.now()
});
await redis.set(key, payload, 'EX', duration);
return token;
}
/**
* Consuma (valida e rimuove) un token.
* @returns {Object|null} - I dati della sessione se valida, altrimenti null
*/
async function consumeToken(token) {
const key = `${TOKEN_PREFIX}${token}`;
// Recupera il token
const rawData = await redis.get(key);
if (!rawData) {
return null;
}
// Il token è monouso: lo cancelliamo subito dopo la lettura
await redis.del(key);
try {
const data = JSON.parse(rawData);
return data; // Ritorna l'intero oggetto (sensorId, metadata, ecc.)
} catch (e) {
console.error('Error parsing token data:', e);
return null;
}
}
module.exports = {
setToken,
consumeToken
};

View File

@@ -1,155 +1,10 @@
const express = require('express');
const WebSocket = require('ws');
const Redis = require('ioredis');
const redisHelper = require('./helper/redis');
const influxWriter = require('./helper/influxWriter');
const influxReader = require('./helper/influxReader');
const app = express();
app.use(express.json());
app.use((req, res, next) => {
res.header('Access-Control-Allow-Origin', req.headers.origin || '*');
res.header('Access-Control-Allow-Credentials', 'true');
res.header('Access-Control-Allow-Headers', 'Content-Type, Authorization');
res.header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS');
if (req.method === 'OPTIONS') return res.sendStatus(204);
next();
});
app.get('/', (req, res) => {
res.redirect('/health');
});
app.get('/', (req, res) => {});
app.get('/health', async (req, res) => {
const dbConnected = await require('./helper/authdb').checkDB();
const redisConnected = await redisHelper.checkRedis();
console.log('DATABASE LOGS', process.env.DB_USER, process.env.DB_HOST, process.env.DB_NAME, process.env.DB_PASSWORD, process.env.DB_PORT);
console.log('REDIS LOGS', process.env.REDIS_HOST, process.env.REDIS_PORT);
res.status(200).send({
status: dbConnected && redisConnected ? 'OK' : 'DEGRADED',
database: dbConnected ? 'connected' : 'disconnected',
redis: redisConnected ? 'connected' : 'disconnected',
service: 'realtime',
version: process.env.VERSION,
build: process.env.VERSION_BUILD,
state: process.env.VERSION_STATE,
});
});
app.use('/sensors', require('./routes/sensors'));
app.use('/connect', require('./routes/connect'));
app.use('/sessions', require('./routes/sessions'));
// --- Flush buffer e CSV export ---
app.post('/sessions/:sensorId/flush', async (req, res) => {
try {
const { sensorId } = req.params;
const flushed = await influxWriter.flushBuffer(sensorId);
res.status(200).json({ flushed: flushed.length });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.get('/sessions/:sensorId/csv', async (req, res) => {
try {
const { sensorId } = req.params;
const { from } = req.query;
if (!from) return res.status(400).json({ error: 'from timestamp required' });
// Flusha prima il buffer residuo
await influxWriter.flushBuffer(sensorId);
const rows = await influxReader.querySessionData(sensorId, parseInt(from));
const csv = influxReader.formatCSV(rows);
res.setHeader('Content-Type', 'text/csv');
res.setHeader('Content-Disposition', `attachment; filename="session_${sensorId}.csv"`);
res.send(csv);
} catch (error) {
res.status(500).json({ error: error.message });
}
});
const server = app.listen(3000, '0.0.0.0', async () => {
app.listen(3000, '0.0.0.0', () => {
console.log(`Realtime started`);
await require('./helper/authdb').initDB();
});
require('./socket')(server);
const wss = new WebSocket.Server({ server, path: '/live' });
wss.on('connection', (client) => {
let watchedSensor = null;
let subscriber = null;
client.on('message', async (raw) => {
try {
const msg = JSON.parse(raw);
if (msg.action === 'watch' && msg.sensorId) {
// Rimuovi watch precedente se esiste
if (watchedSensor) {
await redisHelper.removeWatcher(watchedSensor);
if (subscriber) {
subscriber.unsubscribe();
subscriber.quit();
subscriber = null;
}
}
watchedSensor = msg.sensorId;
await redisHelper.addWatcher(watchedSensor);
// Subscriber Redis dedicato per questo client
subscriber = new Redis({
host: process.env.REDIS_HOST,
port: process.env.REDIS_PORT
});
subscriber.subscribe(`sensor:data:${watchedSensor}`);
subscriber.on('message', (channel, message) => {
if (client.readyState === WebSocket.OPEN) {
client.send(message); // Dati gia' JSON
}
});
client.send(JSON.stringify({ type: 'watching', sensorId: watchedSensor }));
}
if (msg.action === 'unwatch') {
if (watchedSensor) {
await redisHelper.removeWatcher(watchedSensor);
watchedSensor = null;
}
if (subscriber) {
subscriber.unsubscribe();
subscriber.quit();
subscriber = null;
}
client.send(JSON.stringify({ type: 'unwatched' }));
}
} catch (err) {
console.error('[WS-LIVE] Message error:', err);
}
});
client.on('close', async () => {
if (watchedSensor) {
await redisHelper.removeWatcher(watchedSensor);
}
if (subscriber) {
subscriber.unsubscribe();
subscriber.quit();
}
});
client.on('error', (err) => {
console.error('[WS-LIVE] Client error:', err);
});
});

View File

@@ -1,74 +0,0 @@
const express = require('express');
const db = require('../helper/authdb');
const tokenStore = require('../helper/tokenStore');
const redis = require('../helper/redis');
const router = express.Router();
/**
* POST /connect
* Il sensore invia il suo codice segreto (token) e metadati opzionali.
* Se autentica, riceve un token effimero per la connessione WebSocket.
*/
router.post('/', async (req, res) => {
try {
const { token, metadata } = req.body;
if (!token) {
return res.status(400).send({ error: 'Token is required' });
}
const sensor = await db.getSensor(token);
if (!sensor) {
return res.status(401).send({ error: 'token not valid' });
}
if (!sensor.is_active) {
return res.status(403).send({ error: 'token not valid' });
}
// Genera il token effimero valido per max 5 secondi
const socketToken = await tokenStore.setToken(sensor.id, metadata, 5);
return res.status(200).send({
socketToken,
sensorId: sensor.id,
expiresIn: 5
});
} catch (error) {
return res.status(500).send({ error: `${error}` });
}
});
/**
* DELETE /connect/:sensorId
* Disconnette forzatamente un sensore rimuovendo la sua sessione da Redis.
*/
router.delete('/:sensorId', async (req, res) => {
const { sensorId } = req.params;
try {
await redis.deleteSession(sensorId);
return res.status(200).send({ result: 'disconnected' });
} catch (error) {
return res.status(500).send({ error: `${error}` });
}
});
/**
* POST /connect/new
* Crea un nuovo sensore nel database.
*/
router.post('/new', async (req, res) => {
const { name, code } = req.body;
if (!name || !code) {
return res.status(400).send({ error: 'Name and code are required' });
}
try {
await db.createSensor(name, code);
return res.status(200).send({ result: 'created' });
} catch (error) {
return res.status(500).send({ error: `${error}` });
}
});
module.exports = router;

View File

@@ -1,36 +0,0 @@
const express = require('express');
const db = require('../helper/authdb');
router = express.Router();
router.get('/', async (req, res) => {
const sensors = await db.getSensors();
res.status(200).json(sensors);
});
router.post('/:id/:activity', async (req, res) => {
const { id, activity } = req.params;
let isActive;
if (activity === 'active') {
isActive = true;
} else if (activity === 'inactive') {
isActive = false;
} else {
return res.status(400).json({ error: 'Invalid activity' });
}
try {
const exists = await db.sensorsExists(id);
if (!exists) {
return res.status(404).json({ error: `Sensor with id ${id} not found` });
}
await db.setSensorActivity(id, isActive);
res.status(200).json({ status: `Sensor ${activity}` });
} catch (error) {
console.error('Error updating sensor ID:', id, error);
res.status(500).json({ error: 'Database error' });
}
})
module.exports = router

View File

@@ -1,36 +0,0 @@
const express = require('express');
const redis = require('../helper/redis');
const router = express.Router();
/**
* GET /sessions
* Ritorna tutti i sensori attualmente connessi con i loro metadati.
* Se viene passato un parametro ?sensor=ID, restituisce solo quello.
*/
router.get('/', async (req, res) => {
const { sensor } = req.query;
// Se viene passato un parametro ?sensor=ID, restituiamo solo quello
if (sensor) {
try {
const session = await redis.getSession(sensor);
if (!session) {
return res.status(404).json({ error: 'Session not found' });
}
return res.status(200).json(JSON.parse(session));
} catch (error) {
return res.status(500).json({ error: `${error}` });
}
}
// Altrimenti restituiamo tutta la lista
try {
const sessions = await redis.getSessions();
res.status(200).json(sessions);
} catch (error) {
res.status(500).json({ error: `${error}` });
}
});
module.exports = router;

View File

@@ -1,95 +0,0 @@
const WebSocket = require('ws');
const { encode, decode } = require('@msgpack/msgpack');
const url = require('url');
const tokenStore = require('./helper/tokenStore');
const redisHelper = require('./helper/redis');
const influxWriter = require('./helper/influxWriter');
module.exports = function setupSensorWebSocket(server) {
const wsPath = process.env.SENSOR_WS_PATH || '/sensor';
const ws = new WebSocket.Server({
server,
path: wsPath,
perMessageDeflate: false,
verifyClient: (info, callback) => {
console.log('[WS|verifyClient] URL:', info.req.url);
const { query } = url.parse(info.req.url, true);
const token = query.token;
console.log('[WS|verifyClient] Token ricevuto:', token);
if (!token) {
return callback(false, 401, 'token not passed');
}
tokenStore.consumeToken(token).then((sessionData) => {
if (!sessionData) {
return callback(false, 401, 'token not valid or expired');
}
info.req.sensorSession = sessionData;
callback(true);
}).catch((error) => {
callback(false, 500, `internal server error: ${error}`);
});
}
});
ws.on('connection', async (client, req) => {
const session = req.sensorSession;
const sensorId = session.sensorId;
client.sensorId = sensorId;
try {
await redisHelper.setSession(sensorId, {
...session.metadata,
connectedAt: Math.floor(Date.now() / 1000)
});
} catch (err) {
console.error(`[WS] Redis setSession error for ${sensorId}:`, err);
}
client.on('message', async (raw) => {
try {
const msg = decode(raw);
client.send(encode({ a: 1 }));
const [timestamp, measurement, fields] = msg;
redisHelper.publishSensorData(sensorId, { timestamp, measurement, fields });
const watchers = await redisHelper.getWatcherCount(sensorId);
if (measurement === 'forecast_batch') {
influxWriter.writeForecastBatch(sensorId, fields);
} else if (watchers > 0) {
influxWriter.bufferPoint(sensorId, timestamp, measurement, fields);
} else {
influxWriter.writePoint(sensorId, timestamp, measurement, fields);
}
} catch (err) {
console.error(`[WS|${sensorId}] decode error:`, err);
client.send(encode({ e: 1 }));
}
});
client.on('error', (err) => {
console.error(`[WS|${sensorId}] error:`, err);
});
client.on('close', async () => {
try {
await redisHelper.deleteSession(sensorId);
} catch (err) {
console.error(`[WS] Redis deleteSession error for ${sensorId}:`, err);
}
});
});
console.log(`[WS] Sensor websocket server ready on ${wsPath}`);
return ws;
};