157 lines
5.5 KiB
JavaScript
157 lines
5.5 KiB
JavaScript
const { InfluxDB, Point } = require('@influxdata/influxdb-client');
|
|
|
|
const url = process.env.INFLX_URL;
|
|
const token = process.env.INFLX_TOKEN;
|
|
const org = process.env.INFLX_ORG;
|
|
const boatTelemetry = 'boat';
|
|
|
|
const client = new InfluxDB({ url, token });
|
|
const writeApi = client.getWriteApi(org, boatTelemetry);
|
|
|
|
const FIELD_MAP = {
|
|
logs: {
|
|
lat: 'latitude', lon: 'longitude', hdg: 'heading',
|
|
sog: 'speed_over_ground', cog: 'course_over_ground',
|
|
depth: 'depth', engTemp: 'engine_temperature',
|
|
fTemp: 'forecast_temperature', fHum: 'forecast_humidity', fPres: 'forecast_pressure',
|
|
fWSpd: 'forecast_wind_speed', fWDir: 'forecast_wind_direction',
|
|
wvH: 'wave_height', wvP: 'wave_period', wvD: 'wave_direction',
|
|
curD: 'current_direction', curV: 'current_velocity'
|
|
},
|
|
weather: {
|
|
temp: 'temperature', hum: 'humidity', pres: 'pressure',
|
|
wSpd: 'wind_speed', wDir: 'wind_direction', gust: 'wind_gusts',
|
|
rain: 'rain', prec: 'precipitation',
|
|
wvH: 'wave_height', wvP: 'wave_period', wvD: 'wave_direction',
|
|
wvPkP: 'wave_peak_period', curD: 'current_direction', curV: 'current_velocity'
|
|
},
|
|
forecast: {
|
|
temp: 'temperature', hum: 'humidity', pres: 'pressure',
|
|
wSpd: 'wind_speed', wDir: 'wind_direction',
|
|
precProb: 'precipitation_probability', prec: 'precipitation',
|
|
rain: 'rain', cloud: 'cloud_cover',
|
|
wvH: 'wave_height', wvP: 'wave_period', wvD: 'wave_direction',
|
|
curD: 'current_direction', curV: 'current_velocity'
|
|
}
|
|
};
|
|
|
|
async function writePoint(sensorId, timestamp, measurement, fields) {
|
|
try {
|
|
const map = FIELD_MAP[measurement] || {};
|
|
const point = new Point(measurement)
|
|
.tag('sensor', sensorId)
|
|
.timestamp(new Date(timestamp));
|
|
|
|
for (const [key, value] of Object.entries(fields)) {
|
|
if (value == null) continue;
|
|
const fieldName = map[key] || key;
|
|
|
|
if (typeof value === 'number') {
|
|
point.floatField(fieldName, value);
|
|
} else if (typeof value === 'string') {
|
|
point.stringField(fieldName, value);
|
|
} else if (typeof value === 'boolean') {
|
|
point.booleanField(fieldName, value);
|
|
}
|
|
}
|
|
|
|
writeApi.writePoint(point);
|
|
await writeApi.flush();
|
|
} catch (error) {
|
|
console.error(`[INFLUX] Errore writePoint (${measurement}):`, error.message);
|
|
}
|
|
}
|
|
|
|
async function writeForecastBatch(sensorId, points) {
|
|
try {
|
|
const map = FIELD_MAP.forecast || {};
|
|
|
|
for (const [ts, fields] of points) {
|
|
const point = new Point('forecast')
|
|
.tag('sensor', sensorId)
|
|
.timestamp(new Date(ts));
|
|
|
|
for (const [key, value] of Object.entries(fields)) {
|
|
if (value == null) continue;
|
|
const fieldName = map[key] || key;
|
|
|
|
if (typeof value === 'number') {
|
|
point.floatField(fieldName, value);
|
|
} else if (typeof value === 'string') {
|
|
point.stringField(fieldName, value);
|
|
}
|
|
}
|
|
writeApi.writePoint(point);
|
|
}
|
|
|
|
await writeApi.flush();
|
|
console.log(`[INFLUX] Scritti ${points.length} punti forecast per sensore ${sensorId}`);
|
|
} catch (error) {
|
|
console.error(`[INFLUX] Errore writeForecastBatch:`, error.message);
|
|
}
|
|
}
|
|
|
|
// --- Batch buffer per watchers ---
|
|
const BATCH_SIZE = 10;
|
|
const batchBuffers = new Map(); // sensorId → [{timestamp, measurement, fields}, ...]
|
|
|
|
function bufferPoint(sensorId, timestamp, measurement, fields) {
|
|
if (!batchBuffers.has(sensorId)) {
|
|
batchBuffers.set(sensorId, []);
|
|
}
|
|
const buffer = batchBuffers.get(sensorId);
|
|
buffer.push({ timestamp, measurement, fields });
|
|
|
|
if (buffer.length >= BATCH_SIZE) {
|
|
const batch = buffer.splice(0, BATCH_SIZE);
|
|
writeBatch(sensorId, batch);
|
|
}
|
|
}
|
|
|
|
async function writeBatch(sensorId, batch) {
|
|
try {
|
|
for (const { timestamp, measurement, fields } of batch) {
|
|
const map = FIELD_MAP[measurement] || {};
|
|
const point = new Point(measurement)
|
|
.tag('sensor', sensorId)
|
|
.timestamp(new Date(timestamp));
|
|
|
|
for (const [key, value] of Object.entries(fields)) {
|
|
if (value == null) continue;
|
|
const fieldName = map[key] || key;
|
|
if (typeof value === 'number') {
|
|
point.floatField(fieldName, value);
|
|
} else if (typeof value === 'string') {
|
|
point.stringField(fieldName, value);
|
|
} else if (typeof value === 'boolean') {
|
|
point.booleanField(fieldName, value);
|
|
}
|
|
}
|
|
writeApi.writePoint(point);
|
|
}
|
|
await writeApi.flush();
|
|
console.log(`[INFLUX] Batch scritto: ${batch.length} punti per sensore ${sensorId}`);
|
|
} catch (error) {
|
|
console.error(`[INFLUX] Errore writeBatch:`, error.message);
|
|
}
|
|
}
|
|
|
|
async function flushBuffer(sensorId) {
|
|
const buffer = batchBuffers.get(sensorId);
|
|
if (!buffer || buffer.length === 0) return [];
|
|
|
|
const remaining = buffer.splice(0);
|
|
await writeBatch(sensorId, remaining);
|
|
return remaining;
|
|
}
|
|
|
|
function getBufferedPoints(sensorId) {
|
|
return batchBuffers.get(sensorId) || [];
|
|
}
|
|
|
|
function clearBuffer(sensorId) {
|
|
batchBuffers.delete(sensorId);
|
|
}
|
|
|
|
module.exports = { writePoint, writeForecastBatch, bufferPoint, flushBuffer, getBufferedPoints, clearBuffer, FIELD_MAP };
|