const fs = require("fs"); const path = require("path"); // Coda di scrittura per gestire backpressure let writeQueue = []; let isDraining = false; /** * Inizializza il dataset e lo prepara per essere salvato. * * @param {String[]} headers Un array di stringhe che rappresentano i tipi di dati. * @param {WriteStream} streamer Lo stream di scrittura del file. * @returns {boolean} True se l'inizializzazione ha successo */ function datasetInit(headers, streamer) { if (!streamer || streamer.destroyed) { console.error('[DatasetCore] Stream non valido per inizializzazione'); return false; } if (!Array.isArray(headers) || headers.length === 0) { console.error('[DatasetCore] Headers non validi'); return false; } writeQueue = []; isDraining = false; streamer.write(headers.join(',') + '\n'); return true; } /** * Aggiunge una riga di dati al dataset con gestione backpressure. * * @param {Object} data I dati da scrivere * @param {String[]} headers Gli header delle colonne * @param {WriteStream} streamer Lo stream di scrittura * @returns {boolean} True se la scrittura รจ andata a buon fine */ function appendData(data, headers, streamer) { if (!streamer || streamer.destroyed) { console.error('[DatasetCore] Stream non disponibile o distrutto'); return false; } if (!data || typeof data !== 'object') { console.warn('[DatasetCore] Dati non validi, skip scrittura'); return false; } // Escape valori che contengono virgole o newline per CSV valido const escapeCSV = (val) => { if (val === undefined || val === null) return ''; const str = String(val); if (str.includes(',') || str.includes('\n') || str.includes('"')) { return `"${str.replace(/"/g, '""')}"`; } return str; }; const row = headers.map(header => escapeCSV(data[header])).join(','); // Gestione backpressure con coda const canWrite = streamer.write(row + '\n'); if (!canWrite) { if (!isDraining) { isDraining = true; console.warn('[DatasetCore] Buffer saturo, attendo drain...'); streamer.once('drain', () => { isDraining = false; // Processa coda pendente while (writeQueue.length > 0 && !streamer.destroyed) { const pendingRow = writeQueue.shift(); if (!streamer.write(pendingRow + '\n')) { writeQueue.unshift(pendingRow); break; } } }); } // Aggiungi alla coda solo se non troppo piena (max 1000 entries) if (writeQueue.length < 1000) { writeQueue.push(row); } else { console.error('[DatasetCore] Coda piena, scarto dati'); return false; } } return true; } /** * Ottiene la dimensione della coda di scrittura pendente * @returns {number} Numero di righe in attesa */ function getPendingWrites() { return writeQueue.length; } module.exports = { datasetInit, appendData, getPendingWrites };