Files
signalk-plugin/plugin/realtime/core.js
2026-03-11 15:25:03 +01:00

451 lines
14 KiB
JavaScript

const WebSocket = require('ws');
const msgpack = require('msgpack-lite');
const { loadSensorReferencesFromServer, checkSensorReferencesVersion } = require('../config');
const dataHub = require('../tools/dataHub');
// Stato connessione
let ws = null;
let sendTimer = null;
let isConnected = false;
let reconnectTimer = null;
let pingInterval = null;
let configCheckTimer = null;
let app = null;
// Sensor references (caricati dal server)
let sensorRules = null;
// Buffer locale anti-perdita dati
const localBuffer = [];
const MAX_BUFFER_SIZE = 3600; // ~1h di dati a 1/sec
// Statistiche (solo in memoria, niente file I/O)
let stats = {
sensorID: '',
sent: 0,
firstSent: null,
sentEveryMLS: 1000,
reconnections: 0,
status: 'disconnected',
buffered: 0,
lastConfigVersion: null
};
// Reconnection con exponential backoff
const BASE_RECONNECT_DELAY = 2000;
const MAX_RECONNECT_DELAY = 60000;
/**
* Inizializza il modulo realtime.
* 1. Autentica il sensore per ottenere un ticket
* 2. Usa il ticket per caricare i sensor references (endpoint autenticato)
* 3. Usa lo stesso ticket per connettere il WebSocket
*/
async function init(signalKApp, sensorCode) {
app = signalKApp;
stats.sensorID = sensorCode || process.env.SENSOR_CODE || 'N/D';
stats.sentEveryMLS = parseInt(process.env.SEND_INTERVAL || '500');
console.log(`[MEB] Send interval: ${stats.sentEveryMLS}ms (SEND_INTERVAL=${process.env.SEND_INTERVAL || 'default 500'})`);
// Autenticazione unica: ottieni ticket
const authResult = await authenticate();
if (authResult) {
// Carica sensor references con ticket (read-only, non consuma il ticket)
sensorRules = await loadSensorReferencesFromServer(authResult.ticket);
if (sensorRules) stats.lastConfigVersion = sensorRules.version;
// Connetti WebSocket con lo stesso ticket (viene consumato qui)
connectWebSocket(authResult.wsUrl, authResult.ticket);
} else {
// Fallback: carica references senza auth, programma riconnessione
console.warn('[MEB] Auth fallita, carico references senza autenticazione');
sensorRules = await loadSensorReferencesFromServer();
if (sensorRules) stats.lastConfigVersion = sensorRules.version;
scheduleReconnect();
}
// Avvia polling versione config ogni 5 minuti
configCheckTimer = setInterval(checkConfigUpdate, 5 * 60 * 1000);
}
/**
* Controlla se la config sensori sul server e' cambiata e la ricarica.
*/
async function checkConfigUpdate() {
const newVersion = await checkSensorReferencesVersion(stats.lastConfigVersion);
if (newVersion) {
console.log(`[MEB] Sensor config aggiornata: ${stats.lastConfigVersion}${newVersion}`);
sensorRules = await loadSensorReferencesFromServer();
if (sensorRules) {
stats.lastConfigVersion = sensorRules.version;
}
}
}
// ──────────────────── AUTENTICAZIONE ────────────────────
async function authenticate() {
try {
const REALTIME_URL = process.env.REALTIME_URL || 'http://localhost:3002';
const url = REALTIME_URL + '/connect/request';
const res = await fetch(url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ sensor_code: stats.sensorID })
});
if (!res.ok) {
console.error(`[MEB] Realtime Auth failed: ${res.status}`);
return null;
}
const data = await res.json();
if (!data.success || !data.ticket) return null;
return {
ticket: data.ticket,
wsUrl: data.ws_url || REALTIME_URL.replace('http', 'ws') + '/ws'
};
} catch (err) {
console.error('[MEB] Error in Realtime auth:', err.message);
return null;
}
}
// ──────────────────── WEBSOCKET ────────────────────
function connectWebSocket(wsUrl, ticket) {
const fullUrl = `${wsUrl}?ticket=${ticket}`;
ws = new WebSocket(fullUrl);
ws.on('open', () => {
console.log('[MEB] Realtime WebSocket connected');
isConnected = true;
stats.status = 'connected';
stats.reconnections = 0; // Reset su connessione riuscita
startSending();
startPingInterval();
// Flush buffer locale dopo reconnessione
flushBuffer();
});
ws.on('close', (code) => {
console.log(`[MEB] Realtime WebSocket closed (code: ${code})`);
isConnected = false;
stats.status = 'disconnected';
stopSending();
stopPingInterval();
scheduleReconnect();
});
ws.on('error', (err) => {
console.error('[MEB] Realtime WebSocket error:', err.message);
isConnected = false;
stats.status = 'error';
});
ws.on('message', (data) => {
// Gestisce messaggi dal server (comandi, conferme, ecc.)
try {
const decoded = msgpack.decode(data);
if (decoded?.type === 'connected') {
console.log(`[MEB] Server confirmed connection: sensorId=${decoded.sensorId}`);
}
} catch {
// Ignora messaggi non decodificabili
}
});
}
// ──────────────────── INVIO DATI (1/sec) ────────────────────
function startSending() {
stopSending();
sendData(); // Prima chiamata immediata
sendTimer = setInterval(sendData, stats.sentEveryMLS);
}
function stopSending() {
if (sendTimer) {
clearInterval(sendTimer);
sendTimer = null;
}
}
/**
* Legge un valore dal data model di SignalK.
*/
function getSignalKData(skPath) {
const val = app.getSelfPath(skPath);
return val && val.value !== undefined && val.value !== null ? val.value : null;
}
/**
* Raccoglie TUTTI i dati sensore definiti nella config.
* Produce chiavi flat: "temperature", "wind_direction", "position_latitude", ecc.
* Stessa logica di logRecorder.collectSensorData().
*/
function collectAllSensorData() {
const data = {};
if (!sensorRules || !sensorRules.items) {
// Fallback hardcoded se non ci sono regole
return {
service_battery_voltage: getSignalKData('electrical.batteries.service.Voltage') || 0,
service_battery_stateOfCharge: (getSignalKData('electrical.batteries.service.stateOfCharge') || 0) * 100,
traction_battery_power: getSignalKData('electrical.batteries.traction.power') || 0,
temperature: (getSignalKData('meb.temperature') || 273.15) - 273.15,
position_latitude: app.getSelfPath('navigation.position')?.value?.latitude || 0,
position_longitude: app.getSelfPath('navigation.position')?.value?.longitude || 0
};
}
for (const item of sensorRules.items) {
const mainPath = item.main_path;
if (!item.elements || item.elements === null) {
// Campo singolo: usa il nome della collection
data[item.collection] = getSignalKData(mainPath);
} else {
for (const element of item.elements) {
// Separa subelements dalle proprietà campo
const { subelements, ...fields } = element;
const [fieldName, subPath] = Object.entries(fields)[0];
const keyName = item.collection
? `${item.collection}_${fieldName}`
: fieldName;
if (fieldName === 'latitude' || fieldName === 'longitude') {
const baseValue = app.getSelfPath(`${mainPath}.position`)?.value;
data[keyName] = (baseValue && typeof baseValue === 'object')
? baseValue[fieldName] ?? null
: null;
} else {
data[keyName] = getSignalKData(`${mainPath}.${subPath}`);
}
// Gestisci subelementi (es. direction.average)
if (subelements && Array.isArray(subelements)) {
for (const sub of subelements) {
const [subFieldName, subSubPath] = Object.entries(sub)[0];
const subKey = `${keyName}_${subFieldName}`;
data[subKey] = getSignalKData(`${mainPath}.${subPath}.${subSubPath}`);
}
}
}
}
}
return data;
}
/**
* Invia dati sensore al server via WebSocket (msgpack).
* Se il WS e' disconnesso, buffer localmente.
*/
function sendData() {
const data = collectAllSensorData();
// Aggiorna la cache centralizzata per Telegram e altri consumer
dataHub.updateSensorData(data);
const message = {
type: 'sensor',
ts: Date.now(),
data
};
if (!ws || ws.readyState !== WebSocket.OPEN) {
bufferLocally(message);
return;
}
try {
ws.send(msgpack.encode(message));
stats.sent++;
if (!stats.firstSent) stats.firstSent = new Date().toISOString();
} catch (err) {
console.error('[MEB] Error sending realtime data:', err.message);
bufferLocally(message);
}
}
// ──────────────────── WEATHER (REST API) ────────────────────
/**
* Invia dati meteo al server via REST API dedicata (POST /weather).
* Non usa piu' il WebSocket per i dati meteo — endpoint REST separato.
*/
async function sendWeatherPayload(payload) {
try {
const REALTIME_URL = process.env.REALTIME_URL || 'http://localhost:3002';
const url = `${REALTIME_URL}/weather`;
const res = await fetch(url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
sensor_code: stats.sensorID,
data: payload
})
});
if (res.ok) {
const result = await res.json();
console.log(`[MEB] Weather payload inviato via REST — sensor: ${result.sensor}`);
} else {
console.error(`[MEB] Weather REST failed: ${res.status} ${res.statusText}`);
}
} catch (err) {
console.error('[MEB] Error sending weather via REST:', err.message);
}
}
// ──────────────────── BUFFER ANTI-PERDITA ────────────────────
function bufferLocally(message) {
localBuffer.push(message);
if (localBuffer.length > MAX_BUFFER_SIZE) {
localBuffer.shift(); // Rimuovi il piu' vecchio
}
stats.buffered = localBuffer.length;
}
/**
* Flush del buffer locale verso il server dopo reconnessione.
* Invia gradualmente per non sovraccaricare il WS.
*/
function flushBuffer() {
if (localBuffer.length === 0) return;
console.log(`[MEB] Flushing ${localBuffer.length} buffered messages...`);
const flushBatch = () => {
if (localBuffer.length === 0 || !ws || ws.readyState !== WebSocket.OPEN) {
stats.buffered = localBuffer.length;
return;
}
// Invia 10 messaggi alla volta per non bloccare
const batch = Math.min(localBuffer.length, 10);
for (let i = 0; i < batch; i++) {
const msg = localBuffer.shift();
try {
ws.send(msgpack.encode(msg));
stats.sent++;
} catch {
localBuffer.unshift(msg);
break;
}
}
stats.buffered = localBuffer.length;
if (localBuffer.length > 0) {
setTimeout(flushBatch, 100); // Pausa tra batch
} else {
console.log('[MEB] Buffer flush completato');
}
};
// Attendi 1s dopo la connessione prima di iniziare il flush
setTimeout(flushBatch, 1000);
}
// ──────────────────── RECONNESSIONE ────────────────────
function scheduleReconnect() {
if (reconnectTimer) return;
stats.reconnections++;
// Exponential backoff con jitter
const delay = Math.min(
BASE_RECONNECT_DELAY * Math.pow(1.5, Math.min(stats.reconnections, 15)),
MAX_RECONNECT_DELAY
);
const jitter = delay * 0.2 * Math.random();
const finalDelay = Math.round(delay + jitter);
console.log(`[MEB] Reconnecting in ${Math.round(finalDelay / 1000)}s (tentativo ${stats.reconnections})`);
reconnectTimer = setTimeout(async () => {
reconnectTimer = null;
start();
}, finalDelay);
}
// ──────────────────── PING/PONG ────────────────────
function startPingInterval() {
stopPingInterval();
pingInterval = setInterval(() => {
if (ws && ws.readyState === WebSocket.OPEN) {
ws.ping();
}
}, 25000); // 25s, server ha heartbeat a 30s
}
function stopPingInterval() {
if (pingInterval) {
clearInterval(pingInterval);
pingInterval = null;
}
}
// ──────────────────── START / STOP ────────────────────
async function start() {
const result = await authenticate();
if (!result) {
scheduleReconnect();
return;
}
connectWebSocket(result.wsUrl, result.ticket);
}
/**
* Ferma tutto: WebSocket, timer, ping, config check.
*/
function stop() {
stopSending();
stopPingInterval();
if (reconnectTimer) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
if (configCheckTimer) {
clearInterval(configCheckTimer);
configCheckTimer = null;
}
if (ws) {
ws.close(1000, 'Plugin stopping');
ws = null;
}
isConnected = false;
stats.status = 'stopped';
console.log('[MEB] Realtime module stopped');
}
function getStats() {
return { ...stats, isConnected, bufferSize: localBuffer.length };
}
function getSensorRules() {
return sensorRules;
}
module.exports = {
init,
stop,
sendWeatherPayload,
collectAllSensorData,
getSensorRules,
getStats
};