Streaming JSONL: Processe Dados em Tempo Real
Um guia abrangente para streaming de dados JSONL em tempo real. Aprenda a construir pipelines de streaming com Node.js readline, geradores Python, Server-Sent Events, WebSockets e padroes de streaming de logs em producao.
Ultima atualizacao: fevereiro de 2026
Por que JSONL e Ideal para Streaming
JSONL (JSON Lines) armazena um objeto JSON completo por linha, separado por caracteres de nova linha. Essa estrutura delimitada por linhas o torna o formato perfeito para streaming porque cada linha e uma unidade independente e parseavel. Diferente de um unico array JSON grande, voce nunca precisa carregar o dataset inteiro na memoria antes de comecar a processar registros. Um consumidor pode ler uma linha, parsea-la, agir sobre ela e passar para a proxima, mantendo o uso de memoria constante independentemente do tamanho da fonte de dados.
Essa propriedade e a razao pela qual o JSONL domina em sistemas de agregacao de logs, pipelines de analise em tempo real, feeds de dados de aprendizado de maquina e respostas de streaming de APIs. Ferramentas como a API da OpenAI, AWS CloudWatch e Apache Kafka usam JSON delimitado por nova linha porque combina a legibilidade universal do JSON com a eficiencia de streaming de protocolos orientados a linha. Neste guia, voce aprendera a construir solucoes de streaming JSONL em Node.js e Python, enviar dados em tempo real para navegadores com Server-Sent Events, trocar JSONL via WebSockets e implementar um sistema de streaming de logs pronto para producao.
Streaming JSONL em Node.js
Node.js e construido sobre um modelo de I/O orientado a eventos e nao bloqueante que o torna excepcionalmente adequado para cargas de trabalho de streaming. Os modulos integrados readline e stream fornecem tudo o que voce precisa para processar arquivos JSONL e streams de rede linha por linha sem carregar o payload completo na memoria.
O modulo readline combinado com fs.createReadStream e a forma padrao de consumir um arquivo JSONL em Node.js. Ele le o arquivo em blocos e emite linhas completas, entao a memoria permanece estavel mesmo para arquivos de varios gigabytes.
import { createReadStream } from 'node:fs';import { createInterface } from 'node:readline';async function streamJsonl(filePath, onRecord) {const rl = createInterface({input: createReadStream(filePath, 'utf-8'),crlfDelay: Infinity,});let count = 0;for await (const line of rl) {const trimmed = line.trim();if (!trimmed) continue;try {const record = JSON.parse(trimmed);await onRecord(record);count++;} catch (err) {console.error(`Skipping invalid line: ${err.message}`);}}return count;}// Usage: process each record as it arrivesconst total = await streamJsonl('events.jsonl', async (record) => {console.log(`Event: ${record.type} at ${record.timestamp}`);});console.log(`Processed ${total} records`);
Transform Streams do Node.js permitem construir pipelines de dados composiveis que leem JSONL, filtram ou enriquecem cada registro e escrevem os resultados de volta. Esse padrao lida com backpressure automaticamente, pausando a fonte quando o destino nao consegue acompanhar.
import { createReadStream, createWriteStream } from 'node:fs';import { Transform } from 'node:stream';import { pipeline } from 'node:stream/promises';// Split incoming chunks into individual linesclass LineSplitter extends Transform {constructor() {super({ readableObjectMode: true });this.buffer = '';}_transform(chunk, encoding, cb) {this.buffer += chunk.toString();const lines = this.buffer.split('\n');this.buffer = lines.pop();for (const line of lines) {if (line.trim()) this.push(line.trim());}cb();}_flush(cb) {if (this.buffer.trim()) this.push(this.buffer.trim());cb();}}// Parse, transform, and re-serialize each recordclass JsonlTransform extends Transform {constructor(transformFn) {super({ objectMode: true });this.transformFn = transformFn;}_transform(line, encoding, cb) {try {const record = JSON.parse(line);const result = this.transformFn(record);if (result) cb(null, JSON.stringify(result) + '\n');else cb(); // Filter out null results} catch (err) {cb(); // Skip invalid lines}}}// Build the pipeline: read -> split -> transform -> writeawait pipeline(createReadStream('input.jsonl', 'utf-8'),new LineSplitter(),new JsonlTransform((record) => {if (record.level === 'error') {return { ...record, flagged: true, reviewedAt: new Date().toISOString() };}return null; // Filter out non-error records}),createWriteStream('errors.jsonl', 'utf-8'));console.log('Pipeline complete');
Streaming JSONL em Python
Geradores Python fornecem uma maneira elegante de fazer streaming de dados JSONL. Como um gerador produz um registro por vez e so avanca quando o consumidor solicita o proximo valor, o uso de memoria permanece minimo. Combinado com o modulo json integrado, voce pode construir pipelines de streaming poderosos com apenas algumas linhas de codigo.
Uma funcao geradora Python le o arquivo linha por linha usando um loop for padrao. Cada chamada a next() le exatamente uma linha, parseia e retorna o resultado. O handle do arquivo permanece aberto e a posicao avanca apenas conforme o consumidor puxa os registros.
import jsonfrom typing import Generator, Anydef stream_jsonl(file_path: str) -> Generator[dict[str, Any], None, None]:"""Stream JSONL records one at a time using a generator."""with open(file_path, 'r', encoding='utf-8') as f:for line_num, line in enumerate(f, 1):line = line.strip()if not line:continuetry:yield json.loads(line)except json.JSONDecodeError as e:print(f"Skipping line {line_num}: {e}")# Usage: process records lazilyfor record in stream_jsonl('events.jsonl'):if record.get('level') == 'error':print(f"ERROR: {record['message']}")# Or collect a subset with itertoolsimport itertoolsfirst_100 = list(itertools.islice(stream_jsonl('large.jsonl'), 100))print(f"Loaded first {len(first_100)} records")
O itertools do Python permite compor operacoes de streaming como filtragem, agrupamento em lotes e fatiamento sem materializar resultados intermediarios. Cada etapa da cadeia processa um registro por vez, permitindo lidar com arquivos maiores que a RAM disponivel.
import jsonimport itertoolsfrom typing import Generator, Iterabledef stream_jsonl(path: str) -> Generator[dict, None, None]:with open(path, 'r') as f:for line in f:if line.strip():yield json.loads(line)def filter_records(records: Iterable[dict], key: str, value) -> Generator[dict, None, None]:"""Filter records where key equals value."""for record in records:if record.get(key) == value:yield recorddef batch(iterable: Iterable, size: int) -> Generator[list, None, None]:"""Yield successive batches of the given size."""it = iter(iterable)while True:chunk = list(itertools.islice(it, size))if not chunk:breakyield chunk# Compose a streaming pipelinerecords = stream_jsonl('access_logs.jsonl')errors = filter_records(records, 'status', 500)for i, error_batch in enumerate(batch(errors, 50)):print(f"Batch {i + 1}: {len(error_batch)} errors")# Send batch to alerting system, database, etc.for record in error_batch:print(f" {record['timestamp']} - {record['path']}")
Server-Sent Events com JSONL
Server-Sent Events (SSE) fornecem um protocolo simples baseado em HTTP para enviar atualizacoes em tempo real do servidor para o cliente. Cada evento SSE carrega um campo de dados, e JSONL e o formato de payload natural porque cada linha e um objeto JSON completo que o cliente pode parsear imediatamente. Diferente de WebSockets, SSE funciona sobre HTTP padrao, nao requer configuracao especial de proxy e se reconecta automaticamente em caso de falha.
O servidor define o Content-Type como text/event-stream e escreve cada registro JSONL como um campo de dados SSE. A conexao permanece aberta e o servidor envia novos eventos conforme ficam disponiveis.
import express from 'express';const app = express();// Simulate a real-time data sourcefunction* generateMetrics() {let id = 0;while (true) {yield {id: ++id,cpu: Math.random() * 100,memory: Math.random() * 16384,timestamp: new Date().toISOString(),};}}app.get('/api/stream/metrics', (req, res) => {// SSE headersres.setHeader('Content-Type', 'text/event-stream');res.setHeader('Cache-Control', 'no-cache');res.setHeader('Connection', 'keep-alive');res.setHeader('Access-Control-Allow-Origin', '*');res.flushHeaders();const metrics = generateMetrics();const interval = setInterval(() => {const record = metrics.next().value;// Each SSE event contains one JSONL recordres.write(`data: ${JSON.stringify(record)}\n\n`);}, 1000);req.on('close', () => {clearInterval(interval);res.end();});});app.listen(3000, () => console.log('SSE server on :3000'));
O navegador usa a API EventSource para se conectar ao endpoint SSE. Cada evento recebido contem um registro JSONL que e parseado e renderizado em tempo real. O EventSource lida com reconexao automatica se a conexao cair.
// Connect to the SSE endpointconst source = new EventSource('/api/stream/metrics');// Parse each JSONL record as it arrivessource.onmessage = (event) => {const record = JSON.parse(event.data);console.log(`[${record.timestamp}] CPU: ${record.cpu.toFixed(1)}%`);// Update your UI in real timeupdateDashboard(record);};source.onerror = (err) => {console.error('SSE connection error:', err);// EventSource automatically reconnects};// For Fetch-based SSE (more control over the stream)async function fetchSSE(url, onRecord) {const response = await fetch(url);const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();let buffer = '';while (true) {const { done, value } = await reader.read();if (done) break;buffer += value;const events = buffer.split('\n\n');buffer = events.pop();for (const event of events) {const dataLine = event.split('\n').find(l => l.startsWith('data: '));if (dataLine) {const record = JSON.parse(dataLine.slice(6));onRecord(record);}}}}await fetchSSE('/api/stream/metrics', (record) => {console.log('Metric:', record);});
WebSocket + Streaming JSONL
Enquanto SSE lida com push do servidor para o cliente, WebSockets permitem comunicacao full-duplex onde ambos os lados podem enviar registros JSONL a qualquer momento. Isso e ideal para aplicacoes interativas como edicao colaborativa, dashboards em tempo real com comandos do usuario e sincronizacao bidirecional de dados. Cada mensagem WebSocket contem um unico registro JSONL, tornando o parsing direto em ambos os lados.
import { WebSocketServer } from 'ws';const wss = new WebSocketServer({ port: 8080 });wss.on('connection', (ws) => {console.log('Client connected');// Send JSONL records to the clientconst interval = setInterval(() => {const record = {type: 'metric',value: Math.random() * 100,timestamp: new Date().toISOString(),};ws.send(JSON.stringify(record));}, 500);// Receive JSONL records from the clientws.on('message', (data) => {try {const record = JSON.parse(data.toString());console.log(`Received: ${record.type}`, record);// Echo back an acknowledgmentws.send(JSON.stringify({type: 'ack',originalId: record.id,processedAt: new Date().toISOString(),}));} catch (err) {ws.send(JSON.stringify({ type: 'error', message: 'Invalid JSON' }));}});ws.on('close', () => {clearInterval(interval);console.log('Client disconnected');});});// Client-side usage:// const ws = new WebSocket('ws://localhost:8080');// ws.onmessage = (e) => {// const record = JSON.parse(e.data);// console.log(record);// };// ws.send(JSON.stringify({ type: 'subscribe', channel: 'metrics' }));
Cada mensagem WebSocket e um unico objeto JSON, seguindo a convencao JSONL de um registro por mensagem. O servidor transmite metricas para o cliente em intervalos de 500ms, enquanto tambem aceita comandos do cliente. Ambas as direcoes usam a mesma serializacao JSON, entao o protocolo e simetrico e facil de depurar.
Streaming de Logs na Pratica
Um dos usos mais comuns de streaming JSONL no mundo real e a coleta centralizada de logs. Aplicacoes escrevem entradas de log estruturadas como linhas JSONL para stdout ou um arquivo, e um agente coletor faz tail da saida em tempo real, encaminhando registros para um sistema central. Esse padrao e usado pelo Docker, Kubernetes e pela maioria dos servicos de logging em nuvem.
import { createReadStream, watchFile } from 'node:fs';import { createInterface } from 'node:readline';import { stat } from 'node:fs/promises';class JsonlLogCollector {constructor(filePath, handlers = {}) {this.filePath = filePath;this.handlers = handlers;this.position = 0;this.running = false;}async start() {this.running = true;// Read existing content firstawait this.readFrom(0);// Watch for new appendsthis.watch();}async readFrom(startPos) {const rl = createInterface({input: createReadStream(this.filePath, {encoding: 'utf-8',start: startPos,}),crlfDelay: Infinity,});for await (const line of rl) {if (!line.trim()) continue;try {const record = JSON.parse(line);await this.dispatch(record);} catch (err) {this.handlers.onError?.(err, line);}}const info = await stat(this.filePath);this.position = info.size;}watch() {watchFile(this.filePath, { interval: 500 }, async (curr, prev) => {if (!this.running) return;if (curr.size > this.position) {await this.readFrom(this.position);}});}async dispatch(record) {const level = record.level || 'info';this.handlers[level]?.(record);this.handlers.onRecord?.(record);}stop() {this.running = false;}}// Usageconst collector = new JsonlLogCollector('app.log.jsonl', {error: (r) => console.error(`[ERROR] ${r.message}`),warn: (r) => console.warn(`[WARN] ${r.message}`),onRecord: (r) => forwardToElasticsearch(r),onError: (err, line) => console.error('Bad line:', line),});await collector.start();
Este coletor faz tail de um arquivo de log JSONL em tempo real, despachando registros para handlers com base no nivel de log. Ele rastreia a posicao do arquivo para ler apenas conteudo novo apos a passagem inicial de catch-up. Em producao, voce adicionaria deteccao de rotacao de arquivo, shutdown gracioso e encaminhamento em lote para reduzir chamadas de rede ao seu sistema de logging centralizado.
Experimente Nossas Ferramentas JSONL Gratuitas
Quer inspecionar ou converter dados JSONL antes de construir seu pipeline de streaming? Use nossas ferramentas online gratuitas para visualizar, converter e formatar arquivos JSONL diretamente no seu navegador. Todo o processamento acontece localmente, entao seus dados permanecem privados.