Streaming JSONL: Procesa Datos en Tiempo Real
Una guia completa para transmitir datos JSONL en tiempo real. Aprende a construir pipelines de streaming con Node.js readline, generadores Python, Server-Sent Events, WebSockets y patrones de streaming de logs en produccion.
Ultima actualizacion: febrero 2026
Por que JSONL es Ideal para Streaming
JSONL (JSON Lines) almacena un objeto JSON completo por linea, separado por caracteres de nueva linea. Esta estructura delimitada por lineas lo convierte en el formato perfecto para streaming porque cada linea es una unidad autocontenida y parseable. A diferencia de un unico array JSON grande, nunca necesitas cargar todo el dataset en memoria antes de empezar a procesar registros. Un consumidor puede leer una linea, parsearla, actuar sobre ella y pasar a la siguiente, manteniendo el uso de memoria constante sin importar cuan grande crezca la fuente de datos.
Esta propiedad es la razon por la que JSONL domina en sistemas de agregacion de logs, pipelines de analitica en tiempo real, feeds de datos de aprendizaje automatico y respuestas de API en streaming. Herramientas como la API de OpenAI, AWS CloudWatch y Apache Kafka usan JSON delimitado por nuevas lineas porque combina la legibilidad universal de JSON con la eficiencia de streaming de protocolos orientados a lineas. En esta guia, aprenderas a construir soluciones de streaming JSONL en Node.js y Python, enviar datos en tiempo real a navegadores con Server-Sent Events, intercambiar JSONL a traves de WebSockets e implementar un sistema de streaming de logs listo para produccion.
Streaming JSONL en Node.js
Node.js esta construido sobre un modelo de I/O no bloqueante y basado en eventos que lo hace excepcionalmente adecuado para cargas de trabajo de streaming. Los modulos integrados readline y stream te proporcionan todo lo necesario para procesar archivos JSONL y flujos de red linea por linea sin cargar la carga completa en memoria.
El modulo readline combinado con fs.createReadStream es la forma estandar de consumir un archivo JSONL en Node.js. Lee el archivo en fragmentos y emite lineas completas, asi que la memoria se mantiene estable incluso para archivos de varios gigabytes.
import { createReadStream } from 'node:fs';import { createInterface } from 'node:readline';async function streamJsonl(filePath, onRecord) {const rl = createInterface({input: createReadStream(filePath, 'utf-8'),crlfDelay: Infinity,});let count = 0;for await (const line of rl) {const trimmed = line.trim();if (!trimmed) continue;try {const record = JSON.parse(trimmed);await onRecord(record);count++;} catch (err) {console.error(`Skipping invalid line: ${err.message}`);}}return count;}// Usage: process each record as it arrivesconst total = await streamJsonl('events.jsonl', async (record) => {console.log(`Event: ${record.type} at ${record.timestamp}`);});console.log(`Processed ${total} records`);
Los Transform streams de Node.js te permiten construir pipelines de datos componibles que leen JSONL, filtran o enriquecen cada registro y escriben los resultados de vuelta. Este patron maneja la contrapresion automaticamente, pausando la fuente cuando el destino no puede seguir el ritmo.
import { createReadStream, createWriteStream } from 'node:fs';import { Transform } from 'node:stream';import { pipeline } from 'node:stream/promises';// Split incoming chunks into individual linesclass LineSplitter extends Transform {constructor() {super({ readableObjectMode: true });this.buffer = '';}_transform(chunk, encoding, cb) {this.buffer += chunk.toString();const lines = this.buffer.split('\n');this.buffer = lines.pop();for (const line of lines) {if (line.trim()) this.push(line.trim());}cb();}_flush(cb) {if (this.buffer.trim()) this.push(this.buffer.trim());cb();}}// Parse, transform, and re-serialize each recordclass JsonlTransform extends Transform {constructor(transformFn) {super({ objectMode: true });this.transformFn = transformFn;}_transform(line, encoding, cb) {try {const record = JSON.parse(line);const result = this.transformFn(record);if (result) cb(null, JSON.stringify(result) + '\n');else cb(); // Filter out null results} catch (err) {cb(); // Skip invalid lines}}}// Build the pipeline: read -> split -> transform -> writeawait pipeline(createReadStream('input.jsonl', 'utf-8'),new LineSplitter(),new JsonlTransform((record) => {if (record.level === 'error') {return { ...record, flagged: true, reviewedAt: new Date().toISOString() };}return null; // Filter out non-error records}),createWriteStream('errors.jsonl', 'utf-8'));console.log('Pipeline complete');
Streaming JSONL en Python
Los generadores de Python proporcionan una forma elegante de transmitir datos JSONL. Dado que un generador produce un registro a la vez y solo avanza cuando el consumidor solicita el siguiente valor, el uso de memoria se mantiene minimo. Combinado con el modulo json integrado, puedes construir potentes pipelines de streaming con solo unas pocas lineas de codigo.
Una funcion generadora de Python lee el archivo linea por linea usando un bucle for estandar. Cada llamada a next() lee exactamente una linea, la parsea y produce el resultado. El handle del archivo permanece abierto y la posicion avanza solo cuando el consumidor extrae registros.
import jsonfrom typing import Generator, Anydef stream_jsonl(file_path: str) -> Generator[dict[str, Any], None, None]:"""Stream JSONL records one at a time using a generator."""with open(file_path, 'r', encoding='utf-8') as f:for line_num, line in enumerate(f, 1):line = line.strip()if not line:continuetry:yield json.loads(line)except json.JSONDecodeError as e:print(f"Skipping line {line_num}: {e}")# Usage: process records lazilyfor record in stream_jsonl('events.jsonl'):if record.get('level') == 'error':print(f"ERROR: {record['message']}")# Or collect a subset with itertoolsimport itertoolsfirst_100 = list(itertools.islice(stream_jsonl('large.jsonl'), 100))print(f"Loaded first {len(first_100)} records")
itertools de Python te permite componer operaciones de streaming como filtrado, agrupacion por lotes y segmentacion sin materializar resultados intermedios. Cada paso en la cadena procesa un registro a la vez, asi que puedes manejar archivos mas grandes que la RAM disponible.
import jsonimport itertoolsfrom typing import Generator, Iterabledef stream_jsonl(path: str) -> Generator[dict, None, None]:with open(path, 'r') as f:for line in f:if line.strip():yield json.loads(line)def filter_records(records: Iterable[dict], key: str, value) -> Generator[dict, None, None]:"""Filter records where key equals value."""for record in records:if record.get(key) == value:yield recorddef batch(iterable: Iterable, size: int) -> Generator[list, None, None]:"""Yield successive batches of the given size."""it = iter(iterable)while True:chunk = list(itertools.islice(it, size))if not chunk:breakyield chunk# Compose a streaming pipelinerecords = stream_jsonl('access_logs.jsonl')errors = filter_records(records, 'status', 500)for i, error_batch in enumerate(batch(errors, 50)):print(f"Batch {i + 1}: {len(error_batch)} errors")# Send batch to alerting system, database, etc.for record in error_batch:print(f" {record['timestamp']} - {record['path']}")
Server-Sent Events con JSONL
Server-Sent Events (SSE) proporcionan un protocolo simple basado en HTTP para enviar actualizaciones en tiempo real del servidor al cliente. Cada evento SSE lleva un campo de datos, y JSONL es el formato de carga natural porque cada linea es un objeto JSON completo que el cliente puede parsear inmediatamente. A diferencia de WebSockets, SSE funciona sobre HTTP estandar, no requiere configuracion especial de proxy y se reconecta automaticamente en caso de fallo.
El servidor establece el Content-Type a text/event-stream y escribe cada registro JSONL como un campo de datos SSE. La conexion permanece abierta y el servidor envia nuevos eventos a medida que estan disponibles.
import express from 'express';const app = express();// Simulate a real-time data sourcefunction* generateMetrics() {let id = 0;while (true) {yield {id: ++id,cpu: Math.random() * 100,memory: Math.random() * 16384,timestamp: new Date().toISOString(),};}}app.get('/api/stream/metrics', (req, res) => {// SSE headersres.setHeader('Content-Type', 'text/event-stream');res.setHeader('Cache-Control', 'no-cache');res.setHeader('Connection', 'keep-alive');res.setHeader('Access-Control-Allow-Origin', '*');res.flushHeaders();const metrics = generateMetrics();const interval = setInterval(() => {const record = metrics.next().value;// Each SSE event contains one JSONL recordres.write(`data: ${JSON.stringify(record)}\n\n`);}, 1000);req.on('close', () => {clearInterval(interval);res.end();});});app.listen(3000, () => console.log('SSE server on :3000'));
El navegador usa la API EventSource para conectarse al endpoint SSE. Cada evento entrante contiene un registro JSONL que se parsea y renderiza en tiempo real. EventSource maneja la reconexion automatica si la conexion se pierde.
// Connect to the SSE endpointconst source = new EventSource('/api/stream/metrics');// Parse each JSONL record as it arrivessource.onmessage = (event) => {const record = JSON.parse(event.data);console.log(`[${record.timestamp}] CPU: ${record.cpu.toFixed(1)}%`);// Update your UI in real timeupdateDashboard(record);};source.onerror = (err) => {console.error('SSE connection error:', err);// EventSource automatically reconnects};// For Fetch-based SSE (more control over the stream)async function fetchSSE(url, onRecord) {const response = await fetch(url);const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();let buffer = '';while (true) {const { done, value } = await reader.read();if (done) break;buffer += value;const events = buffer.split('\n\n');buffer = events.pop();for (const event of events) {const dataLine = event.split('\n').find(l => l.startsWith('data: '));if (dataLine) {const record = JSON.parse(dataLine.slice(6));onRecord(record);}}}}await fetchSSE('/api/stream/metrics', (record) => {console.log('Metric:', record);});
WebSocket + Streaming JSONL
Mientras que SSE maneja el envio del servidor al cliente, WebSockets habilitan comunicacion full-duplex donde ambos lados pueden enviar registros JSONL en cualquier momento. Esto es ideal para aplicaciones interactivas como edicion colaborativa, dashboards en tiempo real con comandos de usuario y sincronizacion bidireccional de datos. Cada mensaje WebSocket contiene un unico registro JSONL, lo que hace el parseo sencillo en ambos extremos.
import { WebSocketServer } from 'ws';const wss = new WebSocketServer({ port: 8080 });wss.on('connection', (ws) => {console.log('Client connected');// Send JSONL records to the clientconst interval = setInterval(() => {const record = {type: 'metric',value: Math.random() * 100,timestamp: new Date().toISOString(),};ws.send(JSON.stringify(record));}, 500);// Receive JSONL records from the clientws.on('message', (data) => {try {const record = JSON.parse(data.toString());console.log(`Received: ${record.type}`, record);// Echo back an acknowledgmentws.send(JSON.stringify({type: 'ack',originalId: record.id,processedAt: new Date().toISOString(),}));} catch (err) {ws.send(JSON.stringify({ type: 'error', message: 'Invalid JSON' }));}});ws.on('close', () => {clearInterval(interval);console.log('Client disconnected');});});// Client-side usage:// const ws = new WebSocket('ws://localhost:8080');// ws.onmessage = (e) => {// const record = JSON.parse(e.data);// console.log(record);// };// ws.send(JSON.stringify({ type: 'subscribe', channel: 'metrics' }));
Cada mensaje WebSocket es un unico objeto JSON, siguiendo la convencion JSONL de un registro por mensaje. El servidor transmite metricas al cliente a intervalos de 500ms, mientras tambien acepta comandos del cliente. Ambas direcciones usan la misma serializacion JSON, asi que el protocolo es simetrico y facil de depurar.
Streaming de Logs en la Practica
Uno de los usos mas comunes en el mundo real del streaming JSONL es la recoleccion centralizada de logs. Las aplicaciones escriben entradas de log estructuradas como lineas JSONL en stdout o un archivo, y un agente recolector hace tail del output en tiempo real, reenviando registros a un sistema central. Este patron es usado por Docker, Kubernetes y la mayoria de servicios de logging en la nube.
import { createReadStream, watchFile } from 'node:fs';import { createInterface } from 'node:readline';import { stat } from 'node:fs/promises';class JsonlLogCollector {constructor(filePath, handlers = {}) {this.filePath = filePath;this.handlers = handlers;this.position = 0;this.running = false;}async start() {this.running = true;// Read existing content firstawait this.readFrom(0);// Watch for new appendsthis.watch();}async readFrom(startPos) {const rl = createInterface({input: createReadStream(this.filePath, {encoding: 'utf-8',start: startPos,}),crlfDelay: Infinity,});for await (const line of rl) {if (!line.trim()) continue;try {const record = JSON.parse(line);await this.dispatch(record);} catch (err) {this.handlers.onError?.(err, line);}}const info = await stat(this.filePath);this.position = info.size;}watch() {watchFile(this.filePath, { interval: 500 }, async (curr, prev) => {if (!this.running) return;if (curr.size > this.position) {await this.readFrom(this.position);}});}async dispatch(record) {const level = record.level || 'info';this.handlers[level]?.(record);this.handlers.onRecord?.(record);}stop() {this.running = false;}}// Usageconst collector = new JsonlLogCollector('app.log.jsonl', {error: (r) => console.error(`[ERROR] ${r.message}`),warn: (r) => console.warn(`[WARN] ${r.message}`),onRecord: (r) => forwardToElasticsearch(r),onError: (err, line) => console.error('Bad line:', line),});await collector.start();
Este recolector hace tail de un archivo de log JSONL en tiempo real, enviando registros a handlers basados en el nivel de log. Rastrea la posicion del archivo para que solo lea contenido nuevo despues de la pasada inicial de catch-up. En produccion, agregarias deteccion de rotacion de archivos, apagado graceful y reenvio por lotes para reducir las llamadas de red a tu sistema de logging centralizado.
Prueba Nuestras Herramientas JSONL Gratuitas
Quieres inspeccionar o convertir datos JSONL antes de construir tu pipeline de streaming? Usa nuestras herramientas online gratuitas para ver, convertir y formatear archivos JSONL directamente en tu navegador. Todo el procesamiento ocurre localmente, asi que tus datos permanecen privados.