105 lines
3.1 KiB
JavaScript
105 lines
3.1 KiB
JavaScript
const fs = require("fs");
|
|
const path = require("path");
|
|
|
|
// Coda di scrittura per gestire backpressure
|
|
let writeQueue = [];
|
|
let isDraining = false;
|
|
|
|
/**
|
|
* Inizializza il dataset e lo prepara per essere salvato.
|
|
*
|
|
* @param {String[]} headers Un array di stringhe che rappresentano i tipi di dati.
|
|
* @param {WriteStream} streamer Lo stream di scrittura del file.
|
|
* @returns {boolean} True se l'inizializzazione ha successo
|
|
*/
|
|
function datasetInit(headers, streamer) {
|
|
if (!streamer || streamer.destroyed) {
|
|
console.error('[DatasetCore] Stream non valido per inizializzazione');
|
|
return false;
|
|
}
|
|
if (!Array.isArray(headers) || headers.length === 0) {
|
|
console.error('[DatasetCore] Headers non validi');
|
|
return false;
|
|
}
|
|
|
|
writeQueue = [];
|
|
isDraining = false;
|
|
|
|
streamer.write(headers.join(',') + '\n');
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Aggiunge una riga di dati al dataset con gestione backpressure.
|
|
*
|
|
* @param {Object} data I dati da scrivere
|
|
* @param {String[]} headers Gli header delle colonne
|
|
* @param {WriteStream} streamer Lo stream di scrittura
|
|
* @returns {boolean} True se la scrittura è andata a buon fine
|
|
*/
|
|
function appendData(data, headers, streamer) {
|
|
if (!streamer || streamer.destroyed) {
|
|
console.error('[DatasetCore] Stream non disponibile o distrutto');
|
|
return false;
|
|
}
|
|
if (!data || typeof data !== 'object') {
|
|
console.warn('[DatasetCore] Dati non validi, skip scrittura');
|
|
return false;
|
|
}
|
|
|
|
// Escape valori che contengono virgole o newline per CSV valido
|
|
const escapeCSV = (val) => {
|
|
if (val === undefined || val === null) return '';
|
|
const str = String(val);
|
|
if (str.includes(',') || str.includes('\n') || str.includes('"')) {
|
|
return `"${str.replace(/"/g, '""')}"`;
|
|
}
|
|
return str;
|
|
};
|
|
|
|
const row = headers.map(header => escapeCSV(data[header])).join(',');
|
|
|
|
// Gestione backpressure con coda
|
|
const canWrite = streamer.write(row + '\n');
|
|
|
|
if (!canWrite) {
|
|
if (!isDraining) {
|
|
isDraining = true;
|
|
console.warn('[DatasetCore] Buffer saturo, attendo drain...');
|
|
streamer.once('drain', () => {
|
|
isDraining = false;
|
|
// Processa coda pendente
|
|
while (writeQueue.length > 0 && !streamer.destroyed) {
|
|
const pendingRow = writeQueue.shift();
|
|
if (!streamer.write(pendingRow + '\n')) {
|
|
writeQueue.unshift(pendingRow);
|
|
break;
|
|
}
|
|
}
|
|
});
|
|
}
|
|
// Aggiungi alla coda solo se non troppo piena (max 1000 entries)
|
|
if (writeQueue.length < 1000) {
|
|
writeQueue.push(row);
|
|
} else {
|
|
console.error('[DatasetCore] Coda piena, scarto dati');
|
|
return false;
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Ottiene la dimensione della coda di scrittura pendente
|
|
* @returns {number} Numero di righe in attesa
|
|
*/
|
|
function getPendingWrites() {
|
|
return writeQueue.length;
|
|
}
|
|
|
|
module.exports = {
|
|
datasetInit,
|
|
appendData,
|
|
getPendingWrites
|
|
}; |