Strumieniowanie JSONL: Przetwarzanie danych w czasie rzeczywistym

Kompleksowy przewodnik po strumieniowaniu danych JSONL w czasie rzeczywistym. Dowiedz się, jak budować potoki strumieniowania z Node.js readline, generatorami Python, Server-Sent Events, WebSockets i wzorcami strumieniowania logów produkcyjnych.

Ostatnia aktualizacja: luty 2026

Dlaczego JSONL jest idealny do strumieniowania

JSONL (JSON Lines) przechowuje jeden kompletny obiekt JSON na linię, oddzielony znakami nowej linii. Ta struktura rozdzielana liniami sprawia, że jest idealnym formatem do strumieniowania, ponieważ każda linia jest samodzielną, parsowalną jednostką. W przeciwieństwie do pojedynczej dużej tablicy JSON, nigdy nie musisz ładować całego zbioru danych do pamięci, zanim zaczniesz przetwarzać rekordy. Konsument może odczytać jedną linię, sparsować ją, wykonać na niej działanie i przejść do następnej, utrzymując stałe zużycie pamięci niezależnie od wielkości źródła danych.

Ta właściwość sprawia, że JSONL dominuje w systemach agregacji logów, potokach analityki czasu rzeczywistego, strumieniach danych uczenia maszynowego i odpowiedziach strumieniowych API. Narzędzia takie jak API OpenAI, AWS CloudWatch i Apache Kafka używają JSON-a rozdzielanego znakami nowej linii, ponieważ łączy on uniwersalną czytelność JSON z wydajnością strumieniowania protokołów liniowych. W tym przewodniku dowiesz się, jak budować rozwiązania strumieniowania JSONL w Node.js i Python, przesyłać dane w czasie rzeczywistym do przeglądarek za pomocą Server-Sent Events, wymieniać JSONL przez WebSockets i implementować produkcyjny system strumieniowania logów.

Strumieniowanie JSONL w Node.js

Node.js jest zbudowany na zdarzeniowym, nieblokującym modelu I/O, który czyni go wyjątkowo dobrze przystosowanym do obciążeń strumieniowych. Wbudowane moduły readline i stream dają Ci wszystko, czego potrzebujesz do przetwarzania plików JSONL i strumieni sieciowych linia po linii bez ładowania pełnego ładunku do pamięci.

Moduł readline w połączeniu z fs.createReadStream to standardowy sposób konsumowania pliku JSONL w Node.js. Odczytuje plik w porcjach i emituje kompletne linie, więc pamięć pozostaje stała nawet dla plików wielogigabajtowych.

Odczyt strumieniowy z readline
import { createReadStream } from 'node:fs';
import { createInterface } from 'node:readline';
async function streamJsonl(filePath, onRecord) {
const rl = createInterface({
input: createReadStream(filePath, 'utf-8'),
crlfDelay: Infinity,
});
let count = 0;
for await (const line of rl) {
const trimmed = line.trim();
if (!trimmed) continue;
try {
const record = JSON.parse(trimmed);
await onRecord(record);
count++;
} catch (err) {
console.error(`Skipping invalid line: ${err.message}`);
}
}
return count;
}
// Usage: process each record as it arrives
const total = await streamJsonl('events.jsonl', async (record) => {
console.log(`Event: ${record.type} at ${record.timestamp}`);
});
console.log(`Processed ${total} records`);

Transform Streams w Node.js pozwalają budować komponowalne potoki danych, które odczytują JSONL, filtrują lub wzbogacają każdy rekord i zapisują wyniki z powrotem. Ten wzorzec automatycznie obsługuje backpressure, wstrzymując źródło, gdy miejsce docelowe nie nadąża.

Transform Streams dla potoków
import { createReadStream, createWriteStream } from 'node:fs';
import { Transform } from 'node:stream';
import { pipeline } from 'node:stream/promises';
// Split incoming chunks into individual lines
class LineSplitter extends Transform {
constructor() {
super({ readableObjectMode: true });
this.buffer = '';
}
_transform(chunk, encoding, cb) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
this.buffer = lines.pop();
for (const line of lines) {
if (line.trim()) this.push(line.trim());
}
cb();
}
_flush(cb) {
if (this.buffer.trim()) this.push(this.buffer.trim());
cb();
}
}
// Parse, transform, and re-serialize each record
class JsonlTransform extends Transform {
constructor(transformFn) {
super({ objectMode: true });
this.transformFn = transformFn;
}
_transform(line, encoding, cb) {
try {
const record = JSON.parse(line);
const result = this.transformFn(record);
if (result) cb(null, JSON.stringify(result) + '\n');
else cb(); // Filter out null results
} catch (err) {
cb(); // Skip invalid lines
}
}
}
// Build the pipeline: read -> split -> transform -> write
await pipeline(
createReadStream('input.jsonl', 'utf-8'),
new LineSplitter(),
new JsonlTransform((record) => {
if (record.level === 'error') {
return { ...record, flagged: true, reviewedAt: new Date().toISOString() };
}
return null; // Filter out non-error records
}),
createWriteStream('errors.jsonl', 'utf-8')
);
console.log('Pipeline complete');

Strumieniowanie JSONL w Python

Generatory Python zapewniają elegancki sposób strumieniowania danych JSONL. Ponieważ generator zwraca jeden rekord na raz i przechodzi dalej dopiero, gdy konsument zażąda następnej wartości, zużycie pamięci pozostaje minimalne. W połączeniu z wbudowanym modułem json możesz budować potężne potoki strumieniowania za pomocą zaledwie kilku linii kodu.

Funkcja generatora Python odczytuje plik linia po linii za pomocą standardowej pętli for. Każde wywołanie next() odczytuje dokładnie jedną linię, parsuje ją i zwraca wynik. Uchwyt pliku pozostaje otwarty, a pozycja przesuwa się tylko w miarę pobierania rekordów przez konsumenta.

Strumieniowanie oparte na generatorach
import json
from typing import Generator, Any
def stream_jsonl(file_path: str) -> Generator[dict[str, Any], None, None]:
"""Stream JSONL records one at a time using a generator."""
with open(file_path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
yield json.loads(line)
except json.JSONDecodeError as e:
print(f"Skipping line {line_num}: {e}")
# Usage: process records lazily
for record in stream_jsonl('events.jsonl'):
if record.get('level') == 'error':
print(f"ERROR: {record['message']}")
# Or collect a subset with itertools
import itertools
first_100 = list(itertools.islice(stream_jsonl('large.jsonl'), 100))
print(f"Loaded first {len(first_100)} records")

Python itertools pozwala komponować operacje strumieniowe, takie jak filtrowanie, grupowanie i wycinanie, bez materializowania wyników pośrednich. Każdy krok w łańcuchu przetwarza jeden rekord na raz, więc możesz obsługiwać pliki większe niż dostępna pamięć RAM.

Łańcuchowe przetwarzanie z itertools
import json
import itertools
from typing import Generator, Iterable
def stream_jsonl(path: str) -> Generator[dict, None, None]:
with open(path, 'r') as f:
for line in f:
if line.strip():
yield json.loads(line)
def filter_records(records: Iterable[dict], key: str, value) -> Generator[dict, None, None]:
"""Filter records where key equals value."""
for record in records:
if record.get(key) == value:
yield record
def batch(iterable: Iterable, size: int) -> Generator[list, None, None]:
"""Yield successive batches of the given size."""
it = iter(iterable)
while True:
chunk = list(itertools.islice(it, size))
if not chunk:
break
yield chunk
# Compose a streaming pipeline
records = stream_jsonl('access_logs.jsonl')
errors = filter_records(records, 'status', 500)
for i, error_batch in enumerate(batch(errors, 50)):
print(f"Batch {i + 1}: {len(error_batch)} errors")
# Send batch to alerting system, database, etc.
for record in error_batch:
print(f" {record['timestamp']} - {record['path']}")

Server-Sent Events z JSONL

Server-Sent Events (SSE) zapewniają prosty, oparty na HTTP protokół do przesyłania aktualizacji w czasie rzeczywistym z serwera do klienta. Każde zdarzenie SSE niesie pole data, a JSONL jest naturalnym formatem ładunku, ponieważ każda linia jest kompletnym obiektem JSON, który klient może natychmiast sparsować. W przeciwieństwie do WebSockets, SSE działa przez standardowy HTTP, nie wymaga specjalnej konfiguracji proxy i automatycznie łączy się ponownie po awarii.

Serwer ustawia Content-Type na text/event-stream i zapisuje każdy rekord JSONL jako pole data SSE. Połączenie pozostaje otwarte, a serwer przesyła nowe zdarzenia, gdy stają się dostępne.

Serwer SSE (Node.js / Express)
import express from 'express';
const app = express();
// Simulate a real-time data source
function* generateMetrics() {
let id = 0;
while (true) {
yield {
id: ++id,
cpu: Math.random() * 100,
memory: Math.random() * 16384,
timestamp: new Date().toISOString(),
};
}
}
app.get('/api/stream/metrics', (req, res) => {
// SSE headers
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('Access-Control-Allow-Origin', '*');
res.flushHeaders();
const metrics = generateMetrics();
const interval = setInterval(() => {
const record = metrics.next().value;
// Each SSE event contains one JSONL record
res.write(`data: ${JSON.stringify(record)}\n\n`);
}, 1000);
req.on('close', () => {
clearInterval(interval);
res.end();
});
});
app.listen(3000, () => console.log('SSE server on :3000'));

Przeglądarka używa API EventSource do łączenia się z punktem końcowym SSE. Każde przychodzące zdarzenie zawiera rekord JSONL, który jest parsowany i renderowany w czasie rzeczywistym. EventSource obsługuje automatyczne ponowne połączenie w przypadku utraty połączenia.

Klient SSE (przeglądarka)
// Connect to the SSE endpoint
const source = new EventSource('/api/stream/metrics');
// Parse each JSONL record as it arrives
source.onmessage = (event) => {
const record = JSON.parse(event.data);
console.log(`[${record.timestamp}] CPU: ${record.cpu.toFixed(1)}%`);
// Update your UI in real time
updateDashboard(record);
};
source.onerror = (err) => {
console.error('SSE connection error:', err);
// EventSource automatically reconnects
};
// For Fetch-based SSE (more control over the stream)
async function fetchSSE(url, onRecord) {
const response = await fetch(url);
const reader = response.body
.pipeThrough(new TextDecoderStream())
.getReader();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += value;
const events = buffer.split('\n\n');
buffer = events.pop();
for (const event of events) {
const dataLine = event.split('\n').find(l => l.startsWith('data: '));
if (dataLine) {
const record = JSON.parse(dataLine.slice(6));
onRecord(record);
}
}
}
}
await fetchSSE('/api/stream/metrics', (record) => {
console.log('Metric:', record);
});

WebSocket + strumieniowanie JSONL

Podczas gdy SSE obsługuje przesyłanie z serwera do klienta, WebSockets umożliwiają pełnodupleksową komunikację, gdzie obie strony mogą wysyłać rekordy JSONL w dowolnym momencie. Jest to idealne rozwiązanie dla interaktywnych aplikacji, takich jak współedytowanie, pulpity nawigacyjne w czasie rzeczywistym z poleceniami użytkownika i dwukierunkowa synchronizacja danych. Każda wiadomość WebSocket zawiera pojedynczy rekord JSONL, co sprawia, że parsowanie jest proste po obu stronach.

Serwer WebSocket z wiadomościami JSONL
import { WebSocketServer } from 'ws';
const wss = new WebSocketServer({ port: 8080 });
wss.on('connection', (ws) => {
console.log('Client connected');
// Send JSONL records to the client
const interval = setInterval(() => {
const record = {
type: 'metric',
value: Math.random() * 100,
timestamp: new Date().toISOString(),
};
ws.send(JSON.stringify(record));
}, 500);
// Receive JSONL records from the client
ws.on('message', (data) => {
try {
const record = JSON.parse(data.toString());
console.log(`Received: ${record.type}`, record);
// Echo back an acknowledgment
ws.send(JSON.stringify({
type: 'ack',
originalId: record.id,
processedAt: new Date().toISOString(),
}));
} catch (err) {
ws.send(JSON.stringify({ type: 'error', message: 'Invalid JSON' }));
}
});
ws.on('close', () => {
clearInterval(interval);
console.log('Client disconnected');
});
});
// Client-side usage:
// const ws = new WebSocket('ws://localhost:8080');
// ws.onmessage = (e) => {
// const record = JSON.parse(e.data);
// console.log(record);
// };
// ws.send(JSON.stringify({ type: 'subscribe', channel: 'metrics' }));

Każda wiadomość WebSocket to pojedynczy obiekt JSON, zgodny z konwencją JSONL jednego rekordu na wiadomość. Serwer strumieniuje metryki do klienta w odstępach 500ms, jednocześnie przyjmując polecenia od klienta. Oba kierunki używają tej samej serializacji JSON, więc protokół jest symetryczny i łatwy do debugowania.

Strumieniowanie logów w praktyce

Jednym z najczęstszych zastosowań strumieniowania JSONL w świecie rzeczywistym jest scentralizowane zbieranie logów. Aplikacje zapisują ustrukturyzowane wpisy logów jako linie JSONL do stdout lub pliku, a agent kolektora śledzi wyjście w czasie rzeczywistym, przekazując rekordy do systemu centralnego. Ten wzorzec jest używany przez Docker, Kubernetes i większość usług logowania w chmurze.

Kolektor logów JSONL w czasie rzeczywistym
import { createReadStream, watchFile } from 'node:fs';
import { createInterface } from 'node:readline';
import { stat } from 'node:fs/promises';
class JsonlLogCollector {
constructor(filePath, handlers = {}) {
this.filePath = filePath;
this.handlers = handlers;
this.position = 0;
this.running = false;
}
async start() {
this.running = true;
// Read existing content first
await this.readFrom(0);
// Watch for new appends
this.watch();
}
async readFrom(startPos) {
const rl = createInterface({
input: createReadStream(this.filePath, {
encoding: 'utf-8',
start: startPos,
}),
crlfDelay: Infinity,
});
for await (const line of rl) {
if (!line.trim()) continue;
try {
const record = JSON.parse(line);
await this.dispatch(record);
} catch (err) {
this.handlers.onError?.(err, line);
}
}
const info = await stat(this.filePath);
this.position = info.size;
}
watch() {
watchFile(this.filePath, { interval: 500 }, async (curr, prev) => {
if (!this.running) return;
if (curr.size > this.position) {
await this.readFrom(this.position);
}
});
}
async dispatch(record) {
const level = record.level || 'info';
this.handlers[level]?.(record);
this.handlers.onRecord?.(record);
}
stop() {
this.running = false;
}
}
// Usage
const collector = new JsonlLogCollector('app.log.jsonl', {
error: (r) => console.error(`[ERROR] ${r.message}`),
warn: (r) => console.warn(`[WARN] ${r.message}`),
onRecord: (r) => forwardToElasticsearch(r),
onError: (err, line) => console.error('Bad line:', line),
});
await collector.start();

Ten kolektor śledzi plik logów JSONL w czasie rzeczywistym, wysyłając rekordy do handlerów na podstawie poziomu logowania. Śledzi pozycję w pliku, aby odczytywać tylko nową zawartość po początkowym przebiegu. W produkcji dodałbyś wykrywanie rotacji plików, płynne zamykanie i wsadowe przekazywanie, aby zmniejszyć liczbę wywołań sieciowych do scentralizowanego systemu logowania.

Wypróbuj nasze bezpłatne narzędzia JSONL

Chcesz sprawdzić lub przekonwertować dane JSONL przed zbudowaniem potoku strumieniowania? Użyj naszych bezpłatnych narzędzi online do przeglądania, konwertowania i formatowania plików JSONL bezpośrednio w przeglądarce. Całe przetwarzanie odbywa się lokalnie, więc Twoje dane pozostają prywatne.

Pracuj z plikami JSONL online

Przeglądaj, waliduj i konwertuj pliki JSONL do 1GB bezpośrednio w przeglądarce. Bez przesyłania, 100% prywatności.

Najczęściej zadawane pytania

Strumieniowanie JSONL — przetwarzanie JSON Lines w czasie...