Streaming JSONL : traiter les données en temps réel
Un guide complet sur le streaming de données JSONL en temps réel. Apprenez à construire des pipelines de streaming avec Node.js readline, les générateurs Python, les Server-Sent Events, les WebSockets et les patterns de streaming de logs en production.
Dernière mise à jour : février 2026
Pourquoi JSONL est idéal pour le streaming
JSONL (JSON Lines) stocke un objet JSON complet par ligne, séparé par des caractères de retour à la ligne. Cette structure délimitée par ligne en fait le format parfait pour le streaming, car chaque ligne est une unité autonome et analysable. Contrairement à un grand tableau JSON unique, vous n’avez jamais besoin de charger l’ensemble du jeu de données en mémoire avant de commencer à traiter les enregistrements. Un consommateur peut lire une ligne, l’analyser, agir dessus et passer à la suivante, maintenant une utilisation mémoire constante quelle que soit la taille de la source de données.
Cette propriété explique pourquoi JSONL domine dans les systèmes d’agrégation de logs, les pipelines d’analytique en temps réel, les flux de données de machine learning et les réponses d’API en streaming. Des outils comme l’API d’OpenAI, AWS CloudWatch et Apache Kafka utilisent tous du JSON délimité par lignes parce qu’il combine la lisibilité universelle du JSON avec l’efficacité de streaming des protocoles orientés lignes. Dans ce guide, vous apprendrez à construire des solutions de streaming JSONL en Node.js et Python, à envoyer des données en temps réel aux navigateurs avec les Server-Sent Events, à échanger du JSONL via WebSockets et à implémenter un système de streaming de logs prêt pour la production.
Streaming JSONL en Node.js
Node.js est construit sur un modèle d’E/S événementiel et non bloquant qui le rend particulièrement adapté aux charges de travail en streaming. Les modules intégrés readline et stream vous fournissent tout le nécessaire pour traiter des fichiers JSONL et des flux réseau ligne par ligne sans charger l’intégralité du contenu en mémoire.
Le module readline associé à fs.createReadStream est la méthode standard pour consommer un fichier JSONL en Node.js. Il lit le fichier par morceaux et émet des lignes complètes, de sorte que la mémoire reste stable même pour des fichiers de plusieurs gigaoctets.
import { createReadStream } from 'node:fs';import { createInterface } from 'node:readline';async function streamJsonl(filePath, onRecord) {const rl = createInterface({input: createReadStream(filePath, 'utf-8'),crlfDelay: Infinity,});let count = 0;for await (const line of rl) {const trimmed = line.trim();if (!trimmed) continue;try {const record = JSON.parse(trimmed);await onRecord(record);count++;} catch (err) {console.error(`Skipping invalid line: ${err.message}`);}}return count;}// Usage: process each record as it arrivesconst total = await streamJsonl('events.jsonl', async (record) => {console.log(`Event: ${record.type} at ${record.timestamp}`);});console.log(`Processed ${total} records`);
Les Transform streams de Node.js vous permettent de construire des pipelines de données composables qui lisent du JSONL, filtrent ou enrichissent chaque enregistrement et écrivent les résultats. Ce pattern gère automatiquement la contre-pression (backpressure), en mettant en pause la source lorsque la destination ne peut pas suivre.
import { createReadStream, createWriteStream } from 'node:fs';import { Transform } from 'node:stream';import { pipeline } from 'node:stream/promises';// Split incoming chunks into individual linesclass LineSplitter extends Transform {constructor() {super({ readableObjectMode: true });this.buffer = '';}_transform(chunk, encoding, cb) {this.buffer += chunk.toString();const lines = this.buffer.split('\n');this.buffer = lines.pop();for (const line of lines) {if (line.trim()) this.push(line.trim());}cb();}_flush(cb) {if (this.buffer.trim()) this.push(this.buffer.trim());cb();}}// Parse, transform, and re-serialize each recordclass JsonlTransform extends Transform {constructor(transformFn) {super({ objectMode: true });this.transformFn = transformFn;}_transform(line, encoding, cb) {try {const record = JSON.parse(line);const result = this.transformFn(record);if (result) cb(null, JSON.stringify(result) + '\n');else cb(); // Filter out null results} catch (err) {cb(); // Skip invalid lines}}}// Build the pipeline: read -> split -> transform -> writeawait pipeline(createReadStream('input.jsonl', 'utf-8'),new LineSplitter(),new JsonlTransform((record) => {if (record.level === 'error') {return { ...record, flagged: true, reviewedAt: new Date().toISOString() };}return null; // Filter out non-error records}),createWriteStream('errors.jsonl', 'utf-8'));console.log('Pipeline complete');
Streaming JSONL en Python
Les générateurs Python offrent un moyen élégant de streamer des données JSONL. Comme un générateur produit un enregistrement à la fois et n’avance que lorsque le consommateur demande la valeur suivante, l’utilisation mémoire reste minimale. Combiné avec le module intégré json, vous pouvez construire de puissants pipelines de streaming en quelques lignes de code.
Une fonction générateur Python lit le fichier ligne par ligne avec une boucle for standard. Chaque appel à next() lit exactement une ligne, l’analyse et produit le résultat. Le descripteur de fichier reste ouvert et la position n’avance que lorsque le consommateur demande des enregistrements.
import jsonfrom typing import Generator, Anydef stream_jsonl(file_path: str) -> Generator[dict[str, Any], None, None]:"""Stream JSONL records one at a time using a generator."""with open(file_path, 'r', encoding='utf-8') as f:for line_num, line in enumerate(f, 1):line = line.strip()if not line:continuetry:yield json.loads(line)except json.JSONDecodeError as e:print(f"Skipping line {line_num}: {e}")# Usage: process records lazilyfor record in stream_jsonl('events.jsonl'):if record.get('level') == 'error':print(f"ERROR: {record['message']}")# Or collect a subset with itertoolsimport itertoolsfirst_100 = list(itertools.islice(stream_jsonl('large.jsonl'), 100))print(f"Loaded first {len(first_100)} records")
Le module itertools de Python vous permet de composer des opérations de streaming comme le filtrage, le regroupement par lots et le découpage sans matérialiser les résultats intermédiaires. Chaque étape de la chaîne traite un enregistrement à la fois, ce qui vous permet de gérer des fichiers plus volumineux que la RAM disponible.
import jsonimport itertoolsfrom typing import Generator, Iterabledef stream_jsonl(path: str) -> Generator[dict, None, None]:with open(path, 'r') as f:for line in f:if line.strip():yield json.loads(line)def filter_records(records: Iterable[dict], key: str, value) -> Generator[dict, None, None]:"""Filter records where key equals value."""for record in records:if record.get(key) == value:yield recorddef batch(iterable: Iterable, size: int) -> Generator[list, None, None]:"""Yield successive batches of the given size."""it = iter(iterable)while True:chunk = list(itertools.islice(it, size))if not chunk:breakyield chunk# Compose a streaming pipelinerecords = stream_jsonl('access_logs.jsonl')errors = filter_records(records, 'status', 500)for i, error_batch in enumerate(batch(errors, 50)):print(f"Batch {i + 1}: {len(error_batch)} errors")# Send batch to alerting system, database, etc.for record in error_batch:print(f" {record['timestamp']} - {record['path']}")
Server-Sent Events avec JSONL
Les Server-Sent Events (SSE) fournissent un protocole simple basé sur HTTP pour envoyer des mises à jour en temps réel du serveur vers le client. Chaque événement SSE transporte un champ data, et JSONL est le format de charge utile naturel car chaque ligne est un objet JSON complet que le client peut analyser immédiatement. Contrairement aux WebSockets, SSE fonctionne sur HTTP standard, ne nécessite aucune configuration spéciale de proxy et se reconnecte automatiquement en cas de défaillance.
Le serveur définit le Content-Type à text/event-stream et écrit chaque enregistrement JSONL en tant que champ data SSE. La connexion reste ouverte et le serveur envoie de nouveaux événements au fur et à mesure qu’ils deviennent disponibles.
import express from 'express';const app = express();// Simulate a real-time data sourcefunction* generateMetrics() {let id = 0;while (true) {yield {id: ++id,cpu: Math.random() * 100,memory: Math.random() * 16384,timestamp: new Date().toISOString(),};}}app.get('/api/stream/metrics', (req, res) => {// SSE headersres.setHeader('Content-Type', 'text/event-stream');res.setHeader('Cache-Control', 'no-cache');res.setHeader('Connection', 'keep-alive');res.setHeader('Access-Control-Allow-Origin', '*');res.flushHeaders();const metrics = generateMetrics();const interval = setInterval(() => {const record = metrics.next().value;// Each SSE event contains one JSONL recordres.write(`data: ${JSON.stringify(record)}\n\n`);}, 1000);req.on('close', () => {clearInterval(interval);res.end();});});app.listen(3000, () => console.log('SSE server on :3000'));
Le navigateur utilise l’API EventSource pour se connecter au point de terminaison SSE. Chaque événement entrant contient un enregistrement JSONL qui est analysé et affiché en temps réel. EventSource gère la reconnexion automatique en cas de perte de connexion.
// Connect to the SSE endpointconst source = new EventSource('/api/stream/metrics');// Parse each JSONL record as it arrivessource.onmessage = (event) => {const record = JSON.parse(event.data);console.log(`[${record.timestamp}] CPU: ${record.cpu.toFixed(1)}%`);// Update your UI in real timeupdateDashboard(record);};source.onerror = (err) => {console.error('SSE connection error:', err);// EventSource automatically reconnects};// For Fetch-based SSE (more control over the stream)async function fetchSSE(url, onRecord) {const response = await fetch(url);const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();let buffer = '';while (true) {const { done, value } = await reader.read();if (done) break;buffer += value;const events = buffer.split('\n\n');buffer = events.pop();for (const event of events) {const dataLine = event.split('\n').find(l => l.startsWith('data: '));if (dataLine) {const record = JSON.parse(dataLine.slice(6));onRecord(record);}}}}await fetchSSE('/api/stream/metrics', (record) => {console.log('Metric:', record);});
WebSocket + streaming JSONL
Alors que SSE gère l’envoi du serveur vers le client, les WebSockets permettent une communication full-duplex où les deux parties peuvent envoyer des enregistrements JSONL à tout moment. C’est idéal pour les applications interactives comme l’édition collaborative, les tableaux de bord en temps réel avec des commandes utilisateur et la synchronisation bidirectionnelle de données. Chaque message WebSocket contient un seul enregistrement JSONL, ce qui rend l’analyse simple des deux côtés.
import { WebSocketServer } from 'ws';const wss = new WebSocketServer({ port: 8080 });wss.on('connection', (ws) => {console.log('Client connected');// Send JSONL records to the clientconst interval = setInterval(() => {const record = {type: 'metric',value: Math.random() * 100,timestamp: new Date().toISOString(),};ws.send(JSON.stringify(record));}, 500);// Receive JSONL records from the clientws.on('message', (data) => {try {const record = JSON.parse(data.toString());console.log(`Received: ${record.type}`, record);// Echo back an acknowledgmentws.send(JSON.stringify({type: 'ack',originalId: record.id,processedAt: new Date().toISOString(),}));} catch (err) {ws.send(JSON.stringify({ type: 'error', message: 'Invalid JSON' }));}});ws.on('close', () => {clearInterval(interval);console.log('Client disconnected');});});// Client-side usage:// const ws = new WebSocket('ws://localhost:8080');// ws.onmessage = (e) => {// const record = JSON.parse(e.data);// console.log(record);// };// ws.send(JSON.stringify({ type: 'subscribe', channel: 'metrics' }));
Chaque message WebSocket est un objet JSON unique, suivant la convention JSONL d’un enregistrement par message. Le serveur diffuse des métriques au client toutes les 500 ms, tout en acceptant également des commandes du client. Les deux directions utilisent la même sérialisation JSON, ce qui rend le protocole symétrique et facile à déboguer.
Streaming de logs en pratique
L’un des cas d’utilisation les plus courants du streaming JSONL dans le monde réel est la collecte centralisée de logs. Les applications écrivent des entrées de log structurées sous forme de lignes JSONL vers stdout ou un fichier, et un agent collecteur suit la sortie en temps réel, transférant les enregistrements vers un système central. Ce pattern est utilisé par Docker, Kubernetes et la plupart des services de logging cloud.
import { createReadStream, watchFile } from 'node:fs';import { createInterface } from 'node:readline';import { stat } from 'node:fs/promises';class JsonlLogCollector {constructor(filePath, handlers = {}) {this.filePath = filePath;this.handlers = handlers;this.position = 0;this.running = false;}async start() {this.running = true;// Read existing content firstawait this.readFrom(0);// Watch for new appendsthis.watch();}async readFrom(startPos) {const rl = createInterface({input: createReadStream(this.filePath, {encoding: 'utf-8',start: startPos,}),crlfDelay: Infinity,});for await (const line of rl) {if (!line.trim()) continue;try {const record = JSON.parse(line);await this.dispatch(record);} catch (err) {this.handlers.onError?.(err, line);}}const info = await stat(this.filePath);this.position = info.size;}watch() {watchFile(this.filePath, { interval: 500 }, async (curr, prev) => {if (!this.running) return;if (curr.size > this.position) {await this.readFrom(this.position);}});}async dispatch(record) {const level = record.level || 'info';this.handlers[level]?.(record);this.handlers.onRecord?.(record);}stop() {this.running = false;}}// Usageconst collector = new JsonlLogCollector('app.log.jsonl', {error: (r) => console.error(`[ERROR] ${r.message}`),warn: (r) => console.warn(`[WARN] ${r.message}`),onRecord: (r) => forwardToElasticsearch(r),onError: (err, line) => console.error('Bad line:', line),});await collector.start();
Ce collecteur suit un fichier de logs JSONL en temps réel, distribuant les enregistrements aux gestionnaires en fonction du niveau de log. Il suit la position dans le fichier afin de ne lire que le nouveau contenu après la passe initiale de rattrapage. En production, vous ajouteriez la détection de rotation de fichier, l’arrêt propre et le transfert par lots pour réduire les appels réseau vers votre système de logging centralisé.
Essayez nos outils JSONL gratuits
Vous souhaitez inspecter ou convertir des données JSONL avant de construire votre pipeline de streaming ? Utilisez nos outils en ligne gratuits pour visualiser, convertir et formater des fichiers JSONL directement dans votre navigateur. Tout le traitement se fait localement, vos données restent donc privées.