JSONL-Streaming: Daten in Echtzeit verarbeiten

Eine umfassende Anleitung zum Streaming von JSONL-Daten in Echtzeit. Erfahren Sie, wie Sie Streaming-Pipelines mit Node.js readline, Python-Generatoren, Server-Sent Events, WebSockets und produktionsreifen Log-Streaming-Mustern erstellen.

Zuletzt aktualisiert: Februar 2026

Warum JSONL ideal fur Streaming ist

JSONL (JSON Lines) speichert ein vollstandiges JSON-Objekt pro Zeile, getrennt durch Zeilenumbruchzeichen. Diese zeilengetrennte Struktur macht es zum perfekten Format fur Streaming, da jede Zeile eine eigenstandige, parsbare Einheit ist. Im Gegensatz zu einem einzelnen grossen JSON-Array mussen Sie niemals den gesamten Datensatz in den Speicher laden, bevor Sie mit der Verarbeitung von Datensatzen beginnen konnen. Ein Consumer kann eine Zeile lesen, sie parsen, darauf reagieren und zur nachsten ubergehen, wobei der Speicherverbrauch konstant bleibt, unabhangig davon, wie gross die Datenquelle wird.

Diese Eigenschaft ist der Grund, warum JSONL in Log-Aggregationssystemen, Echtzeit-Analyse-Pipelines, Machine-Learning-Datenfeeds und API-Streaming-Antworten dominiert. Tools wie die OpenAI-API, AWS CloudWatch und Apache Kafka verwenden alle zeilengetrenntes JSON, weil es die universelle Lesbarkeit von JSON mit der Streaming-Effizienz zeilenorientierter Protokolle kombiniert. In dieser Anleitung lernen Sie, wie Sie JSONL-Streaming-Losungen in Node.js und Python erstellen, Echtzeitdaten mit Server-Sent Events an Browser ubertragen, JSONL uber WebSockets austauschen und ein produktionsreifes Log-Streaming-System implementieren.

JSONL-Streaming in Node.js

Node.js basiert auf einem ereignisgesteuerten, nicht-blockierenden I/O-Modell, das es aussergewohnlich gut fur Streaming-Workloads geeignet macht. Die integrierten Module readline und stream bieten alles, was Sie benotigen, um JSONL-Dateien und Netzwerkstreams Zeile fur Zeile zu verarbeiten, ohne die gesamte Nutzlast in den Speicher zu laden.

Das readline-Modul in Kombination mit fs.createReadStream ist die Standardmethode, um eine JSONL-Datei in Node.js zu verarbeiten. Es liest die Datei in Blocken und gibt vollstandige Zeilen aus, sodass der Speicher auch bei Multi-Gigabyte-Dateien konstant bleibt.

Stream-Lesen mit readline
import { createReadStream } from 'node:fs';
import { createInterface } from 'node:readline';
async function streamJsonl(filePath, onRecord) {
const rl = createInterface({
input: createReadStream(filePath, 'utf-8'),
crlfDelay: Infinity,
});
let count = 0;
for await (const line of rl) {
const trimmed = line.trim();
if (!trimmed) continue;
try {
const record = JSON.parse(trimmed);
await onRecord(record);
count++;
} catch (err) {
console.error(`Skipping invalid line: ${err.message}`);
}
}
return count;
}
// Usage: process each record as it arrives
const total = await streamJsonl('events.jsonl', async (record) => {
console.log(`Event: ${record.type} at ${record.timestamp}`);
});
console.log(`Processed ${total} records`);

Node.js Transform Streams ermoglichen es Ihnen, zusammensetzbare Datenpipelines zu erstellen, die JSONL lesen, jeden Datensatz filtern oder anreichern und die Ergebnisse zuruckschreiben. Dieses Muster behandelt Backpressure automatisch und pausiert die Quelle, wenn das Ziel nicht mithalten kann.

Transform Streams fur Pipelines
import { createReadStream, createWriteStream } from 'node:fs';
import { Transform } from 'node:stream';
import { pipeline } from 'node:stream/promises';
// Split incoming chunks into individual lines
class LineSplitter extends Transform {
constructor() {
super({ readableObjectMode: true });
this.buffer = '';
}
_transform(chunk, encoding, cb) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
this.buffer = lines.pop();
for (const line of lines) {
if (line.trim()) this.push(line.trim());
}
cb();
}
_flush(cb) {
if (this.buffer.trim()) this.push(this.buffer.trim());
cb();
}
}
// Parse, transform, and re-serialize each record
class JsonlTransform extends Transform {
constructor(transformFn) {
super({ objectMode: true });
this.transformFn = transformFn;
}
_transform(line, encoding, cb) {
try {
const record = JSON.parse(line);
const result = this.transformFn(record);
if (result) cb(null, JSON.stringify(result) + '\n');
else cb(); // Filter out null results
} catch (err) {
cb(); // Skip invalid lines
}
}
}
// Build the pipeline: read -> split -> transform -> write
await pipeline(
createReadStream('input.jsonl', 'utf-8'),
new LineSplitter(),
new JsonlTransform((record) => {
if (record.level === 'error') {
return { ...record, flagged: true, reviewedAt: new Date().toISOString() };
}
return null; // Filter out non-error records
}),
createWriteStream('errors.jsonl', 'utf-8')
);
console.log('Pipeline complete');

JSONL-Streaming in Python

Python-Generatoren bieten eine elegante Moglichkeit, JSONL-Daten zu streamen. Da ein Generator jeweils einen Datensatz liefert und nur voranschreitet, wenn der Consumer den nachsten Wert anfordert, bleibt der Speicherverbrauch minimal. In Kombination mit dem integrierten json-Modul konnen Sie leistungsstarke Streaming-Pipelines mit nur wenigen Codezeilen erstellen.

Eine Python-Generatorfunktion liest die Datei Zeile fur Zeile mit einer Standard-for-Schleife. Jeder Aufruf von next() liest genau eine Zeile, parst sie und gibt das Ergebnis zuruck. Das Dateihandle bleibt offen und die Position ruckt nur vor, wenn der Consumer Datensatze abruft.

Generatorbasiertes Streaming
import json
from typing import Generator, Any
def stream_jsonl(file_path: str) -> Generator[dict[str, Any], None, None]:
"""Stream JSONL records one at a time using a generator."""
with open(file_path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
yield json.loads(line)
except json.JSONDecodeError as e:
print(f"Skipping line {line_num}: {e}")
# Usage: process records lazily
for record in stream_jsonl('events.jsonl'):
if record.get('level') == 'error':
print(f"ERROR: {record['message']}")
# Or collect a subset with itertools
import itertools
first_100 = list(itertools.islice(stream_jsonl('large.jsonl'), 100))
print(f"Loaded first {len(first_100)} records")

Python itertools ermoglicht es Ihnen, Streaming-Operationen wie Filtern, Batching und Slicing zu komponieren, ohne Zwischenergebnisse zu materialisieren. Jeder Schritt in der Kette verarbeitet jeweils einen Datensatz, sodass Sie Dateien verarbeiten konnen, die grosser als der verfugbare RAM sind.

Verkettete Verarbeitung mit itertools
import json
import itertools
from typing import Generator, Iterable
def stream_jsonl(path: str) -> Generator[dict, None, None]:
with open(path, 'r') as f:
for line in f:
if line.strip():
yield json.loads(line)
def filter_records(records: Iterable[dict], key: str, value) -> Generator[dict, None, None]:
"""Filter records where key equals value."""
for record in records:
if record.get(key) == value:
yield record
def batch(iterable: Iterable, size: int) -> Generator[list, None, None]:
"""Yield successive batches of the given size."""
it = iter(iterable)
while True:
chunk = list(itertools.islice(it, size))
if not chunk:
break
yield chunk
# Compose a streaming pipeline
records = stream_jsonl('access_logs.jsonl')
errors = filter_records(records, 'status', 500)
for i, error_batch in enumerate(batch(errors, 50)):
print(f"Batch {i + 1}: {len(error_batch)} errors")
# Send batch to alerting system, database, etc.
for record in error_batch:
print(f" {record['timestamp']} - {record['path']}")

Server-Sent Events mit JSONL

Server-Sent Events (SSE) bieten ein einfaches, HTTP-basiertes Protokoll zum Ubertragen von Echtzeit-Updates vom Server zum Client. Jedes SSE-Event tragt ein data-Feld, und JSONL ist das naturliche Nutzlastformat, da jede Zeile ein vollstandiges JSON-Objekt ist, das der Client sofort parsen kann. Im Gegensatz zu WebSockets funktioniert SSE uber Standard-HTTP, erfordert keine spezielle Proxy-Konfiguration und verbindet sich bei Ausfallen automatisch wieder.

Der Server setzt den Content-Type auf text/event-stream und schreibt jeden JSONL-Datensatz als SSE-data-Feld. Die Verbindung bleibt offen, und der Server sendet neue Events, sobald sie verfugbar sind.

SSE-Server (Node.js / Express)
import express from 'express';
const app = express();
// Simulate a real-time data source
function* generateMetrics() {
let id = 0;
while (true) {
yield {
id: ++id,
cpu: Math.random() * 100,
memory: Math.random() * 16384,
timestamp: new Date().toISOString(),
};
}
}
app.get('/api/stream/metrics', (req, res) => {
// SSE headers
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('Access-Control-Allow-Origin', '*');
res.flushHeaders();
const metrics = generateMetrics();
const interval = setInterval(() => {
const record = metrics.next().value;
// Each SSE event contains one JSONL record
res.write(`data: ${JSON.stringify(record)}\n\n`);
}, 1000);
req.on('close', () => {
clearInterval(interval);
res.end();
});
});
app.listen(3000, () => console.log('SSE server on :3000'));

Der Browser verwendet die EventSource-API, um sich mit dem SSE-Endpunkt zu verbinden. Jedes eingehende Event enthalt einen JSONL-Datensatz, der in Echtzeit geparst und gerendert wird. EventSource behandelt die automatische Wiederverbindung, wenn die Verbindung unterbrochen wird.

SSE-Client (Browser)
// Connect to the SSE endpoint
const source = new EventSource('/api/stream/metrics');
// Parse each JSONL record as it arrives
source.onmessage = (event) => {
const record = JSON.parse(event.data);
console.log(`[${record.timestamp}] CPU: ${record.cpu.toFixed(1)}%`);
// Update your UI in real time
updateDashboard(record);
};
source.onerror = (err) => {
console.error('SSE connection error:', err);
// EventSource automatically reconnects
};
// For Fetch-based SSE (more control over the stream)
async function fetchSSE(url, onRecord) {
const response = await fetch(url);
const reader = response.body
.pipeThrough(new TextDecoderStream())
.getReader();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += value;
const events = buffer.split('\n\n');
buffer = events.pop();
for (const event of events) {
const dataLine = event.split('\n').find(l => l.startsWith('data: '));
if (dataLine) {
const record = JSON.parse(dataLine.slice(6));
onRecord(record);
}
}
}
}
await fetchSSE('/api/stream/metrics', (record) => {
console.log('Metric:', record);
});

WebSocket + JSONL-Streaming

Wahrend SSE Server-zu-Client-Push behandelt, ermoglichen WebSockets Vollduplex-Kommunikation, bei der beide Seiten jederzeit JSONL-Datensatze senden konnen. Dies ist ideal fur interaktive Anwendungen wie kollaboratives Editieren, Echtzeit-Dashboards mit Benutzerbefehlen und bidirektionale Datensynchronisation. Jede WebSocket-Nachricht enthalt einen einzelnen JSONL-Datensatz, was das Parsen auf beiden Seiten unkompliziert macht.

WebSocket-Server mit JSONL-Nachrichten
import { WebSocketServer } from 'ws';
const wss = new WebSocketServer({ port: 8080 });
wss.on('connection', (ws) => {
console.log('Client connected');
// Send JSONL records to the client
const interval = setInterval(() => {
const record = {
type: 'metric',
value: Math.random() * 100,
timestamp: new Date().toISOString(),
};
ws.send(JSON.stringify(record));
}, 500);
// Receive JSONL records from the client
ws.on('message', (data) => {
try {
const record = JSON.parse(data.toString());
console.log(`Received: ${record.type}`, record);
// Echo back an acknowledgment
ws.send(JSON.stringify({
type: 'ack',
originalId: record.id,
processedAt: new Date().toISOString(),
}));
} catch (err) {
ws.send(JSON.stringify({ type: 'error', message: 'Invalid JSON' }));
}
});
ws.on('close', () => {
clearInterval(interval);
console.log('Client disconnected');
});
});
// Client-side usage:
// const ws = new WebSocket('ws://localhost:8080');
// ws.onmessage = (e) => {
// const record = JSON.parse(e.data);
// console.log(record);
// };
// ws.send(JSON.stringify({ type: 'subscribe', channel: 'metrics' }));

Jede WebSocket-Nachricht ist ein einzelnes JSON-Objekt, das der JSONL-Konvention von einem Datensatz pro Nachricht folgt. Der Server streamt Metriken an den Client im 500-ms-Intervall und akzeptiert gleichzeitig Befehle vom Client. Beide Richtungen verwenden die gleiche JSON-Serialisierung, sodass das Protokoll symmetrisch und leicht zu debuggen ist.

Log-Streaming in der Praxis

Eine der haufigsten realen Anwendungen von JSONL-Streaming ist die zentralisierte Log-Erfassung. Anwendungen schreiben strukturierte Log-Eintrage als JSONL-Zeilen auf stdout oder in eine Datei, und ein Collector-Agent verfolgt die Ausgabe in Echtzeit und leitet Datensatze an ein zentrales System weiter. Dieses Muster wird von Docker, Kubernetes und den meisten Cloud-Logging-Diensten verwendet.

Echtzeit-JSONL-Log-Collector
import { createReadStream, watchFile } from 'node:fs';
import { createInterface } from 'node:readline';
import { stat } from 'node:fs/promises';
class JsonlLogCollector {
constructor(filePath, handlers = {}) {
this.filePath = filePath;
this.handlers = handlers;
this.position = 0;
this.running = false;
}
async start() {
this.running = true;
// Read existing content first
await this.readFrom(0);
// Watch for new appends
this.watch();
}
async readFrom(startPos) {
const rl = createInterface({
input: createReadStream(this.filePath, {
encoding: 'utf-8',
start: startPos,
}),
crlfDelay: Infinity,
});
for await (const line of rl) {
if (!line.trim()) continue;
try {
const record = JSON.parse(line);
await this.dispatch(record);
} catch (err) {
this.handlers.onError?.(err, line);
}
}
const info = await stat(this.filePath);
this.position = info.size;
}
watch() {
watchFile(this.filePath, { interval: 500 }, async (curr, prev) => {
if (!this.running) return;
if (curr.size > this.position) {
await this.readFrom(this.position);
}
});
}
async dispatch(record) {
const level = record.level || 'info';
this.handlers[level]?.(record);
this.handlers.onRecord?.(record);
}
stop() {
this.running = false;
}
}
// Usage
const collector = new JsonlLogCollector('app.log.jsonl', {
error: (r) => console.error(`[ERROR] ${r.message}`),
warn: (r) => console.warn(`[WARN] ${r.message}`),
onRecord: (r) => forwardToElasticsearch(r),
onError: (err, line) => console.error('Bad line:', line),
});
await collector.start();

Dieser Collector verfolgt eine JSONL-Log-Datei in Echtzeit und leitet Datensatze basierend auf dem Log-Level an Handler weiter. Er verfolgt die Dateiposition, sodass er nach dem ersten Durchlauf nur neue Inhalte liest. In der Produktion wurden Sie die Erkennung von Dateirotation, ein ordnungsgemasses Herunterfahren und Batch-Weiterleitung hinzufugen, um Netzwerkaufrufe an Ihr zentralisiertes Logging-System zu reduzieren.

Probieren Sie unsere kostenlosen JSONL-Tools

Mochten Sie JSONL-Daten inspizieren oder konvertieren, bevor Sie Ihre Streaming-Pipeline aufbauen? Verwenden Sie unsere kostenlosen Online-Tools, um JSONL-Dateien direkt in Ihrem Browser anzuzeigen, zu konvertieren und zu formatieren. Die gesamte Verarbeitung erfolgt lokal, sodass Ihre Daten privat bleiben.

JSONL-Dateien online bearbeiten

JSONL-Dateien bis zu 1 GB direkt in Ihrem Browser anzeigen, validieren und konvertieren. Kein Upload erforderlich, 100 % privat.

Haufig gestellte Fragen

JSONL Streaming — JSON Lines in Echtzeit verarbeiten (Nod...