const express = require('express'); const WebSocket = require('ws'); const Redis = require('ioredis'); const redisHelper = require('./helper/redis'); const influxWriter = require('./helper/influxWriter'); const influxReader = require('./helper/influxReader'); const app = express(); app.use(express.json()); app.use((req, res, next) => { res.header('Access-Control-Allow-Origin', req.headers.origin || '*'); res.header('Access-Control-Allow-Credentials', 'true'); res.header('Access-Control-Allow-Headers', 'Content-Type, Authorization'); res.header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS'); if (req.method === 'OPTIONS') return res.sendStatus(204); next(); }); require('./socket'); app.get('/', (req, res) => { res.redirect('/health'); }); app.get('/health', async (req, res) => { const dbConnected = await require('./helper/authdb').checkDB(); const redisConnected = await redisHelper.checkRedis(); console.log('DATABASE LOGS', process.env.DB_USER, process.env.DB_HOST, process.env.DB_NAME, process.env.DB_PASSWORD, process.env.DB_PORT); console.log('REDIS LOGS', process.env.REDIS_HOST, process.env.REDIS_PORT); res.status(200).send({ status: dbConnected && redisConnected ? 'OK' : 'DEGRADED', database: dbConnected ? 'connected' : 'disconnected', redis: redisConnected ? 'connected' : 'disconnected', service: 'realtime', version: process.env.VERSION, build: process.env.VERSION_BUILD, state: process.env.VERSION_STATE, port: process.env.PORT }); }); app.use('/sensors', require('./routes/sensors')); app.use('/connect', require('./routes/connect')); app.use('/sessions', require('./routes/sessions')); // --- Flush buffer e CSV export --- app.post('/sessions/:sensorId/flush', async (req, res) => { try { const { sensorId } = req.params; const flushed = await influxWriter.flushBuffer(sensorId); res.status(200).json({ flushed: flushed.length }); } catch (error) { res.status(500).json({ error: error.message }); } }); app.get('/sessions/:sensorId/csv', async (req, res) => { try { const { sensorId } = req.params; const { from } = req.query; if (!from) return res.status(400).json({ error: 'from timestamp required' }); // Flusha prima il buffer residuo await influxWriter.flushBuffer(sensorId); const rows = await influxReader.querySessionData(sensorId, parseInt(from)); const csv = influxReader.formatCSV(rows); res.setHeader('Content-Type', 'text/csv'); res.setHeader('Content-Disposition', `attachment; filename="session_${sensorId}.csv"`); res.send(csv); } catch (error) { res.status(500).json({ error: error.message }); } }); // --- HTTP server + WebSocket per watchers live --- const server = app.listen(process.env.PORT, '0.0.0.0', () => { console.log(`Realtime on port ${process.env.PORT}`); }); const wss = new WebSocket.Server({ server, path: '/live' }); wss.on('connection', (client) => { let watchedSensor = null; let subscriber = null; client.on('message', async (raw) => { try { const msg = JSON.parse(raw); if (msg.action === 'watch' && msg.sensorId) { // Rimuovi watch precedente se esiste if (watchedSensor) { await redisHelper.removeWatcher(watchedSensor); if (subscriber) { subscriber.unsubscribe(); subscriber.quit(); subscriber = null; } } watchedSensor = msg.sensorId; await redisHelper.addWatcher(watchedSensor); // Subscriber Redis dedicato per questo client subscriber = new Redis({ host: process.env.REDIS_HOST, port: process.env.REDIS_PORT }); subscriber.subscribe(`sensor:data:${watchedSensor}`); subscriber.on('message', (channel, message) => { if (client.readyState === WebSocket.OPEN) { client.send(message); // Dati gia' JSON } }); client.send(JSON.stringify({ type: 'watching', sensorId: watchedSensor })); } if (msg.action === 'unwatch') { if (watchedSensor) { await redisHelper.removeWatcher(watchedSensor); watchedSensor = null; } if (subscriber) { subscriber.unsubscribe(); subscriber.quit(); subscriber = null; } client.send(JSON.stringify({ type: 'unwatched' })); } } catch (err) { console.error('[WS-LIVE] Message error:', err); } }); client.on('close', async () => { if (watchedSensor) { await redisHelper.removeWatcher(watchedSensor); } if (subscriber) { subscriber.unsubscribe(); subscriber.quit(); } }); client.on('error', (err) => { console.error('[WS-LIVE] Client error:', err); }); });