Fixed some bugs in api and auth services, completed auth cores.
This commit is contained in:
8
stream/Dockerfile
Normal file
8
stream/Dockerfile
Normal file
@@ -0,0 +1,8 @@
|
||||
FROM node:20-alpine
|
||||
WORKDIR /app
|
||||
RUN corepack enable && corepack prepare pnpm@9.15.0 --activate
|
||||
COPY package.json ./
|
||||
RUN pnpm install
|
||||
COPY . .
|
||||
EXPOSE 3000
|
||||
CMD ["pnpm", "exec", "nodemon", "src/index.js"]
|
||||
24
stream/package.json
Normal file
24
stream/package.json
Normal file
@@ -0,0 +1,24 @@
|
||||
{
|
||||
"name": "stream",
|
||||
"version": "1.0.0",
|
||||
"description": "MEB stream service — sensor WebSocket ingest to InfluxDB",
|
||||
"main": "src/index.js",
|
||||
"scripts": {
|
||||
"start": "node src/index.js",
|
||||
"dev": "nodemon src/index.js"
|
||||
},
|
||||
"type": "module",
|
||||
"license": "ISC",
|
||||
"packageManager": "pnpm@9.15.0",
|
||||
"dependencies": {
|
||||
"@influxdata/influxdb-client": "^1.35.0",
|
||||
"@msgpack/msgpack": "^3.1.2",
|
||||
"express": "^5.2.1",
|
||||
"ioredis": "^5.10.1",
|
||||
"pg": "^8.21.0",
|
||||
"ws": "^8.18.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"nodemon": "^3.1.14"
|
||||
}
|
||||
}
|
||||
28
stream/src/core/securitycore.js
Normal file
28
stream/src/core/securitycore.js
Normal file
@@ -0,0 +1,28 @@
|
||||
import crypto from 'crypto';
|
||||
|
||||
const SECRET = process.env.SENSOR_SECURITY_SECRET;
|
||||
|
||||
/**
|
||||
* Calcola l'HMAC-SHA256 del codice sensore con il secret token server-side.
|
||||
* - return {String} l'hash in formato hex
|
||||
*/
|
||||
export function getHmac(code) {
|
||||
return crypto.createHmac('sha256', SECRET || '').update(code).digest('hex');
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifica timing-safe del codice a partire dal suo hash salvato..
|
||||
* - return {Boolean} true se il codice è valido, false altrimenti
|
||||
*/
|
||||
export function verify(code, hash) {
|
||||
if (!code || !hash || !SECRET) return false;
|
||||
try {
|
||||
const computed = getHmac(code);
|
||||
const a = Buffer.from(computed, 'hex');
|
||||
const b = Buffer.from(hash, 'hex');
|
||||
if (a.length !== b.length) return false;
|
||||
return crypto.timingSafeEqual(a, b);
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
26
stream/src/core/sessioncore.js
Normal file
26
stream/src/core/sessioncore.js
Normal file
@@ -0,0 +1,26 @@
|
||||
import { queryData as data } from '../data/db.js'
|
||||
|
||||
const maxTries = 10;
|
||||
|
||||
/*
|
||||
Generates a random, unique session ID like `s00123`.
|
||||
*/
|
||||
function makeID() {
|
||||
const n = Math.floor(Math.random() * 100_000).toString().padStart(5, '0');
|
||||
return `s${n}`;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Creates a new session by generating a unique ID and checking for conflicts in the database.
|
||||
*/
|
||||
export async function newSession() {
|
||||
for (let i = 0; i < maxTries; i++) {
|
||||
const id = makeID();
|
||||
const { rows } = await data(`select 1 from telemetrysessions where session_id = $1 and ended_at is null`, [id]);
|
||||
if (rows.length === 0) {
|
||||
return id;
|
||||
}
|
||||
}
|
||||
throw new Error('Failed to create session');
|
||||
}
|
||||
32
stream/src/data/db.js
Normal file
32
stream/src/data/db.js
Normal file
@@ -0,0 +1,32 @@
|
||||
import pg from 'pg';
|
||||
|
||||
// Pool per il database "sensors": lookup sensori + verifica code_hash
|
||||
export const sensorsDb = new pg.Pool({
|
||||
user: process.env.DB_USER,
|
||||
password: process.env.DB_PASSWORD,
|
||||
host: process.env.DB_HOST,
|
||||
port: process.env.DB_PORT,
|
||||
database: 'sensors',
|
||||
max: 5,
|
||||
idleTimeoutMillis: 30_000,
|
||||
});
|
||||
|
||||
// Pool per il database "data": gestione tabella telemetrysessions
|
||||
export const dataDb = new pg.Pool({
|
||||
user: process.env.DB_USER,
|
||||
password: process.env.DB_PASSWORD,
|
||||
host: process.env.DB_HOST,
|
||||
port: process.env.DB_PORT,
|
||||
database: 'data',
|
||||
max: 10,
|
||||
idleTimeoutMillis: 30_000,
|
||||
});
|
||||
|
||||
|
||||
export function querySensors(text, params) {
|
||||
return sensorsDb.query(text, params);
|
||||
}
|
||||
|
||||
export function queryData(text, params) {
|
||||
return dataDb.query(text, params);
|
||||
}
|
||||
32
stream/src/data/influx.js
Normal file
32
stream/src/data/influx.js
Normal file
@@ -0,0 +1,32 @@
|
||||
import { InfluxDB, Point } from '@influxdata/influxdb-client';
|
||||
|
||||
const url = process.env.INFLUX_URL;
|
||||
const token = process.env.INFLUX_TOKEN;
|
||||
const org = process.env.INFLUX_ORG;
|
||||
const bucket = process.env.INFLUX_BUCKET;
|
||||
|
||||
if (!url || !token || !org || !bucket) {
|
||||
console.error('[influx] configurazione mancante — verifica INFLUX_URL/INFLUX_TOKEN/INFLUX_ORG/INFLUX_BUCKET');
|
||||
}
|
||||
|
||||
const influxDB = new InfluxDB({ url, token });
|
||||
|
||||
// Configurazione write API: sincrono, no batching, no retry interno (lasciamo che il
|
||||
// fallimento si propaghi così possiamo chiudere il WS e far ripartire il plugin)
|
||||
const writeApi = influxDB.getWriteApi(org, bucket, 'ns', {
|
||||
batchSize: 1,
|
||||
flushInterval: 0,
|
||||
maxRetries: 0,
|
||||
maxBufferLines: 1, // niente accumulo locale
|
||||
});
|
||||
|
||||
/**
|
||||
* Scrive un Point su InfluxDB in modo sincrono.
|
||||
* Lancia su errore — il chiamante deve gestire (chiusura WS).
|
||||
*/
|
||||
export async function writePoint(point) {
|
||||
writeApi.writePoint(point);
|
||||
await writeApi.flush(true); // true = throw on error
|
||||
}
|
||||
|
||||
export { Point };
|
||||
18
stream/src/data/redis.js
Normal file
18
stream/src/data/redis.js
Normal file
@@ -0,0 +1,18 @@
|
||||
import Redis from 'ioredis';
|
||||
|
||||
const baseOpts = {
|
||||
host: process.env.REDIS_HOST,
|
||||
port: Number(process.env.REDIS_PORT),
|
||||
password: process.env.REDIS_PASSWORD,
|
||||
};
|
||||
|
||||
// Client principale: SET/GET/SETEX/GETDEL/INCR/PUBLISH
|
||||
const client = new Redis(baseOpts);
|
||||
|
||||
// Client dedicato per SUBSCRIBE (ioredis non permette comandi normali su un client subscribed)
|
||||
const sub = new Redis(baseOpts);
|
||||
|
||||
client.on('error', (e) => console.error('[redis] client error', e.message));
|
||||
sub.on('error', (e) => console.error('[redis] sub error', e.message));
|
||||
|
||||
export { client as redis, sub as redisSub };
|
||||
44
stream/src/routes/connect.js
Normal file
44
stream/src/routes/connect.js
Normal file
@@ -0,0 +1,44 @@
|
||||
import { Router } from 'express';
|
||||
import crypto from 'crypto';
|
||||
import { querySensors as sensors } from '../data/db.js';
|
||||
import { redis } from '../data/redis.js';
|
||||
import { verify } from '../core/securitycore.js';
|
||||
|
||||
const router = Router();
|
||||
const rateLimiter = 10;
|
||||
const rateLimitWindow = 60;
|
||||
|
||||
router.post('/connect', async (req, res) => {
|
||||
const { sensorID, code } = req.body;
|
||||
|
||||
const ip = (req.headers['x-forwarded-for']?.split(',')[0]?.trim()) || req.socket.remoteAddress || 'unknown';
|
||||
const tryKey = `streamconnect:fail:${ip}`;
|
||||
const fails = Number(await redis.get(tryKey).catch(() => 0));
|
||||
if (fails >= rateLimiter) {
|
||||
return res.status(429).json({ error: 'Too many failed attempts' });
|
||||
}
|
||||
|
||||
if (!sensorID || !code) {
|
||||
await redis.multi().incr(tryKey).expire(tryKey, rateLimitWindow).exec().catch(() => { });
|
||||
return res.status(400).json({ error: 'sensor and code are required' });
|
||||
}
|
||||
|
||||
const { rows } = await sensors('select id, name, code_hash from sensors where id = $1', [sensorID]);
|
||||
if (rows.length === 0) {
|
||||
return res.status(404).json({ error: 'sensor not found' });
|
||||
}
|
||||
if (!rows[0] || !verify(code, rows[0].code_hash)) {
|
||||
await redis.multi().incr(tryKey).expire(tryKey, rateLimitWindow).exec().catch(() => { });
|
||||
return res.status(401).json({ error: 'invalid code' });
|
||||
}
|
||||
|
||||
const token = crypto.randomUUID();
|
||||
await redis.set(`sensor:pending:${token}`, rows[0].id, 'EX', 5);
|
||||
res.json({
|
||||
token,
|
||||
expiresIn: 5
|
||||
})
|
||||
|
||||
})
|
||||
|
||||
export { router as connectsAPI }
|
||||
3
stream/src/ws/connection.js
Normal file
3
stream/src/ws/connection.js
Normal file
@@ -0,0 +1,3 @@
|
||||
import { encode, decode } from '@msgpack/msgpack';
|
||||
import { queryData as datas } from '../data/db.js';
|
||||
import { write, point } from '';
|
||||
44
stream/src/ws/upgrade.js
Normal file
44
stream/src/ws/upgrade.js
Normal file
@@ -0,0 +1,44 @@
|
||||
import { URL } from 'url';
|
||||
import { redis } from '../data/redis';
|
||||
import { querySensors as sensors } from '../data/db';
|
||||
|
||||
export function buildUpgradeHandler(wss) {
|
||||
return async function upgradeHandler(req, socket, head) {
|
||||
try {
|
||||
const url = new URL(req.url, 'http://localhost');
|
||||
const token = url.searchParams.get('token');
|
||||
|
||||
if (!token) {
|
||||
socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n');
|
||||
return socket.destroy();
|
||||
}
|
||||
|
||||
const pendingSensor = await redis.getdel(`sensors:pending:${token}`);
|
||||
if (!pendingSensor) {
|
||||
socket.write('HTTP/1.1 404 Not Found\r\n\r\n');
|
||||
return socket.destroy();
|
||||
}
|
||||
|
||||
const { rows } = await sensors('select id, name from sensors where id = $1', [pendingSensor]);
|
||||
const sensor = rows[0];
|
||||
if (!sensor) {
|
||||
socket.write('HTTP/1.1 404 Not Found\r\n\r\n');
|
||||
return socket.destroy();
|
||||
}
|
||||
|
||||
wss.handleUpgrade(req, socket, head, (ws) => {
|
||||
ws._sensor = sensor;
|
||||
wss.emit('connection', ws, req);
|
||||
});
|
||||
|
||||
} catch (error) {
|
||||
console.error('error in upgrading conenction with sensor to ws with error: ', error);
|
||||
|
||||
try {
|
||||
socket.destroy();
|
||||
} catch (destroyError) {
|
||||
console.error('error destroying socket: ', destroyError);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
Reference in New Issue
Block a user