JSONL Streaming: Process Data in Real-Time
A comprehensive guide to streaming JSONL data in real-time. Learn to build streaming pipelines with Node.js readline, Python generators, Server-Sent Events, WebSockets, and production log streaming patterns.
Last updated: February 2026
Why JSONL Is Ideal for Streaming
JSONL (JSON Lines) stores one complete JSON object per line, separated by newline characters. This line-delimited structure makes it the perfect format for streaming because each line is a self-contained, parseable unit. Unlike a single large JSON array, you never need to load the entire dataset into memory before you can start processing records. A consumer can read one line, parse it, act on it, and move on to the next, keeping memory usage constant regardless of how large the data source grows.
This property is why JSONL dominates in log aggregation systems, real-time analytics pipelines, machine learning data feeds, and API streaming responses. Tools like OpenAI's API, AWS CloudWatch, and Apache Kafka all use newline-delimited JSON because it combines the universal readability of JSON with the streaming efficiency of line-oriented protocols. In this guide, you will learn how to build JSONL streaming solutions in Node.js and Python, push real-time data to browsers with Server-Sent Events, exchange JSONL over WebSockets, and implement a production-ready log streaming system.
Streaming JSONL in Node.js
Node.js is built on an event-driven, non-blocking I/O model that makes it exceptionally well suited for streaming workloads. The built-in readline and stream modules give you everything you need to process JSONL files and network streams line by line without loading the full payload into memory.
The readline module paired with fs.createReadStream is the standard way to consume a JSONL file in Node.js. It reads the file in chunks and emits complete lines, so memory stays flat even for multi-gigabyte files.
import { createReadStream } from 'node:fs';import { createInterface } from 'node:readline';async function streamJsonl(filePath, onRecord) {const rl = createInterface({input: createReadStream(filePath, 'utf-8'),crlfDelay: Infinity,});let count = 0;for await (const line of rl) {const trimmed = line.trim();if (!trimmed) continue;try {const record = JSON.parse(trimmed);await onRecord(record);count++;} catch (err) {console.error(`Skipping invalid line: ${err.message}`);}}return count;}// Usage: process each record as it arrivesconst total = await streamJsonl('events.jsonl', async (record) => {console.log(`Event: ${record.type} at ${record.timestamp}`);});console.log(`Processed ${total} records`);
Node.js Transform streams let you build composable data pipelines that read JSONL, filter or enrich each record, and write the results back. This pattern handles backpressure automatically, pausing the source when the destination cannot keep up.
import { createReadStream, createWriteStream } from 'node:fs';import { Transform } from 'node:stream';import { pipeline } from 'node:stream/promises';// Split incoming chunks into individual linesclass LineSplitter extends Transform {constructor() {super({ readableObjectMode: true });this.buffer = '';}_transform(chunk, encoding, cb) {this.buffer += chunk.toString();const lines = this.buffer.split('\n');this.buffer = lines.pop();for (const line of lines) {if (line.trim()) this.push(line.trim());}cb();}_flush(cb) {if (this.buffer.trim()) this.push(this.buffer.trim());cb();}}// Parse, transform, and re-serialize each recordclass JsonlTransform extends Transform {constructor(transformFn) {super({ objectMode: true });this.transformFn = transformFn;}_transform(line, encoding, cb) {try {const record = JSON.parse(line);const result = this.transformFn(record);if (result) cb(null, JSON.stringify(result) + '\n');else cb(); // Filter out null results} catch (err) {cb(); // Skip invalid lines}}}// Build the pipeline: read -> split -> transform -> writeawait pipeline(createReadStream('input.jsonl', 'utf-8'),new LineSplitter(),new JsonlTransform((record) => {if (record.level === 'error') {return { ...record, flagged: true, reviewedAt: new Date().toISOString() };}return null; // Filter out non-error records}),createWriteStream('errors.jsonl', 'utf-8'));console.log('Pipeline complete');
Streaming JSONL in Python
Python generators provide an elegant way to stream JSONL data. Because a generator yields one record at a time and only advances when the consumer requests the next value, memory usage stays minimal. Combined with the built-in json module, you can build powerful streaming pipelines with just a few lines of code.
A Python generator function reads the file line by line using a standard for loop. Each call to next() reads exactly one line, parses it, and yields the result. The file handle stays open and the position advances only as the consumer pulls records.
import jsonfrom typing import Generator, Anydef stream_jsonl(file_path: str) -> Generator[dict[str, Any], None, None]:"""Stream JSONL records one at a time using a generator."""with open(file_path, 'r', encoding='utf-8') as f:for line_num, line in enumerate(f, 1):line = line.strip()if not line:continuetry:yield json.loads(line)except json.JSONDecodeError as e:print(f"Skipping line {line_num}: {e}")# Usage: process records lazilyfor record in stream_jsonl('events.jsonl'):if record.get('level') == 'error':print(f"ERROR: {record['message']}")# Or collect a subset with itertoolsimport itertoolsfirst_100 = list(itertools.islice(stream_jsonl('large.jsonl'), 100))print(f"Loaded first {len(first_100)} records")
Python itertools lets you compose streaming operations like filtering, batching, and slicing without materializing intermediate results. Each step in the chain processes one record at a time, so you can handle files larger than available RAM.
import jsonimport itertoolsfrom typing import Generator, Iterabledef stream_jsonl(path: str) -> Generator[dict, None, None]:with open(path, 'r') as f:for line in f:if line.strip():yield json.loads(line)def filter_records(records: Iterable[dict], key: str, value) -> Generator[dict, None, None]:"""Filter records where key equals value."""for record in records:if record.get(key) == value:yield recorddef batch(iterable: Iterable, size: int) -> Generator[list, None, None]:"""Yield successive batches of the given size."""it = iter(iterable)while True:chunk = list(itertools.islice(it, size))if not chunk:breakyield chunk# Compose a streaming pipelinerecords = stream_jsonl('access_logs.jsonl')errors = filter_records(records, 'status', 500)for i, error_batch in enumerate(batch(errors, 50)):print(f"Batch {i + 1}: {len(error_batch)} errors")# Send batch to alerting system, database, etc.for record in error_batch:print(f" {record['timestamp']} - {record['path']}")
Server-Sent Events with JSONL
Server-Sent Events (SSE) provide a simple, HTTP-based protocol for pushing real-time updates from server to client. Each SSE event carries a data field, and JSONL is the natural payload format because each line is a complete JSON object that the client can parse immediately. Unlike WebSockets, SSE works over standard HTTP, requires no special proxy configuration, and automatically reconnects on failure.
The server sets the Content-Type to text/event-stream and writes each JSONL record as an SSE data field. The connection stays open, and the server pushes new events as they become available.
import express from 'express';const app = express();// Simulate a real-time data sourcefunction* generateMetrics() {let id = 0;while (true) {yield {id: ++id,cpu: Math.random() * 100,memory: Math.random() * 16384,timestamp: new Date().toISOString(),};}}app.get('/api/stream/metrics', (req, res) => {// SSE headersres.setHeader('Content-Type', 'text/event-stream');res.setHeader('Cache-Control', 'no-cache');res.setHeader('Connection', 'keep-alive');res.setHeader('Access-Control-Allow-Origin', '*');res.flushHeaders();const metrics = generateMetrics();const interval = setInterval(() => {const record = metrics.next().value;// Each SSE event contains one JSONL recordres.write(`data: ${JSON.stringify(record)}\n\n`);}, 1000);req.on('close', () => {clearInterval(interval);res.end();});});app.listen(3000, () => console.log('SSE server on :3000'));
The browser uses the EventSource API to connect to the SSE endpoint. Each incoming event contains a JSONL record that is parsed and rendered in real time. EventSource handles automatic reconnection if the connection drops.
// Connect to the SSE endpointconst source = new EventSource('/api/stream/metrics');// Parse each JSONL record as it arrivessource.onmessage = (event) => {const record = JSON.parse(event.data);console.log(`[${record.timestamp}] CPU: ${record.cpu.toFixed(1)}%`);// Update your UI in real timeupdateDashboard(record);};source.onerror = (err) => {console.error('SSE connection error:', err);// EventSource automatically reconnects};// For Fetch-based SSE (more control over the stream)async function fetchSSE(url, onRecord) {const response = await fetch(url);const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();let buffer = '';while (true) {const { done, value } = await reader.read();if (done) break;buffer += value;const events = buffer.split('\n\n');buffer = events.pop();for (const event of events) {const dataLine = event.split('\n').find(l => l.startsWith('data: '));if (dataLine) {const record = JSON.parse(dataLine.slice(6));onRecord(record);}}}}await fetchSSE('/api/stream/metrics', (record) => {console.log('Metric:', record);});
WebSocket + JSONL Streaming
While SSE handles server-to-client push, WebSockets enable full-duplex communication where both sides can send JSONL records at any time. This is ideal for interactive applications like collaborative editing, real-time dashboards with user commands, and bidirectional data synchronization. Each WebSocket message contains a single JSONL record, making parsing straightforward on both ends.
import { WebSocketServer } from 'ws';const wss = new WebSocketServer({ port: 8080 });wss.on('connection', (ws) => {console.log('Client connected');// Send JSONL records to the clientconst interval = setInterval(() => {const record = {type: 'metric',value: Math.random() * 100,timestamp: new Date().toISOString(),};ws.send(JSON.stringify(record));}, 500);// Receive JSONL records from the clientws.on('message', (data) => {try {const record = JSON.parse(data.toString());console.log(`Received: ${record.type}`, record);// Echo back an acknowledgmentws.send(JSON.stringify({type: 'ack',originalId: record.id,processedAt: new Date().toISOString(),}));} catch (err) {ws.send(JSON.stringify({ type: 'error', message: 'Invalid JSON' }));}});ws.on('close', () => {clearInterval(interval);console.log('Client disconnected');});});// Client-side usage:// const ws = new WebSocket('ws://localhost:8080');// ws.onmessage = (e) => {// const record = JSON.parse(e.data);// console.log(record);// };// ws.send(JSON.stringify({ type: 'subscribe', channel: 'metrics' }));
Each WebSocket message is a single JSON object, following the JSONL convention of one record per message. The server streams metrics to the client at 500ms intervals, while also accepting commands from the client. Both directions use the same JSON serialization, so the protocol is symmetrical and easy to debug.
Log Streaming in Practice
One of the most common real-world uses of JSONL streaming is centralized log collection. Applications write structured log entries as JSONL lines to stdout or a file, and a collector agent tails the output in real time, forwarding records to a central system. This pattern is used by Docker, Kubernetes, and most cloud logging services.
import { createReadStream, watchFile } from 'node:fs';import { createInterface } from 'node:readline';import { stat } from 'node:fs/promises';class JsonlLogCollector {constructor(filePath, handlers = {}) {this.filePath = filePath;this.handlers = handlers;this.position = 0;this.running = false;}async start() {this.running = true;// Read existing content firstawait this.readFrom(0);// Watch for new appendsthis.watch();}async readFrom(startPos) {const rl = createInterface({input: createReadStream(this.filePath, {encoding: 'utf-8',start: startPos,}),crlfDelay: Infinity,});for await (const line of rl) {if (!line.trim()) continue;try {const record = JSON.parse(line);await this.dispatch(record);} catch (err) {this.handlers.onError?.(err, line);}}const info = await stat(this.filePath);this.position = info.size;}watch() {watchFile(this.filePath, { interval: 500 }, async (curr, prev) => {if (!this.running) return;if (curr.size > this.position) {await this.readFrom(this.position);}});}async dispatch(record) {const level = record.level || 'info';this.handlers[level]?.(record);this.handlers.onRecord?.(record);}stop() {this.running = false;}}// Usageconst collector = new JsonlLogCollector('app.log.jsonl', {error: (r) => console.error(`[ERROR] ${r.message}`),warn: (r) => console.warn(`[WARN] ${r.message}`),onRecord: (r) => forwardToElasticsearch(r),onError: (err, line) => console.error('Bad line:', line),});await collector.start();
This collector tails a JSONL log file in real time, dispatching records to handlers based on log level. It tracks the file position so it only reads new content after the initial catch-up pass. In production, you would add file rotation detection, graceful shutdown, and batch forwarding to reduce network calls to your centralized logging system.
Try Our Free JSONL Tools
Want to inspect or convert JSONL data before building your streaming pipeline? Use our free online tools to view, convert, and format JSONL files right in your browser. All processing happens locally, so your data stays private.