Streaming JSONL: Elabora Dati in Tempo Reale

Una guida completa allo streaming di dati JSONL in tempo reale. Impara a costruire pipeline di streaming con Node.js readline, generatori Python, Server-Sent Events, WebSocket e pattern di streaming log di produzione.

Ultimo aggiornamento: Febbraio 2026

Perché JSONL è Ideale per lo Streaming

JSONL (JSON Lines) memorizza un oggetto JSON completo per riga, separato da caratteri di nuova riga. Questa struttura delimitata da righe lo rende il formato perfetto per lo streaming perché ogni riga è un'unità autonoma e analizzabile. A differenza di un singolo grande array JSON, non è mai necessario caricare l'intero dataset in memoria prima di poter iniziare a elaborare i record. Un consumatore può leggere una riga, analizzarla, agire su di essa e passare alla successiva, mantenendo l'uso della memoria costante indipendentemente dalla dimensione della sorgente dati.

Questa proprietà è il motivo per cui JSONL domina nei sistemi di aggregazione log, nelle pipeline di analisi in tempo reale, nei feed di dati per machine learning e nelle risposte API in streaming. Strumenti come l'API di OpenAI, AWS CloudWatch e Apache Kafka utilizzano tutti JSON delimitato da nuova riga perché combina la leggibilità universale di JSON con l'efficienza di streaming dei protocolli orientati alle righe. In questa guida imparerai come costruire soluzioni di streaming JSONL in Node.js e Python, inviare dati in tempo reale ai browser con Server-Sent Events, scambiare JSONL tramite WebSocket e implementare un sistema di streaming log pronto per la produzione.

Streaming JSONL in Node.js

Node.js è costruito su un modello I/O event-driven e non bloccante che lo rende eccezionalmente adatto per carichi di lavoro in streaming. I moduli integrati readline e stream forniscono tutto il necessario per elaborare file JSONL e stream di rete riga per riga senza caricare l'intero payload in memoria.

Il modulo readline abbinato a fs.createReadStream è il modo standard per consumare un file JSONL in Node.js. Legge il file in blocchi ed emette righe complete, così la memoria rimane costante anche per file multi-gigabyte.

Lettura in Streaming con readline
import { createReadStream } from 'node:fs';
import { createInterface } from 'node:readline';
async function streamJsonl(filePath, onRecord) {
const rl = createInterface({
input: createReadStream(filePath, 'utf-8'),
crlfDelay: Infinity,
});
let count = 0;
for await (const line of rl) {
const trimmed = line.trim();
if (!trimmed) continue;
try {
const record = JSON.parse(trimmed);
await onRecord(record);
count++;
} catch (err) {
console.error(`Skipping invalid line: ${err.message}`);
}
}
return count;
}
// Usage: process each record as it arrives
const total = await streamJsonl('events.jsonl', async (record) => {
console.log(`Event: ${record.type} at ${record.timestamp}`);
});
console.log(`Processed ${total} records`);

I Transform stream di Node.js permettono di costruire pipeline di dati componibili che leggono JSONL, filtrano o arricchiscono ogni record e scrivono i risultati. Questo pattern gestisce automaticamente la backpressure, mettendo in pausa la sorgente quando la destinazione non riesce a tenere il passo.

Transform Stream per Pipeline
import { createReadStream, createWriteStream } from 'node:fs';
import { Transform } from 'node:stream';
import { pipeline } from 'node:stream/promises';
// Split incoming chunks into individual lines
class LineSplitter extends Transform {
constructor() {
super({ readableObjectMode: true });
this.buffer = '';
}
_transform(chunk, encoding, cb) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
this.buffer = lines.pop();
for (const line of lines) {
if (line.trim()) this.push(line.trim());
}
cb();
}
_flush(cb) {
if (this.buffer.trim()) this.push(this.buffer.trim());
cb();
}
}
// Parse, transform, and re-serialize each record
class JsonlTransform extends Transform {
constructor(transformFn) {
super({ objectMode: true });
this.transformFn = transformFn;
}
_transform(line, encoding, cb) {
try {
const record = JSON.parse(line);
const result = this.transformFn(record);
if (result) cb(null, JSON.stringify(result) + '\n');
else cb(); // Filter out null results
} catch (err) {
cb(); // Skip invalid lines
}
}
}
// Build the pipeline: read -> split -> transform -> write
await pipeline(
createReadStream('input.jsonl', 'utf-8'),
new LineSplitter(),
new JsonlTransform((record) => {
if (record.level === 'error') {
return { ...record, flagged: true, reviewedAt: new Date().toISOString() };
}
return null; // Filter out non-error records
}),
createWriteStream('errors.jsonl', 'utf-8')
);
console.log('Pipeline complete');

Streaming JSONL in Python

I generatori Python forniscono un modo elegante per fare streaming di dati JSONL. Poiché un generatore produce un record alla volta e avanza solo quando il consumatore richiede il valore successivo, l'uso della memoria rimane minimo. Combinato con il modulo json integrato, puoi costruire potenti pipeline di streaming con poche righe di codice.

Una funzione generatore Python legge il file riga per riga usando un ciclo for standard. Ogni chiamata a next() legge esattamente una riga, la analizza e produce il risultato. L'handle del file rimane aperto e la posizione avanza solo quando il consumatore richiede record.

Streaming Basato su Generatori
import json
from typing import Generator, Any
def stream_jsonl(file_path: str) -> Generator[dict[str, Any], None, None]:
"""Stream JSONL records one at a time using a generator."""
with open(file_path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
yield json.loads(line)
except json.JSONDecodeError as e:
print(f"Skipping line {line_num}: {e}")
# Usage: process records lazily
for record in stream_jsonl('events.jsonl'):
if record.get('level') == 'error':
print(f"ERROR: {record['message']}")
# Or collect a subset with itertools
import itertools
first_100 = list(itertools.islice(stream_jsonl('large.jsonl'), 100))
print(f"Loaded first {len(first_100)} records")

Python itertools permette di comporre operazioni di streaming come filtraggio, raggruppamento in batch e slicing senza materializzare risultati intermedi. Ogni passaggio nella catena elabora un record alla volta, così puoi gestire file più grandi della RAM disponibile.

Elaborazione Concatenata con itertools
import json
import itertools
from typing import Generator, Iterable
def stream_jsonl(path: str) -> Generator[dict, None, None]:
with open(path, 'r') as f:
for line in f:
if line.strip():
yield json.loads(line)
def filter_records(records: Iterable[dict], key: str, value) -> Generator[dict, None, None]:
"""Filter records where key equals value."""
for record in records:
if record.get(key) == value:
yield record
def batch(iterable: Iterable, size: int) -> Generator[list, None, None]:
"""Yield successive batches of the given size."""
it = iter(iterable)
while True:
chunk = list(itertools.islice(it, size))
if not chunk:
break
yield chunk
# Compose a streaming pipeline
records = stream_jsonl('access_logs.jsonl')
errors = filter_records(records, 'status', 500)
for i, error_batch in enumerate(batch(errors, 50)):
print(f"Batch {i + 1}: {len(error_batch)} errors")
# Send batch to alerting system, database, etc.
for record in error_batch:
print(f" {record['timestamp']} - {record['path']}")

Server-Sent Events con JSONL

I Server-Sent Events (SSE) forniscono un protocollo semplice basato su HTTP per inviare aggiornamenti in tempo reale dal server al client. Ogni evento SSE porta un campo data, e JSONL è il formato di payload naturale perché ogni riga è un oggetto JSON completo che il client può analizzare immediatamente. A differenza dei WebSocket, SSE funziona su HTTP standard, non richiede configurazione speciale del proxy e si riconnette automaticamente in caso di errore.

Il server imposta il Content-Type su text/event-stream e scrive ogni record JSONL come campo data SSE. La connessione rimane aperta e il server invia nuovi eventi man mano che diventano disponibili.

Server SSE (Node.js / Express)
import express from 'express';
const app = express();
// Simulate a real-time data source
function* generateMetrics() {
let id = 0;
while (true) {
yield {
id: ++id,
cpu: Math.random() * 100,
memory: Math.random() * 16384,
timestamp: new Date().toISOString(),
};
}
}
app.get('/api/stream/metrics', (req, res) => {
// SSE headers
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('Access-Control-Allow-Origin', '*');
res.flushHeaders();
const metrics = generateMetrics();
const interval = setInterval(() => {
const record = metrics.next().value;
// Each SSE event contains one JSONL record
res.write(`data: ${JSON.stringify(record)}\n\n`);
}, 1000);
req.on('close', () => {
clearInterval(interval);
res.end();
});
});
app.listen(3000, () => console.log('SSE server on :3000'));

Il browser usa l'API EventSource per connettersi all'endpoint SSE. Ogni evento in arrivo contiene un record JSONL che viene analizzato e renderizzato in tempo reale. EventSource gestisce la riconnessione automatica se la connessione cade.

Client SSE (Browser)
// Connect to the SSE endpoint
const source = new EventSource('/api/stream/metrics');
// Parse each JSONL record as it arrives
source.onmessage = (event) => {
const record = JSON.parse(event.data);
console.log(`[${record.timestamp}] CPU: ${record.cpu.toFixed(1)}%`);
// Update your UI in real time
updateDashboard(record);
};
source.onerror = (err) => {
console.error('SSE connection error:', err);
// EventSource automatically reconnects
};
// For Fetch-based SSE (more control over the stream)
async function fetchSSE(url, onRecord) {
const response = await fetch(url);
const reader = response.body
.pipeThrough(new TextDecoderStream())
.getReader();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += value;
const events = buffer.split('\n\n');
buffer = events.pop();
for (const event of events) {
const dataLine = event.split('\n').find(l => l.startsWith('data: '));
if (dataLine) {
const record = JSON.parse(dataLine.slice(6));
onRecord(record);
}
}
}
}
await fetchSSE('/api/stream/metrics', (record) => {
console.log('Metric:', record);
});

WebSocket + Streaming JSONL

Mentre SSE gestisce il push server-to-client, i WebSocket abilitano la comunicazione full-duplex dove entrambe le parti possono inviare record JSONL in qualsiasi momento. Questo è ideale per applicazioni interattive come l'editing collaborativo, dashboard in tempo reale con comandi utente e sincronizzazione bidirezionale dei dati. Ogni messaggio WebSocket contiene un singolo record JSONL, rendendo il parsing semplice su entrambi i lati.

Server WebSocket con Messaggi JSONL
import { WebSocketServer } from 'ws';
const wss = new WebSocketServer({ port: 8080 });
wss.on('connection', (ws) => {
console.log('Client connected');
// Send JSONL records to the client
const interval = setInterval(() => {
const record = {
type: 'metric',
value: Math.random() * 100,
timestamp: new Date().toISOString(),
};
ws.send(JSON.stringify(record));
}, 500);
// Receive JSONL records from the client
ws.on('message', (data) => {
try {
const record = JSON.parse(data.toString());
console.log(`Received: ${record.type}`, record);
// Echo back an acknowledgment
ws.send(JSON.stringify({
type: 'ack',
originalId: record.id,
processedAt: new Date().toISOString(),
}));
} catch (err) {
ws.send(JSON.stringify({ type: 'error', message: 'Invalid JSON' }));
}
});
ws.on('close', () => {
clearInterval(interval);
console.log('Client disconnected');
});
});
// Client-side usage:
// const ws = new WebSocket('ws://localhost:8080');
// ws.onmessage = (e) => {
// const record = JSON.parse(e.data);
// console.log(record);
// };
// ws.send(JSON.stringify({ type: 'subscribe', channel: 'metrics' }));

Ogni messaggio WebSocket è un singolo oggetto JSON, seguendo la convenzione JSONL di un record per messaggio. Il server trasmette metriche al client a intervalli di 500ms, accettando contemporaneamente comandi dal client. Entrambe le direzioni usano la stessa serializzazione JSON, quindi il protocollo è simmetrico e facile da debuggare.

Streaming di Log nella Pratica

Uno degli usi più comuni dello streaming JSONL nel mondo reale è la raccolta centralizzata dei log. Le applicazioni scrivono voci di log strutturate come righe JSONL su stdout o un file, e un agente collector segue l'output in tempo reale, inoltrando i record a un sistema centralizzato. Questo pattern è utilizzato da Docker, Kubernetes e dalla maggior parte dei servizi di logging cloud.

Collector di Log JSONL in Tempo Reale
import { createReadStream, watchFile } from 'node:fs';
import { createInterface } from 'node:readline';
import { stat } from 'node:fs/promises';
class JsonlLogCollector {
constructor(filePath, handlers = {}) {
this.filePath = filePath;
this.handlers = handlers;
this.position = 0;
this.running = false;
}
async start() {
this.running = true;
// Read existing content first
await this.readFrom(0);
// Watch for new appends
this.watch();
}
async readFrom(startPos) {
const rl = createInterface({
input: createReadStream(this.filePath, {
encoding: 'utf-8',
start: startPos,
}),
crlfDelay: Infinity,
});
for await (const line of rl) {
if (!line.trim()) continue;
try {
const record = JSON.parse(line);
await this.dispatch(record);
} catch (err) {
this.handlers.onError?.(err, line);
}
}
const info = await stat(this.filePath);
this.position = info.size;
}
watch() {
watchFile(this.filePath, { interval: 500 }, async (curr, prev) => {
if (!this.running) return;
if (curr.size > this.position) {
await this.readFrom(this.position);
}
});
}
async dispatch(record) {
const level = record.level || 'info';
this.handlers[level]?.(record);
this.handlers.onRecord?.(record);
}
stop() {
this.running = false;
}
}
// Usage
const collector = new JsonlLogCollector('app.log.jsonl', {
error: (r) => console.error(`[ERROR] ${r.message}`),
warn: (r) => console.warn(`[WARN] ${r.message}`),
onRecord: (r) => forwardToElasticsearch(r),
onError: (err, line) => console.error('Bad line:', line),
});
await collector.start();

Questo collector segue un file di log JSONL in tempo reale, inviando i record agli handler in base al livello di log. Tiene traccia della posizione nel file così da leggere solo il nuovo contenuto dopo il passaggio iniziale di recupero. In produzione, aggiungeresti il rilevamento della rotazione dei file, lo shutdown ordinato e l'inoltro in batch per ridurre le chiamate di rete al sistema di logging centralizzato.

Prova i Nostri Strumenti JSONL Gratuiti

Vuoi ispezionare o convertire dati JSONL prima di costruire la tua pipeline di streaming? Usa i nostri strumenti online gratuiti per visualizzare, convertire e formattare file JSONL direttamente nel browser. Tutta l'elaborazione avviene localmente, quindi i tuoi dati restano privati.

Lavora con File JSONL Online

Visualizza, valida e converti file JSONL fino a 1GB direttamente nel tuo browser. Nessun upload richiesto, 100% privato.

Domande Frequenti

Streaming JSONL — Elabora JSON Lines in Tempo Reale (Node...