Migra dal codice salvato in locale al codice condiviso
This commit is contained in:
105
plugin/datasetModels/datasetCore.js
Normal file
105
plugin/datasetModels/datasetCore.js
Normal file
@@ -0,0 +1,105 @@
|
||||
const fs = require("fs");
|
||||
const path = require("path");
|
||||
|
||||
// Coda di scrittura per gestire backpressure
|
||||
let writeQueue = [];
|
||||
let isDraining = false;
|
||||
|
||||
/**
|
||||
* Inizializza il dataset e lo prepara per essere salvato.
|
||||
*
|
||||
* @param {String[]} headers Un array di stringhe che rappresentano i tipi di dati.
|
||||
* @param {WriteStream} streamer Lo stream di scrittura del file.
|
||||
* @returns {boolean} True se l'inizializzazione ha successo
|
||||
*/
|
||||
function datasetInit(headers, streamer) {
|
||||
if (!streamer || streamer.destroyed) {
|
||||
console.error('[DatasetCore] Stream non valido per inizializzazione');
|
||||
return false;
|
||||
}
|
||||
if (!Array.isArray(headers) || headers.length === 0) {
|
||||
console.error('[DatasetCore] Headers non validi');
|
||||
return false;
|
||||
}
|
||||
|
||||
writeQueue = [];
|
||||
isDraining = false;
|
||||
|
||||
streamer.write(headers.join(',') + '\n');
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Aggiunge una riga di dati al dataset con gestione backpressure.
|
||||
*
|
||||
* @param {Object} data I dati da scrivere
|
||||
* @param {String[]} headers Gli header delle colonne
|
||||
* @param {WriteStream} streamer Lo stream di scrittura
|
||||
* @returns {boolean} True se la scrittura è andata a buon fine
|
||||
*/
|
||||
function appendData(data, headers, streamer) {
|
||||
if (!streamer || streamer.destroyed) {
|
||||
console.error('[DatasetCore] Stream non disponibile o distrutto');
|
||||
return false;
|
||||
}
|
||||
if (!data || typeof data !== 'object') {
|
||||
console.warn('[DatasetCore] Dati non validi, skip scrittura');
|
||||
return false;
|
||||
}
|
||||
|
||||
// Escape valori che contengono virgole o newline per CSV valido
|
||||
const escapeCSV = (val) => {
|
||||
if (val === undefined || val === null) return '';
|
||||
const str = String(val);
|
||||
if (str.includes(',') || str.includes('\n') || str.includes('"')) {
|
||||
return `"${str.replace(/"/g, '""')}"`;
|
||||
}
|
||||
return str;
|
||||
};
|
||||
|
||||
const row = headers.map(header => escapeCSV(data[header])).join(',');
|
||||
|
||||
// Gestione backpressure con coda
|
||||
const canWrite = streamer.write(row + '\n');
|
||||
|
||||
if (!canWrite) {
|
||||
if (!isDraining) {
|
||||
isDraining = true;
|
||||
console.warn('[DatasetCore] Buffer saturo, attendo drain...');
|
||||
streamer.once('drain', () => {
|
||||
isDraining = false;
|
||||
// Processa coda pendente
|
||||
while (writeQueue.length > 0 && !streamer.destroyed) {
|
||||
const pendingRow = writeQueue.shift();
|
||||
if (!streamer.write(pendingRow + '\n')) {
|
||||
writeQueue.unshift(pendingRow);
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
// Aggiungi alla coda solo se non troppo piena (max 1000 entries)
|
||||
if (writeQueue.length < 1000) {
|
||||
writeQueue.push(row);
|
||||
} else {
|
||||
console.error('[DatasetCore] Coda piena, scarto dati');
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Ottiene la dimensione della coda di scrittura pendente
|
||||
* @returns {number} Numero di righe in attesa
|
||||
*/
|
||||
function getPendingWrites() {
|
||||
return writeQueue.length;
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
datasetInit,
|
||||
appendData,
|
||||
getPendingWrites
|
||||
};
|
||||
Reference in New Issue
Block a user