JSONL-Streaming: Daten in Echtzeit verarbeiten
Eine umfassende Anleitung zum Streaming von JSONL-Daten in Echtzeit. Erfahren Sie, wie Sie Streaming-Pipelines mit Node.js readline, Python-Generatoren, Server-Sent Events, WebSockets und produktionsreifen Log-Streaming-Mustern erstellen.
Zuletzt aktualisiert: Februar 2026
Warum JSONL ideal fur Streaming ist
JSONL (JSON Lines) speichert ein vollstandiges JSON-Objekt pro Zeile, getrennt durch Zeilenumbruchzeichen. Diese zeilengetrennte Struktur macht es zum perfekten Format fur Streaming, da jede Zeile eine eigenstandige, parsbare Einheit ist. Im Gegensatz zu einem einzelnen grossen JSON-Array mussen Sie niemals den gesamten Datensatz in den Speicher laden, bevor Sie mit der Verarbeitung von Datensatzen beginnen konnen. Ein Consumer kann eine Zeile lesen, sie parsen, darauf reagieren und zur nachsten ubergehen, wobei der Speicherverbrauch konstant bleibt, unabhangig davon, wie gross die Datenquelle wird.
Diese Eigenschaft ist der Grund, warum JSONL in Log-Aggregationssystemen, Echtzeit-Analyse-Pipelines, Machine-Learning-Datenfeeds und API-Streaming-Antworten dominiert. Tools wie die OpenAI-API, AWS CloudWatch und Apache Kafka verwenden alle zeilengetrenntes JSON, weil es die universelle Lesbarkeit von JSON mit der Streaming-Effizienz zeilenorientierter Protokolle kombiniert. In dieser Anleitung lernen Sie, wie Sie JSONL-Streaming-Losungen in Node.js und Python erstellen, Echtzeitdaten mit Server-Sent Events an Browser ubertragen, JSONL uber WebSockets austauschen und ein produktionsreifes Log-Streaming-System implementieren.
JSONL-Streaming in Node.js
Node.js basiert auf einem ereignisgesteuerten, nicht-blockierenden I/O-Modell, das es aussergewohnlich gut fur Streaming-Workloads geeignet macht. Die integrierten Module readline und stream bieten alles, was Sie benotigen, um JSONL-Dateien und Netzwerkstreams Zeile fur Zeile zu verarbeiten, ohne die gesamte Nutzlast in den Speicher zu laden.
Das readline-Modul in Kombination mit fs.createReadStream ist die Standardmethode, um eine JSONL-Datei in Node.js zu verarbeiten. Es liest die Datei in Blocken und gibt vollstandige Zeilen aus, sodass der Speicher auch bei Multi-Gigabyte-Dateien konstant bleibt.
import { createReadStream } from 'node:fs';import { createInterface } from 'node:readline';async function streamJsonl(filePath, onRecord) {const rl = createInterface({input: createReadStream(filePath, 'utf-8'),crlfDelay: Infinity,});let count = 0;for await (const line of rl) {const trimmed = line.trim();if (!trimmed) continue;try {const record = JSON.parse(trimmed);await onRecord(record);count++;} catch (err) {console.error(`Skipping invalid line: ${err.message}`);}}return count;}// Usage: process each record as it arrivesconst total = await streamJsonl('events.jsonl', async (record) => {console.log(`Event: ${record.type} at ${record.timestamp}`);});console.log(`Processed ${total} records`);
Node.js Transform Streams ermoglichen es Ihnen, zusammensetzbare Datenpipelines zu erstellen, die JSONL lesen, jeden Datensatz filtern oder anreichern und die Ergebnisse zuruckschreiben. Dieses Muster behandelt Backpressure automatisch und pausiert die Quelle, wenn das Ziel nicht mithalten kann.
import { createReadStream, createWriteStream } from 'node:fs';import { Transform } from 'node:stream';import { pipeline } from 'node:stream/promises';// Split incoming chunks into individual linesclass LineSplitter extends Transform {constructor() {super({ readableObjectMode: true });this.buffer = '';}_transform(chunk, encoding, cb) {this.buffer += chunk.toString();const lines = this.buffer.split('\n');this.buffer = lines.pop();for (const line of lines) {if (line.trim()) this.push(line.trim());}cb();}_flush(cb) {if (this.buffer.trim()) this.push(this.buffer.trim());cb();}}// Parse, transform, and re-serialize each recordclass JsonlTransform extends Transform {constructor(transformFn) {super({ objectMode: true });this.transformFn = transformFn;}_transform(line, encoding, cb) {try {const record = JSON.parse(line);const result = this.transformFn(record);if (result) cb(null, JSON.stringify(result) + '\n');else cb(); // Filter out null results} catch (err) {cb(); // Skip invalid lines}}}// Build the pipeline: read -> split -> transform -> writeawait pipeline(createReadStream('input.jsonl', 'utf-8'),new LineSplitter(),new JsonlTransform((record) => {if (record.level === 'error') {return { ...record, flagged: true, reviewedAt: new Date().toISOString() };}return null; // Filter out non-error records}),createWriteStream('errors.jsonl', 'utf-8'));console.log('Pipeline complete');
JSONL-Streaming in Python
Python-Generatoren bieten eine elegante Moglichkeit, JSONL-Daten zu streamen. Da ein Generator jeweils einen Datensatz liefert und nur voranschreitet, wenn der Consumer den nachsten Wert anfordert, bleibt der Speicherverbrauch minimal. In Kombination mit dem integrierten json-Modul konnen Sie leistungsstarke Streaming-Pipelines mit nur wenigen Codezeilen erstellen.
Eine Python-Generatorfunktion liest die Datei Zeile fur Zeile mit einer Standard-for-Schleife. Jeder Aufruf von next() liest genau eine Zeile, parst sie und gibt das Ergebnis zuruck. Das Dateihandle bleibt offen und die Position ruckt nur vor, wenn der Consumer Datensatze abruft.
import jsonfrom typing import Generator, Anydef stream_jsonl(file_path: str) -> Generator[dict[str, Any], None, None]:"""Stream JSONL records one at a time using a generator."""with open(file_path, 'r', encoding='utf-8') as f:for line_num, line in enumerate(f, 1):line = line.strip()if not line:continuetry:yield json.loads(line)except json.JSONDecodeError as e:print(f"Skipping line {line_num}: {e}")# Usage: process records lazilyfor record in stream_jsonl('events.jsonl'):if record.get('level') == 'error':print(f"ERROR: {record['message']}")# Or collect a subset with itertoolsimport itertoolsfirst_100 = list(itertools.islice(stream_jsonl('large.jsonl'), 100))print(f"Loaded first {len(first_100)} records")
Python itertools ermoglicht es Ihnen, Streaming-Operationen wie Filtern, Batching und Slicing zu komponieren, ohne Zwischenergebnisse zu materialisieren. Jeder Schritt in der Kette verarbeitet jeweils einen Datensatz, sodass Sie Dateien verarbeiten konnen, die grosser als der verfugbare RAM sind.
import jsonimport itertoolsfrom typing import Generator, Iterabledef stream_jsonl(path: str) -> Generator[dict, None, None]:with open(path, 'r') as f:for line in f:if line.strip():yield json.loads(line)def filter_records(records: Iterable[dict], key: str, value) -> Generator[dict, None, None]:"""Filter records where key equals value."""for record in records:if record.get(key) == value:yield recorddef batch(iterable: Iterable, size: int) -> Generator[list, None, None]:"""Yield successive batches of the given size."""it = iter(iterable)while True:chunk = list(itertools.islice(it, size))if not chunk:breakyield chunk# Compose a streaming pipelinerecords = stream_jsonl('access_logs.jsonl')errors = filter_records(records, 'status', 500)for i, error_batch in enumerate(batch(errors, 50)):print(f"Batch {i + 1}: {len(error_batch)} errors")# Send batch to alerting system, database, etc.for record in error_batch:print(f" {record['timestamp']} - {record['path']}")
Server-Sent Events mit JSONL
Server-Sent Events (SSE) bieten ein einfaches, HTTP-basiertes Protokoll zum Ubertragen von Echtzeit-Updates vom Server zum Client. Jedes SSE-Event tragt ein data-Feld, und JSONL ist das naturliche Nutzlastformat, da jede Zeile ein vollstandiges JSON-Objekt ist, das der Client sofort parsen kann. Im Gegensatz zu WebSockets funktioniert SSE uber Standard-HTTP, erfordert keine spezielle Proxy-Konfiguration und verbindet sich bei Ausfallen automatisch wieder.
Der Server setzt den Content-Type auf text/event-stream und schreibt jeden JSONL-Datensatz als SSE-data-Feld. Die Verbindung bleibt offen, und der Server sendet neue Events, sobald sie verfugbar sind.
import express from 'express';const app = express();// Simulate a real-time data sourcefunction* generateMetrics() {let id = 0;while (true) {yield {id: ++id,cpu: Math.random() * 100,memory: Math.random() * 16384,timestamp: new Date().toISOString(),};}}app.get('/api/stream/metrics', (req, res) => {// SSE headersres.setHeader('Content-Type', 'text/event-stream');res.setHeader('Cache-Control', 'no-cache');res.setHeader('Connection', 'keep-alive');res.setHeader('Access-Control-Allow-Origin', '*');res.flushHeaders();const metrics = generateMetrics();const interval = setInterval(() => {const record = metrics.next().value;// Each SSE event contains one JSONL recordres.write(`data: ${JSON.stringify(record)}\n\n`);}, 1000);req.on('close', () => {clearInterval(interval);res.end();});});app.listen(3000, () => console.log('SSE server on :3000'));
Der Browser verwendet die EventSource-API, um sich mit dem SSE-Endpunkt zu verbinden. Jedes eingehende Event enthalt einen JSONL-Datensatz, der in Echtzeit geparst und gerendert wird. EventSource behandelt die automatische Wiederverbindung, wenn die Verbindung unterbrochen wird.
// Connect to the SSE endpointconst source = new EventSource('/api/stream/metrics');// Parse each JSONL record as it arrivessource.onmessage = (event) => {const record = JSON.parse(event.data);console.log(`[${record.timestamp}] CPU: ${record.cpu.toFixed(1)}%`);// Update your UI in real timeupdateDashboard(record);};source.onerror = (err) => {console.error('SSE connection error:', err);// EventSource automatically reconnects};// For Fetch-based SSE (more control over the stream)async function fetchSSE(url, onRecord) {const response = await fetch(url);const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();let buffer = '';while (true) {const { done, value } = await reader.read();if (done) break;buffer += value;const events = buffer.split('\n\n');buffer = events.pop();for (const event of events) {const dataLine = event.split('\n').find(l => l.startsWith('data: '));if (dataLine) {const record = JSON.parse(dataLine.slice(6));onRecord(record);}}}}await fetchSSE('/api/stream/metrics', (record) => {console.log('Metric:', record);});
WebSocket + JSONL-Streaming
Wahrend SSE Server-zu-Client-Push behandelt, ermoglichen WebSockets Vollduplex-Kommunikation, bei der beide Seiten jederzeit JSONL-Datensatze senden konnen. Dies ist ideal fur interaktive Anwendungen wie kollaboratives Editieren, Echtzeit-Dashboards mit Benutzerbefehlen und bidirektionale Datensynchronisation. Jede WebSocket-Nachricht enthalt einen einzelnen JSONL-Datensatz, was das Parsen auf beiden Seiten unkompliziert macht.
import { WebSocketServer } from 'ws';const wss = new WebSocketServer({ port: 8080 });wss.on('connection', (ws) => {console.log('Client connected');// Send JSONL records to the clientconst interval = setInterval(() => {const record = {type: 'metric',value: Math.random() * 100,timestamp: new Date().toISOString(),};ws.send(JSON.stringify(record));}, 500);// Receive JSONL records from the clientws.on('message', (data) => {try {const record = JSON.parse(data.toString());console.log(`Received: ${record.type}`, record);// Echo back an acknowledgmentws.send(JSON.stringify({type: 'ack',originalId: record.id,processedAt: new Date().toISOString(),}));} catch (err) {ws.send(JSON.stringify({ type: 'error', message: 'Invalid JSON' }));}});ws.on('close', () => {clearInterval(interval);console.log('Client disconnected');});});// Client-side usage:// const ws = new WebSocket('ws://localhost:8080');// ws.onmessage = (e) => {// const record = JSON.parse(e.data);// console.log(record);// };// ws.send(JSON.stringify({ type: 'subscribe', channel: 'metrics' }));
Jede WebSocket-Nachricht ist ein einzelnes JSON-Objekt, das der JSONL-Konvention von einem Datensatz pro Nachricht folgt. Der Server streamt Metriken an den Client im 500-ms-Intervall und akzeptiert gleichzeitig Befehle vom Client. Beide Richtungen verwenden die gleiche JSON-Serialisierung, sodass das Protokoll symmetrisch und leicht zu debuggen ist.
Log-Streaming in der Praxis
Eine der haufigsten realen Anwendungen von JSONL-Streaming ist die zentralisierte Log-Erfassung. Anwendungen schreiben strukturierte Log-Eintrage als JSONL-Zeilen auf stdout oder in eine Datei, und ein Collector-Agent verfolgt die Ausgabe in Echtzeit und leitet Datensatze an ein zentrales System weiter. Dieses Muster wird von Docker, Kubernetes und den meisten Cloud-Logging-Diensten verwendet.
import { createReadStream, watchFile } from 'node:fs';import { createInterface } from 'node:readline';import { stat } from 'node:fs/promises';class JsonlLogCollector {constructor(filePath, handlers = {}) {this.filePath = filePath;this.handlers = handlers;this.position = 0;this.running = false;}async start() {this.running = true;// Read existing content firstawait this.readFrom(0);// Watch for new appendsthis.watch();}async readFrom(startPos) {const rl = createInterface({input: createReadStream(this.filePath, {encoding: 'utf-8',start: startPos,}),crlfDelay: Infinity,});for await (const line of rl) {if (!line.trim()) continue;try {const record = JSON.parse(line);await this.dispatch(record);} catch (err) {this.handlers.onError?.(err, line);}}const info = await stat(this.filePath);this.position = info.size;}watch() {watchFile(this.filePath, { interval: 500 }, async (curr, prev) => {if (!this.running) return;if (curr.size > this.position) {await this.readFrom(this.position);}});}async dispatch(record) {const level = record.level || 'info';this.handlers[level]?.(record);this.handlers.onRecord?.(record);}stop() {this.running = false;}}// Usageconst collector = new JsonlLogCollector('app.log.jsonl', {error: (r) => console.error(`[ERROR] ${r.message}`),warn: (r) => console.warn(`[WARN] ${r.message}`),onRecord: (r) => forwardToElasticsearch(r),onError: (err, line) => console.error('Bad line:', line),});await collector.start();
Dieser Collector verfolgt eine JSONL-Log-Datei in Echtzeit und leitet Datensatze basierend auf dem Log-Level an Handler weiter. Er verfolgt die Dateiposition, sodass er nach dem ersten Durchlauf nur neue Inhalte liest. In der Produktion wurden Sie die Erkennung von Dateirotation, ein ordnungsgemasses Herunterfahren und Batch-Weiterleitung hinzufugen, um Netzwerkaufrufe an Ihr zentralisiertes Logging-System zu reduzieren.
Probieren Sie unsere kostenlosen JSONL-Tools
Mochten Sie JSONL-Daten inspizieren oder konvertieren, bevor Sie Ihre Streaming-Pipeline aufbauen? Verwenden Sie unsere kostenlosen Online-Tools, um JSONL-Dateien direkt in Ihrem Browser anzuzeigen, zu konvertieren und zu formatieren. Die gesamte Verarbeitung erfolgt lokal, sodass Ihre Daten privat bleiben.