const skFlow = require('../config/skFlow'); const realtimeCore = require('./realtime/core'); const rulesets = require('./rulesets'); /** * Helper: legge i codici openmeteo + il path SK destinato per i 4 tipi forecast/marine, * leggendoli dal ruleset attivo. Fall back ai default in rules.js se il ruleset * non e' ancora stato caricato. */ function paramsAndMap(type) { const items = rulesets.getEnabledItems(type); if (!items.length) return { codes: [], map: {} }; const codes = items.map(it => it.path).filter(Boolean); const map = {}; for (const it of items) { if (it.meta?.sk_path) map[it.path] = it.meta.sk_path; } return { codes, map }; } const FETCH_TIMEOUT = 10000; const FORECAST_API = 'https://api.open-meteo.com/v1/forecast'; const MARINE_API = 'https://marine-api.open-meteo.com/v1/marine'; // I path map sono ora ottenuti dal ruleset (rulesets.getPathMap('forecast_*'/'marine_*')) /** * Fetch JSON con timeout */ async function fetchJSON(url) { const controller = new AbortController(); const timeoutId = setTimeout(() => controller.abort(), FETCH_TIMEOUT); try { const res = await fetch(url, { signal: controller.signal }); clearTimeout(timeoutId); if (!res.ok) throw new Error(`HTTP ${res.status}`); return await res.json(); } catch (err) { clearTimeout(timeoutId); throw err; } } /** * Pubblica i dati current su Signal K usando i path mappati. */ function publishCurrentToSignalK(forecastData, marineData) { const skData = {}; const fMap = paramsAndMap('forecast_current').map; const mMap = paramsAndMap('marine_current').map; if (forecastData?.current) { for (const [key, value] of Object.entries(forecastData.current)) { if (key === 'time' || key === 'interval') continue; const skPath = fMap[key]; if (skPath && value != null) skData[skPath] = value; } } if (marineData?.current) { for (const [key, value] of Object.entries(marineData.current)) { if (key === 'time' || key === 'interval') continue; const skPath = mMap[key]; if (skPath && value != null) skData[skPath] = value; } } if (Object.keys(skData).length > 0) { skData['meb.weather.timestamp'] = Date.now(); skFlow.publish(skData); console.log(`[OPENMETEO] pubblicati ${Object.keys(skData).length} valori su Signal K`); } } /** * Invia i dati current weather al server realtime (measurement: weather). * Usa i path mappati come field keys per InfluxDB. */ function sendCurrentToRealtime(forecastData, marineData) { const fields = {}; const fMap = paramsAndMap('forecast_current').map; const mMap = paramsAndMap('marine_current').map; if (forecastData?.current) { for (const [key, value] of Object.entries(forecastData.current)) { if (key === 'time' || key === 'interval') continue; const skPath = fMap[key]; if (skPath && value != null) fields[skPath] = value; } } if (marineData?.current) { for (const [key, value] of Object.entries(marineData.current)) { if (key === 'time' || key === 'interval') continue; const skPath = mMap[key]; if (skPath && value != null) fields[skPath] = value; } } if (Object.keys(fields).length > 0) { realtimeCore.send([Date.now(), 'weather', fields]); } } /** * Invia i dati hourly forecast come batch al server realtime (measurement: weather_forecast). * Ogni punto usa i path mappati come field keys. */ function sendForecastBatchToRealtime(forecastData, marineData) { const forecastHourly = forecastData?.hourly; const marineHourly = marineData?.hourly; if (!forecastHourly?.time && !marineHourly?.time) return; const times = forecastHourly?.time || marineHourly?.time; const points = []; const fMap = paramsAndMap('forecast_hourly').map; const mMap = paramsAndMap('marine_hourly').map; for (let i = 0; i < times.length; i++) { const ts = new Date(times[i]).getTime(); const fields = {}; if (forecastHourly) { for (const [key, values] of Object.entries(forecastHourly)) { if (key === 'time') continue; const skPath = fMap[key]; if (skPath && values?.[i] != null) fields[skPath] = values[i]; } } if (marineHourly) { for (const [key, values] of Object.entries(marineHourly)) { if (key === 'time') continue; const skPath = mMap[key]; if (skPath && values?.[i] != null) fields[skPath] = values[i]; } } if (Object.keys(fields).length > 0) { points.push([ts, fields]); } } if (points.length > 0) { realtimeCore.sendRaw({ ts: 0, _m: 'forecast_batch', points }); console.log(`[OPENMETEO] Batch forecast inviato: ${points.length} punti orari`); } } // ========== FUNZIONI PRINCIPALI ========== /** * Fetch dati meteo current (ogni 5 minuti). */ async function fetchCurrentWeather(location) { if (!location?.latitude || !location?.longitude) { console.warn('[OPENMETEO] Coordinate non valide'); return; } const forecastCurrent = paramsAndMap('forecast_current').codes; const marineCurrent = paramsAndMap('marine_current').codes; if (forecastCurrent.length === 0 && marineCurrent.length === 0) { console.warn('[OPENMETEO] Nessun parametro current configurato'); return; } console.log(`[OPENMETEO] Fetch current — forecast:${forecastCurrent.length} marine:${marineCurrent.length}`); let forecastData = null, marineData = null; try { const promises = []; if (forecastCurrent.length > 0) { const url = `${FORECAST_API}?latitude=${location.latitude}&longitude=${location.longitude}¤t=${forecastCurrent.join(',')}`; promises.push(fetchJSON(url).then(d => { forecastData = d; }).catch(e => { console.error(`[OPENMETEO] Errore forecast current: ${e.message}`); })); } if (marineCurrent.length > 0) { const url = `${MARINE_API}?latitude=${location.latitude}&longitude=${location.longitude}¤t=${marineCurrent.join(',')}&models=ecmwf_wam`; promises.push(fetchJSON(url).then(d => { marineData = d; }).catch(e => { console.error(`[OPENMETEO] Errore marine current: ${e.message}`); })); } await Promise.all(promises); publishCurrentToSignalK(forecastData, marineData); sendCurrentToRealtime(forecastData, marineData); } catch (err) { console.error(`[OPENMETEO] Errore fetch current: ${err.message}`); } } /** * Fetch previsioni orarie 7 giorni (ogni 1 ora). */ async function fetchHourlyForecasts(location) { if (!location?.latitude || !location?.longitude) { console.warn('[OPENMETEO] Coordinate non valide per forecast'); return; } const forecastHourly = paramsAndMap('forecast_hourly').codes; const marineHourly = paramsAndMap('marine_hourly').codes; if (forecastHourly.length === 0 && marineHourly.length === 0) { console.warn('[OPENMETEO] Nessun parametro hourly configurato'); return; } console.log(`[OPENMETEO] Fetch hourly 7gg — forecast:${forecastHourly.length} marine:${marineHourly.length}`); let forecastData = null, marineData = null; try { const promises = []; if (forecastHourly.length > 0) { const url = `${FORECAST_API}?latitude=${location.latitude}&longitude=${location.longitude}&hourly=${forecastHourly.join(',')}&forecast_days=7`; promises.push(fetchJSON(url).then(d => { forecastData = d; }).catch(e => { console.error(`[OPENMETEO] Errore forecast hourly: ${e.message}`); })); } if (marineHourly.length > 0) { const url = `${MARINE_API}?latitude=${location.latitude}&longitude=${location.longitude}&hourly=${marineHourly.join(',')}&forecast_days=7&models=ecmwf_wam`; promises.push(fetchJSON(url).then(d => { marineData = d; }).catch(e => { console.error(`[OPENMETEO] Errore marine hourly: ${e.message}`); })); } await Promise.all(promises); sendForecastBatchToRealtime(forecastData, marineData); } catch (err) { console.error(`[OPENMETEO] Errore fetch hourly: ${err.message}`); } } /** * Fetch completo: current + hourly. Chiamato all'avvio. */ async function fetchAll(location) { await fetchCurrentWeather(location); await fetchHourlyForecasts(location); } module.exports = { fetchCurrentWeather, fetchHourlyForecasts, fetchAll };