const skFlow = require('../config/skFlow'); const realtimeCore = require('./realtime/core'); const { FORECAST_CURRENT, FORECAST_HOURLY, MARINE_CURRENT, MARINE_HOURLY } = require('../rules'); const FETCH_TIMEOUT = 10000; const FORECAST_API = 'https://api.open-meteo.com/v1/forecast'; const MARINE_API = 'https://marine-api.open-meteo.com/v1/marine'; /** * Mapping da parametri API Open-Meteo a path Signal K. * Questi path vengono pubblicati sul databrowser e letti dai log. */ const FORECAST_PATH_MAP = { 'temperature_2m': 'meb.forecasts.temperature', 'wind_speed_10m': 'meb.forecast.wind.speed', 'wind_direction_10m': 'meb.forecast.wind.direction', 'wind_gusts_10m': 'meb.forecast.wind.gusts', 'precipitation': 'meb.forecast.precipitation', 'rain': 'meb.forecast.rain', 'relative_humidity_2m': 'meb.forecast.humidity', 'pressure_msl': 'meb.forecast.pressure', 'precipitation_probability':'meb.forecast.precipitationProbability', 'cloud_cover': 'meb.forecast.cloudCover', }; const MARINE_PATH_MAP = { 'wave_height': 'meb.waves.height', 'wave_direction': 'meb.waves.direction', 'wave_period': 'meb.waves.period', 'wave_peak_period': 'meb.waves.peakPeriod', 'ocean_current_velocity': 'meb.waves.currentVelocity', 'ocean_current_direction': 'meb.waves.currentDirection', }; /** * Fetch JSON con timeout */ async function fetchJSON(url) { const controller = new AbortController(); const timeoutId = setTimeout(() => controller.abort(), FETCH_TIMEOUT); try { const res = await fetch(url, { signal: controller.signal }); clearTimeout(timeoutId); if (!res.ok) throw new Error(`HTTP ${res.status}`); return await res.json(); } catch (err) { clearTimeout(timeoutId); throw err; } } /** * Pubblica i dati current su Signal K usando i path mappati. */ function publishCurrentToSignalK(forecastData, marineData) { const skData = {}; if (forecastData?.current) { for (const [key, value] of Object.entries(forecastData.current)) { if (key === 'time' || key === 'interval') continue; const skPath = FORECAST_PATH_MAP[key]; if (skPath && value != null) skData[skPath] = value; } } if (marineData?.current) { for (const [key, value] of Object.entries(marineData.current)) { if (key === 'time' || key === 'interval') continue; const skPath = MARINE_PATH_MAP[key]; if (skPath && value != null) skData[skPath] = value; } } if (Object.keys(skData).length > 0) { skData['meb.weather.timestamp'] = Date.now(); skFlow.publish(skData); console.log(`[OPENMETEO] Pubblicati ${Object.keys(skData).length} valori su Signal K`); } } /** * Invia i dati current weather al server realtime (measurement: weather). * Usa i path mappati come field keys per InfluxDB. */ function sendCurrentToRealtime(forecastData, marineData) { const fields = {}; if (forecastData?.current) { for (const [key, value] of Object.entries(forecastData.current)) { if (key === 'time' || key === 'interval') continue; const skPath = FORECAST_PATH_MAP[key]; if (skPath && value != null) fields[skPath] = value; } } if (marineData?.current) { for (const [key, value] of Object.entries(marineData.current)) { if (key === 'time' || key === 'interval') continue; const skPath = MARINE_PATH_MAP[key]; if (skPath && value != null) fields[skPath] = value; } } if (Object.keys(fields).length > 0) { realtimeCore.send([Date.now(), 'weather', fields]); } } /** * Invia i dati hourly forecast come batch al server realtime (measurement: weather_forecast). * Ogni punto usa i path mappati come field keys. */ function sendForecastBatchToRealtime(forecastData, marineData) { const forecastHourly = forecastData?.hourly; const marineHourly = marineData?.hourly; if (!forecastHourly?.time && !marineHourly?.time) return; const times = forecastHourly?.time || marineHourly?.time; const points = []; for (let i = 0; i < times.length; i++) { const ts = new Date(times[i]).getTime(); const fields = {}; if (forecastHourly) { for (const [key, values] of Object.entries(forecastHourly)) { if (key === 'time') continue; const skPath = FORECAST_PATH_MAP[key]; if (skPath && values?.[i] != null) fields[skPath] = values[i]; } } if (marineHourly) { for (const [key, values] of Object.entries(marineHourly)) { if (key === 'time') continue; const skPath = MARINE_PATH_MAP[key]; if (skPath && values?.[i] != null) fields[skPath] = values[i]; } } if (Object.keys(fields).length > 0) { points.push([ts, fields]); } } if (points.length > 0) { realtimeCore.sendRaw({ ts: 0, _m: 'forecast_batch', points }); console.log(`[OPENMETEO] Batch forecast inviato: ${points.length} punti orari`); } } // ========== FUNZIONI PRINCIPALI ========== /** * Fetch dati meteo current (ogni 5 minuti). */ async function fetchCurrentWeather(location) { if (!location?.latitude || !location?.longitude) { console.warn('[OPENMETEO] Coordinate non valide'); return; } if (FORECAST_CURRENT.length === 0 && MARINE_CURRENT.length === 0) { console.warn('[OPENMETEO] Nessun parametro current configurato'); return; } console.log(`[OPENMETEO] Fetch current — forecast: ${FORECAST_CURRENT.length} params, marine: ${MARINE_CURRENT.length} params`); let forecastData = null, marineData = null; try { const promises = []; if (FORECAST_CURRENT.length > 0) { const url = `${FORECAST_API}?latitude=${location.latitude}&longitude=${location.longitude}¤t=${FORECAST_CURRENT.join(',')}`; promises.push(fetchJSON(url).then(d => { forecastData = d; }).catch(e => { console.error(`[OPENMETEO] Errore forecast current: ${e.message}`); })); } if (MARINE_CURRENT.length > 0) { const url = `${MARINE_API}?latitude=${location.latitude}&longitude=${location.longitude}¤t=${MARINE_CURRENT.join(',')}&models=ecmwf_wam`; promises.push(fetchJSON(url).then(d => { marineData = d; }).catch(e => { console.error(`[OPENMETEO] Errore marine current: ${e.message}`); })); } await Promise.all(promises); publishCurrentToSignalK(forecastData, marineData); sendCurrentToRealtime(forecastData, marineData); } catch (err) { console.error(`[OPENMETEO] Errore fetch current: ${err.message}`); } } /** * Fetch previsioni orarie 7 giorni (ogni 1 ora). */ async function fetchHourlyForecasts(location) { if (!location?.latitude || !location?.longitude) { console.warn('[OPENMETEO] Coordinate non valide per forecast'); return; } if (FORECAST_HOURLY.length === 0 && MARINE_HOURLY.length === 0) { console.warn('[OPENMETEO] Nessun parametro hourly configurato'); return; } console.log(`[OPENMETEO] Fetch hourly 7gg — forecast: ${FORECAST_HOURLY.length} params, marine: ${MARINE_HOURLY.length} params`); let forecastData = null, marineData = null; try { const promises = []; if (FORECAST_HOURLY.length > 0) { const url = `${FORECAST_API}?latitude=${location.latitude}&longitude=${location.longitude}&hourly=${FORECAST_HOURLY.join(',')}&forecast_days=7`; promises.push(fetchJSON(url).then(d => { forecastData = d; }).catch(e => { console.error(`[OPENMETEO] Errore forecast hourly: ${e.message}`); })); } if (MARINE_HOURLY.length > 0) { const url = `${MARINE_API}?latitude=${location.latitude}&longitude=${location.longitude}&hourly=${MARINE_HOURLY.join(',')}&forecast_days=7&models=ecmwf_wam`; promises.push(fetchJSON(url).then(d => { marineData = d; }).catch(e => { console.error(`[OPENMETEO] Errore marine hourly: ${e.message}`); })); } await Promise.all(promises); sendForecastBatchToRealtime(forecastData, marineData); } catch (err) { console.error(`[OPENMETEO] Errore fetch hourly: ${err.message}`); } } /** * Fetch completo: current + hourly. Chiamato all'avvio. */ async function fetchAll(location) { await fetchCurrentWeather(location); await fetchHourlyForecasts(location); } module.exports = { fetchCurrentWeather, fetchHourlyForecasts, fetchAll };