const { InfluxDB, Point } = require('@influxdata/influxdb-client'); const client = new InfluxDB({ url: process.env.INFLX_URL, token: process.env.INFLX_TOKEN, }); const bucket = process.env.INFLX_BUCKET || 'logs'; const org = process.env.INFLX_ORG; const writeApi = client.getWriteApi(org, bucket, 'ms', { flushInterval: 100, batchSize: 50, }); /** * Scrive dati generici su InfluxDB senza mapping. * @param {string} measurement - nome della measurement (es. 'logs', 'weather') * @param {Object} fields - campi { key: value } * @param {string} sensor - nome del sensore * @param {string} session - id sessione (tag immutabile) * @param {number} timestamp - timestamp unix ms */ function writeGenericData(measurement, fields, sensor, session, timestamp) { const point = new Point(measurement) .tag('sensor', sensor) .tag('session', session) .timestamp(timestamp); for (const [key, value] of Object.entries(fields)) { if (value === null || value === undefined) continue; if (typeof value === 'number') { point.floatField(key, value); } else if (typeof value === 'string') { point.stringField(key, value); } } writeApi.writePoint(point); } /** * Scrive un batch di punti forecast (previsioni orarie). * @param {Array} points - array di [timestamp_ms, { key: value, ... }] * @param {string} sensor - nome del sensore * @param {string} session - id sessione */ function writeForecastBatch(points, sensor, session) { for (const [ts, fields] of points) { writeGenericData('weather_forecast', fields, sensor, session, ts); } } /** * Forza il flush del buffer di scrittura. */ async function flush() { try { await writeApi.flush(); } catch (err) { console.error('[INFLUX] Flush error:', err.message); } } /** * Query storica per una sessione: ritorna righe pivotate con tutti i campi. * @param {string} sensor - nome sensore * @param {string} session - session_id (tag InfluxDB) * @param {string} since - ISO timestamp o duration (es. "-30d") * @returns {Array} */ async function queryHistory(sensor, session, since) { const queryApi = client.getQueryApi(org); const fluxQuery = ` from(bucket: "${bucket}") |> range(start: ${since}) |> filter(fn: (r) => r._measurement == "logs") |> filter(fn: (r) => r.sensor == "${sensor}") |> filter(fn: (r) => r.session == "${session}") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") |> sort(columns: ["_time"]) `; const rows = []; return new Promise((resolve, reject) => { queryApi.queryRows(fluxQuery, { next(row, tableMeta) { rows.push(tableMeta.toObject(row)); }, error: reject, complete() { resolve(rows); }, }); }); } /** * Esporta tutti i dati di una sessione come CSV. * @param {string} sensor - nome sensore * @param {string} session - session_id * @param {string} since - ISO timestamp inizio (opzionale, default -30d) * @returns {string} CSV content */ async function exportSessionCSV(sensor, session, since) { const start = since || '-30d'; const rows = await queryHistory(sensor, session, start); if (rows.length === 0) return ''; // Raccogli tutti i field names (esclusi meta InfluxDB) const metaKeys = new Set(['result', 'table', '_start', '_stop', '_measurement', 'sensor', 'session', '']); const fieldNames = new Set(); for (const row of rows) { for (const key of Object.keys(row)) { if (!metaKeys.has(key) && key !== '_time') { fieldNames.add(key); } } } const fields = Array.from(fieldNames).sort(); const header = ['timestamp', ...fields].join(','); const csvRows = rows.map(row => { const ts = row._time || ''; const values = fields.map(f => { const v = row[f]; if (v === null || v === undefined) return ''; return v; }); return [ts, ...values].join(','); }); return header + '\n' + csvRows.join('\n') + '\n'; } module.exports = { writeGenericData, writeForecastBatch, flush, queryHistory, exportSessionCSV };