JSONL Streaming: Verwerk Gegevens in Realtime
Een uitgebreide gids voor het streamen van JSONL-gegevens in realtime. Leer streaming pipelines te bouwen met Node.js readline, Python generators, Server-Sent Events, WebSockets en productie-klare logstreamingpatronen.
Laatst bijgewerkt: februari 2026
Waarom JSONL Ideaal Is voor Streaming
JSONL (JSON Lines) slaat één compleet JSON-object per regel op, gescheiden door newline-tekens. Deze regelgescheiden structuur maakt het het perfecte formaat voor streaming, omdat elke regel een op zichzelf staande, parseerbare eenheid is. In tegenstelling tot een enkele grote JSON-array hoef je nooit de volledige dataset in het geheugen te laden voordat je records kunt verwerken. Een consumer kan één regel lezen, deze parseren, erop reageren en doorgaan naar de volgende, waarbij het geheugengebruik constant blijft ongeacht hoe groot de gegevensbron groeit.
Deze eigenschap is de reden waarom JSONL domineert in logaggregatiesystemen, realtime analysepipelines, machine learning datafeeds en API-streamingresponses. Tools zoals de OpenAI API, AWS CloudWatch en Apache Kafka gebruiken allemaal newline-gescheiden JSON omdat het de universele leesbaarheid van JSON combineert met de streamingefficiëntie van regelgeoriënteerde protocollen. In deze gids leer je hoe je JSONL-streamingoplossingen bouwt in Node.js en Python, realtime gegevens naar browsers pusht met Server-Sent Events, JSONL uitwisselt via WebSockets en een productie-klaar logstreamingsysteem implementeert.
JSONL Streamen in Node.js
Node.js is gebouwd op een event-driven, non-blocking I/O-model dat uitzonderlijk geschikt is voor streaming-workloads. De ingebouwde readline- en stream-modules geven je alles wat je nodig hebt om JSONL-bestanden en netwerkstreams regel voor regel te verwerken zonder de volledige payload in het geheugen te laden.
De readline-module gecombineerd met fs.createReadStream is de standaardmanier om een JSONL-bestand te consumeren in Node.js. Het leest het bestand in chunks en geeft complete regels door, zodat het geheugen vlak blijft, zelfs voor bestanden van meerdere gigabytes.
import { createReadStream } from 'node:fs';import { createInterface } from 'node:readline';async function streamJsonl(filePath, onRecord) {const rl = createInterface({input: createReadStream(filePath, 'utf-8'),crlfDelay: Infinity,});let count = 0;for await (const line of rl) {const trimmed = line.trim();if (!trimmed) continue;try {const record = JSON.parse(trimmed);await onRecord(record);count++;} catch (err) {console.error(`Skipping invalid line: ${err.message}`);}}return count;}// Usage: process each record as it arrivesconst total = await streamJsonl('events.jsonl', async (record) => {console.log(`Event: ${record.type} at ${record.timestamp}`);});console.log(`Processed ${total} records`);
Node.js Transform streams laten je composeerbare datapipelines bouwen die JSONL lezen, elk record filteren of verrijken, en de resultaten terugschrijven. Dit patroon behandelt backpressure automatisch en pauzeert de bron wanneer de bestemming niet kan bijhouden.
import { createReadStream, createWriteStream } from 'node:fs';import { Transform } from 'node:stream';import { pipeline } from 'node:stream/promises';// Split incoming chunks into individual linesclass LineSplitter extends Transform {constructor() {super({ readableObjectMode: true });this.buffer = '';}_transform(chunk, encoding, cb) {this.buffer += chunk.toString();const lines = this.buffer.split('\n');this.buffer = lines.pop();for (const line of lines) {if (line.trim()) this.push(line.trim());}cb();}_flush(cb) {if (this.buffer.trim()) this.push(this.buffer.trim());cb();}}// Parse, transform, and re-serialize each recordclass JsonlTransform extends Transform {constructor(transformFn) {super({ objectMode: true });this.transformFn = transformFn;}_transform(line, encoding, cb) {try {const record = JSON.parse(line);const result = this.transformFn(record);if (result) cb(null, JSON.stringify(result) + '\n');else cb(); // Filter out null results} catch (err) {cb(); // Skip invalid lines}}}// Build the pipeline: read -> split -> transform -> writeawait pipeline(createReadStream('input.jsonl', 'utf-8'),new LineSplitter(),new JsonlTransform((record) => {if (record.level === 'error') {return { ...record, flagged: true, reviewedAt: new Date().toISOString() };}return null; // Filter out non-error records}),createWriteStream('errors.jsonl', 'utf-8'));console.log('Pipeline complete');
JSONL Streamen in Python
Python generators bieden een elegante manier om JSONL-gegevens te streamen. Omdat een generator één record tegelijk yieldt en alleen verder gaat wanneer de consumer de volgende waarde opvraagt, blijft het geheugengebruik minimaal. Gecombineerd met de ingebouwde json-module kun je krachtige streaming pipelines bouwen met slechts een paar regels code.
Een Python generator-functie leest het bestand regel voor regel met een standaard for-lus. Elke aanroep van next() leest precies één regel, parseert deze en yieldt het resultaat. De file handle blijft open en de positie schuift alleen op naarmate de consumer records ophaalt.
import jsonfrom typing import Generator, Anydef stream_jsonl(file_path: str) -> Generator[dict[str, Any], None, None]:"""Stream JSONL records one at a time using a generator."""with open(file_path, 'r', encoding='utf-8') as f:for line_num, line in enumerate(f, 1):line = line.strip()if not line:continuetry:yield json.loads(line)except json.JSONDecodeError as e:print(f"Skipping line {line_num}: {e}")# Usage: process records lazilyfor record in stream_jsonl('events.jsonl'):if record.get('level') == 'error':print(f"ERROR: {record['message']}")# Or collect a subset with itertoolsimport itertoolsfirst_100 = list(itertools.islice(stream_jsonl('large.jsonl'), 100))print(f"Loaded first {len(first_100)} records")
Python itertools laat je streaming-operaties zoals filteren, batchen en slicen samenstellen zonder tussenresultaten te materialiseren. Elke stap in de keten verwerkt één record tegelijk, zodat je bestanden kunt verwerken die groter zijn dan het beschikbare RAM.
import jsonimport itertoolsfrom typing import Generator, Iterabledef stream_jsonl(path: str) -> Generator[dict, None, None]:with open(path, 'r') as f:for line in f:if line.strip():yield json.loads(line)def filter_records(records: Iterable[dict], key: str, value) -> Generator[dict, None, None]:"""Filter records where key equals value."""for record in records:if record.get(key) == value:yield recorddef batch(iterable: Iterable, size: int) -> Generator[list, None, None]:"""Yield successive batches of the given size."""it = iter(iterable)while True:chunk = list(itertools.islice(it, size))if not chunk:breakyield chunk# Compose a streaming pipelinerecords = stream_jsonl('access_logs.jsonl')errors = filter_records(records, 'status', 500)for i, error_batch in enumerate(batch(errors, 50)):print(f"Batch {i + 1}: {len(error_batch)} errors")# Send batch to alerting system, database, etc.for record in error_batch:print(f" {record['timestamp']} - {record['path']}")
Server-Sent Events met JSONL
Server-Sent Events (SSE) bieden een eenvoudig, HTTP-gebaseerd protocol voor het pushen van realtime updates van server naar client. Elk SSE-event draagt een data-veld, en JSONL is het natuurlijke payload-formaat omdat elke regel een compleet JSON-object is dat de client direct kan parseren. In tegenstelling tot WebSockets werkt SSE via standaard HTTP, vereist het geen speciale proxyconfiguratie en herstelt het automatisch de verbinding bij uitval.
De server stelt het Content-Type in op text/event-stream en schrijft elk JSONL-record als een SSE data-veld. De verbinding blijft open en de server pusht nieuwe events zodra ze beschikbaar zijn.
import express from 'express';const app = express();// Simulate a real-time data sourcefunction* generateMetrics() {let id = 0;while (true) {yield {id: ++id,cpu: Math.random() * 100,memory: Math.random() * 16384,timestamp: new Date().toISOString(),};}}app.get('/api/stream/metrics', (req, res) => {// SSE headersres.setHeader('Content-Type', 'text/event-stream');res.setHeader('Cache-Control', 'no-cache');res.setHeader('Connection', 'keep-alive');res.setHeader('Access-Control-Allow-Origin', '*');res.flushHeaders();const metrics = generateMetrics();const interval = setInterval(() => {const record = metrics.next().value;// Each SSE event contains one JSONL recordres.write(`data: ${JSON.stringify(record)}\n\n`);}, 1000);req.on('close', () => {clearInterval(interval);res.end();});});app.listen(3000, () => console.log('SSE server on :3000'));
De browser gebruikt de EventSource API om verbinding te maken met het SSE-endpoint. Elk binnenkomend event bevat een JSONL-record dat in realtime wordt geparseerd en weergegeven. EventSource handelt automatische herverbinding af als de verbinding wegvalt.
// Connect to the SSE endpointconst source = new EventSource('/api/stream/metrics');// Parse each JSONL record as it arrivessource.onmessage = (event) => {const record = JSON.parse(event.data);console.log(`[${record.timestamp}] CPU: ${record.cpu.toFixed(1)}%`);// Update your UI in real timeupdateDashboard(record);};source.onerror = (err) => {console.error('SSE connection error:', err);// EventSource automatically reconnects};// For Fetch-based SSE (more control over the stream)async function fetchSSE(url, onRecord) {const response = await fetch(url);const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();let buffer = '';while (true) {const { done, value } = await reader.read();if (done) break;buffer += value;const events = buffer.split('\n\n');buffer = events.pop();for (const event of events) {const dataLine = event.split('\n').find(l => l.startsWith('data: '));if (dataLine) {const record = JSON.parse(dataLine.slice(6));onRecord(record);}}}}await fetchSSE('/api/stream/metrics', (record) => {console.log('Metric:', record);});
WebSocket + JSONL Streaming
Terwijl SSE server-naar-client push afhandelt, maken WebSockets full-duplex communicatie mogelijk waarbij beide zijden op elk moment JSONL-records kunnen versturen. Dit is ideaal voor interactieve applicaties zoals collaboratief bewerken, realtime dashboards met gebruikerscommando's en bidirectionele gegevenssynchronisatie. Elk WebSocket-bericht bevat een enkel JSONL-record, waardoor het parseren aan beide zijden eenvoudig is.
import { WebSocketServer } from 'ws';const wss = new WebSocketServer({ port: 8080 });wss.on('connection', (ws) => {console.log('Client connected');// Send JSONL records to the clientconst interval = setInterval(() => {const record = {type: 'metric',value: Math.random() * 100,timestamp: new Date().toISOString(),};ws.send(JSON.stringify(record));}, 500);// Receive JSONL records from the clientws.on('message', (data) => {try {const record = JSON.parse(data.toString());console.log(`Received: ${record.type}`, record);// Echo back an acknowledgmentws.send(JSON.stringify({type: 'ack',originalId: record.id,processedAt: new Date().toISOString(),}));} catch (err) {ws.send(JSON.stringify({ type: 'error', message: 'Invalid JSON' }));}});ws.on('close', () => {clearInterval(interval);console.log('Client disconnected');});});// Client-side usage:// const ws = new WebSocket('ws://localhost:8080');// ws.onmessage = (e) => {// const record = JSON.parse(e.data);// console.log(record);// };// ws.send(JSON.stringify({ type: 'subscribe', channel: 'metrics' }));
Elk WebSocket-bericht is een enkel JSON-object, volgens de JSONL-conventie van één record per bericht. De server streamt metrics naar de client met intervallen van 500ms, terwijl hij ook commando's van de client accepteert. Beide richtingen gebruiken dezelfde JSON-serialisatie, waardoor het protocol symmetrisch en eenvoudig te debuggen is.
Log Streaming in de Praktijk
Een van de meest voorkomende toepassingen van JSONL-streaming in de praktijk is gecentraliseerde logverzameling. Applicaties schrijven gestructureerde logvermeldingen als JSONL-regels naar stdout of een bestand, en een collector-agent tailt de uitvoer in realtime en stuurt records door naar een centraal systeem. Dit patroon wordt gebruikt door Docker, Kubernetes en de meeste cloudloggingdiensten.
import { createReadStream, watchFile } from 'node:fs';import { createInterface } from 'node:readline';import { stat } from 'node:fs/promises';class JsonlLogCollector {constructor(filePath, handlers = {}) {this.filePath = filePath;this.handlers = handlers;this.position = 0;this.running = false;}async start() {this.running = true;// Read existing content firstawait this.readFrom(0);// Watch for new appendsthis.watch();}async readFrom(startPos) {const rl = createInterface({input: createReadStream(this.filePath, {encoding: 'utf-8',start: startPos,}),crlfDelay: Infinity,});for await (const line of rl) {if (!line.trim()) continue;try {const record = JSON.parse(line);await this.dispatch(record);} catch (err) {this.handlers.onError?.(err, line);}}const info = await stat(this.filePath);this.position = info.size;}watch() {watchFile(this.filePath, { interval: 500 }, async (curr, prev) => {if (!this.running) return;if (curr.size > this.position) {await this.readFrom(this.position);}});}async dispatch(record) {const level = record.level || 'info';this.handlers[level]?.(record);this.handlers.onRecord?.(record);}stop() {this.running = false;}}// Usageconst collector = new JsonlLogCollector('app.log.jsonl', {error: (r) => console.error(`[ERROR] ${r.message}`),warn: (r) => console.warn(`[WARN] ${r.message}`),onRecord: (r) => forwardToElasticsearch(r),onError: (err, line) => console.error('Bad line:', line),});await collector.start();
Deze collector tailt een JSONL-logbestand in realtime en stuurt records naar handlers op basis van logniveau. Hij houdt de bestandspositie bij zodat hij alleen nieuwe inhoud leest na de initiële inhaalslag. In productie zou je bestandsrotatiedetectie, graceful shutdown en batchdoorsturen toevoegen om netwerkaanroepen naar je gecentraliseerd loggingsysteem te verminderen.
Probeer Onze Gratis JSONL-tools
Wil je JSONL-gegevens inspecteren of converteren voordat je je streaming pipeline bouwt? Gebruik onze gratis online tools om JSONL-bestanden te bekijken, converteren en formatteren direct in je browser. Alle verwerking gebeurt lokaal, zodat je gegevens privé blijven.